ソースを参照

Merge pull request #7 from jetlinks/1.2

tcp server没有消费者时拒绝连接
老周 5 年 前
コミット
98cefd9e71

+ 23 - 3
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java

@@ -7,11 +7,16 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.network.tcp.client.TcpClient;
 import org.jetlinks.community.network.tcp.client.VertxTcpClient;
 import org.jetlinks.community.network.tcp.parser.PayloadParser;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
 
 import java.time.Duration;
 import java.util.Collection;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
@@ -19,7 +24,7 @@ import java.util.function.Supplier;
  * @since 1.0
  **/
 @Slf4j
-public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
+public class VertxTcpServer implements TcpServer {
 
     Collection<NetServer> tcpServers;
 
@@ -29,12 +34,22 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
     private long keepAliveTimeout = Duration.ofMinutes(10).toMillis();
 
     @Getter
-    private String id;
+    private final String id;
+
+    private final EmitterProcessor<TcpClient> processor = EmitterProcessor.create(false);
+
+    private final FluxSink<TcpClient> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
 
     public VertxTcpServer(String id) {
         this.id = id;
     }
 
+    @Override
+    public Flux<TcpClient> handleConnection() {
+        return processor
+            .map(Function.identity());
+    }
+
     private void execute(Runnable runnable) {
         try {
             runnable.run();
@@ -61,6 +76,11 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
 
 
     protected void acceptTcpConnection(NetSocket socket) {
+        if (!processor.hasDownstreams()) {
+            log.warn("not handler for tcp client[{}]", socket.remoteAddress());
+            socket.close();
+            return;
+        }
         VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress());
         client.setKeepAliveTimeoutMs(keepAliveTimeout);
         try {
@@ -72,7 +92,7 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
             });
             client.setRecordParser(parserSupplier.get());
             client.setSocket(socket);
-            received(client);
+            sink.next(client);
             log.debug("accept tcp client [{}] connection", socket.remoteAddress());
         } catch (Exception e) {
             log.error("create tcp server client error", e);