|
@@ -44,8 +44,6 @@ public class MqttClientDeviceGateway extends AbstractDeviceGateway {
|
|
|
|
|
|
private final ProtocolSupports protocolSupport;
|
|
private final ProtocolSupports protocolSupport;
|
|
|
|
|
|
- private final AtomicBoolean started = new AtomicBoolean();
|
|
|
|
-
|
|
|
|
private Disposable disposable = null;
|
|
private Disposable disposable = null;
|
|
|
|
|
|
private final DeviceGatewayHelper helper;
|
|
private final DeviceGatewayHelper helper;
|
|
@@ -80,7 +78,7 @@ public class MqttClientDeviceGateway extends AbstractDeviceGateway {
|
|
}
|
|
}
|
|
disposable = mqttClient
|
|
disposable = mqttClient
|
|
.subscribe(topics, qos)
|
|
.subscribe(topics, qos)
|
|
- .filter((msg) -> started.get())
|
|
|
|
|
|
+ .filter((msg) -> isStarted())
|
|
.flatMap(mqttMessage -> {
|
|
.flatMap(mqttMessage -> {
|
|
AtomicReference<Duration> timeoutRef = new AtomicReference<>();
|
|
AtomicReference<Duration> timeoutRef = new AtomicReference<>();
|
|
return this
|
|
return this
|