Browse Source

优化设备会话处理

zhouhao 3 năm trước cách đây
mục cha
commit
a9594d28c4

+ 1 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java

@@ -112,7 +112,7 @@ public class MqttClientDeviceGateway extends AbstractDeviceGateway {
                         return helper
                             .handleDeviceMessage(message,
                                                  device -> createDeviceSession(device, mqttClient),
-                                                 DeviceGatewayHelper.applySessionKeepaliveTimeout(message, timeoutRef::get),
+                                                 ignore->{},
                                                  () -> log.warn("无法从MQTT[{}]消息中获取设备信息:{}", mqttMessage.print(), message)
                             );
                     })

+ 170 - 142
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java

@@ -24,12 +24,12 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
- * 设备网关处理工具
- * <p>
- * 封装常用的设备消息处理操作
- * </p>
+ * 设备网关消息处理,会话管理工具类,用于统一封装对设备消息和会话的处理逻辑
  *
  * @author zhouhao
+ * @see DeviceRegistry
+ * @see DecodedClientMessageHandler
+ * @since 1.5
  */
 @AllArgsConstructor
 public class DeviceGatewayHelper {
@@ -38,20 +38,20 @@ public class DeviceGatewayHelper {
     private final DeviceSessionManager sessionManager;
     private final DecodedClientMessageHandler messageHandler;
 
-
+    @Deprecated
     public static Consumer<DeviceSession> applySessionKeepaliveTimeout(DeviceMessage msg, Supplier<Duration> timeoutSupplier) {
         return session -> {
-            //从消息头里获取keepOnlineTimeoutSeconds来设置会话有效期
-            Duration timeout = msg
-                .getHeader(Headers.keepOnlineTimeoutSeconds)
-                .map(Duration::ofSeconds)
-                .orElseGet(timeoutSupplier);
-            if (null != timeout) {
-                session.setKeepAliveTimeout(timeout);
-            }
+            //do nothing
+
         };
     }
 
+    public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
+                                                    Function<DeviceOperator, DeviceSession> sessionBuilder) {
+
+        return handleDeviceMessage(message, sessionBuilder, (ignore)->{}, () -> {});
+    }
+
     /**
      * 处理设备消息
      *
@@ -69,14 +69,15 @@ public class DeviceGatewayHelper {
         return handleDeviceMessage(message, sessionBuilder, sessionConsumer, () -> Mono.fromRunnable(deviceNotFoundCallback));
     }
 
-    protected Mono<Void> handleChildrenDeviceMessage(String deviceId, DeviceMessage children) {
+    private Mono<Void> handleChildrenDeviceMessage(String deviceId, DeviceMessage children) {
         //设备状态检查,断开设备连接的消息都忽略
         //这些消息属于状态管理,通常是用来自定义子设备状态的,所以这些消息都忽略处理会话
         if (deviceId == null
             || children instanceof DeviceStateCheckMessage
             || children instanceof DeviceStateCheckMessageReply
             || children instanceof DisconnectDeviceMessage
-            || children instanceof DisconnectDeviceMessageReply) {
+            || children instanceof DisconnectDeviceMessageReply
+            || children.getHeaderOrDefault(Headers.ignoreSession)) {
             return Mono.empty();
         }
         //子设备回复失败的也忽略
@@ -105,19 +106,14 @@ public class DeviceGatewayHelper {
             if (children instanceof DeviceOnlineMessage) {
                 children.addHeader(Headers.ignore, true);
             }
-            Mono<DeviceSession> registerSession = sessionManager
+            //子设备会话处理
+            Mono<DeviceSession> sessionHandler = sessionManager
                 .getSession(deviceId)
-                .flatMap(parentSession -> sessionManager
-                    .compute(childrenId, old -> old
-                        .switchIfEmpty(Mono.defer(() -> registry
-                            .getDevice(childrenId)
-                            .map(child -> new ChildrenDeviceSession(childrenId, parentSession, child)))))
-                )
-                .doOnNext(session -> {
-                    session.keepAlive();
-                    applySessionKeepaliveTimeout(children, () -> null)
-                        .accept(session);
-                });
+                .flatMap(parentSession -> this
+                    .createOrUpdateSession(childrenId,
+                                           children,
+                                           child -> Mono.just(new ChildrenDeviceSession(childrenId, parentSession, child)),
+                                           Mono::empty));
 
 
             //子设备注册
@@ -132,10 +128,10 @@ public class DeviceGatewayHelper {
                                   .getSelfConfig(DeviceConfigKey.selfManageState)
                                   .defaultIfEmpty(false)
                                   .filter(Boolean.FALSE::equals))
-                              .flatMap(ignore -> registerSession))
+                              .flatMap(ignore -> sessionHandler))
                     .then();
             }
-            return registerSession.then();
+            return sessionHandler.then();
         }
     }
 
@@ -144,20 +140,22 @@ public class DeviceGatewayHelper {
                                                     Function<DeviceSession, Mono<Void>> sessionConsumer,
                                                     Supplier<Mono<DeviceOperator>> deviceNotFoundCallback) {
         String deviceId = message.getDeviceId();
-        if (StringUtils.isEmpty(deviceId)) {
+        if (!StringUtils.hasText(deviceId)) {
             return Mono.empty();
         }
-        Mono<Void> then = Mono.empty();
+        Mono<DeviceOperator> then = null;
         boolean doHandle = true;
         //子设备消息
         if (message instanceof ChildDeviceMessage) {
             DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
-            then = handleChildrenDeviceMessage(deviceId, childrenMessage);
+            then = handleChildrenDeviceMessage(deviceId, childrenMessage)
+                .then(registry.getDevice(deviceId));
         }
         //子设备消息回复
         else if (message instanceof ChildDeviceMessageReply) {
             DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage();
-            then = handleChildrenDeviceMessage(deviceId, childrenMessage);
+            then = handleChildrenDeviceMessage(deviceId, childrenMessage)
+                .then(registry.getDevice(deviceId));
         }
         //设备离线消息
         else if (message instanceof DeviceOfflineMessage) {
@@ -171,119 +169,156 @@ public class DeviceGatewayHelper {
                     }
                     return Mono.empty();
                 })
-                .then(
-                    registry.getDevice(deviceId)
-                )
+                .then(registry.getDevice(deviceId))
                 .contextWrite(Context.of(DeviceMessage.class, message));
         }
-        //设备在线消
+        //设备上线消息,不发送到messageHandler,防止设备上线存在重复消
         else if (message instanceof DeviceOnlineMessage) {
-
             doHandle = false;
         }
 
-        boolean fHandle = doHandle;
-        return sessionManager
-            .compute(deviceId, (old) -> old
-                .map(session -> {
-                    //会话已存在
-                    Mono<Void> after = Mono.empty();
-                    //没有忽略会话
-                    if (!message.getHeader(Headers.ignoreSession).orElse(false)) {
-                        //消息中指定保存在线
-                        if (message.getHeader(Headers.keepOnline).orElse(false)
-                            && !(session instanceof KeepOnlineSession)) {
-                            Duration timeout = message
-                                .getHeader(Headers.keepOnlineTimeoutSeconds)
-                                .map(Duration::ofSeconds)
-                                .orElse(Duration.ofHours(1));
-                            //替换session
-                            session = new KeepOnlineSession(session, timeout);
-                        }
-                        //KeepOnline的连接丢失时,重新创建会话,并替换丢失的会话。
-                        if (session.isWrapFrom(KeepOnlineSession.class) && session.isWrapFrom(LostDeviceSession.class)) {
-                            after = sessionBuilder
-                                .apply(session.getOperator())
-                                .doOnNext(session.unwrap(KeepOnlineSession.class)::replaceWith)
-                                .then();
-                        }
-                        after = after.then(
-                            sessionConsumer.apply(session)
-                        );
-
-                    }
-                    session.keepAlive();
-                    if (fHandle) {
-                        //处理消息
+        //忽略会话管理,比如一个设备存在多种接入方式时,其中一种接入方式收到的消息设置忽略会话来防止会话冲突
+        if (message.getHeaderOrDefault(Headers.ignoreSession)) {
+            return registry
+                .getDevice(deviceId)
+                .flatMap(device -> {
+                    if (!isDoRegister(message)) {
                         return messageHandler
-                            .handleMessage(session.getOperator(), message)
-                            .then(after)
-                            .thenReturn(session);
+                            .handleMessage(device, message)
+                            .thenReturn(device);
                     }
-                    return after
-                        .thenReturn(session);
-                })
-                .defaultIfEmpty(Mono.defer(() -> registry
-                    .getDevice(deviceId)
-                    .switchIfEmpty(Mono.defer(() -> {
-                        //设备注册
-                        if (isDoRegister(message)) {
-                            return messageHandler
-                                .handleMessage(null, message)
-                                //延迟2秒后尝试重新获取设备并上线
-                                .then(Mono.delay(Duration.ofSeconds(2)))
-                                .then(registry.getDevice(deviceId));
-                        }
-                        if (deviceNotFoundCallback != null) {
-                            return deviceNotFoundCallback.get();
-                        }
-                        return Mono.empty();
-                    }))
-                    .flatMap(device -> {
-                        //忽略会话管理,比如一个设备存在多种接入方式时,其中一种接入方式收到的消息设置忽略会话来防止会话冲突
-                        if (message.getHeader(Headers.ignoreSession).orElse(false)) {
-                            if (!isDoRegister(message)) {
-                                return messageHandler
-                                    .handleMessage(device, message)
-                                    .then(Mono.empty());
-                            }
-                            return Mono.empty();
-                        }
-                        return sessionBuilder
-                            .apply(device)
-                            .flatMap(newSession -> {
-                                //保持会话,在低功率设备上,可能无法保持长连接.
-                                if (message.getHeader(Headers.keepOnline).orElse(false)) {
-                                    int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
-                                    newSession = new KeepOnlineSession(newSession, Duration.ofSeconds(timeout));
-                                }
-                                //执行自定义会话回调
-                                sessionConsumer.apply(newSession);
-                                //保活
-                                newSession.keepAlive();
-                                if (!(message instanceof DeviceRegisterMessage) &&
-                                    !(message instanceof DeviceOnlineMessage)) {
-                                    return
-                                        sessionConsumer
-                                            .apply(newSession)
-                                            .then(
-                                                messageHandler
-                                                    .handleMessage(device, message)
-                                            )
-                                            .thenReturn(newSession);
-                                } else {
-                                    return sessionConsumer
-                                        .apply(newSession)
-                                        .thenReturn(newSession);
-                                }
-                            });
-                    })))
-                .flatMap(Function.identity()))
+                    return Mono.just(device);
+                });
+
+        }
+
+        if (then == null) {
+            then = registry.getDevice(deviceId);
+        }
+
+        if (doHandle) {
+            then = then.flatMap(opt -> messageHandler.handleMessage(opt, message).thenReturn(opt));
+        }
+
+        return this
+            .createOrUpdateSession(deviceId, message, sessionBuilder, deviceNotFoundCallback)
+            .flatMap(sessionConsumer)
             .then(then)
-            .then(registry.getDevice(deviceId))
-            .contextWrite(Context.of(DeviceMessage.class,message));
+            .contextWrite(Context.of(DeviceMessage.class, message));
+
+    }
+
+    private Mono<DeviceSession> createOrUpdateSession(String deviceId,
+                                                      DeviceMessage message,
+                                                      Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
+                                                      Supplier<Mono<DeviceOperator>> deviceNotFoundCallback) {
+        return sessionManager
+            .getSession(deviceId)
+            .map(old -> {
+                //需要更新会话时才进行更新
+                if (needUpdateSession(old, message)) {
+                    return sessionManager
+                        .compute(deviceId, null, session -> updateSession(session, message, sessionBuilder));
+                }
+                applySessionKeepaliveTimeout(message, old);
+                old.keepAlive();
+                return Mono.just(old);
+            })
+            //会话不存在则尝试创建或者更新
+            .defaultIfEmpty(Mono.defer(() -> sessionManager
+                .compute(deviceId,
+                         registerNewSession(
+                             deviceId,
+                             message,
+                             sessionBuilder,
+                             () -> {
+                                 //设备注册
+                                 if (isDoRegister(message)) {
+                                     return messageHandler
+                                         .handleMessage(null, message)
+                                         //延迟2秒后尝试重新获取设备并上线
+                                         .then(Mono.delay(Duration.ofSeconds(2)))
+                                         .then(registry.getDevice(deviceId));
+                                 }
+                                 if (deviceNotFoundCallback != null) {
+                                     return deviceNotFoundCallback.get();
+                                 }
+                                 return Mono.empty();
+                             }),
+                         session -> updateSession(session, message, sessionBuilder))))
+            .flatMap(Function.identity());
+    }
+
+    private Mono<DeviceSession> registerNewSession(String deviceId,
+                                                   DeviceMessage message,
+                                                   Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
+                                                   Supplier<Mono<DeviceOperator>> deviceNotFoundCallback) {
+        return registry
+            .getDevice(deviceId)
+            .switchIfEmpty(Mono.defer(deviceNotFoundCallback))
+            .flatMap(device -> sessionBuilder
+                .apply(device)
+                .map(newSession -> {
+                    //保持在线,在低功率设备上,可能无法保持长连接,通过keepOnline的header来标识让设备保持在线
+                    if (message.getHeader(Headers.keepOnline).orElse(false)) {
+                        int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
+                        newSession = new KeepOnlineSession(newSession, Duration.ofSeconds(timeout));
+                    }
+                    return newSession;
+                }));
+    }
+
+    private Mono<DeviceSession> updateSession(DeviceSession session,
+                                              DeviceMessage message,
+                                              Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder) {
+        Mono<Void> after = null;
+        //消息中指定保持在线,并且之前的会话不是保持在线,则需要替换之前的会话
+        if (isNewKeeOnline(session, message)) {
+            Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
+            //替换session
+            session = new KeepOnlineSession(session, Duration.ofSeconds(timeoutSeconds));
+        }
+        //KeepOnline的连接丢失时(服务重启等操作),设备上线后替换丢失的会话,让其能恢复下行能力。
+        if (isKeeOnlineLost(session)) {
+            after = sessionBuilder
+                .apply(session.getOperator())
+                .doOnNext(((KeepOnlineSession) session)::replaceWith)
+                .then();
+        }
+        applySessionKeepaliveTimeout(message, session);
+        session.keepAlive();
+        return after == null
+            ? Mono.just(session)
+            : after.thenReturn(session);
+    }
+
+    private static void applySessionKeepaliveTimeout(DeviceMessage msg, DeviceSession session) {
+        Integer timeout = msg.getHeaderOrElse(Headers.keepOnlineTimeoutSeconds, () -> null);
+        if (null != timeout) {
+            session.setKeepAliveTimeout(Duration.ofSeconds(timeout));
+        }
+    }
+
+    //判断是否需要更新会话
+    private static boolean needUpdateSession(DeviceSession session, DeviceMessage message) {
+        return isNewKeeOnline(session, message) || isKeeOnlineLost(session);
+    }
+
+    //判断是否为新的保持在线消息
+    private static boolean isNewKeeOnline(DeviceSession session, DeviceMessage message) {
+        return message.getHeaderOrDefault(Headers.keepOnline) && !(session instanceof KeepOnlineSession);
+    }
 
+    //判断保持在线的会话是否以及丢失(服务重启后可能出现)
+    private static boolean isKeeOnlineLost(DeviceSession session) {
+        return session instanceof KeepOnlineSession && session.isWrapFrom(LostDeviceSession.class);
+    }
 
+    //判断是否为设备注册
+    private static boolean isDoRegister(DeviceMessage message) {
+        return message instanceof DeviceRegisterMessage
+            && message.getHeader(PropertyConstants.deviceName).isPresent()
+            && message.getHeader(PropertyConstants.productId).isPresent();
     }
 
 
@@ -313,12 +348,5 @@ public class DeviceGatewayHelper {
 
     }
 
-    private boolean isDoRegister(DeviceMessage message) {
-        return message instanceof DeviceRegisterMessage
-            && message.getHeader(PropertyConstants.deviceName).isPresent()
-            && message.getHeader(PropertyConstants.productId).isPresent();
-    }
-
-
 
 }

+ 8 - 10
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java

@@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 
 @Slf4j(topic = "system.tcp.gateway")
-class TcpServerDeviceGateway extends AbstractDeviceGateway implements  MonitorSupportDeviceGateway {
+class TcpServerDeviceGateway extends AbstractDeviceGateway implements MonitorSupportDeviceGateway {
 
     private final TcpServer tcpServer;
 
@@ -161,13 +161,11 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements  MonitorSu
             return helper
                 .handleDeviceMessage(message,
                                      device -> new TcpDeviceSession(device, client, getTransport(), monitor),
-                                     DeviceGatewayHelper
-                                         .applySessionKeepaliveTimeout(message, keepaliveTimeout::get)
-                                         .andThen(session -> {
-                                             TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class);
-                                             deviceSession.setClient(client);
-                                             sessionRef.set(deviceSession);
-                                         }),
+                                     session -> {
+                                         TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class);
+                                         deviceSession.setClient(client);
+                                         sessionRef.set(deviceSession);
+                                     },
                                      () -> log.warn("TCP{}: The device[{}] in the message body does not exist:{}", address, message.getDeviceId(), message)
                 )
                 .thenReturn(message);
@@ -190,7 +188,7 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements  MonitorSu
     }
 
     private void doStart() {
-        if ( disposable != null) {
+        if (disposable != null) {
             disposable.dispose();
         }
         disposable = tcpServer
@@ -214,7 +212,7 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements  MonitorSu
 
     @Override
     protected Mono<Void> doShutdown() {
-        if(disposable!=null){
+        if (disposable != null) {
             disposable.dispose();
         }
         return Mono.empty();