Prechádzať zdrojové kódy

优化网络组件调试 jetlinks/jetlinks-ui-antd#31

zhou-hao 5 rokov pred
rodič
commit
edee5508b8

+ 14 - 5
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/DefaultMessagingManager.java

@@ -20,13 +20,22 @@ public class DefaultMessagingManager implements MessagingManager, BeanPostProces
     @Override
     public Flux<Message> subscribe(SubscribeRequest request) {
 
-        for (Map.Entry<String, SubscriptionProvider> entry : subProvider.entrySet()) {
-            if (matcher.match(entry.getKey(), request.getTopic())) {
-                return entry.getValue().subscribe(request);
+        return Flux.defer(() -> {
+            for (Map.Entry<String, SubscriptionProvider> entry : subProvider.entrySet()) {
+                if (matcher.match(entry.getKey(), request.getTopic())) {
+                    return entry.getValue()
+                        .subscribe(request)
+                        .map(v -> {
+                            if (v instanceof Message) {
+                                return ((Message) v);
+                            }
+                            return Message.success(request.getId(), request.getTopic(), v);
+                        });
+                }
             }
-        }
 
-        return Flux.empty();
+            return Flux.error(new UnsupportedOperationException("不支持的topic"));
+        });
     }
 
     public void register(SubscriptionProvider provider) {

+ 1 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscriptionProvider.java

@@ -10,6 +10,6 @@ public interface SubscriptionProvider {
 
     String[] getTopicPattern();
 
-    Flux<Message> subscribe(SubscribeRequest request);
+    Flux<?> subscribe(SubscribeRequest request);
 
 }

+ 8 - 5
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -198,14 +198,17 @@ class VertxMqttConnection implements MqttConnection {
         keepAliveTimeoutMs = duration.toMillis();
     }
 
+    private volatile InetSocketAddress clientAddress;
+
     @Override
     public InetSocketAddress getClientAddress() {
-
-        SocketAddress address = endpoint.remoteAddress();
-        if (address != null) {
-            return new InetSocketAddress(address.host(), address.port());
+        if (clientAddress == null) {
+            SocketAddress address = endpoint.remoteAddress();
+            if (address != null) {
+                clientAddress = new InetSocketAddress(address.host(), address.port());
+            }
         }
-        return null;
+        return clientAddress;
     }
 
     @Override

+ 14 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DebugAuthenticationHandler.java

@@ -0,0 +1,14 @@
+package org.jetlinks.community.network.manager.debug;
+
+import org.hswebframework.web.authorization.exception.AccessDenyException;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+
+public class DebugAuthenticationHandler {
+
+    public static void handle(SubscribeRequest request){
+        if (!request.getAuthentication().hasPermission("network-config", "save")) {
+          throw new AccessDenyException();
+        }
+    }
+
+}

+ 87 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/MqttClientDebugSubscriptionProvider.java

@@ -0,0 +1,87 @@
+package org.jetlinks.community.network.manager.debug;
+
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.gateway.external.Message;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.manager.web.request.MqttMessageRequest;
+import org.jetlinks.community.network.manager.web.response.MqttMessageResponse;
+import org.jetlinks.community.network.mqtt.client.MqttClient;
+import org.jetlinks.rule.engine.executor.PayloadType;
+import org.jetlinks.supports.utils.MqttTopicUtils;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+
+import java.util.Arrays;
+import java.util.Map;
+
+@Component
+public class MqttClientDebugSubscriptionProvider implements SubscriptionProvider {
+
+    private final NetworkManager networkManager;
+
+    public MqttClientDebugSubscriptionProvider(NetworkManager networkManager) {
+        this.networkManager = networkManager;
+    }
+
+    @Override
+    public String id() {
+        return "network-client-mqtt-debug";
+    }
+
+    @Override
+    public String name() {
+        return "MQTT客户端调试";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/network/mqtt/client/*/_subscribe/*",
+            "/network/mqtt/client/*/_publish/*"
+        };
+    }
+
+    @Override
+    public Flux<Object> subscribe(SubscribeRequest request) {
+        DebugAuthenticationHandler.handle(request);
+        Map<String, String> vars = MqttTopicUtils.getPathVariables("/network/mqtt/client/{id}/{pubsub}/{type}", request.getTopic());
+
+        String clientId = vars.get("id");
+        String pubsub = vars.get("pubsub");
+        PayloadType type = PayloadType.valueOf(vars.get("type").toUpperCase());
+
+        return networkManager
+            .<MqttClient>getNetwork(DefaultNetworkType.MQTT_CLIENT, clientId)
+            .flatMapMany(mqtt ->
+                "_subscribe".equals(pubsub)
+                    ? mqttClientSubscribe(mqtt, type, request)
+                    : mqttClientPublish(mqtt, type, request))
+            ;
+    }
+
+    public Flux<Object> mqttClientSubscribe(MqttClient client,
+                                            PayloadType type,
+                                            SubscribeRequest request) {
+        String topics = request.getString("topics", "/#");
+
+        return client
+            .subscribe(Arrays.asList(topics.split("[\n]")))
+            .map(mqttMessage -> Message.success(request.getId(), request.getTopic(), MqttMessageResponse.of(mqttMessage, type)));
+
+    }
+
+    public Flux<String> mqttClientPublish(MqttClient client,
+                                          PayloadType type,
+                                          SubscribeRequest request) {
+        MqttMessageRequest messageRequest = FastBeanCopier.copy(request.getAll(), new MqttMessageRequest());
+
+        return client
+            .publish(MqttMessageRequest.of(messageRequest, type))
+            .thenReturn("推送成功")
+            .flux();
+
+    }
+}

+ 148 - 0
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/MqttServerDebugSubscriptionProvider.java

@@ -0,0 +1,148 @@
+package org.jetlinks.community.network.manager.debug;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.gateway.external.SubscribeRequest;
+import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.manager.web.response.MqttMessageResponse;
+import org.jetlinks.community.network.mqtt.server.*;
+import org.jetlinks.rule.engine.executor.PayloadType;
+import org.jetlinks.supports.utils.MqttTopicUtils;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Component
+@Slf4j
+public class MqttServerDebugSubscriptionProvider implements SubscriptionProvider {
+
+    private final NetworkManager networkManager;
+
+    public MqttServerDebugSubscriptionProvider(NetworkManager networkManager) {
+        this.networkManager = networkManager;
+    }
+
+    @Override
+    public String id() {
+        return "network-server-mqtt-debug";
+    }
+
+    @Override
+    public String name() {
+        return "MQTT服务调试";
+    }
+
+    @Override
+    public String[] getTopicPattern() {
+        return new String[]{
+            "/network/mqtt/server/*/_subscribe/*"
+        };
+    }
+
+    @Override
+    public Flux<MqttClientMessage> subscribe(SubscribeRequest request) {
+        DebugAuthenticationHandler.handle(request);
+
+        Map<String, String> vars = MqttTopicUtils.getPathVariables("/network/mqtt/server/{id}/_subscribe/{type}", request.getTopic());
+
+        String clientId = vars.get("id");
+        PayloadType type = PayloadType.valueOf(vars.get("type").toUpperCase());
+
+        return Flux.create(sink ->
+            sink.onDispose(networkManager
+                .<MqttServer>getNetwork(DefaultNetworkType.MQTT_SERVER, clientId)
+                .flatMap(mqtt ->
+                    mqtt
+                        .handleConnection()
+                        .doOnNext(conn -> {
+                            sink.next(MqttClientMessage.of(conn.accept()));
+                            conn.onClose(disconnect -> sink.next(MqttClientMessage.ofDisconnect(disconnect)));
+                        })
+                        .flatMap(conn -> Flux.merge(
+                            conn.handleSubscribe(true).map(sub -> MqttClientMessage.of(conn, sub)),
+                            conn.handleUnSubscribe(true).map(sub -> MqttClientMessage.of(conn, sub)),
+                            conn.handleMessage().map(sub -> MqttClientMessage.of(conn, sub, type)))
+                        )
+                        .doOnNext(sink::next)
+                        .then()
+                )
+                .doOnError(sink::error)
+                .doOnSubscribe(sub -> log.debug("start mqtt server[{}] debug", clientId))
+                .doOnCancel(() -> log.debug("stop mqtt server[{}] debug", clientId))
+                .subscribe()
+            ));
+    }
+
+
+    @AllArgsConstructor(staticName = "of")
+    @Getter
+    @Setter
+    public static class MqttClientMessage {
+        private String type;
+
+        private String typeText;
+
+        private Object data;
+
+        public static MqttClientMessage of(MqttConnection connection) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("clientId", connection.getClientId());
+            data.put("address", connection.getClientAddress().toString());
+            connection.getAuth().ifPresent(auth -> {
+                data.put("username", auth.getUsername());
+                data.put("password", auth.getPassword());
+            });
+            return MqttClientMessage.of("connection", "连接", data);
+        }
+
+        public static MqttClientMessage ofDisconnect(MqttConnection connection) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("clientId", connection.getClientId());
+            data.put("address", connection.getClientAddress().toString());
+            connection.getAuth().ifPresent(auth -> {
+                data.put("username", auth.getUsername());
+                data.put("password", auth.getPassword());
+            });
+            return MqttClientMessage.of("disconnection", "断开连接", data);
+        }
+
+        public static MqttClientMessage of(MqttConnection connection, MqttSubscription subscription) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("clientId", connection.getClientId());
+            data.put("address", connection.getClientAddress().toString());
+            data.put("topics", subscription
+                .getMessage()
+                .topicSubscriptions()
+                .stream()
+                .map(subs -> "QoS:" + subs.qualityOfService().value() + " Topic:" + subs.topicName())
+            );
+            return MqttClientMessage.of("subscription", "订阅", data);
+        }
+
+        public static MqttClientMessage of(MqttConnection connection, MqttUnSubscription subscription) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("clientId", connection.getClientId());
+            data.put("address", connection.getClientAddress().toString());
+            data.put("topics", subscription
+                .getMessage()
+                .topics()
+            );
+            return MqttClientMessage.of("unsubscription", "取消订阅", data);
+        }
+
+        public static MqttClientMessage of(MqttConnection connection, MqttPublishing subscription, PayloadType type) {
+            Map<String, Object> data = new HashMap<>();
+            data.put("clientId", connection.getClientId());
+            data.put("address", connection.getClientAddress().toString());
+            data.put("message", MqttMessageResponse.of(subscription.getMessage(), type));
+            return MqttClientMessage.of("publish", "推送消息", data);
+        }
+
+    }
+}