Kaynağa Gözat

优化mqtt client topic处理

zhouhao 2 yıl önce
ebeveyn
işleme
24a45f5b67

+ 8 - 3
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java

@@ -113,11 +113,14 @@ public class VertxMqttClient implements MqttClient {
     protected String parseTopic(String topic) {
         //适配emqx共享订阅
         if (topic.startsWith("$share")) {
-            return Stream.of(topic.split("/"))
+            topic= Stream.of(topic.split("/"))
                 .skip(2)
                 .collect(Collectors.joining("/", "/", ""));
         } else if (topic.startsWith("$queue")) {
-            return topic.substring(6);
+            topic= topic.substring(6);
+        }
+        if(topic.startsWith("//")){
+            return topic.substring(1);
         }
         return topic;
     }
@@ -131,7 +134,9 @@ public class VertxMqttClient implements MqttClient {
             for (String topic : topics) {
                 String realTopic = parseTopic(topic);
 
-                Topic<Tuple3<String, FluxSink<MqttMessage>, Integer>> sinkTopic = subscriber.append(realTopic.replace("#", "**").replace("+", "*"));
+                Topic<Tuple3<String, FluxSink<MqttMessage>, Integer>> sinkTopic = subscriber
+                    .append(realTopic.replace("#", "**")
+                                     .replace("+", "*"));
 
                 Tuple3<String, FluxSink<MqttMessage>, Integer> topicQos = Tuples.of(topic, sink, qos);