Kaynağa Gözat

Merge branch '1.4'

zhou-hao 4 yıl önce
ebeveyn
işleme
864e6eb65d

+ 5 - 1
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClient.java

@@ -9,7 +9,11 @@ import java.util.List;
 
 public interface MqttClient extends Network {
 
-    Flux<MqttMessage> subscribe(List<String> topics);
+    default Flux<MqttMessage> subscribe(List<String> topics){
+        return subscribe(topics,0);
+    }
+
+    Flux<MqttMessage> subscribe(List<String> topics,int qos);
 
     Mono<Void> publish(MqttMessage message);
 

+ 26 - 13
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java

@@ -1,12 +1,17 @@
 package org.jetlinks.community.network.mqtt.client;
 
+import com.alibaba.fastjson.JSONObject;
 import io.vertx.core.Vertx;
 import io.vertx.mqtt.MqttClient;
 import io.vertx.mqtt.MqttClientOptions;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.bean.FastBeanCopier;
-import org.jetlinks.community.network.*;
 import org.jetlinks.core.metadata.ConfigMetadata;
+import org.jetlinks.core.metadata.DefaultConfigMetadata;
+import org.jetlinks.core.metadata.types.BooleanType;
+import org.jetlinks.core.metadata.types.IntType;
+import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.community.network.*;
 import org.jetlinks.community.network.security.CertificateManager;
 import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
 import org.springframework.stereotype.Component;
@@ -44,19 +49,23 @@ public class MqttClientProvider implements NetworkProvider<MqttClientProperties>
 
     @Override
     public void reload(@Nonnull Network network, @Nonnull MqttClientProperties properties) {
-        VertxMqttClient mqttClient = ((VertxMqttClient) network);
-        mqttClient.shutdown();
-
+       VertxMqttClient mqttClient = ((VertxMqttClient) network);
+        if (mqttClient.isLoading()) {
+            return;
+        }
         initMqttClient(mqttClient, properties);
     }
 
     public void initMqttClient(VertxMqttClient mqttClient, MqttClientProperties properties) {
+        mqttClient.setLoading(true);
         MqttClient client = MqttClient.create(vertx, properties.getOptions());
+        mqttClient.setClient(client);
         client.connect(properties.getPort(), properties.getHost(), result -> {
+            mqttClient.setLoading(false);
             if (!result.succeeded()) {
                 log.warn("connect mqtt [{}] error", properties.getId(), result.cause());
             } else {
-                mqttClient.setClient(client);
+                log.debug("connect mqtt [{}] success", properties.getId());
             }
         });
     }
@@ -64,8 +73,12 @@ public class MqttClientProvider implements NetworkProvider<MqttClientProperties>
     @Nullable
     @Override
     public ConfigMetadata getConfigMetadata() {
-        // TODO: 2019/12/19
-        return null;
+        return new DefaultConfigMetadata()
+            .add("id", "id", "", new StringType())
+            .add("instance", "服务实例数量(线程数)", "", new IntType())
+            .add("certId", "证书id", "", new StringType())
+            .add("ssl", "是否开启ssl", "", new BooleanType())
+            .add("options.port", "MQTT服务设置", "", new IntType());
     }
 
     @Nonnull
@@ -74,12 +87,12 @@ public class MqttClientProvider implements NetworkProvider<MqttClientProperties>
         return Mono.defer(() -> {
             MqttClientProperties config = FastBeanCopier.copy(properties.getConfigurations(), new MqttClientProperties());
             config.setId(properties.getId());
-            if (config.getOptions() == null) {
-                config.setOptions(new MqttClientOptions());
-                config.getOptions().setClientId(config.getClientId());
-                config.getOptions().setPassword(config.getPassword());
-                config.getOptions().setUsername(config.getUsername());
-            }
+            config.setOptions(new JSONObject(properties.getConfigurations()).toJavaObject(MqttClientOptions.class));
+
+            config.getOptions().setClientId(config.getClientId());
+            config.getOptions().setPassword(config.getPassword());
+            config.getOptions().setUsername(config.getUsername());
+
             if (config.isSsl()) {
                 config.getOptions().setSsl(true);
                 return certificateManager.getCertificate(config.getCertId())

+ 117 - 77
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java

@@ -6,19 +6,19 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.SimpleMqttMessage;
+import org.jetlinks.core.topic.Topic;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
-import org.jetlinks.core.utils.TopicUtils;
-import reactor.core.publisher.*;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 @Slf4j
 public class VertxMqttClient implements MqttClient {
@@ -26,101 +26,126 @@ public class VertxMqttClient implements MqttClient {
     @Getter
     private io.vertx.mqtt.MqttClient client;
 
-    private final FluxProcessor<MqttMessage, MqttMessage> messageProcessor = EmitterProcessor.create(false);
+    private final Topic<Tuple2<FluxSink<MqttMessage>, Integer>> subscriber = Topic.createRoot();
 
-    private final FluxSink<MqttMessage> sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
+    private final String id;
 
-    private final Map<String, AtomicInteger> topicsSubscribeCounter = new ConcurrentHashMap<>();
+    private volatile boolean loading;
 
-    private boolean neverSubscribe = true;
+    private final List<Runnable> loadSuccessListener = new CopyOnWriteArrayList<>();
 
-    private final String id;
+    public void setLoading(boolean loading) {
+        this.loading = loading;
+        if (!loading) {
+            loadSuccessListener.forEach(Runnable::run);
+            loadSuccessListener.clear();
+        }
+    }
+
+    public boolean isLoading() {
+        return loading;
+    }
 
-    @Getter
-    private final AtomicInteger reloadCounter = new AtomicInteger();
 
     public VertxMqttClient(String id) {
         this.id = id;
     }
 
     public void setClient(io.vertx.mqtt.MqttClient client) {
+        if (this.client != null && this.client != client) {
+            this.client.disconnect();
+        }
         this.client = client;
-        if (isAlive()) {
-            reloadCounter.set(0);
-            client.publishHandler(msg -> {
-                //从未订阅,可能消息是还没来得及
-                //或者已经有了下游消费者
-                if (neverSubscribe || messageProcessor.hasDownstreams()) {
-                    sink.next(SimpleMqttMessage
-                        .builder()
-                        .topic(msg.topicName())
-                        .clientId(client.clientId())
-                        .qosLevel(msg.qosLevel().value())
-                        .retain(msg.isRetain())
-                        .dup(msg.isDup())
-                        .payload(msg.payload().getByteBuf())
-                        .messageId(msg.messageId())
-                        .build());
-                }
+        client
+            .closeHandler(nil -> log.debug("mqtt client [{}] closed", id))
+            .publishHandler(msg -> {
+                MqttMessage mqttMessage = SimpleMqttMessage
+                    .builder()
+                    .messageId(msg.messageId())
+                    .topic(msg.topicName())
+                    .payload(msg.payload().getByteBuf())
+                    .dup(msg.isDup())
+                    .retain(msg.isRetain())
+                    .qosLevel(msg.qosLevel().value())
+                    .build();
+                log.debug("handle mqtt message \n{}", mqttMessage);
+                subscriber
+                    .findTopic(msg.topicName().replace("#","**").replace("+","*"))
+                    .flatMapIterable(Topic::getSubscribers)
+                    .subscribe(sink -> {
+                        try {
+                            sink.getT1().next(mqttMessage);
+                        } catch (Exception e) {
+                            log.error("handle mqtt message error", e);
+                        }
+                    });
             });
-            if (!topicsSubscribeCounter.isEmpty()) {
-                Map<String, Integer> reSubscribe = topicsSubscribeCounter
-                    .entrySet()
-                    .stream()
-                    .filter(e -> e.getValue().get() > 0)
-                    .map(Map.Entry::getKey)
-                    .collect(Collectors.toMap(Function.identity(), (r) -> 0));
-                if (!reSubscribe.isEmpty()) {
-                    log.info("re subscribe [{}] topic {}", client.clientId(), reSubscribe.keySet());
-                    client.subscribe(reSubscribe);
-                }
-            }
+        if (isAlive()) {
+            reSubscribe();
+        } else if (loading) {
+            loadSuccessListener.add(this::reSubscribe);
         }
 
     }
 
-    private AtomicInteger getTopicCounter(String topic) {
-        return topicsSubscribeCounter.computeIfAbsent(topic, (ignore) -> new AtomicInteger());
+    private void reSubscribe() {
+        subscriber
+            .findTopic("/**")
+            .filter(topic -> topic.getSubscribers().size() > 0)
+            .collectMap(topic -> convertMqttTopic(topic.getTopic()), topic -> topic.getSubscribers().iterator().next().getT2())
+            .subscribe(topics -> {
+                log.debug("subscribe mqtt topic {}", topics);
+                client.subscribe(topics);
+            });
     }
 
+    private String convertMqttTopic(String topic) {
+        return topic.replace("**", "#").replace("*", "+");
+    }
 
     @Override
-    public Flux<MqttMessage> subscribe(List<String> topics) {
-        neverSubscribe = false;
-        AtomicBoolean canceled = new AtomicBoolean();
-        return Flux.defer(() -> {
-            Map<String, Integer> subscribeTopic = topics.stream()
-                .filter(r -> getTopicCounter(r).getAndIncrement() == 0)
-                .collect(Collectors.toMap(Function.identity(), (r) -> 0));
-            if (isAlive()) {
-                if (!subscribeTopic.isEmpty()) {
-                    log.info("subscribe mqtt [{}] topic : {}", client.clientId(), subscribeTopic);
-                    client.subscribe(subscribeTopic);
-                }
-            }
-            return messageProcessor
-                .filter(msg -> topics
-                    .stream()
-                    .anyMatch(topic -> TopicUtils.match(topic, msg.getTopic())));
-        }).doOnCancel(() -> {
-            if (!canceled.getAndSet(true)) {
-                for (String topic : topics) {
-                    if (getTopicCounter(topic).decrementAndGet() <= 0 && isAlive()) {
-                        log.info("unsubscribe mqtt [{}] topic : {}", client.clientId(), topic);
-                        client.unsubscribe(topic);
+    public Flux<MqttMessage> subscribe(List<String> topics, int qos) {
+        return Flux.create(sink -> {
+
+            Disposable.Composite composite = Disposables.composite();
+
+            for (String topic : topics) {
+                Topic<Tuple2<FluxSink<MqttMessage>, Integer>> sinkTopic = subscriber.append(topic.replace("#", "**").replace("+", "*"));
+
+                Tuple2<FluxSink<MqttMessage>, Integer> topicQos = Tuples.of(sink, qos);
+
+                boolean first = sinkTopic.getSubscribers().size() == 0;
+                sinkTopic.subscribe(topicQos);
+                composite.add(() -> {
+                    if (sinkTopic.unsubscribe(topicQos).size() > 0) {
+                        client.unsubscribe(convertMqttTopic(topic), result -> {
+                            if (result.succeeded()) {
+                                log.debug("unsubscribe mqtt topic {}", topic);
+                            } else {
+                                log.debug("unsubscribe mqtt topic {} error", topic, result.cause());
+                            }
+                        });
                     }
+                });
+
+                //首次订阅
+                if (isAlive() && first) {
+                    log.debug("subscribe mqtt topic {}", topic);
+                    client.subscribe(convertMqttTopic(topic), qos, result -> {
+                        if (!result.succeeded()) {
+                            sink.error(result.cause());
+                        }
+                    });
                 }
             }
+
+            sink.onDispose(composite);
+
         });
     }
 
-    @Override
-    public Mono<Void> publish(MqttMessage message) {
+    private Mono<Void> doPublish(MqttMessage message) {
         return Mono.create((sink) -> {
-            if (!isAlive()) {
-                sink.error(new IOException("mqtt client not alive"));
-                return;
-            }
             Buffer buffer = Buffer.buffer(message.getPayload());
             client.publish(message.getTopic(),
                 buffer,
@@ -139,6 +164,21 @@ public class VertxMqttClient implements MqttClient {
         });
     }
 
+    @Override
+    public Mono<Void> publish(MqttMessage message) {
+        if (loading) {
+            return Mono.create(sink -> {
+                loadSuccessListener.add(() -> {
+                    doPublish(message)
+                        .doOnSuccess(sink::success)
+                        .doOnError(sink::error)
+                        .subscribe();
+                });
+            });
+        }
+        return doPublish(message);
+    }
+
     @Override
     public String getId() {
         return id;
@@ -151,11 +191,11 @@ public class VertxMqttClient implements MqttClient {
 
     @Override
     public void shutdown() {
+        loading = false;
         if (isAlive()) {
             client.disconnect();
             client = null;
         }
-
     }
 
     @Override