浏览代码

优化buffer

zhou-hao 5 年之前
父节点
当前提交
a03b86cb29

+ 21 - 9
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -38,18 +38,30 @@ import java.util.stream.Collectors;
 @Slf4j
 class VertxMqttConnection implements MqttConnection {
 
-    private MqttEndpoint endpoint;
+    private final MqttEndpoint endpoint;
     private long keepAliveTimeoutMs;
     @Getter
     private long lastPingTime = System.currentTimeMillis();
     private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
 
-    private EmitterProcessor<MqttPublishing> messageProcessor = EmitterProcessor.create(false);
+    private final EmitterProcessor<MqttPublishing> messageProcessor = EmitterProcessor.create(false);
 
-    private FluxSink<MqttPublishing> publishingFluxSink = messageProcessor.sink();
+    private final FluxSink<MqttPublishing> publishingFluxSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
 
-    private EmitterProcessor<MqttSubscription> subscription = EmitterProcessor.create(false);
-    private EmitterProcessor<MqttUnSubscription> unsubscription = EmitterProcessor.create(false);
+    private final EmitterProcessor<MqttSubscription> subscription = EmitterProcessor.create(false);
+    private final EmitterProcessor<MqttUnSubscription> unsubscription = EmitterProcessor.create(false);
+
+    private static final MqttAuth emptyAuth = new MqttAuth() {
+        @Override
+        public String getUsername() {
+            return "";
+        }
+
+        @Override
+        public String getPassword() {
+            return "";
+        }
+    };
 
     public VertxMqttConnection(MqttEndpoint endpoint) {
         this.endpoint = endpoint;
@@ -73,7 +85,7 @@ class VertxMqttConnection implements MqttConnection {
 
     @Override
     public Optional<MqttAuth> getAuth() {
-        return endpoint.auth() == null ? Optional.empty() : Optional.of(new VertxMqttAuth());
+        return endpoint.auth() == null ? Optional.of(emptyAuth) : Optional.of(new VertxMqttAuth());
     }
 
     @Override
@@ -339,7 +351,7 @@ class VertxMqttConnection implements MqttConnection {
     @AllArgsConstructor
     class VertxMqttPublishing implements MqttPublishing {
 
-        private MqttPublishMessage message;
+        private final MqttPublishMessage message;
 
         private volatile boolean acknowledged;
 
@@ -367,7 +379,7 @@ class VertxMqttConnection implements MqttConnection {
     @AllArgsConstructor
     class VertxMqttSubscription implements MqttSubscription {
 
-        private MqttSubscribeMessage message;
+        private final MqttSubscribeMessage message;
 
         private volatile boolean acknowledged;
 
@@ -390,7 +402,7 @@ class VertxMqttConnection implements MqttConnection {
     @AllArgsConstructor
     class VertxMqttMqttUnSubscription implements MqttUnSubscription {
 
-        private MqttUnsubscribeMessage message;
+        private final MqttUnsubscribeMessage message;
 
         private volatile boolean acknowledged;
 

+ 3 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -30,12 +30,12 @@ public class DeviceMessageConnector
     MessageConnection,
     MessagePublisher {
 
-    private EmitterProcessor<TopicMessage> messageProcessor = EmitterProcessor.create(false);
+    private final EmitterProcessor<TopicMessage> messageProcessor = EmitterProcessor.create(false);
 
-    private FluxSink<TopicMessage> sink = messageProcessor.sink();
+    private final FluxSink<TopicMessage> sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
 
     //将设备注册中心到配置追加到消息header中,下游订阅者可直接使用.
-    private String[] appendConfigHeader = {"orgId", "productId", "deviceName"};
+    private final String[] appendConfigHeader = {"orgId", "productId", "deviceName"};
     //设备注册中心
     private final DeviceRegistry registry;