Forráskód Böngészése

优化tcp client 关闭时,释放FluxProcessor

zhou-hao 4 éve
szülő
commit
f6dc39a668

+ 7 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java

@@ -48,6 +48,8 @@ public class VertxTcpClient implements TcpClient {
 
     private final FluxSink<TcpMessage> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
 
+    private final boolean serverClient;
+
     @Override
     public void keepAlive() {
         lastKeepAliveTime = System.currentTimeMillis();
@@ -75,8 +77,9 @@ public class VertxTcpClient implements TcpClient {
         return true;
     }
 
-    public VertxTcpClient(String id) {
+    public VertxTcpClient(String id,boolean serverClient) {
         this.id = id;
+        this.serverClient=serverClient;
     }
 
     protected void received(TcpMessage message) {
@@ -136,6 +139,9 @@ public class VertxTcpClient implements TcpClient {
             execute(runnable);
         }
         disconnectListener.clear();
+        if(serverClient){
+            processor.onComplete();
+        }
     }
 
     public void setClient(NetClient client) {

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java

@@ -43,7 +43,7 @@ public class VertxTcpClientProvider implements NetworkProvider<TcpClientProperti
     @Nonnull
     @Override
     public VertxTcpClient createNetwork(@Nonnull TcpClientProperties properties) {
-        VertxTcpClient client = new VertxTcpClient(properties.getId());
+        VertxTcpClient client = new VertxTcpClient(properties.getId(),false);
 
         initClient(client, properties);
 

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java

@@ -81,7 +81,7 @@ public class VertxTcpServer implements TcpServer {
             socket.close();
             return;
         }
-        VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress());
+        VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress(), true);
         client.setKeepAliveTimeoutMs(keepAliveTimeout);
         try {
             socket.exceptionHandler(err -> {