|
@@ -4,6 +4,7 @@ import io.netty.handler.codec.mqtt.MqttQoS;
|
|
import io.vertx.core.buffer.Buffer;
|
|
import io.vertx.core.buffer.Buffer;
|
|
import lombok.Getter;
|
|
import lombok.Getter;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.apache.commons.collections4.MapUtils;
|
|
import org.jetlinks.core.message.codec.MqttMessage;
|
|
import org.jetlinks.core.message.codec.MqttMessage;
|
|
import org.jetlinks.core.message.codec.SimpleMqttMessage;
|
|
import org.jetlinks.core.message.codec.SimpleMqttMessage;
|
|
import org.jetlinks.core.topic.Topic;
|
|
import org.jetlinks.core.topic.Topic;
|
|
@@ -46,7 +47,6 @@ public class VertxMqttClient implements MqttClient {
|
|
return loading;
|
|
return loading;
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
public VertxMqttClient(String id) {
|
|
public VertxMqttClient(String id) {
|
|
this.id = id;
|
|
this.id = id;
|
|
}
|
|
}
|
|
@@ -74,7 +74,7 @@ public class VertxMqttClient implements MqttClient {
|
|
.build();
|
|
.build();
|
|
log.debug("handle mqtt message \n{}", mqttMessage);
|
|
log.debug("handle mqtt message \n{}", mqttMessage);
|
|
subscriber
|
|
subscriber
|
|
- .findTopic(msg.topicName().replace("#","**").replace("+","*"))
|
|
|
|
|
|
+ .findTopic(msg.topicName().replace("#", "**").replace("+", "*"))
|
|
.flatMapIterable(Topic::getSubscribers)
|
|
.flatMapIterable(Topic::getSubscribers)
|
|
.subscribe(sink -> {
|
|
.subscribe(sink -> {
|
|
try {
|
|
try {
|
|
@@ -84,10 +84,10 @@ public class VertxMqttClient implements MqttClient {
|
|
}
|
|
}
|
|
});
|
|
});
|
|
});
|
|
});
|
|
- if (isAlive()) {
|
|
|
|
- reSubscribe();
|
|
|
|
- } else if (loading) {
|
|
|
|
|
|
+ if (loading) {
|
|
loadSuccessListener.add(this::reSubscribe);
|
|
loadSuccessListener.add(this::reSubscribe);
|
|
|
|
+ } else if (isAlive()) {
|
|
|
|
+ reSubscribe();
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
@@ -97,6 +97,7 @@ public class VertxMqttClient implements MqttClient {
|
|
.findTopic("/**")
|
|
.findTopic("/**")
|
|
.filter(topic -> topic.getSubscribers().size() > 0)
|
|
.filter(topic -> topic.getSubscribers().size() > 0)
|
|
.collectMap(topic -> convertMqttTopic(topic.getTopic()), topic -> topic.getSubscribers().iterator().next().getT2())
|
|
.collectMap(topic -> convertMqttTopic(topic.getTopic()), topic -> topic.getSubscribers().iterator().next().getT2())
|
|
|
|
+ .filter(MapUtils::isNotEmpty)
|
|
.subscribe(topics -> {
|
|
.subscribe(topics -> {
|
|
log.debug("subscribe mqtt topic {}", topics);
|
|
log.debug("subscribe mqtt topic {}", topics);
|
|
client.subscribe(topics);
|
|
client.subscribe(topics);
|
|
@@ -197,14 +198,18 @@ public class VertxMqttClient implements MqttClient {
|
|
public void shutdown() {
|
|
public void shutdown() {
|
|
loading = false;
|
|
loading = false;
|
|
if (isAlive()) {
|
|
if (isAlive()) {
|
|
- client.disconnect();
|
|
|
|
|
|
+ try {
|
|
|
|
+ client.disconnect();
|
|
|
|
+ } catch (Exception ignore) {
|
|
|
|
+
|
|
|
|
+ }
|
|
client = null;
|
|
client = null;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public boolean isAlive() {
|
|
public boolean isAlive() {
|
|
- return client != null && client.isConnected();
|
|
|
|
|
|
+ return client != null&& client.isConnected();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|