소스 검색

mqtt client增加emqx 共享订阅支持

zhou-hao 4 년 전
부모
커밋
208c56cb9c

+ 27 - 13
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java

@@ -15,11 +15,13 @@ import reactor.core.Disposables;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
+import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 @Slf4j
 public class VertxMqttClient implements MqttClient {
@@ -27,7 +29,7 @@ public class VertxMqttClient implements MqttClient {
     @Getter
     private io.vertx.mqtt.MqttClient client;
 
-    private final Topic<Tuple2<FluxSink<MqttMessage>, Integer>> subscriber = Topic.createRoot();
+    private final Topic<Tuple3<String, FluxSink<MqttMessage>, Integer>> subscriber = Topic.createRoot();
 
     private final String id;
 
@@ -78,7 +80,7 @@ public class VertxMqttClient implements MqttClient {
                     .flatMapIterable(Topic::getSubscribers)
                     .subscribe(sink -> {
                         try {
-                            sink.getT1().next(mqttMessage);
+                            sink.getT2().next(mqttMessage);
                         } catch (Exception e) {
                             log.error("handle mqtt message error", e);
                         }
@@ -96,7 +98,7 @@ public class VertxMqttClient implements MqttClient {
         subscriber
             .findTopic("/**")
             .filter(topic -> topic.getSubscribers().size() > 0)
-            .collectMap(topic -> convertMqttTopic(topic.getTopic()), topic -> topic.getSubscribers().iterator().next().getT2())
+            .collectMap(topic -> convertMqttTopic(topic.getSubscribers().iterator().next().getT1()), topic -> topic.getSubscribers().iterator().next().getT3())
             .filter(MapUtils::isNotEmpty)
             .subscribe(topics -> {
                 log.debug("subscribe mqtt topic {}", topics);
@@ -108,6 +110,18 @@ public class VertxMqttClient implements MqttClient {
         return topic.replace("**", "#").replace("*", "+");
     }
 
+    protected String parseTopic(String topic) {
+        //适配emqx共享订阅
+        if (topic.startsWith("$share")) {
+            return Stream.of(topic.split("/"))
+                .skip(2)
+                .collect(Collectors.joining("/", "/", ""));
+        } else if (topic.startsWith("$queue")) {
+            return topic.substring(6);
+        }
+        return topic;
+    }
+
     @Override
     public Flux<MqttMessage> subscribe(List<String> topics, int qos) {
         return Flux.create(sink -> {
@@ -115,9 +129,11 @@ public class VertxMqttClient implements MqttClient {
             Disposable.Composite composite = Disposables.composite();
 
             for (String topic : topics) {
-                Topic<Tuple2<FluxSink<MqttMessage>, Integer>> sinkTopic = subscriber.append(topic.replace("#", "**").replace("+", "*"));
+                String realTopic = parseTopic(topic);
 
-                Tuple2<FluxSink<MqttMessage>, Integer> topicQos = Tuples.of(sink, qos);
+                Topic<Tuple3<String, FluxSink<MqttMessage>, Integer>> sinkTopic = subscriber.append(realTopic.replace("#", "**").replace("+", "*"));
+
+                Tuple3<String, FluxSink<MqttMessage>, Integer> topicQos = Tuples.of(topic, sink, qos);
 
                 boolean first = sinkTopic.getSubscribers().size() == 0;
                 sinkTopic.subscribe(topicQos);
@@ -172,14 +188,12 @@ public class VertxMqttClient implements MqttClient {
     @Override
     public Mono<Void> publish(MqttMessage message) {
         if (loading) {
-            return Mono.create(sink -> {
-                loadSuccessListener.add(() -> {
-                    doPublish(message)
+            return Mono.create(sink ->
+                loadSuccessListener
+                    .add(() -> doPublish(message)
                         .doOnSuccess(sink::success)
                         .doOnError(sink::error)
-                        .subscribe();
-                });
-            });
+                        .subscribe()));
         }
         return doPublish(message);
     }
@@ -209,7 +223,7 @@ public class VertxMqttClient implements MqttClient {
 
     @Override
     public boolean isAlive() {
-        return client != null&& client.isConnected();
+        return client != null && client.isConnected();
     }
 
     @Override