zhou-hao 5 năm trước cách đây
mục cha
commit
7d116896fa

+ 3 - 2
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java

@@ -125,8 +125,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                 });
                 AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
                 AtomicReference<DeviceSession> sessionRef = new AtomicReference<>(sessionManager.getSession(client.getId()));
-                disposable.add(client.subscribe()
+                client.subscribe()
                     .filter(r -> started.get())
+                    .takeWhile(r -> !disposable.isEmpty())
                     .doOnNext(r -> {
                         log.debug("收到TCP报文:\n{}", r);
                         gatewayMonitor.receivedMessage();
@@ -225,7 +226,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                                 , err))))
                     .onErrorResume((err) -> Mono.empty())
                     .subscriberContext(ReactiveLogger.start("network", tcpServer.getId()))
-                    .subscribe());
+                    .subscribe();
             }));
     }