Pārlūkot izejas kodu

mqtt client网关支持qos

zhou-hao 4 gadi atpakaļ
vecāks
revīzija
fe5b364a94

+ 6 - 2
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java

@@ -51,6 +51,8 @@ public class MqttClientDeviceGateway implements DeviceGateway {
 
     private final String protocol;
 
+    private final int qos;
+
     private final ProtocolSupports protocolSupport;
 
     private final EmitterProcessor<Message> processor = EmitterProcessor.create(false);
@@ -72,7 +74,8 @@ public class MqttClientDeviceGateway implements DeviceGateway {
                                    String protocol,
                                    DeviceSessionManager sessionManager,
                                    DecodedClientMessageHandler clientMessageHandler,
-                                   List<String> topics) {
+                                   List<String> topics,
+                                   int qos) {
         this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
 
         this.id = Objects.requireNonNull(id, "id");
@@ -82,6 +85,7 @@ public class MqttClientDeviceGateway implements DeviceGateway {
         this.protocol = Objects.requireNonNull(protocol, "protocol");
         this.topics = Objects.requireNonNull(topics, "topics");
         this.helper = new DeviceGatewayHelper(registry, sessionManager, clientMessageHandler);
+        this.qos = qos;
     }
 
 
@@ -95,7 +99,7 @@ public class MqttClientDeviceGateway implements DeviceGateway {
         }
         disposable
             .add(mqttClient
-                     .subscribe(topics)
+                     .subscribe(topics,qos)
                      .filter((msg) -> started.get())
                      .flatMap(mqttMessage -> {
                          AtomicReference<Duration> timeoutRef = new AtomicReference<>();

+ 3 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGatewayProvider.java

@@ -64,6 +64,7 @@ public class MqttClientDeviceGatewayProvider implements DeviceGatewayProvider {
 
                 String protocol = (String) properties.getConfiguration().get("protocol");
                 String topics = (String) properties.getConfiguration().get("topics");
+                int qos = properties.getInt("topics").orElse(0);
                 Objects.requireNonNull(topics, "topics");
 
                 return new MqttClientDeviceGateway(properties.getId(),
@@ -73,7 +74,8 @@ public class MqttClientDeviceGatewayProvider implements DeviceGatewayProvider {
                                                    protocol,
                                                    sessionManager,
                                                    clientMessageHandler,
-                                                   Arrays.asList(topics.split("[,;\n]"))
+                                                   Arrays.asList(topics.split("[,;\n]")),
+                                                   qos
                 );
 
             });