|
@@ -6,19 +6,19 @@ import lombok.Getter;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.jetlinks.core.message.codec.MqttMessage;
|
|
|
import org.jetlinks.core.message.codec.SimpleMqttMessage;
|
|
|
+import org.jetlinks.core.topic.Topic;
|
|
|
import org.jetlinks.community.network.DefaultNetworkType;
|
|
|
import org.jetlinks.community.network.NetworkType;
|
|
|
-import org.jetlinks.core.utils.TopicUtils;
|
|
|
-import reactor.core.publisher.*;
|
|
|
+import reactor.core.Disposable;
|
|
|
+import reactor.core.Disposables;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
+import reactor.core.publisher.FluxSink;
|
|
|
+import reactor.core.publisher.Mono;
|
|
|
+import reactor.util.function.Tuple2;
|
|
|
+import reactor.util.function.Tuples;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
-import java.util.function.Function;
|
|
|
-import java.util.stream.Collectors;
|
|
|
+import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
|
|
|
@Slf4j
|
|
|
public class VertxMqttClient implements MqttClient {
|
|
@@ -26,101 +26,126 @@ public class VertxMqttClient implements MqttClient {
|
|
|
@Getter
|
|
|
private io.vertx.mqtt.MqttClient client;
|
|
|
|
|
|
- private final FluxProcessor<MqttMessage, MqttMessage> messageProcessor = EmitterProcessor.create(false);
|
|
|
+ private final Topic<Tuple2<FluxSink<MqttMessage>, Integer>> subscriber = Topic.createRoot();
|
|
|
|
|
|
- private final FluxSink<MqttMessage> sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
|
|
|
+ private final String id;
|
|
|
|
|
|
- private final Map<String, AtomicInteger> topicsSubscribeCounter = new ConcurrentHashMap<>();
|
|
|
+ private volatile boolean loading;
|
|
|
|
|
|
- private boolean neverSubscribe = true;
|
|
|
+ private final List<Runnable> loadSuccessListener = new CopyOnWriteArrayList<>();
|
|
|
|
|
|
- private final String id;
|
|
|
+ public void setLoading(boolean loading) {
|
|
|
+ this.loading = loading;
|
|
|
+ if (!loading) {
|
|
|
+ loadSuccessListener.forEach(Runnable::run);
|
|
|
+ loadSuccessListener.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isLoading() {
|
|
|
+ return loading;
|
|
|
+ }
|
|
|
|
|
|
- @Getter
|
|
|
- private final AtomicInteger reloadCounter = new AtomicInteger();
|
|
|
|
|
|
public VertxMqttClient(String id) {
|
|
|
this.id = id;
|
|
|
}
|
|
|
|
|
|
public void setClient(io.vertx.mqtt.MqttClient client) {
|
|
|
+ if (this.client != null && this.client != client) {
|
|
|
+ this.client.disconnect();
|
|
|
+ }
|
|
|
this.client = client;
|
|
|
- if (isAlive()) {
|
|
|
- reloadCounter.set(0);
|
|
|
- client.publishHandler(msg -> {
|
|
|
- //从未订阅,可能消息是还没来得及
|
|
|
- //或者已经有了下游消费者
|
|
|
- if (neverSubscribe || messageProcessor.hasDownstreams()) {
|
|
|
- sink.next(SimpleMqttMessage
|
|
|
- .builder()
|
|
|
- .topic(msg.topicName())
|
|
|
- .clientId(client.clientId())
|
|
|
- .qosLevel(msg.qosLevel().value())
|
|
|
- .retain(msg.isRetain())
|
|
|
- .dup(msg.isDup())
|
|
|
- .payload(msg.payload().getByteBuf())
|
|
|
- .messageId(msg.messageId())
|
|
|
- .build());
|
|
|
- }
|
|
|
+ client
|
|
|
+ .closeHandler(nil -> log.debug("mqtt client [{}] closed", id))
|
|
|
+ .publishHandler(msg -> {
|
|
|
+ MqttMessage mqttMessage = SimpleMqttMessage
|
|
|
+ .builder()
|
|
|
+ .messageId(msg.messageId())
|
|
|
+ .topic(msg.topicName())
|
|
|
+ .payload(msg.payload().getByteBuf())
|
|
|
+ .dup(msg.isDup())
|
|
|
+ .retain(msg.isRetain())
|
|
|
+ .qosLevel(msg.qosLevel().value())
|
|
|
+ .build();
|
|
|
+ log.debug("handle mqtt message \n{}", mqttMessage);
|
|
|
+ subscriber
|
|
|
+ .findTopic(msg.topicName().replace("#","**").replace("+","*"))
|
|
|
+ .flatMapIterable(Topic::getSubscribers)
|
|
|
+ .subscribe(sink -> {
|
|
|
+ try {
|
|
|
+ sink.getT1().next(mqttMessage);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("handle mqtt message error", e);
|
|
|
+ }
|
|
|
+ });
|
|
|
});
|
|
|
- if (!topicsSubscribeCounter.isEmpty()) {
|
|
|
- Map<String, Integer> reSubscribe = topicsSubscribeCounter
|
|
|
- .entrySet()
|
|
|
- .stream()
|
|
|
- .filter(e -> e.getValue().get() > 0)
|
|
|
- .map(Map.Entry::getKey)
|
|
|
- .collect(Collectors.toMap(Function.identity(), (r) -> 0));
|
|
|
- if (!reSubscribe.isEmpty()) {
|
|
|
- log.info("re subscribe [{}] topic {}", client.clientId(), reSubscribe.keySet());
|
|
|
- client.subscribe(reSubscribe);
|
|
|
- }
|
|
|
- }
|
|
|
+ if (isAlive()) {
|
|
|
+ reSubscribe();
|
|
|
+ } else if (loading) {
|
|
|
+ loadSuccessListener.add(this::reSubscribe);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
- private AtomicInteger getTopicCounter(String topic) {
|
|
|
- return topicsSubscribeCounter.computeIfAbsent(topic, (ignore) -> new AtomicInteger());
|
|
|
+ private void reSubscribe() {
|
|
|
+ subscriber
|
|
|
+ .findTopic("/**")
|
|
|
+ .filter(topic -> topic.getSubscribers().size() > 0)
|
|
|
+ .collectMap(topic -> convertMqttTopic(topic.getTopic()), topic -> topic.getSubscribers().iterator().next().getT2())
|
|
|
+ .subscribe(topics -> {
|
|
|
+ log.debug("subscribe mqtt topic {}", topics);
|
|
|
+ client.subscribe(topics);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
+ private String convertMqttTopic(String topic) {
|
|
|
+ return topic.replace("**", "#").replace("*", "+");
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
- public Flux<MqttMessage> subscribe(List<String> topics) {
|
|
|
- neverSubscribe = false;
|
|
|
- AtomicBoolean canceled = new AtomicBoolean();
|
|
|
- return Flux.defer(() -> {
|
|
|
- Map<String, Integer> subscribeTopic = topics.stream()
|
|
|
- .filter(r -> getTopicCounter(r).getAndIncrement() == 0)
|
|
|
- .collect(Collectors.toMap(Function.identity(), (r) -> 0));
|
|
|
- if (isAlive()) {
|
|
|
- if (!subscribeTopic.isEmpty()) {
|
|
|
- log.info("subscribe mqtt [{}] topic : {}", client.clientId(), subscribeTopic);
|
|
|
- client.subscribe(subscribeTopic);
|
|
|
- }
|
|
|
- }
|
|
|
- return messageProcessor
|
|
|
- .filter(msg -> topics
|
|
|
- .stream()
|
|
|
- .anyMatch(topic -> TopicUtils.match(topic, msg.getTopic())));
|
|
|
- }).doOnCancel(() -> {
|
|
|
- if (!canceled.getAndSet(true)) {
|
|
|
- for (String topic : topics) {
|
|
|
- if (getTopicCounter(topic).decrementAndGet() <= 0 && isAlive()) {
|
|
|
- log.info("unsubscribe mqtt [{}] topic : {}", client.clientId(), topic);
|
|
|
- client.unsubscribe(topic);
|
|
|
+ public Flux<MqttMessage> subscribe(List<String> topics, int qos) {
|
|
|
+ return Flux.create(sink -> {
|
|
|
+
|
|
|
+ Disposable.Composite composite = Disposables.composite();
|
|
|
+
|
|
|
+ for (String topic : topics) {
|
|
|
+ Topic<Tuple2<FluxSink<MqttMessage>, Integer>> sinkTopic = subscriber.append(topic.replace("#", "**").replace("+", "*"));
|
|
|
+
|
|
|
+ Tuple2<FluxSink<MqttMessage>, Integer> topicQos = Tuples.of(sink, qos);
|
|
|
+
|
|
|
+ boolean first = sinkTopic.getSubscribers().size() == 0;
|
|
|
+ sinkTopic.subscribe(topicQos);
|
|
|
+ composite.add(() -> {
|
|
|
+ if (sinkTopic.unsubscribe(topicQos).size() > 0) {
|
|
|
+ client.unsubscribe(convertMqttTopic(topic), result -> {
|
|
|
+ if (result.succeeded()) {
|
|
|
+ log.debug("unsubscribe mqtt topic {}", topic);
|
|
|
+ } else {
|
|
|
+ log.debug("unsubscribe mqtt topic {} error", topic, result.cause());
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
+ });
|
|
|
+
|
|
|
+ //首次订阅
|
|
|
+ if (isAlive() && first) {
|
|
|
+ log.debug("subscribe mqtt topic {}", topic);
|
|
|
+ client.subscribe(convertMqttTopic(topic), qos, result -> {
|
|
|
+ if (!result.succeeded()) {
|
|
|
+ sink.error(result.cause());
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ sink.onDispose(composite);
|
|
|
+
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Mono<Void> publish(MqttMessage message) {
|
|
|
+ private Mono<Void> doPublish(MqttMessage message) {
|
|
|
return Mono.create((sink) -> {
|
|
|
- if (!isAlive()) {
|
|
|
- sink.error(new IOException("mqtt client not alive"));
|
|
|
- return;
|
|
|
- }
|
|
|
Buffer buffer = Buffer.buffer(message.getPayload());
|
|
|
client.publish(message.getTopic(),
|
|
|
buffer,
|
|
@@ -139,6 +164,21 @@ public class VertxMqttClient implements MqttClient {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public Mono<Void> publish(MqttMessage message) {
|
|
|
+ if (loading) {
|
|
|
+ return Mono.create(sink -> {
|
|
|
+ loadSuccessListener.add(() -> {
|
|
|
+ doPublish(message)
|
|
|
+ .doOnSuccess(sink::success)
|
|
|
+ .doOnError(sink::error)
|
|
|
+ .subscribe();
|
|
|
+ });
|
|
|
+ });
|
|
|
+ }
|
|
|
+ return doPublish(message);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public String getId() {
|
|
|
return id;
|
|
@@ -151,11 +191,11 @@ public class VertxMqttClient implements MqttClient {
|
|
|
|
|
|
@Override
|
|
|
public void shutdown() {
|
|
|
+ loading = false;
|
|
|
if (isAlive()) {
|
|
|
client.disconnect();
|
|
|
client = null;
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
@Override
|