Explorar o código

优化MQTT客户端网关

zhouhao %!s(int64=5) %!d(string=hai) anos
pai
achega
6286ea8c7c

+ 46 - 61
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java

@@ -2,19 +2,28 @@ package org.jetlinks.community.network.mqtt.gateway.device;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.gateway.DeviceGateway;
+import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.network.mqtt.client.MqttClient;
+import org.jetlinks.community.network.mqtt.gateway.device.session.MqttClientSession;
+import org.jetlinks.community.network.mqtt.gateway.device.session.UnknownDeviceMqttClientSession;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.ProtocolSupports;
-import org.jetlinks.core.device.*;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.DeviceOfflineMessage;
 import org.jetlinks.core.message.DeviceOnlineMessage;
 import org.jetlinks.core.message.Message;
-import org.jetlinks.core.message.codec.*;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.core.message.codec.FromDeviceMessageContext;
+import org.jetlinks.core.message.codec.Transport;
 import org.jetlinks.core.server.MessageHandler;
-import org.jetlinks.community.gateway.DeviceGateway;
-import org.jetlinks.community.network.DefaultNetworkType;
-import org.jetlinks.community.network.NetworkType;
-import org.jetlinks.community.network.mqtt.client.MqttClient;
+import org.jetlinks.core.server.session.DeviceSession;
+import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import reactor.core.Disposable;
 import reactor.core.publisher.EmitterProcessor;
