|
@@ -4,6 +4,7 @@ import io.netty.buffer.ByteBuf;
|
|
import io.netty.buffer.Unpooled;
|
|
import io.netty.buffer.Unpooled;
|
|
import io.netty.handler.codec.DecoderException;
|
|
import io.netty.handler.codec.DecoderException;
|
|
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
|
|
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
|
|
|
|
+import io.netty.handler.codec.mqtt.MqttProperties;
|
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
|
import io.netty.util.ReferenceCountUtil;
|
|
import io.netty.util.ReferenceCountUtil;
|
|
import io.vertx.core.buffer.Buffer;
|
|
import io.vertx.core.buffer.Buffer;
|
|
@@ -43,7 +44,9 @@ class VertxMqttConnection implements MqttConnection {
|
|
private long keepAliveTimeoutMs;
|
|
private long keepAliveTimeoutMs;
|
|
@Getter
|
|
@Getter
|
|
private long lastPingTime = System.currentTimeMillis();
|
|
private long lastPingTime = System.currentTimeMillis();
|
|
- private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
|
|
|
|
|
|
+ private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = false;
|
|
|
|
+ private int messageIdCounter;
|
|
|
|
+
|
|
private static final MqttAuth emptyAuth = new MqttAuth() {
|
|
private static final MqttAuth emptyAuth = new MqttAuth() {
|
|
@Override
|
|
@Override
|
|
public String getUsername() {
|
|
public String getUsername() {
|
|
@@ -56,7 +59,7 @@ class VertxMqttConnection implements MqttConnection {
|
|
}
|
|
}
|
|
};
|
|
};
|
|
private final Sinks.Many<MqttPublishing> messageProcessor = Reactors.createMany(Integer.MAX_VALUE, false);
|
|
private final Sinks.Many<MqttPublishing> messageProcessor = Reactors.createMany(Integer.MAX_VALUE, false);
|
|
- private final Sinks.Many<MqttSubscription> subscription =Reactors.createMany(Integer.MAX_VALUE, false);
|
|
|
|
|
|
+ private final Sinks.Many<MqttSubscription> subscription = Reactors.createMany(Integer.MAX_VALUE, false);
|
|
private final Sinks.Many<MqttUnSubscription> unsubscription = Reactors.createMany(Integer.MAX_VALUE, false);
|
|
private final Sinks.Many<MqttUnSubscription> unsubscription = Reactors.createMany(Integer.MAX_VALUE, false);
|
|
|
|
|
|
|
|
|
|
@@ -203,7 +206,7 @@ class VertxMqttConnection implements MqttConnection {
|
|
.subscribeHandler(msg -> {
|
|
.subscribeHandler(msg -> {
|
|
ping();
|
|
ping();
|
|
VertxMqttSubscription subscription = new VertxMqttSubscription(msg, false);
|
|
VertxMqttSubscription subscription = new VertxMqttSubscription(msg, false);
|
|
- boolean hasDownstream = this.subscription.currentSubscriberCount()>0;
|
|
|
|
|
|
+ boolean hasDownstream = this.subscription.currentSubscriberCount() > 0;
|
|
if (autoAckSub || !hasDownstream) {
|
|
if (autoAckSub || !hasDownstream) {
|
|
subscription.acknowledge();
|
|
subscription.acknowledge();
|
|
}
|
|
}
|
|
@@ -214,7 +217,7 @@ class VertxMqttConnection implements MqttConnection {
|
|
.unsubscribeHandler(msg -> {
|
|
.unsubscribeHandler(msg -> {
|
|
ping();
|
|
ping();
|
|
VertxMqttMqttUnSubscription unSubscription = new VertxMqttMqttUnSubscription(msg, false);
|
|
VertxMqttMqttUnSubscription unSubscription = new VertxMqttMqttUnSubscription(msg, false);
|
|
- boolean hasDownstream = this.unsubscription.currentSubscriberCount()>0;
|
|
|
|
|
|
+ boolean hasDownstream = this.unsubscription.currentSubscriberCount() > 0;
|
|
if (autoAckUnSub || !hasDownstream) {
|
|
if (autoAckUnSub || !hasDownstream) {
|
|
unSubscription.acknowledge();
|
|
unSubscription.acknowledge();
|
|
}
|
|
}
|
|
@@ -259,6 +262,7 @@ class VertxMqttConnection implements MqttConnection {
|
|
@Override
|
|
@Override
|
|
public Mono<Void> publish(MqttMessage message) {
|
|
public Mono<Void> publish(MqttMessage message) {
|
|
ping();
|
|
ping();
|
|
|
|
+ int messageId = message.getMessageId() <= 0 ? nextMessageId() : message.getMessageId();
|
|
return Mono
|
|
return Mono
|
|
.<Void>create(sink -> {
|
|
.<Void>create(sink -> {
|
|
ByteBuf buf = message.getPayload();
|
|
ByteBuf buf = message.getPayload();
|
|
@@ -269,6 +273,8 @@ class VertxMqttConnection implements MqttConnection {
|
|
MqttQoS.valueOf(message.getQosLevel()),
|
|
MqttQoS.valueOf(message.getQosLevel()),
|
|
message.isDup(),
|
|
message.isDup(),
|
|
message.isRetain(),
|
|
message.isRetain(),
|
|
|
|
+ messageId,
|
|
|
|
+ message.getProperties(),
|
|
result -> {
|
|
result -> {
|
|
if (result.succeeded()) {
|
|
if (result.succeeded()) {
|
|
sink.success();
|
|
sink.success();
|
|
@@ -325,7 +331,7 @@ class VertxMqttConnection implements MqttConnection {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<Void> close() {
|
|
public Mono<Void> close() {
|
|
- if(closed){
|
|
|
|
|
|
+ if (closed) {
|
|
return Mono.empty();
|
|
return Mono.empty();
|
|
}
|
|
}
|
|
return Mono.<Void>fromRunnable(() -> {
|
|
return Mono.<Void>fromRunnable(() -> {
|
|
@@ -404,6 +410,11 @@ class VertxMqttConnection implements MqttConnection {
|
|
return print();
|
|
return print();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public MqttProperties getProperties() {
|
|
|
|
+ return message.properties();
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public MqttMessage getMessage() {
|
|
public MqttMessage getMessage() {
|
|
return this;
|
|
return this;
|
|
@@ -488,6 +499,11 @@ class VertxMqttConnection implements MqttConnection {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private int nextMessageId() {
|
|
|
|
+ this.messageIdCounter = ((this.messageIdCounter % 65535) != 0) ? this.messageIdCounter + 1 : 1;
|
|
|
|
+ return this.messageIdCounter;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public boolean equals(Object o) {
|
|
public boolean equals(Object o) {
|
|
if (this == o) return true;
|
|
if (this == o) return true;
|