Bladeren bron

优化MQTT

zhouhao 3 jaren geleden
bovenliggende
commit
bc985539e2

+ 28 - 27
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -182,44 +182,45 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
                 String deviceId = device.getDeviceId();
                 //认证通过
                 if (resp.isSuccess()) {
+                    //监听断开连接
+                    connection.onClose(conn -> {
+                        counter.decrement();
+                        //监控信息
+                        monitor.disconnected();
+                        monitor.totalConnection(counter.sum());
+
+                        sessionManager
+                            .getSession(deviceId)
+                            .flatMap(_tmp -> {
+                                //只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline,
+                                //或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况.
+                                if (_tmp != null && _tmp.isWrapFrom(MqttConnectionSession.class) && !(_tmp instanceof KeepOnlineSession)) {
+                                    MqttConnectionSession connectionSession = _tmp.unwrap(MqttConnectionSession.class);
+                                    if (connectionSession.getConnection() == conn) {
+                                        return sessionManager.remove(deviceId, true);
+                                    }
+                                }
+                                return Mono.empty();
+                            })
+                            .subscribe();
+                    });
+
                     counter.increment();
                     return sessionManager
                         .compute(deviceId, old -> {
                             MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, monitor);
                             return old
-                                .doOnNext(session -> {
-                                    if (session instanceof ReplaceableDeviceSession) {
-                                        //如果是可替换的会话,则替换为新的会话
-                                        //通常是设置了keepOnline或者之前的会话还没有来得及移除时,直接更新为新的会话.
-                                        ((ReplaceableDeviceSession) session).replaceWith(newSession);
+                                .<DeviceSession>map(session -> {
+                                    if (session instanceof KeepOnlineSession) {
+                                        //KeepOnlineSession 则依旧保持keepOnline
+                                        return new KeepOnlineSession(newSession, session.getKeepAliveTimeout());
                                     }
+                                    return newSession;
                                 })
                                 .defaultIfEmpty(newSession);
                         })
                         .flatMap(session -> Mono.fromCallable(() -> {
                             try {
-                                //监听断开连接
-                                connection.onClose(conn -> {
-                                    counter.decrement();
-                                    //监控信息
-                                    monitor.disconnected();
-                                    monitor.totalConnection(counter.sum());
-
-                                    sessionManager
-                                        .getSession(session.getDeviceId())
-                                        .flatMap(_tmp -> {
-                                            //只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline,
-                                            //或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况.
-                                            if (_tmp != null && _tmp.isWrapFrom(MqttConnectionSession.class) && !(_tmp instanceof KeepOnlineSession)) {
-                                                MqttConnectionSession connectionSession = _tmp.unwrap(MqttConnectionSession.class);
-                                                if (connectionSession.getConnection() == conn) {
-                                                    return sessionManager.remove(deviceId, true);
-                                                }
-                                            }
-                                            return Mono.empty();
-                                        })
-                                        .subscribe();
-                                });
                                 return Tuples.of(connection.accept(), device, session.unwrap(MqttConnectionSession.class));
                             } catch (IllegalStateException ignore) {
                                 //忽略错误,偶尔可能会出现网络异常,导致accept时,连接已经中断.还有其他更好的处理方式?

+ 2 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttPublishing.java

@@ -1,8 +1,9 @@
 package org.jetlinks.community.network.mqtt.server;
 
 import org.jetlinks.core.message.codec.MqttMessage;
+import org.jetlinks.core.server.mqtt.MqttPublishingMessage;
 
-public interface MqttPublishing {
+public interface MqttPublishing extends MqttPublishingMessage {
 
     MqttMessage getMessage();
 

+ 106 - 68
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.network.mqtt.server.vertx;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.DecoderException;
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.vertx.core.buffer.Buffer;
@@ -18,17 +19,16 @@ import org.jetlinks.community.network.mqtt.server.MqttConnection;
 import org.jetlinks.community.network.mqtt.server.MqttPublishing;
 import org.jetlinks.community.network.mqtt.server.MqttSubscription;
 import org.jetlinks.community.network.mqtt.server.MqttUnSubscription;
+import org.jetlinks.core.message.codec.EncodedMessage;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.SimpleMqttMessage;
 import org.jetlinks.core.server.mqtt.MqttAuth;
-import reactor.core.publisher.EmitterProcessor;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.*;
 
 import javax.annotation.Nonnull;
 import java.net.InetSocketAddress;
 import java.time.Duration;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -42,14 +42,6 @@ class VertxMqttConnection implements MqttConnection {
     @Getter
     private long lastPingTime = System.currentTimeMillis();
     private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
-
-    private final EmitterProcessor<MqttPublishing> messageProcessor = EmitterProcessor.create(false);
-
-    private final FluxSink<MqttPublishing> publishingFluxSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
-
-    private final EmitterProcessor<MqttSubscription> subscription = EmitterProcessor.create(false);
-    private final EmitterProcessor<MqttUnSubscription> unsubscription = EmitterProcessor.create(false);
-
     private static final MqttAuth emptyAuth = new MqttAuth() {
         @Override
         public String getUsername() {
@@ -61,6 +53,20 @@ class VertxMqttConnection implements MqttConnection {
             return "";
         }
     };
+    private final Sinks.Many<MqttPublishing> messageProcessor = Sinks
+        .many()
+        .multicast()
+        .onBackpressureBuffer(Integer.MAX_VALUE);
+
+    private final Sinks.Many<MqttSubscription> subscription = Sinks
+        .many()
+        .multicast()
+        .onBackpressureBuffer(Integer.MAX_VALUE);
+    private final Sinks.Many<MqttUnSubscription> unsubscription = Sinks
+        .many()
+        .multicast()
+        .onBackpressureBuffer(Integer.MAX_VALUE);
+
 
     public VertxMqttConnection(MqttEndpoint endpoint) {
         this.endpoint = endpoint;
@@ -68,10 +74,10 @@ class VertxMqttConnection implements MqttConnection {
     }
 
     private final Consumer<MqttConnection> defaultListener = mqttConnection -> {
-        log.debug("mqtt client [{}] disconnected", getClientId());
-        subscription.onComplete();
-        unsubscription.onComplete();
-        messageProcessor.onComplete();
+        VertxMqttConnection.log.debug("mqtt client [{}] disconnected", getClientId());
+        subscription.tryEmitComplete();
+        unsubscription.tryEmitComplete();
+        messageProcessor.tryEmitComplete();
 
     };
 
@@ -92,20 +98,28 @@ class VertxMqttConnection implements MqttConnection {
         if (closed) {
             return;
         }
-        endpoint.reject(code);
-        complete();
+        try {
+            endpoint.reject(code);
+        } catch (Throwable ignore) {
+        }
+        try {
+            complete();
+        } catch (Throwable ignore) {
+
+        }
     }
 
     @Override
     public Optional<MqttMessage> getWillMessage() {
         return Optional.ofNullable(endpoint.will())
-            .filter(will -> will.getWillMessageBytes() != null)
-            .map(will -> SimpleMqttMessage.builder()
-                .will(true)
-                .payload(Unpooled.wrappedBuffer(will.getWillMessageBytes()))
-                .topic(will.getWillTopic())
-                .qosLevel(will.getWillQos())
-                .build());
+                       .filter(will -> will.getWillMessageBytes() != null)
+                       .map(will -> SimpleMqttMessage
+                           .builder()
+                           .will(true)
+                           .payload(Unpooled.wrappedBuffer(will.getWillMessageBytes()))
+                           .topic(will.getWillTopic())
+                           .qosLevel(will.getWillQos())
+                           .build());
     }
 
     @Override
@@ -141,6 +155,15 @@ class VertxMqttConnection implements MqttConnection {
         this.endpoint
             .disconnectHandler(ignore -> this.complete())
             .closeHandler(ignore -> this.complete())
+            .exceptionHandler(error -> {
+                if (error instanceof DecoderException) {
+                    if (error.getMessage().contains("too large message")) {
+                        log.error("MQTT消息过大,请在网络组件中设置[最大消息长度].", error);
+                        return;
+                    }
+                }
+                log.error(error.getMessage(), error);
+            })
             .pingHandler(ignore -> {
                 this.ping();
                 if (!endpoint.isAutoKeepAlive()) {
@@ -150,12 +173,12 @@ class VertxMqttConnection implements MqttConnection {
             .publishHandler(msg -> {
                 ping();
                 VertxMqttPublishing publishing = new VertxMqttPublishing(msg, false);
-                boolean hasDownstream = this.messageProcessor.hasDownstreams();
-                if (autoAckMsg || !hasDownstream) {
+                boolean hasDownstream = this.messageProcessor.currentSubscriberCount() > 0;
+                if (autoAckMsg && hasDownstream) {
                     publishing.acknowledge();
                 }
                 if (hasDownstream) {
-                    this.publishingFluxSink.next(publishing);
+                    this.messageProcessor.tryEmitNext(publishing);
                 }
             })
             //QoS 1 PUBACK
@@ -183,23 +206,23 @@ class VertxMqttConnection implements MqttConnection {
             .subscribeHandler(msg -> {
                 ping();
                 VertxMqttSubscription subscription = new VertxMqttSubscription(msg, false);
-                boolean hasDownstream = this.subscription.hasDownstreams();
+                boolean hasDownstream = this.subscription.currentSubscriberCount() > 0;
                 if (autoAckSub || !hasDownstream) {
                     subscription.acknowledge();
                 }
                 if (hasDownstream) {
-                    this.subscription.onNext(subscription);
+                    this.subscription.tryEmitNext(subscription);
                 }
             })
             .unsubscribeHandler(msg -> {
                 ping();
                 VertxMqttMqttUnSubscription unSubscription = new VertxMqttMqttUnSubscription(msg, false);
-                boolean hasDownstream = this.unsubscription.hasDownstreams();
+                boolean hasDownstream = this.unsubscription.currentSubscriberCount() > 0;
                 if (autoAckUnSub || !hasDownstream) {
                     unSubscription.acknowledge();
                 }
                 if (hasDownstream) {
-                    this.unsubscription.onNext(unSubscription);
+                    this.unsubscription.tryEmitNext(unSubscription);
                 }
             });
     }
@@ -220,7 +243,7 @@ class VertxMqttConnection implements MqttConnection {
                     clientAddress = new InetSocketAddress(address.host(), address.port());
                 }
             }
-        }catch (Throwable ignore){
+        } catch (Throwable ignore) {
 
         }
         return clientAddress;
@@ -233,11 +256,7 @@ class VertxMqttConnection implements MqttConnection {
 
     @Override
     public Flux<MqttPublishing> handleMessage() {
-        if (messageProcessor.isCancelled()) {
-            return Flux.empty();
-        }
-        return messageProcessor
-            .map(Function.identity());
+        return messageProcessor.asFlux();
     }
 
     @Override
@@ -247,17 +266,17 @@ class VertxMqttConnection implements MqttConnection {
             .<Void>create(sink -> {
                 Buffer buffer = Buffer.buffer(message.getPayload());
                 endpoint.publish(message.getTopic(),
-                    buffer,
-                    MqttQoS.valueOf(message.getQosLevel()),
-                    message.isDup(),
-                    message.isRetain(),
-                    result -> {
-                        if (result.succeeded()) {
-                            sink.success();
-                        } else {
-                            sink.error(result.cause());
-                        }
-                    }
+                                 buffer,
+                                 MqttQoS.valueOf(message.getQosLevel()),
+                                 message.isDup(),
+                                 message.isRetain(),
+                                 result -> {
+                                     if (result.succeeded()) {
+                                         sink.success();
+                                     } else {
+                                         sink.error(result.cause());
+                                     }
+                                 }
                 );
             });
     }
@@ -266,13 +285,13 @@ class VertxMqttConnection implements MqttConnection {
     public Flux<MqttSubscription> handleSubscribe(boolean autoAck) {
 
         autoAckSub = autoAck;
-        return subscription.map(Function.identity());
+        return subscription.asFlux();
     }
 
     @Override
     public Flux<MqttUnSubscription> handleUnSubscribe(boolean autoAck) {
         autoAckUnSub = autoAck;
-        return unsubscription.map(Function.identity());
+        return unsubscription.asFlux();
     }
 
     @Override
@@ -282,11 +301,19 @@ class VertxMqttConnection implements MqttConnection {
 
     @Override
     public Mono<Void> close() {
+        if (closed) {
+            return Mono.empty();
+        }
         return Mono.<Void>fromRunnable(() -> {
-            if (endpoint.isConnected()) {
-                endpoint.close();
+            try {
+                if (endpoint.isConnected()) {
+                    endpoint.close();
+                } else {
+                    complete();
+                }
+            } catch (Throwable ignore) {
             }
-        }).doFinally(s -> this.complete());
+        });
 
     }
 
@@ -296,12 +323,15 @@ class VertxMqttConnection implements MqttConnection {
         }
         closed = true;
         disconnectConsumer.accept(this);
-        disconnectConsumer = defaultListener;
     }
 
+
     @AllArgsConstructor
-    class VertxMqttMessage implements MqttMessage {
-        MqttPublishMessage message;
+    class VertxMqttPublishing implements MqttPublishing {
+
+        private final MqttPublishMessage message;
+
+        private volatile boolean acknowledged;
 
         @Nonnull
         @Override
@@ -349,18 +379,10 @@ class VertxMqttConnection implements MqttConnection {
         public String toString() {
             return print();
         }
-    }
-
-    @AllArgsConstructor
-    class VertxMqttPublishing implements MqttPublishing {
-
-        private final MqttPublishMessage message;
-
-        private volatile boolean acknowledged;
 
         @Override
         public MqttMessage getMessage() {
-            return new VertxMqttMessage(message);
+            return this;
         }
 
         @Override
@@ -397,8 +419,11 @@ class VertxMqttConnection implements MqttConnection {
                 return;
             }
             acknowledged = true;
-            endpoint.subscribeAcknowledge(message.messageId(), message.topicSubscriptions().stream()
-                .map(MqttTopicSubscription::qualityOfService).collect(Collectors.toList()));
+            endpoint.subscribeAcknowledge(message.messageId(), message
+                .topicSubscriptions()
+                .stream()
+                .map(MqttTopicSubscription::qualityOfService)
+                .collect(Collectors.toList()));
         }
     }
 
@@ -438,4 +463,17 @@ class VertxMqttConnection implements MqttConnection {
             return endpoint.auth().getPassword();
         }
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        VertxMqttConnection that = (VertxMqttConnection) o;
+        return Objects.equals(endpoint, that.endpoint);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(endpoint);
+    }
 }