Explorar o código

优化MQTT client

zhou-hao %!s(int64=5) %!d(string=hai) anos
pai
achega
2be268a1f8

+ 61 - 25
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java

@@ -4,6 +4,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.gateway.DeviceGateway;
 import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
+import org.jetlinks.community.gateway.monitor.GatewayMonitors;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
 import org.jetlinks.community.network.mqtt.client.MqttClient;
@@ -32,10 +33,12 @@ import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
 @Slf4j
@@ -67,6 +70,8 @@ public class MqttClientDeviceGateway implements DeviceGateway {
 
     private DeviceSessionManager sessionManager;
 
+    private DeviceGatewayMonitor gatewayMonitor;
+
     public MqttClientDeviceGateway(String id,
                                    MqttClient mqttClient,
                                    DeviceRegistry registry,
@@ -75,6 +80,7 @@ public class MqttClientDeviceGateway implements DeviceGateway {
                                    DeviceSessionManager sessionManager,
                                    DecodedClientMessageHandler clientMessageHandler,
                                    List<String> topics) {
+        this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
 
         this.id = Objects.requireNonNull(id, "id");
         this.mqttClient = Objects.requireNonNull(mqttClient, "mqttClient");
@@ -95,13 +101,14 @@ public class MqttClientDeviceGateway implements DeviceGateway {
         if (started.getAndSet(true) || !disposable.isEmpty()) {
             return;
         }
-
         disposable.add(mqttClient
             .subscribe(topics)
             .filter((msg) -> started.get())
-            .flatMap(mqttMessage -> getProtocol()
-                .flatMap(codec -> codec.getMessageCodec(getTransport()))
-                .flatMapMany(codec -> codec.decode(new FromDeviceMessageContext() {
+            .flatMap(mqttMessage -> {
+                AtomicReference<Duration> timeoutRef = new AtomicReference<>();
+                return getProtocol()
+                    .flatMap(codec -> codec.getMessageCodec(getTransport()))
+                    .flatMapMany(codec -> codec.decode(new FromDeviceMessageContext() {
                         @Override
                         public EncodedMessage getMessage() {
                             return mqttMessage;
@@ -109,34 +116,63 @@ public class MqttClientDeviceGateway implements DeviceGateway {
 
                         @Override
                         public DeviceSession getSession() {
-                            return new UnknownDeviceMqttClientSession(id + ":unknown", mqttClient);
+                            return new UnknownDeviceMqttClientSession(id + ":unknown", mqttClient) {
+                                @Override
+                                public Mono<Boolean> send(EncodedMessage encodedMessage) {
+                                    return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
+                                }
+
+                                @Override
+                                public void setKeepAliveTimeout(Duration timeout) {
+                                    timeoutRef.set(timeout);
+                                }
+                            };
                         }
 
                         @Override
                         public DeviceOperator getDevice() {
                             return null;
                         }
+                    }))
+                    .doOnError((err) -> log.error("解码MQTT客户端消息失败 {}:{}",
+                        mqttMessage.getTopic(), mqttMessage.getPayload().toString(StandardCharsets.UTF_8), err))
+                    .cast(DeviceMessage.class)
+                    .flatMap(msg -> {
+                        gatewayMonitor.receivedMessage();
+                        if (messageProcessor.hasDownstreams()) {
+                            sink.next(msg);
+                        }
+                        return registry
+                            .getDevice(msg.getDeviceId())
+                            .switchIfEmpty(Mono.fromRunnable(() -> log.debug("无法识别的设备:{}", msg)))
+                            .flatMap(device -> {
+                                DeviceSession session = sessionManager.getSession(device.getDeviceId());
+                                if (session == null) {
+                                    session = new MqttClientSession(id + ":" + device.getDeviceId(), device, mqttClient) {
+                                        @Override
+                                        public Mono<Boolean> send(EncodedMessage encodedMessage) {
+                                            return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
+                                        }
+                                    };
+                                    if (!(msg instanceof DeviceOfflineMessage)) {
+                                        sessionManager.register(session);
+                                    }
+                                }
+                                session.keepAlive();
+                                if (timeoutRef.get() != null) {
+                                    session.setKeepAliveTimeout(timeoutRef.get());
+                                }
+                                if (msg instanceof DeviceOnlineMessage) {
+                                    return Mono.empty();
+                                } else if (msg instanceof DeviceOfflineMessage) {
+                                    return Mono.fromRunnable(() -> sessionManager.unregister(device.getDeviceId()));
+                                } else {
+                                    return clientMessageHandler.handleMessage(device, msg).then();
+                                }
+                            });
                     })
-                )
-                .doOnError((err) -> log.error("解码MQTT客户端消息失败 {}:{}",
-                    mqttMessage.getTopic(), mqttMessage.getPayload().toString(StandardCharsets.UTF_8), err))
-                .cast(DeviceMessage.class)
-                .flatMap(msg -> {
-                    if (messageProcessor.hasDownstreams()) {
-                        sink.next(msg);
-                    }
-                    return registry
-                        .getDevice(msg.getDeviceId())
-                        .flatMap(device -> {
-                            if (msg instanceof DeviceOnlineMessage) {
-                                return Mono.fromRunnable(() -> sessionManager.register(new MqttClientSession(id + ":" + device.getDeviceId(), device, mqttClient)));
-                            } else if (msg instanceof DeviceOfflineMessage) {
-                                return Mono.fromRunnable(() -> sessionManager.unregister(device.getDeviceId()));
-                            } else {
-                                return clientMessageHandler.handleMessage(device, msg).then();
-                            }
-                        });
-                }))
+                    .onErrorContinue((err, ms) -> log.error("处理MQTT消息失败:{}", mqttMessage, err));
+            })
             .onErrorContinue((err, ms) -> log.error("处理MQTT客户端消息失败", err))
             .subscribe());
     }

+ 25 - 4
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttClientSession.java

@@ -10,6 +10,8 @@ import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.community.network.mqtt.client.MqttClient;
 import reactor.core.publisher.Mono;
 
+import java.time.Duration;
+
 public class MqttClientSession implements DeviceSession {
     @Getter
     private String id;
@@ -19,6 +21,12 @@ public class MqttClientSession implements DeviceSession {
 
     private MqttClient client;
 
+    private long connectTime = System.currentTimeMillis();
+
+    private long lastPingTime = System.currentTimeMillis();
+
+    private long keepAliveTimeout = -1;
+
     public MqttClientSession(String id,
                              DeviceOperator operator,
                              MqttClient client) {
@@ -34,12 +42,12 @@ public class MqttClientSession implements DeviceSession {
 
     @Override
     public long lastPingTime() {
-        return 0;
+        return lastPingTime;
     }
 
     @Override
     public long connectTime() {
-        return 0;
+        return connectTime;
     }
 
     @Override
@@ -63,16 +71,29 @@ public class MqttClientSession implements DeviceSession {
 
     @Override
     public void ping() {
-
+        lastPingTime = System.currentTimeMillis();
     }
 
     @Override
     public boolean isAlive() {
-        return client.isAlive();
+        return client.isAlive() &&
+            (keepAliveTimeout <= 0 || System.currentTimeMillis() - lastPingTime < keepAliveTimeout);
     }
 
     @Override
     public void onClose(Runnable call) {
 
     }
+
+    @Override
+    public void setKeepAliveTimeout(Duration timeout) {
+        this.keepAliveTimeout = timeout.toMillis();
+    }
+
+    @Override
+    public String toString() {
+        return "MqttClientSession{" +
+            "id=" + id + ",device=" + getDeviceId() +
+            '}';
+    }
 }