Browse Source

修复状态错误

zhouhao 3 years ago
parent
commit
cf8d1285eb

+ 1 - 3
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java

@@ -44,8 +44,6 @@ public class MqttClientDeviceGateway extends AbstractDeviceGateway {
 
     private final ProtocolSupports protocolSupport;
 
-    private final AtomicBoolean started = new AtomicBoolean();
-
     private Disposable disposable = null;
 
     private final DeviceGatewayHelper helper;
@@ -80,7 +78,7 @@ public class MqttClientDeviceGateway extends AbstractDeviceGateway {
         }
         disposable = mqttClient
             .subscribe(topics, qos)
-            .filter((msg) -> started.get())
+            .filter((msg) -> isStarted())
             .flatMap(mqttMessage -> {
                 AtomicReference<Duration> timeoutRef = new AtomicReference<>();
                 return this