Browse Source

优化websocket消息

zhou-hao 5 years ago
parent
commit
f2c2245c90

+ 21 - 4
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/Message.java

@@ -8,16 +8,33 @@ public interface Message {
 
     Object getPayload();
 
-    boolean isSuccess();
-
     String getMessage();
 
+    Type getType();
+
+    static Message authError() {
+
+        return new SimpleMessage(null, null, null, Type.authError, "认证失败");
+    }
+
     static Message error(String id, String topic, String message) {
-        return new ErrorMessage(id, topic, message);
+
+        return new SimpleMessage(id, topic, null, Type.error, message);
     }
 
     static Message success(String id, String topic, Object payload) {
-        return new SuccessMessage(id, topic, payload);
+        return new SimpleMessage(id, topic, payload, Type.result, null);
+    }
+
+    static Message complete(String id) {
+        return new SimpleMessage(id, null, null, Type.complete, null);
+    }
+
+    enum Type {
+        authError,
+        result,
+        error,
+        complete
     }
 
 }

+ 3 - 9
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SuccessMessage.java

@@ -9,7 +9,7 @@ import lombok.Setter;
 @AllArgsConstructor
 @Getter
 @Setter
-public class SuccessMessage implements Message {
+public class SimpleMessage implements Message {
 
     private String requestId;
 
@@ -17,14 +17,8 @@ public class SuccessMessage implements Message {
 
     private Object payload;
 
+    private Type type;
 
-    @Override
-    public boolean isSuccess() {
-        return true;
-    }
+    private String message;
 
-    @Override
-    public String getMessage() {
-        return null;
-    }
 }

+ 2 - 5
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/dashboard/DashBoardSubscriptionProvider.java

@@ -5,9 +5,8 @@ import org.jetlinks.community.dashboard.MeasurementParameter;
 import org.jetlinks.community.gateway.external.Message;
 import org.jetlinks.community.gateway.external.SubscribeRequest;
 import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.supports.utils.MqttTopicUtils;
 import org.springframework.stereotype.Component;
-import org.springframework.util.AntPathMatcher;
-import org.springframework.util.PathMatcher;
 import reactor.core.publisher.Flux;
 
 import java.util.Map;
@@ -21,8 +20,6 @@ public class DashBoardSubscriptionProvider implements SubscriptionProvider {
         this.dashboardManager = dashboardManager;
     }
 
-    private static final PathMatcher pathMatcher = new AntPathMatcher();
-
     @Override
     public String id() {
         return "dashboard";
@@ -42,7 +39,7 @@ public class DashBoardSubscriptionProvider implements SubscriptionProvider {
     public Flux<Message> subscribe(SubscribeRequest request) {
         return Flux.defer(() -> {
             try {
-                Map<String, String> variables = pathMatcher.extractUriTemplateVariables(
+                Map<String, String> variables = MqttTopicUtils.getPathVariables(
                     "/dashboard/{dashboard}/{object}/{measurement}/{dimension}", request.getTopic());
                 return dashboardManager.getDashboard(variables.get("dashboard"))
                     .flatMap(dashboard -> dashboard.getObject(variables.get("object")))

+ 32 - 17
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java

@@ -12,9 +12,14 @@ import org.springframework.util.StringUtils;
 import org.springframework.web.reactive.socket.CloseStatus;
 import org.springframework.web.reactive.socket.WebSocketHandler;
 import org.springframework.web.reactive.socket.WebSocketSession;
+import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 
+import javax.annotation.Nonnull;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 @AllArgsConstructor
@@ -28,7 +33,8 @@ public class WebSocketMessagingHandler implements WebSocketHandler {
 
     // /messaging/{token}
     @Override
-    public Mono<Void> handle(WebSocketSession session) {
+    @Nonnull
+    public Mono<Void> handle(@Nonnull WebSocketSession session) {
         String[] path = session.getHandshakeInfo().getUri().getPath().split("[/]");
         if (path.length == 0) {
             return session.send(Mono.just(session.textMessage(JSON.toJSONString(
@@ -37,50 +43,59 @@ public class WebSocketMessagingHandler implements WebSocketHandler {
         }
         String token = path[path.length - 1];
 
-        Set<String> disposable = new ConcurrentSkipListSet<>();
+        Map<String, Disposable> subs = new ConcurrentHashMap<>();
 
         return userTokenManager.getByToken(token)
             .map(UserToken::getUserId)
             .flatMap(authenticationManager::getByUserId)
             .switchIfEmpty(session
                 .send(Mono.just(session.textMessage(JSON.toJSONString(
-                    Message.error("auth", null, "认证失败")
+                    Message.authError()
                 ))))
                 .then(session.close(CloseStatus.BAD_DATA))
                 .then(Mono.empty()))
             .flatMap(auth -> session
                 .receive()
-                .flatMap(message -> {
+                .doOnNext(message -> {
                     MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
                     if (StringUtils.isEmpty(request.getId())) {
-                        return session
+                        session
                             .send(Mono.just(session.textMessage(JSON.toJSONString(
                                 Message.error(request.getType().name(), null, "id不能为空")
-                            ))));
+                            )))).subscribe();
                     }
                     if (request.getType() == MessagingRequest.Type.sub) {
                         //重复订阅
-                        if (disposable.contains(request.getId())) {
-                            return Mono.empty();
+                        if (subs.containsKey(request.getId())) {
+                            return;
                         }
-                        disposable.add(request.getId());
-                        return session.send(messagingManager
+                        subs.put(request.getId(), messagingManager
                             .subscribe(SubscribeRequest.of(request, auth))
                             .onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err.getMessage())))
-                            .takeWhile(r -> disposable.contains(request.getId()))
-                            .switchIfEmpty(Mono.fromSupplier(() -> Message.error(request.getId(), request.getTopic(), "不支持的Topic")))
                             .map(msg -> session.textMessage(JSON.toJSONString(msg)))
-                            .doOnComplete(() -> disposable.remove(request.getId()))
+                            .doOnComplete(() -> {
+                                subs.remove(request.getId());
+                                Mono.just(session.textMessage(JSON.toJSONString(Message.complete(request.getId()))))
+                                    .as(session::send)
+                                    .subscribe();
+                            })
+                            .flatMap(msg -> session.send(Mono.just(msg)))
+                            .subscribe()
                         );
+
                     } else if (request.getType() == MessagingRequest.Type.unsub) {
-                        return Mono.fromRunnable(() -> disposable.remove(request.getId()));
+                        Optional.ofNullable(subs.remove(request.getId()))
+                            .ifPresent(Disposable::dispose);
                     } else {
-                        return session.send(Mono.just(session.textMessage(JSON.toJSONString(
+                        session.send(Mono.just(session.textMessage(JSON.toJSONString(
                             Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType())
-                        ))));
+                        )))).subscribe();
                     }
                 }).then())
-            .doFinally(r -> disposable.clear());
+            .doFinally(r -> {
+                subs.values().forEach(Disposable::dispose);
+                subs.clear();
+            });
 
     }
 }

+ 99 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendSubscriptionProvider.java

@@ -0,0 +1,99 @@
+package org.jetlinks.community.device.message;
+
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.id.IDGenerator;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.enums.ErrorCode;
+import org.jetlinks.core.exception.DeviceOperationException;
+import org.jetlinks.core.message.DeviceMessageReply;
+import org.jetlinks.core.message.MessageType;
+import org.jetlinks.core.message.RepayableDeviceMessage;
+import org.jetlinks.community.device.entity.DeviceInstanceEntity;
+import org.jetlinks.community.device.service.LocalDeviceInstanceService;
+import org.jetlinks.community.gateway.external.Message;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.supports.utils.MqttTopicUtils;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+@AllArgsConstructor
+public class DeviceMessageSendSubscriptionProvider implements SubscriptionProvider {
+
+    private final DeviceRegistry registry;
+
+    private final LocalDeviceInstanceService instanceService;
+
+    @Override
+    public String id() {
+        return "device-message-sender";
+    }
+
+    @Override
+    public String name() {
+        return "设备消息发送";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/device-message-sender/*/*"
+        };
+    }
+
+    @Override
+    public Flux<Message> subscribe(SubscribeRequest request) {
+
+        String topic = request.getTopic();
+
+        Map<String, String> variables = MqttTopicUtils.getPathVariables("/device-message-sender/{productId}/{deviceId}", topic);
+        String deviceId = variables.get("deviceId");
+        String productId = variables.get("productId");
+
+        //发给所有设备
+        if ("*".equals(deviceId)) {
+            return instanceService.createQuery()
+                .select(DeviceInstanceEntity::getId)
+                .where(DeviceInstanceEntity::getProductId, productId)
+                //.and(DeviceInstanceEntity::getState, DeviceState.online)
+                .fetch()
+                .map(DeviceInstanceEntity::getId)
+                .flatMap(id -> doSend(request.getId(), topic, id, new HashMap<>(request.getParameter())));
+        }
+        return Flux.fromArray(deviceId.split("[,]"))
+            .flatMap(id -> doSend(request.getId(), topic, id, new HashMap<>(request.getParameter())));
+
+    }
+
+    public Flux<Message> doSend(String requestId, String topic, String deviceId, Map<String, Object> message) {
+        message.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
+        message.put("deviceId", deviceId);
+
+        RepayableDeviceMessage<?> msg = MessageType.convertMessage(message)
+            .filter(RepayableDeviceMessage.class::isInstance)
+            .map(RepayableDeviceMessage.class::cast)
+            .orElseThrow(() -> new UnsupportedOperationException("不支持的消息格式"));
+        return registry
+            .getDevice(deviceId)
+            .switchIfEmpty(Mono.error(() -> new DeviceOperationException(ErrorCode.CLIENT_OFFLINE)))
+            .flatMapMany(deviceOperator -> deviceOperator
+                .messageSender()
+                .send(Mono.just(msg)))
+            .map(reply -> Message.success(requestId, topic, reply))
+            .onErrorResume(error -> {
+                DeviceMessageReply reply = msg.newReply();
+                if (error instanceof DeviceOperationException) {
+                    reply.error(((DeviceOperationException) error).getCode());
+                } else {
+                    reply.error(error);
+                }
+                return Mono.just(Message.success(requestId, topic, reply));
+            })
+            ;
+    }
+}

+ 46 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSubscriptionProvider.java

@@ -0,0 +1,46 @@
+package org.jetlinks.community.device.message;
+
+import lombok.AllArgsConstructor;
+import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.community.gateway.Subscription;
+import org.jetlinks.community.gateway.external.Message;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Collections;
+
+@Component
+@AllArgsConstructor
+public class DeviceMessageSubscriptionProvider implements SubscriptionProvider {
+
+    private final MessageGateway messageGateway;
+
+    @Override
+    public String id() {
+        return "device-message-subscriber";
+    }
+
+    @Override
+    public String name() {
+        return "订阅设备消息";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/device/*/*/**"
+        };
+    }
+
+    @Override
+    public Flux<Message> subscribe(SubscribeRequest request) {
+        return messageGateway
+            .subscribe(Collections.singletonList(new Subscription(request.getTopic())), true)
+            .flatMap(topic -> Mono.justOrEmpty(DeviceMessageUtils.convert(topic)
+                .map(msg -> Message.success(request.getId(), topic.getTopic(), msg))
+            ));
+    }
+}