Explorar o código

优化子设备状态管理

zhou-hao %!s(int64=4) %!d(string=hai) anos
pai
achega
127028d32a

+ 61 - 22
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java

@@ -2,14 +2,18 @@ package org.jetlinks.community.network.utils;
 
 
 import lombok.AllArgsConstructor;
 import lombok.AllArgsConstructor;
 import org.jetlinks.community.PropertyConstants;
 import org.jetlinks.community.PropertyConstants;
+import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.*;
+import org.jetlinks.core.message.state.DeviceStateCheckMessage;
+import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
 import org.jetlinks.core.server.session.ChildrenDeviceSession;
 import org.jetlinks.core.server.session.ChildrenDeviceSession;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.core.server.session.KeepOnlineSession;
 import org.jetlinks.core.server.session.KeepOnlineSession;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
+import org.springframework.util.StringUtils;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
 import java.time.Duration;
 import java.time.Duration;
@@ -45,25 +49,55 @@ public class DeviceGatewayHelper {
     }
     }
 
 
     protected Mono<Void> handleChildrenDeviceMessage(String deviceId, DeviceMessage children) {
     protected Mono<Void> handleChildrenDeviceMessage(String deviceId, DeviceMessage children) {
+        if (deviceId == null
+            || children instanceof DeviceStateCheckMessage
+            || children instanceof DeviceStateCheckMessageReply
+            || children instanceof DisconnectDeviceMessage
+            || children instanceof DisconnectDeviceMessageReply) {
+            return Mono.empty();
+        }
+        if (children instanceof DeviceMessageReply) {
+            DeviceMessageReply reply = ((DeviceMessageReply) children);
+            if (!reply.isSuccess()) {
+                return Mono.empty();
+            }
+        }
         ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, children.getDeviceId());
         ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, children.getDeviceId());
-        //子设备离线
-        if(children instanceof DeviceOfflineMessage){
+        //子设备离线或者注销
+        if (children instanceof DeviceOfflineMessage || children instanceof DeviceUnRegisterMessage) {
             //注销会话,这里子设备可能会收到多次离线消息
             //注销会话,这里子设备可能会收到多次离线消息
             //注销会话一次离线,消息网关转发子设备消息一次
             //注销会话一次离线,消息网关转发子设备消息一次
+            if (deviceSession != null && children instanceof DeviceOfflineMessage) {
+                //忽略离线消息,因为注销会话时,会自动发送一个离线消息
+                children.addHeader(Headers.ignore, true);
+            }
             return sessionManager
             return sessionManager
-                .unRegisterChildren(deviceId,children.getDeviceId())
+                .unRegisterChildren(deviceId, children.getDeviceId())
                 .then();
                 .then();
         }
         }
         if (deviceSession == null && null != children.getDeviceId()) {
         if (deviceSession == null && null != children.getDeviceId()) {
-            Mono<Void> then = sessionManager
+            //忽略上线消息,因为注册会话时,会自动发送一个上线消息
+            if (children instanceof DeviceOnlineMessage) {
+                children.addHeader(Headers.ignore, true);
+            }
+            Mono<Void> registerSession = sessionManager
                 .registerChildren(deviceId, children.getDeviceId())
                 .registerChildren(deviceId, children.getDeviceId())
                 .then();
                 .then();
             //子设备注册
             //子设备注册
             if (isDoRegister(children)) {
             if (isDoRegister(children)) {
-                then = Mono.delay(Duration.ofSeconds(2))
-                           .then(then);
+                return Mono
+                    .delay(Duration.ofSeconds(2))
+                    .then(registry
+                              .getDevice(children.getDeviceId())
+                              .flatMap(device -> device
+                                  //没有配置状态自管理才自动上线
+                                  .getSelfConfig(DeviceConfigKey.selfManageState)
+                                  .defaultIfEmpty(false)
+                                  .filter(Boolean.FALSE::equals)
+                                  .flatMap(ignore -> registerSession))
+                    );
             }
             }
-            return then;
+            return registerSession;
         }
         }
         return Mono.empty();
         return Mono.empty();
     }
     }
@@ -73,14 +107,17 @@ public class DeviceGatewayHelper {
                                                     Consumer<DeviceSession> sessionConsumer,
                                                     Consumer<DeviceSession> sessionConsumer,
                                                     Supplier<Mono<DeviceOperator>> deviceNotFoundListener) {
                                                     Supplier<Mono<DeviceOperator>> deviceNotFoundListener) {
         String deviceId = message.getDeviceId();
         String deviceId = message.getDeviceId();
+        if (StringUtils.isEmpty(deviceId)) {
+            return Mono.empty();
+        }
         Mono<Void> then = Mono.empty();
         Mono<Void> then = Mono.empty();
         boolean doHandle = true;
         boolean doHandle = true;
         if (message instanceof ChildDeviceMessage) {
         if (message instanceof ChildDeviceMessage) {
             DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
             DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
-            then = handleChildrenDeviceMessage(deviceId,childrenMessage);
+            then = handleChildrenDeviceMessage(deviceId, childrenMessage);
         } else if (message instanceof ChildDeviceMessageReply) {
         } else if (message instanceof ChildDeviceMessageReply) {
             DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage();
             DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage();
-            then = handleChildrenDeviceMessage(deviceId,childrenMessage);
+            then = handleChildrenDeviceMessage(deviceId, childrenMessage);
         } else if (message instanceof DeviceOfflineMessage) {
         } else if (message instanceof DeviceOfflineMessage) {
             //设备离线消息
             //设备离线消息
             DeviceSession session = sessionManager.unregister(deviceId);
             DeviceSession session = sessionManager.unregister(deviceId);
@@ -118,19 +155,21 @@ public class DeviceGatewayHelper {
                 }))
                 }))
                 .flatMap(device -> {
                 .flatMap(device -> {
                     DeviceSession newSession = sessionBuilder.apply(device);
                     DeviceSession newSession = sessionBuilder.apply(device);
-                    //保持会话,在低功率设备上,可能无法保持mqtt长连接.
-                    if (message.getHeader(Headers.keepOnline).orElse(false)) {
-                        int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
-                        newSession = new KeepOnlineSession(newSession, Duration.ofSeconds(timeout));
-                    }
-                    sessionManager.register(newSession);
-                    sessionConsumer.accept(newSession);
-                    newSession.keepAlive();
-                    if (!(message instanceof DeviceRegisterMessage) &&
-                        !(message instanceof DeviceOnlineMessage)) {
-                        return messageHandler
-                            .handleMessage(device, message)
-                            .thenReturn(device);
+                    if (null != newSession) {
+                        //保持会话,在低功率设备上,可能无法保持mqtt长连接.
+                        if (message.getHeader(Headers.keepOnline).orElse(false)) {
+                            int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
+                            newSession = new KeepOnlineSession(newSession, Duration.ofSeconds(timeout));
+                        }
+                        sessionManager.register(newSession);
+                        sessionConsumer.accept(newSession);
+                        newSession.keepAlive();
+                        if (!(message instanceof DeviceRegisterMessage) &&
+                            !(message instanceof DeviceOnlineMessage)) {
+                            return messageHandler
+                                .handleMessage(device, message)
+                                .thenReturn(device);
+                        }
                     }
                     }
                     return Mono.just(device);
                     return Mono.just(device);
                 })
                 })