Ver código fonte

优化tcp解析

zhouhao 5 anos atrás
pai
commit
a51516d4d1

+ 8 - 7
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java

@@ -82,6 +82,7 @@ public class VertxTcpClient extends AbstractTcpClient {
 
     @Override
     public void shutdown() {
+        log.debug("tcp client [{}] disconnect", getId());
         if (null != client) {
             execute(client::close);
             client = null;
@@ -94,6 +95,9 @@ public class VertxTcpClient extends AbstractTcpClient {
             execute(payloadParser::close);
             payloadParser = null;
         }
+        for (Runnable runnable : disconnectListener) {
+            execute(runnable);
+        }
         disconnectListener.clear();
     }
 
@@ -110,9 +114,9 @@ public class VertxTcpClient extends AbstractTcpClient {
         }
         this.payloadParser = payloadParser;
         this.payloadParser
-                .handlePayload()
-                .onErrorContinue((err, res) ->log.error(err.getMessage(),err))
-                .subscribe(buffer -> received(new TcpMessage(buffer.getByteBuf())));
+            .handlePayload()
+            .onErrorContinue((err, res) -> log.error(err.getMessage(), err))
+            .subscribe(buffer -> received(new TcpMessage(buffer.getByteBuf())));
     }
 
     public void setSocket(NetSocket socket) {
@@ -122,10 +126,7 @@ public class VertxTcpClient extends AbstractTcpClient {
         }
         this.socket = socket;
         this.socket.closeHandler(v -> {
-            for (Runnable runnable : disconnectListener) {
-                runnable.run();
-            }
-            disconnectListener.clear();
+            shutdown();
         });
         this.socket.handler(buffer -> {
             keepAlive();

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java

@@ -147,7 +147,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
 
                             @Override
                             public DeviceOperator getDevice() {
-                                throw new UnsupportedOperationException();
+                                return null;
                             }
                         })))
                     .cast(DeviceMessage.class)

+ 7 - 6
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/ScriptPayloadParserBuilder.java

@@ -23,11 +23,11 @@ public class ScriptPayloadParserBuilder implements PayloadParserBuilderStrategy
     @SneakyThrows
     public PayloadParser build(Values config) {
         String script = config.getValue("script")
-                .map(Value::asString)
-                .orElseThrow(() -> new IllegalArgumentException("script不能为空"));
+            .map(Value::asString)
+            .orElseThrow(() -> new IllegalArgumentException("script不能为空"));
         String lang = config.getValue("lang")
-                .map(Value::asString)
-                .orElseThrow(() -> new IllegalArgumentException("lang不能为空"));
+            .map(Value::asString)
+            .orElseThrow(() -> new IllegalArgumentException("lang不能为空"));
 
         DynamicScriptEngine engine = DynamicScriptEngineFactory.getEngine(lang);
         if (engine == null) {
@@ -35,8 +35,9 @@ public class ScriptPayloadParserBuilder implements PayloadParserBuilderStrategy
         }
         PipePayloadParser parser = new PipePayloadParser();
         String id = DigestUtils.md5Hex(script);
-
-        engine.compile(id, script);
+        if (!engine.compiled(id)) {
+            engine.compile(id, script);
+        }
         Map<String, Object> ctx = new HashMap<>();
         ctx.put("parser", parser);
         engine.execute(id, ctx).getIfSuccess();

+ 3 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java

@@ -52,6 +52,9 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
 
     private void initTcpServer(VertxTcpServer tcpServer, TcpServerProperties properties) {
         NetServer netServer = vertx.createNetServer(properties.getOptions());
+
+        payloadParserBuilder.build(properties.getParserType(), Values.of(properties.getParserConfiguration()));
+
         tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(), Values.of(properties.getParserConfiguration())));
         tcpServer.setServer(netServer);
         tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java

@@ -62,7 +62,7 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
             socket.exceptionHandler(err -> {
                 log.error("tcp server client [{}] error", socket.remoteAddress(), err);
             }).closeHandler((nil) -> {
-                log.info("tcp server client [{}] closed", socket.remoteAddress());
+                log.debug("tcp server client [{}] closed", socket.remoteAddress());
                 client.shutdown();
             });
             client.setRecordParser(parserSupplier.get());