@@ -22,6 +31,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -46,7 +56,6 @@ public class MqttClientDeviceGateway implements DeviceGateway {
 
     private DecodedClientMessageHandler clientMessageHandler;
 
-    private MessageHandler messageHandler;
 
     private EmitterProcessor<Message> messageProcessor = EmitterProcessor.create(false);
 
@@ -56,26 +65,28 @@ public class MqttClientDeviceGateway implements DeviceGateway {
 
     private List<Disposable> disposable = new CopyOnWriteArrayList<>();
 
+    private DeviceSessionManager sessionManager;
+
     public MqttClientDeviceGateway(String id,
                                    MqttClient mqttClient,
                                    DeviceRegistry registry,
                                    ProtocolSupports protocolSupport,
                                    String protocol,
+                                   DeviceSessionManager sessionManager,
                                    DecodedClientMessageHandler clientMessageHandler,
-                                   MessageHandler messageHandler,
                                    List<String> topics) {
+
         this.id = Objects.requireNonNull(id, "id");
         this.mqttClient = Objects.requireNonNull(mqttClient, "mqttClient");
         this.registry = Objects.requireNonNull(registry, "registry");
         this.protocolSupport = Objects.requireNonNull(protocolSupport, "protocolSupport");
         this.protocol = Objects.requireNonNull(protocol, "protocol");
+        this.sessionManager = Objects.requireNonNull(sessionManager, "sessionManager");
         this.clientMessageHandler = Objects.requireNonNull(clientMessageHandler, "clientMessageHandler");
-        this.messageHandler = Objects.requireNonNull(messageHandler, "messageHandler");
         this.topics = Objects.requireNonNull(topics, "topics");
     }
 
 
-
     protected Mono<ProtocolSupport> getProtocol() {
         return protocolSupport.getProtocol(protocol);
     }
@@ -85,55 +96,30 @@ public class MqttClientDeviceGateway implements DeviceGateway {
             return;
         }
 
-        messageHandler
-            .handleGetDeviceState(getId(), idPublisher ->
-                Flux.from(idPublisher)
-                    .map(id -> new DeviceStateInfo(id, DeviceState.online)));
-
-        disposable.add(messageHandler
-            .handleSendToDeviceMessage(getId())
-            .filter((msg) -> started.get())
-            .flatMap(msg -> {
-                if (msg instanceof DeviceMessage) {
-                    DeviceMessage deviceMessage = ((DeviceMessage) msg);
-                    return registry.getDevice(deviceMessage.getDeviceId())
-                        .flatMapMany(device -> device.getProtocol()
-                            .flatMapMany(protocol ->
-                                protocol.getMessageCodec(getTransport())
-                                    .flatMapMany(codec -> codec.encode(new MessageEncodeContext() {
-                                        @Override
-                                        public Message getMessage() {
-                                            return deviceMessage;
-                                        }
-
-                                        @Override
-                                        public DeviceOperator getDevice() {
-                                            return device;
-                                        }
-                                    }))))
-                        .flatMap(message -> mqttClient.publish(((MqttMessage) message)));
-                }
-                return Mono.empty();
-            })
-            .onErrorContinue((err, res) -> log.error("处理MQTT消息失败", err))
-            .subscribe());
-
         disposable.add(mqttClient
             .subscribe(topics)
             .filter((msg) -> started.get())
             .flatMap(mqttMessage -> getProtocol()
                 .flatMap(codec -> codec.getMessageCodec(getTransport()))
-                .flatMapMany(codec -> codec.decode(new MessageDecodeContext() {
-                    @Override
-                    public EncodedMessage getMessage() {
-                        return mqttMessage;
-                    }
-
-                    @Override
-                    public DeviceOperator getDevice() {
-                        throw new UnsupportedOperationException();
-                    }
-                }))
+                .flatMapMany(codec -> codec.decode(new FromDeviceMessageContext() {
+                        @Override
+                        public EncodedMessage getMessage() {
+                            return mqttMessage;
+                        }
+
+                        @Override
+                        public DeviceSession getSession() {
+                            return new UnknownDeviceMqttClientSession(id + ":unknown", mqttClient);
+                        }
+
+                        @Override
+                        public DeviceOperator getDevice() {
+                            return null;
+                        }
+                    })
+                )
+                .doOnError((err) -> log.error("解码MQTT客户端消息失败 {}:{}",
+                    mqttMessage.getTopic(), mqttMessage.getPayload().toString(StandardCharsets.UTF_8), err))
                 .cast(DeviceMessage.class)
                 .flatMap(msg -> {
                     if (messageProcessor.hasDownstreams()) {
@@ -142,17 +128,16 @@ public class MqttClientDeviceGateway implements DeviceGateway {
                     return registry
                         .getDevice(msg.getDeviceId())
                         .flatMap(device -> {
-                            Mono<Void> handle = clientMessageHandler.handleMessage(device, msg).then();
-                            if (msg instanceof DeviceOfflineMessage) {
-                                handle = handle.then(device.offline().then());
-                            }
                             if (msg instanceof DeviceOnlineMessage) {
-                                handle = handle.then(device.online(getId(), getId()).then());
+                                return Mono.fromRunnable(() -> sessionManager.register(new MqttClientSession(id + ":" + device.getDeviceId(), device, mqttClient)));
+                            } else if (msg instanceof DeviceOfflineMessage) {
+                                return Mono.fromRunnable(() -> sessionManager.unregister(device.getDeviceId()));
+                            } else {
+                                return clientMessageHandler.handleMessage(device, msg).then();
                             }
-                            return handle;
                         });
                 }))
-            .onErrorContinue((err, res) -> log.error("处理MQTT消息失败", err))
+            .onErrorContinue((err, ms) -> log.error("处理MQTT客户端消息失败", err))
             .subscribe());
     }
 

+ 5 - 4
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGatewayProvider.java

@@ -10,6 +10,7 @@ import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkManager;
 import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.mqtt.client.MqttClient;
+import org.jetlinks.core.server.session.DeviceSessionManager;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
@@ -24,7 +25,7 @@ public class MqttClientDeviceGatewayProvider implements DeviceGatewayProvider {
 
     private final DeviceRegistry registry;
 
-    private final MessageHandler messageHandler;
+    private final DeviceSessionManager sessionManager;
 
     private final DecodedClientMessageHandler clientMessageHandler;
 
@@ -32,12 +33,12 @@ public class MqttClientDeviceGatewayProvider implements DeviceGatewayProvider {
 
     public MqttClientDeviceGatewayProvider(NetworkManager networkManager,
                                            DeviceRegistry registry,
-                                           MessageHandler messageHandler,
+                                           DeviceSessionManager sessionManager,
                                            DecodedClientMessageHandler clientMessageHandler,
                                            ProtocolSupports protocolSupports) {
         this.networkManager = networkManager;
         this.registry = registry;
-        this.messageHandler = messageHandler;
+        this.sessionManager = sessionManager;
         this.clientMessageHandler = clientMessageHandler;
         this.protocolSupports = protocolSupports;
     }
@@ -72,8 +73,8 @@ public class MqttClientDeviceGatewayProvider implements DeviceGatewayProvider {
                     registry,
                     protocolSupports,
                     protocol,
+                    sessionManager,
                     clientMessageHandler,
-                    messageHandler,
                     Arrays.asList(topics.split("[,;\n]"))
                 );
 

+ 78 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttClientSession.java

@@ -0,0 +1,78 @@
+package org.jetlinks.community.network.mqtt.gateway.device.session;
+
+import lombok.Getter;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.core.message.codec.MqttMessage;
+import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.server.session.DeviceSession;
+import org.jetlinks.community.network.mqtt.client.MqttClient;
+import reactor.core.publisher.Mono;
+
+public class MqttClientSession implements DeviceSession {
+    @Getter
+    private String id;
+
+    @Getter
+    private DeviceOperator operator;
+
+    private MqttClient client;
+
+    public MqttClientSession(String id,
+                             DeviceOperator operator,
+                             MqttClient client) {
+        this.id = id;
+        this.operator = operator;
+        this.client = client;
+    }
+
+    @Override
+    public String getDeviceId() {
+        return operator.getDeviceId();
+    }
+
+    @Override
+    public long lastPingTime() {
+        return 0;
+    }
+
+    @Override
+    public long connectTime() {
+        return 0;
+    }
+
+    @Override
+    public Mono<Boolean> send(EncodedMessage encodedMessage) {
+        if (encodedMessage instanceof MqttMessage) {
+            return client.publish(((MqttMessage) encodedMessage))
+                .thenReturn(true);
+        }
+        return Mono.error(new UnsupportedOperationException("unsupported message type:" + encodedMessage.getClass()));
+    }
+
+    @Override
+    public Transport getTransport() {
+        return DefaultTransport.MQTT;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void ping() {
+
+    }
+
+    @Override
+    public boolean isAlive() {
+        return client.isAlive();
+    }
+
+    @Override
+    public void onClose(Runnable call) {
+
+    }
+}

+ 78 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/UnknownDeviceMqttClientSession.java

@@ -0,0 +1,78 @@
+package org.jetlinks.community.network.mqtt.gateway.device.session;
+
+import lombok.Getter;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.message.codec.DefaultTransport;
+import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.core.message.codec.MqttMessage;
+import org.jetlinks.core.message.codec.Transport;
+import org.jetlinks.core.server.session.DeviceSession;
+import org.jetlinks.community.network.mqtt.client.MqttClient;
+import reactor.core.publisher.Mono;
+
+public class UnknownDeviceMqttClientSession implements DeviceSession {
+    @Getter
+    private String id;
+
+    private MqttClient client;
+
+    public UnknownDeviceMqttClientSession(String id,
+                                          MqttClient client) {
+        this.id = id;
+        this.client = client;
+    }
+
+    @Override
+    public String getDeviceId() {
+        return null;
+    }
+
+    @Override
+    public DeviceOperator getOperator() {
+        return null;
+    }
+
+    @Override
+    public long lastPingTime() {
+        return 0;
+    }
+
+    @Override
+    public long connectTime() {
+        return 0;
+    }
+
+    @Override
+    public Mono<Boolean> send(EncodedMessage encodedMessage) {
+        if (encodedMessage instanceof MqttMessage) {
+            return client.publish(((MqttMessage) encodedMessage))
+                .thenReturn(true);
+        }
+        return Mono.error(new UnsupportedOperationException("unsupported message type:" + encodedMessage.getClass()));
+    }
+
+    @Override
+    public Transport getTransport() {
+        return DefaultTransport.MQTT;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void ping() {
+
+    }
+
+    @Override
+    public boolean isAlive() {
+        return client.isAlive();
+    }
+
+    @Override
+    public void onClose(Runnable call) {
+
+    }
+}