|
@@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled;
|
|
import io.netty.handler.codec.DecoderException;
|
|
import io.netty.handler.codec.DecoderException;
|
|
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
|
|
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
|
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
|
|
|
+import io.netty.util.ReferenceCountUtil;
|
|
import io.vertx.core.buffer.Buffer;
|
|
import io.vertx.core.buffer.Buffer;
|
|
import io.vertx.core.net.SocketAddress;
|
|
import io.vertx.core.net.SocketAddress;
|
|
import io.vertx.mqtt.MqttEndpoint;
|
|
import io.vertx.mqtt.MqttEndpoint;
|
|
@@ -32,6 +33,7 @@ import java.time.Duration;
|
|
import java.util.Objects;
|
|
import java.util.Objects;
|
|
import java.util.Optional;
|
|
import java.util.Optional;
|
|
import java.util.function.Consumer;
|
|
import java.util.function.Consumer;
|
|
|
|
+import java.util.function.Function;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@Slf4j
|
|
@Slf4j
|
|
@@ -41,7 +43,7 @@ class VertxMqttConnection implements MqttConnection {
|
|
private long keepAliveTimeoutMs;
|
|
private long keepAliveTimeoutMs;
|
|
@Getter
|
|
@Getter
|
|
private long lastPingTime = System.currentTimeMillis();
|
|
private long lastPingTime = System.currentTimeMillis();
|
|
- private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = false;
|
|
|
|
|
|
+ private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
|
|
private static final MqttAuth emptyAuth = new MqttAuth() {
|
|
private static final MqttAuth emptyAuth = new MqttAuth() {
|
|
@Override
|
|
@Override
|
|
public String getUsername() {
|
|
public String getUsername() {
|
|
@@ -259,19 +261,22 @@ class VertxMqttConnection implements MqttConnection {
|
|
ping();
|
|
ping();
|
|
return Mono
|
|
return Mono
|
|
.<Void>create(sink -> {
|
|
.<Void>create(sink -> {
|
|
- Buffer buffer = Buffer.buffer(message.getPayload());
|
|
|
|
- endpoint.publish(message.getTopic(),
|
|
|
|
- buffer,
|
|
|
|
- MqttQoS.valueOf(message.getQosLevel()),
|
|
|
|
- message.isDup(),
|
|
|
|
- message.isRetain(),
|
|
|
|
- result -> {
|
|
|
|
- if (result.succeeded()) {
|
|
|
|
- sink.success();
|
|
|
|
- } else {
|
|
|
|
- sink.error(result.cause());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ ByteBuf buf = message.getPayload();
|
|
|
|
+ Buffer buffer = Buffer.buffer(buf);
|
|
|
|
+ endpoint.publish(
|
|
|
|
+ message.getTopic(),
|
|
|
|
+ buffer,
|
|
|
|
+ MqttQoS.valueOf(message.getQosLevel()),
|
|
|
|
+ message.isDup(),
|
|
|
|
+ message.isRetain(),
|
|
|
|
+ result -> {
|
|
|
|
+ if (result.succeeded()) {
|
|
|
|
+ sink.success();
|
|
|
|
+ } else {
|
|
|
|
+ sink.error(result.cause());
|
|
|
|
+ }
|
|
|
|
+ ReferenceCountUtil.safeRelease(buf);
|
|
|
|
+ }
|
|
);
|
|
);
|
|
});
|
|
});
|
|
}
|
|
}
|