Jelajahi Sumber

优化设备会话处理

zhouhao 3 tahun lalu
induk
melakukan
0a683684ed

+ 32 - 9
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java

@@ -38,18 +38,29 @@ public class DeviceGatewayHelper {
     private final DeviceSessionManager sessionManager;
     private final DecodedClientMessageHandler messageHandler;
 
-    @Deprecated
     public static Consumer<DeviceSession> applySessionKeepaliveTimeout(DeviceMessage msg, Supplier<Duration> timeoutSupplier) {
         return session -> {
-            //do nothing
-
+            Integer timeout = msg.getHeaderOrElse(Headers.keepOnlineTimeoutSeconds, () -> null);
+            if (null != timeout) {
+                session.setKeepAliveTimeout(Duration.ofSeconds(timeout));
+            } else {
+                Duration defaultTimeout = timeoutSupplier.get();
+                if (null != defaultTimeout) {
+                    session.setKeepAliveTimeout(defaultTimeout);
+                }
+            }
         };
     }
 
     public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
                                                     Function<DeviceOperator, DeviceSession> sessionBuilder) {
 
-        return handleDeviceMessage(message, sessionBuilder, (ignore)->{}, () -> {});
+        return handleDeviceMessage(message,
+                                   sessionBuilder,
+                                   (ignore) -> {
+                                   },
+                                   () -> {
+                                   });
     }
 
     /**
@@ -94,7 +105,7 @@ public class DeviceGatewayHelper {
             //注销会话,这里子设备可能会收到多次离线消息
             //注销会话一次离线,消息网关转发子设备消息一次
             return sessionManager
-                .remove(childrenId, children.getHeaderOrDefault(Headers.clearAllSession))
+                .remove(childrenId, removeSessionOnlyLocal(children))
                 .doOnNext(total -> {
                     if (total > 0) {
                         children.addHeader(Headers.ignore, true);
@@ -160,7 +171,7 @@ public class DeviceGatewayHelper {
         //设备离线消息
         else if (message instanceof DeviceOfflineMessage) {
             return sessionManager
-                .remove(deviceId, message.getHeaderOrDefault(Headers.clearAllSession))
+                .remove(deviceId, removeSessionOnlyLocal(message))
                 .flatMap(l -> {
                     if (l == 0) {
                         return registry
@@ -174,7 +185,9 @@ public class DeviceGatewayHelper {
         }
         //设备上线消息,不发送到messageHandler,防止设备上线存在重复消息
         else if (message instanceof DeviceOnlineMessage) {
-            doHandle = false;
+            doHandle = message
+                .getHeader(Headers.force)
+                .orElse(false);
         }
 
         //忽略会话管理,比如一个设备存在多种接入方式时,其中一种接入方式收到的消息设置忽略会话来防止会话冲突
@@ -280,9 +293,10 @@ public class DeviceGatewayHelper {
         }
         //KeepOnline的连接丢失时(服务重启等操作),设备上线后替换丢失的会话,让其能恢复下行能力。
         if (isKeeOnlineLost(session)) {
+            Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
             after = sessionBuilder
                 .apply(session.getOperator())
-                .doOnNext(((KeepOnlineSession) session)::replaceWith)
+                .map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds)))
                 .then();
         }
         applySessionKeepaliveTimeout(message, session);
@@ -299,6 +313,15 @@ public class DeviceGatewayHelper {
         }
     }
 
+    //是否只移除当前节点中的会话,默认false,表示下行则移除整个集群的会话.
+    //设置addHeader("clearAllSession",false); 表示只移除本地会话
+    private boolean removeSessionOnlyLocal(DeviceMessage message) {
+        return message
+            .getHeader(Headers.clearAllSession)
+            .map(val -> !val)
+            .orElse(false);
+    }
+
     //判断是否需要更新会话
     private static boolean needUpdateSession(DeviceSession session, DeviceMessage message) {
         return isNewKeeOnline(session, message) || isKeeOnlineLost(session);
@@ -306,7 +329,7 @@ public class DeviceGatewayHelper {
 
     //判断是否为新的保持在线消息
     private static boolean isNewKeeOnline(DeviceSession session, DeviceMessage message) {
-        return message.getHeaderOrDefault(Headers.keepOnline) && !(session instanceof KeepOnlineSession);
+        return message.getHeader(Headers.keepOnline).orElse(false) && !(session instanceof KeepOnlineSession);
     }
 
     //判断保持在线的会话是否以及丢失(服务重启后可能出现)