Bladeren bron

tcp 增加多实例(多工作线程)支持

zhou-hao 5 jaren geleden
bovenliggende
commit
30726eac89

+ 3 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java

@@ -40,6 +40,9 @@ public class TcpServerProperties implements ValueObject {
 
     private boolean ssl;
 
+    //服务实例数量(线程数)
+    private int instance = Runtime.getRuntime().availableProcessors();
+
     private String certId;
 
     public SocketAddress createSocketAddress() {

+ 19 - 13
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java

@@ -17,6 +17,8 @@ import reactor.core.publisher.Mono;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 
 @Component
 @Slf4j
@@ -51,20 +53,24 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
     }
 
     private void initTcpServer(VertxTcpServer tcpServer, TcpServerProperties properties) {
-        NetServer netServer = vertx.createNetServer(properties.getOptions());
-
+        int instance = Math.max(2, properties.getInstance());
+        List<NetServer> instances = new ArrayList<>(instance);
+        for (int i = 0; i < instance; i++) {
+            instances.add(vertx.createNetServer(properties.getOptions()));
+        }
         payloadParserBuilder.build(properties.getParserType(), properties);
-
-        tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(),properties));
-        tcpServer.setServer(netServer);
-        tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));
-        netServer.listen(properties.createSocketAddress(), result -> {
-            if (result.succeeded()) {
-                log.info("tcp server startup on {}", result.result().actualPort());
-            } else {
-                log.error("startup tcp server error", result.cause());
-            }
-        });
+        tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(), properties));
+        tcpServer.setServer(instances);
+        tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout", Duration.ofMinutes(10).toMillis()));
+        for (NetServer netServer : instances) {
+            netServer.listen(properties.createSocketAddress(), result -> {
+                if (result.succeeded()) {
+                    log.info("tcp server startup on {}", result.result().actualPort());
+                } else {
+                    log.error("startup tcp server error", result.cause());
+                }
+            });
+        }
     }
 
     @Override

+ 18 - 11
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java

@@ -11,6 +11,7 @@ import org.jetlinks.community.network.tcp.client.VertxTcpClient;
 import org.jetlinks.community.network.tcp.parser.PayloadParser;
 
 import java.time.Duration;
+import java.util.Collection;
 import java.util.function.Supplier;
 
 /**
@@ -20,8 +21,7 @@ import java.util.function.Supplier;
 @Slf4j
 public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
 
-    @Getter
-    volatile NetServer server;
+    Collection<NetServer> tcpServers;
 
     private Supplier<PayloadParser> parserSupplier;
 
@@ -47,14 +47,19 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
         this.parserSupplier = parserSupplier;
     }
 
-    public void setServer(NetServer server) {
-        if (this.server != null && this.server != server) {
-            this.server.close();
+    public void setServer(Collection<NetServer> mqttServer) {
+        if (this.tcpServers != null && !this.tcpServers.isEmpty()) {
+            shutdown();
+        }
+        this.tcpServers = mqttServer;
+
+        for (NetServer tcpServer : this.tcpServers) {
+            tcpServer.connectHandler(this::acceptTcpConnection);
         }
-        this.server = server;
-        this.server.connectHandler(this::acceptTcpConnection);
+
     }
 
+
     protected void acceptTcpConnection(NetSocket socket) {
         VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress());
         client.setKeepAliveTimeoutMs(keepAliveTimeout);
@@ -82,15 +87,17 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
 
     @Override
     public void shutdown() {
-        if (null != server) {
-            execute(server::close);
-            server = null;
+        if (null != tcpServers) {
+            for (NetServer tcpServer : tcpServers) {
+                execute(tcpServer::close);
+            }
+            tcpServers = null;
         }
     }
 
     @Override
     public boolean isAlive() {
-        return server != null;
+        return tcpServers != null;
     }
 
     @Override