Explorar el Código

优化tcp网关

zhou-hao hace 5 años
padre
commit
0e37ee7f31

+ 2 - 3
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java

@@ -125,9 +125,8 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                 });
                 AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
                 AtomicReference<DeviceSession> sessionRef = new AtomicReference<>(sessionManager.getSession(client.getId()));
-                client.subscribe()
+                disposable.add(client.subscribe()
                     .filter(r -> started.get())
-                    .takeWhile(r -> disposable != null)
                     .doOnNext(r -> {
                         log.debug("收到TCP报文:\n{}", r);
                         gatewayMonitor.receivedMessage();
@@ -226,7 +225,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                                 , err))))
                     .onErrorResume((err) -> Mono.empty())
                     .subscriberContext(ReactiveLogger.start("network", tcpServer.getId()))
-                    .subscribe();
+                    .subscribe());
             }));
     }