瀏覽代碼

优化网络组件调试

zhou-hao 5 年之前
父節點
當前提交
af5d3a1f53

+ 1 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java

@@ -110,6 +110,7 @@ public class VertxTcpClient extends AbstractTcpClient {
         if (this.client != null && this.client != client) {
             this.client.close();
         }
+        keepAlive();
         this.client = client;
     }
 

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java

@@ -56,13 +56,13 @@ public class VertxTcpClientProvider implements NetworkProvider<TcpClientProperti
     }
 
     public void initClient(VertxTcpClient client, TcpClientProperties properties) {
-        client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), properties));
         NetClient netClient = vertx.createNetClient(properties.getOptions());
         client.setClient(netClient);
         client.setKeepAliveTimeoutMs(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));
         netClient.connect(properties.getPort(), properties.getHost(), result -> {
             if (result.succeeded()) {
                 log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort());
+                client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), properties));
                 client.setSocket(result.result());
             } else {
                 log.error("connect tcp [{}:{}] error", properties.getHost(), properties.getPort(),result.cause());

+ 22 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DebugUtils.java

@@ -0,0 +1,22 @@
+package org.jetlinks.community.network.manager.debug;
+
+import io.netty.buffer.ByteBufUtil;
+import org.springframework.util.StringUtils;
+
+public class DebugUtils {
+
+    static byte[] stringToBytes(String text){
+        byte[] payload;
+        if (StringUtils.isEmpty(text)) {
+            payload = new byte[0];
+        } else {
+            if (text.startsWith("0x")) {
+                payload = ByteBufUtil.decodeHexDump(text, 2, text.length()-2);
+            } else {
+                payload = text.getBytes();
+            }
+        }
+        return payload;
+    }
+
+}

+ 82 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpClientDebugSubscriptionProvider.java

@@ -0,0 +1,82 @@
+package org.jetlinks.community.network.manager.debug;
+
+import io.netty.buffer.Unpooled;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import org.jetlinks.community.network.tcp.client.TcpClient;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+
+@Component
+public class TcpClientDebugSubscriptionProvider implements SubscriptionProvider {
+
+    private final NetworkManager networkManager;
+
+    public TcpClientDebugSubscriptionProvider(NetworkManager networkManager) {
+        this.networkManager = networkManager;
+    }
+
+    @Override
+    public String id() {
+        return "network-tcp-client-debug";
+    }
+
+    @Override
+    public String name() {
+        return "TCP客户端调试";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/network/tcp/client/*/_send",
+            "/network/tcp/client/*/_subscribe"
+        };
+    }
+
+    @Override
+    public Flux<String> subscribe(SubscribeRequest request) {
+        String id = request.getTopic().split("[/]")[4];
+        if (request.getTopic().endsWith("_send")) {
+            return send(id, request);
+        } else {
+            return subscribe(id, request);
+        }
+    }
+
+    public Flux<String> send(String id, SubscribeRequest request) {
+        String message = request.getString("request")
+            .orElseThrow(() -> new IllegalArgumentException("参数[request]不能为空"));
+
+        byte[] payload=DebugUtils.stringToBytes(message);
+
+        return networkManager
+            .<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT, id)
+            .flatMap(client -> client.send(new TcpMessage(Unpooled.wrappedBuffer(payload))))
+            .thenReturn("推送成功")
+            .flux();
+    }
+
+    @SuppressWarnings("all")
+    public Flux<String> subscribe(String id, SubscribeRequest request) {
+        String message = request.getString("response").filter(StringUtils::hasText).orElse(null);
+
+        byte[] payload =DebugUtils.stringToBytes(message);
+
+        return networkManager
+            .<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT, id)
+            .flatMapMany(client -> client
+                .subscribe()
+                .flatMap(msg -> client
+                    .send(new TcpMessage(Unpooled.wrappedBuffer(payload)))
+                    .thenReturn(msg))
+                .map(TcpMessage::toString)
+            );
+    }
+
+
+}

+ 128 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpServerDebugSubscriptionProvider.java

@@ -0,0 +1,128 @@
+package org.jetlinks.community.network.manager.debug;
+
+import io.netty.buffer.Unpooled;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import org.jetlinks.community.network.tcp.client.TcpClient;
+import org.jetlinks.community.network.tcp.server.TcpServer;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+public class TcpServerDebugSubscriptionProvider implements SubscriptionProvider {
+
+    private final NetworkManager networkManager;
+
+    public TcpServerDebugSubscriptionProvider(NetworkManager networkManager) {
+        this.networkManager = networkManager;
+    }
+
+    @Override
+    public String id() {
+        return "network-tcp-server-debug";
+    }
+
+    @Override
+    public String name() {
+        return "TCP服务调试";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/network/tcp/server/*/_subscribe"
+        };
+    }
+
+    @Override
+    public Flux<TcpClientMessage> subscribe(SubscribeRequest request) {
+        String id = request.getTopic().split("[/]")[4];
+
+        return subscribe(id, request);
+    }
+
+    @SuppressWarnings("all")
+    public Flux<TcpClientMessage> subscribe(String id, SubscribeRequest request) {
+
+        String message = request.getString("response").filter(StringUtils::hasText).orElse(null);
+
+        byte[] payload = DebugUtils.stringToBytes(message);
+
+        return Flux.create(sink ->
+            sink.onDispose(networkManager
+                .<TcpServer>getNetwork(DefaultNetworkType.TCP_SERVER, id)
+                .flatMap(server ->
+                    server
+                        .handleConnection()
+                        .doOnNext(client -> sink.next(TcpClientMessage.of(client)))
+                        .flatMap(client -> {
+                            client.onDisconnect(() -> {
+                                sink.next(TcpClientMessage.ofDisconnect(client));
+                            });
+                            return client
+                                .subscribe()
+                                .map(msg -> TcpClientMessage.of(client, msg))
+                                .doOnNext(sink::next)
+                                .flatMap(msg -> {
+                                    if (payload.length > 0) {
+                                        return client.send(new TcpMessage(Unpooled.wrappedBuffer(payload)));
+                                    }
+                                    return Mono.empty();
+                                })
+                                .then();
+                        })
+                        .then()
+                )
+                .doOnError(sink::error)
+                .subscriberContext(sink.currentContext())
+                .subscribe()
+            ));
+    }
+
+
+    @AllArgsConstructor(staticName = "of")
+    @Getter
+    @Setter
+    public static class TcpClientMessage {
+        private String type;
+
+        private String typeText;
+
+        private Object data;
+
+        public static TcpClientMessage of(TcpClient client) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("address", client.getRemoteAddress());
+
+            return TcpClientMessage.of("connection", "连接", data);
+        }
+
+        public static TcpClientMessage ofDisconnect(TcpClient client) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("address", client.getRemoteAddress());
+
+            return TcpClientMessage.of("disconnection", "断开连接", data);
+        }
+
+        public static TcpClientMessage of(TcpClient connection, TcpMessage message) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("address", connection.getRemoteAddress().toString());
+            data.put("message", message.toString());
+
+            return TcpClientMessage.of("publish", "订阅", data);
+        }
+
+
+    }
+}