瀏覽代碼

优化子设备消息处理

zhou-hao 4 年之前
父節點
當前提交
1ce8d1e0d9

+ 29 - 26
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java

@@ -5,6 +5,7 @@ import org.jetlinks.community.PropertyConstants;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.*;
+import org.jetlinks.core.server.session.ChildrenDeviceSession;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.core.server.session.KeepOnlineSession;
@@ -45,33 +46,28 @@ public class DeviceGatewayHelper {
     }
 
     public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
-                                                    Function<DeviceOperator, DeviceSession> sessionBuilder,
-                                                    Consumer<DeviceSession> sessionConsumer,
-                                                    Supplier<Mono<DeviceOperator>> deviceNotFoundListener) {
-
-        return handleDeviceMessage(message, sessionBuilder, sessionConsumer, deviceNotFoundListener, false);
-    }
-
-    private Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
                                                      Function<DeviceOperator, DeviceSession> sessionBuilder,
                                                      Consumer<DeviceSession> sessionConsumer,
-                                                     Supplier<Mono<DeviceOperator>> deviceNotFoundListener,
-                                                     boolean children) {
+                                                     Supplier<Mono<DeviceOperator>> deviceNotFoundListener) {
         String deviceId = message.getDeviceId();
-        Mono<DeviceOperator> then = Mono.empty();
+        Mono<Void> then = Mono.empty();
+        boolean doHandle = true;
         if (message instanceof ChildDeviceMessage) {
-//            then = handleDeviceMessage((DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage(),
-//                                       sessionBuilder,
-//                                       sessionConsumer,
-//                                       deviceNotFoundListener, true);
+            DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
+            ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, childrenMessage.getDeviceId());
+            if (deviceSession == null) {
+                then = sessionManager
+                    .registerChildren(deviceId, childrenMessage.getDeviceId())
+                    .then(Mono.empty());
+            }
         } else if (message instanceof ChildDeviceMessageReply) {
-//            then = handleDeviceMessage((DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage(),
-//                                       sessionBuilder,
-//                                       sessionConsumer,
-//                                       deviceNotFoundListener, true);
-        } else if (message instanceof DeviceOnlineMessage) {
-            //设备在线消息
-            then = registry.getDevice(deviceId);
+            DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage();
+            ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, childrenMessage.getDeviceId());
+            if (deviceSession == null) {
+                then = sessionManager
+                    .registerChildren(deviceId, childrenMessage.getDeviceId())
+                    .then(Mono.empty());
+            }
         } else if (message instanceof DeviceOfflineMessage) {
             //设备离线消息
             DeviceSession session = sessionManager.unregister(deviceId);
@@ -84,13 +80,16 @@ public class DeviceGatewayHelper {
                         .thenReturn(device));
             }
             return registry.getDevice(deviceId);
+        } else if (message instanceof DeviceOnlineMessage) {
+            //设备在线消息
+            doHandle = false;
         }
         DeviceSession session = sessionManager.getSession(deviceId);
         //session不存在,可能是同一个连接返回多个设备消息
         if (session == null) {
             return registry
                 .getDevice(deviceId)
-                .switchIfEmpty(children ? Mono.empty() : Mono.defer(() -> {
+                .switchIfEmpty(Mono.defer(() -> {
                     //设备注册
                     if (message instanceof DeviceRegisterMessage) {
                         if (message.getHeader(PropertyConstants.deviceName).isPresent()
@@ -117,22 +116,26 @@ public class DeviceGatewayHelper {
                     sessionManager.register(newSession);
                     sessionConsumer.accept(newSession);
                     newSession.keepAlive();
-                    if (message instanceof DeviceRegisterMessage) {
+                    if (!(message instanceof DeviceRegisterMessage) &&
+                        !(message instanceof DeviceOnlineMessage)) {
                         return messageHandler
                             .handleMessage(device, message)
                             .thenReturn(device);
                     }
                     return Mono.just(device);
                 })
-                .then(then);
+                .switchIfEmpty(then.then(Mono.empty()))
+                .flatMap(then::thenReturn)
+                ;
         } else {
             sessionConsumer.accept(session);
             session.keepAlive();
             return then
-                .then(messageHandler.handleMessage(session.getOperator(), message))
+                .then(doHandle ? messageHandler.handleMessage(session.getOperator(), message) : Mono.empty())
                 .then(registry.getDevice(deviceId));
         }
 
     }
 
+
 }

+ 0 - 8
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -244,14 +244,6 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
     protected Mono<Boolean> handleChildrenDeviceMessage(DeviceOperator device, String childrenId, Message message) {
         if (message instanceof DeviceMessageReply) {
             return doReply(((DeviceMessageReply) message));
-        } else if (message instanceof DeviceOnlineMessage) {
-            return sessionManager.registerChildren(device.getDeviceId(), childrenId)
-                                 .thenReturn(true)
-                                 .defaultIfEmpty(false);
-        } else if (message instanceof DeviceOfflineMessage) {
-            return sessionManager.unRegisterChildren(device.getDeviceId(), childrenId)
-                                 .thenReturn(true)
-                                 .defaultIfEmpty(false);
         }
         return Mono.just(true);
     }