浏览代码

优化TCP

zhouhao 5 年之前
父节点
当前提交
f934758be5

+ 9 - 2
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClientProperties.java

@@ -2,16 +2,19 @@ package org.jetlinks.community.network.tcp.client;
 
 import io.vertx.core.net.NetClientOptions;
 import lombok.*;
+import org.jetlinks.community.ValueObject;
 import org.jetlinks.community.network.tcp.parser.PayloadParserType;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 @Getter
 @Setter
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class TcpClientProperties {
+public class TcpClientProperties implements ValueObject {
 
     private String id;
 
@@ -25,10 +28,14 @@ public class TcpClientProperties {
 
     private PayloadParserType parserType;
 
-    private Map<String,Object> parserConfiguration;
+    private Map<String, Object> parserConfiguration = new HashMap<>();
 
     private NetClientOptions options;
 
     private boolean enabled;
 
+    @Override
+    public Optional<Object> get(String name) {
+        return Optional.ofNullable(parserConfiguration).map(map -> map.get(name));
+    }
 }

+ 2 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java

@@ -16,6 +16,7 @@ import reactor.core.publisher.Mono;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.time.Duration;
 
 @Component
 @Slf4j
@@ -58,6 +59,7 @@ public class VertxTcpClientProvider implements NetworkProvider<TcpClientProperti
         client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), Values.of(properties.getParserConfiguration())));
         NetClient netClient = vertx.createNetClient(properties.getOptions());
         client.setClient(netClient);
+        client.setKeepAliveTimeout(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));
         netClient.connect(properties.getPort(), properties.getHost(), result -> {
             if (result.succeeded()) {
                 log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort());

+ 10 - 3
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java

@@ -3,11 +3,14 @@ package org.jetlinks.community.network.tcp.server;
 import io.vertx.core.net.NetServerOptions;
 import io.vertx.core.net.SocketAddress;
 import lombok.*;
+import org.jetlinks.community.ValueObject;
 import org.jetlinks.community.network.tcp.parser.PayloadParserType;
 import org.jetlinks.rule.engine.executor.PayloadType;
 import org.springframework.util.StringUtils;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * @author bsetfeng
@@ -19,7 +22,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class TcpServerProperties {
+public class TcpServerProperties implements ValueObject {
 
     private String id;
 
@@ -29,7 +32,7 @@ public class TcpServerProperties {
 
     private PayloadParserType parserType;
 
-    private Map<String, Object> parserConfiguration;
+    private Map<String, Object> parserConfiguration = new HashMap<>();
 
     private String host;
 
@@ -39,11 +42,15 @@ public class TcpServerProperties {
 
     private String certId;
 
-
     public SocketAddress createSocketAddress() {
         if (StringUtils.isEmpty(host)) {
             host = "localhost";
         }
         return SocketAddress.inetSocketAddress(port, host);
     }
+
+    @Override
+    public Optional<Object> get(String name) {
+        return Optional.ofNullable(parserConfiguration).map(map -> map.get(name));
+    }
 }

+ 7 - 4
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java

@@ -16,6 +16,7 @@ import reactor.core.publisher.Mono;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.time.Duration;
 
 @Component
 @Slf4j
@@ -45,6 +46,7 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
 
         VertxTcpServer tcpServer = new VertxTcpServer(properties.getId());
         initTcpServer(tcpServer, properties);
+
         return tcpServer;
     }
 
@@ -52,6 +54,7 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
         NetServer netServer = vertx.createNetServer(properties.getOptions());
         tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(), Values.of(properties.getParserConfiguration())));
         tcpServer.setServer(netServer);
+        tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));
         netServer.listen(properties.createSocketAddress(), result -> {
             if (result.succeeded()) {
                 log.info("tcp server startup on {}", result.result().actualPort());
@@ -86,10 +89,10 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
             if (config.isSsl()) {
                 config.getOptions().setSsl(true);
                 return certificateManager.getCertificate(config.getCertId())
-                        .map(VertxKeyCertTrustOptions::new)
-                        .doOnNext(config.getOptions()::setKeyCertOptions)
-                        .doOnNext(config.getOptions()::setTrustOptions)
-                        .thenReturn(config);
+                    .map(VertxKeyCertTrustOptions::new)
+                    .doOnNext(config.getOptions()::setKeyCertOptions)
+                    .doOnNext(config.getOptions()::setTrustOptions)
+                    .thenReturn(config);
             }
             return Mono.just(config);
         });

+ 6 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java

@@ -3,12 +3,14 @@ package org.jetlinks.community.network.tcp.server;
 import io.vertx.core.net.NetServer;
 import io.vertx.core.net.NetSocket;
 import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.tcp.client.VertxTcpClient;
 import org.jetlinks.community.network.tcp.parser.PayloadParser;
 
+import java.time.Duration;
 import java.util.function.Supplier;
 
 /**
@@ -23,6 +25,9 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
 
     private Supplier<PayloadParser> parserSupplier;
 
+    @Setter
+    private long keepAliveTimeout = Duration.ofMinutes(10).toMillis();
+
     @Getter
     private String id;
 
@@ -52,6 +57,7 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
 
     protected void acceptTcpConnection(NetSocket socket) {
         VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress());
+        client.setKeepAliveTimeout(keepAliveTimeout);
         try {
             socket.exceptionHandler(err -> {
                 log.error("tcp server client [{}] error", socket.remoteAddress(), err);