Ver código fonte

优化session

zhou-hao 5 anos atrás
pai
commit
3d5fdf5755

+ 19 - 5
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttConnectionSession.java

@@ -9,6 +9,10 @@ import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.community.network.mqtt.server.MqttConnection;
 import org.jetlinks.community.network.mqtt.server.MqttConnection;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Optional;
+
 public class MqttConnectionSession implements DeviceSession {
 public class MqttConnectionSession implements DeviceSession {
 
 
     @Getter
     @Getter
@@ -23,11 +27,11 @@ public class MqttConnectionSession implements DeviceSession {
     @Getter
     @Getter
     private MqttConnection connection;
     private MqttConnection connection;
 
 
-    public MqttConnectionSession(String id,DeviceOperator operator,Transport transport,MqttConnection connection){
-        this.id=id;
-        this.operator=operator;
-        this.transport=transport;
-        this.connection=connection;
+    public MqttConnectionSession(String id, DeviceOperator operator, Transport transport, MqttConnection connection) {
+        this.id = id;
+        this.operator = operator;
+        this.transport = transport;
+        this.connection = connection;
     }
     }
 
 
     private long connectTime = System.currentTimeMillis();
     private long connectTime = System.currentTimeMillis();
@@ -60,7 +64,12 @@ public class MqttConnectionSession implements DeviceSession {
 
 
     @Override
     @Override
     public void ping() {
     public void ping() {
+        connection.keepAlive();
+    }
 
 
+    @Override
+    public void setKeepAliveTimeout(Duration timeout) {
+        connection.setKeepAliveTimeout(timeout);
     }
     }
 
 
     @Override
     @Override
@@ -70,6 +79,11 @@ public class MqttConnectionSession implements DeviceSession {
 
 
     @Override
     @Override
     public void onClose(Runnable call) {
     public void onClose(Runnable call) {
+        connection.onClose(c -> call.run());
+    }
 
 
+    @Override
+    public Optional<InetSocketAddress> getClientAddress() {
+        return Optional.ofNullable(connection.getClientAddress());
     }
     }
 }
 }

+ 8 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttConnection.java

@@ -6,6 +6,8 @@ import org.jetlinks.core.server.mqtt.MqttAuth;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
+import java.net.InetSocketAddress;
+import java.time.Duration;
 import java.util.Optional;
 import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
 
 
@@ -117,4 +119,10 @@ public interface MqttConnection {
     Mono<Void> close();
     Mono<Void> close();
 
 
     long getLastPingTime();
     long getLastPingTime();
+
+    void keepAlive();
+
+    void setKeepAliveTimeout(Duration duration);
+
+    InetSocketAddress getClientAddress();
 }
 }

+ 27 - 3
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java

@@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.buffer.Buffer;
+import io.vertx.core.net.SocketAddress;
 import io.vertx.mqtt.MqttEndpoint;
 import io.vertx.mqtt.MqttEndpoint;
 import io.vertx.mqtt.MqttTopicSubscription;
 import io.vertx.mqtt.MqttTopicSubscription;
 import io.vertx.mqtt.messages.MqttPublishMessage;
 import io.vertx.mqtt.messages.MqttPublishMessage;
@@ -26,7 +27,9 @@ import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nonnull;
+import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.Optional;
 import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Function;
@@ -36,7 +39,7 @@ import java.util.stream.Collectors;
 class VertxMqttConnection implements MqttConnection {
 class VertxMqttConnection implements MqttConnection {
 
 
     private MqttEndpoint endpoint;
     private MqttEndpoint endpoint;
-    private long keepAliveTimeout;
+    private long keepAliveTimeoutMs;
     @Getter
     @Getter
     private long lastPingTime = System.currentTimeMillis();
     private long lastPingTime = System.currentTimeMillis();
     private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
     private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
@@ -50,7 +53,7 @@ class VertxMqttConnection implements MqttConnection {
 
 
     public VertxMqttConnection(MqttEndpoint endpoint) {
     public VertxMqttConnection(MqttEndpoint endpoint) {
         this.endpoint = endpoint;
         this.endpoint = endpoint;
-        this.keepAliveTimeout = (endpoint.keepAliveTimeSeconds() + 10) * 1000L;
+        this.keepAliveTimeoutMs = (endpoint.keepAliveTimeSeconds() + 10) * 1000L;
     }
     }
 
 
     private final Consumer<MqttConnection> defaultListener = mqttConnection -> {
     private final Consumer<MqttConnection> defaultListener = mqttConnection -> {
@@ -58,6 +61,7 @@ class VertxMqttConnection implements MqttConnection {
         subscription.onComplete();
         subscription.onComplete();
         unsubscription.onComplete();
         unsubscription.onComplete();
         messageProcessor.onComplete();
         messageProcessor.onComplete();
+
     };
     };
 
 
     private Consumer<MqttConnection> disconnectConsumer = defaultListener;
     private Consumer<MqttConnection> disconnectConsumer = defaultListener;
@@ -113,6 +117,11 @@ class VertxMqttConnection implements MqttConnection {
         return this;
         return this;
     }
     }
 
 
+    @Override
+    public void keepAlive() {
+        ping();
+    }
+
     void ping() {
     void ping() {
         lastPingTime = System.currentTimeMillis();
         lastPingTime = System.currentTimeMillis();
     }
     }
@@ -184,6 +193,20 @@ class VertxMqttConnection implements MqttConnection {
             });
             });
     }
     }
 
 
+    @Override
+    public void setKeepAliveTimeout(Duration duration) {
+        keepAliveTimeoutMs = duration.toMillis();
+    }
+
+    @Override
+    public InetSocketAddress getClientAddress() {
+
+        SocketAddress address = endpoint.remoteAddress();
+        if (address != null) {
+            return new InetSocketAddress(address.host(), address.port());
+        }
+        return null;
+    }
 
 
     @Override
     @Override
     public String getClientId() {
     public String getClientId() {
@@ -236,7 +259,7 @@ class VertxMqttConnection implements MqttConnection {
 
 
     @Override
     @Override
     public boolean isAlive() {
     public boolean isAlive() {
-        return endpoint.isConnected() && ((System.currentTimeMillis() - lastPingTime) < keepAliveTimeout);
+        return endpoint.isConnected() && (keepAliveTimeoutMs < 0 || ((System.currentTimeMillis() - lastPingTime) < keepAliveTimeoutMs));
     }
     }
 
 
     @Override
     @Override
@@ -255,6 +278,7 @@ class VertxMqttConnection implements MqttConnection {
         }
         }
         closed = true;
         closed = true;
         disconnectConsumer.accept(this);
         disconnectConsumer.accept(this);
+        disconnectConsumer = defaultListener;
     }
     }
 
 
     @AllArgsConstructor
     @AllArgsConstructor

+ 7 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClient.java

@@ -6,6 +6,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
+import java.time.Duration;
 
 
 /**
 /**
  * TCP 客户端
  * TCP 客户端
@@ -43,4 +44,10 @@ public interface TcpClient extends Network {
      */
      */
     void keepAlive();
     void keepAlive();
 
 
+    /**
+     * 设置客户端心跳超时时间
+     *
+     * @param timeout 超时时间
+     */
+    void setKeepAliveTimeout(Duration timeout);
 }
 }

+ 7 - 2
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java

@@ -33,7 +33,7 @@ public class VertxTcpClient extends AbstractTcpClient {
     private String id;
     private String id;
 
 
     @Setter
     @Setter
-    private long keepAliveTimeout = Duration.ofMinutes(10).toMillis();
+    private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis();
 
 
     private volatile long lastKeepAliveTime = System.currentTimeMillis();
     private volatile long lastKeepAliveTime = System.currentTimeMillis();
 
 
@@ -44,9 +44,14 @@ public class VertxTcpClient extends AbstractTcpClient {
         lastKeepAliveTime = System.currentTimeMillis();
         lastKeepAliveTime = System.currentTimeMillis();
     }
     }
 
 
+    @Override
+    public void setKeepAliveTimeout(Duration timeout) {
+        keepAliveTimeoutMs = timeout.toMillis();
+    }
+
     @Override
     @Override
     public boolean isAlive() {
     public boolean isAlive() {
-        return socket != null && (System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeout);
+        return socket != null && (keepAliveTimeoutMs < 0 || System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeoutMs);
     }
     }
 
 
     @Override
     @Override

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java

@@ -59,7 +59,7 @@ public class VertxTcpClientProvider implements NetworkProvider<TcpClientProperti
         client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), Values.of(properties.getParserConfiguration())));
         client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), Values.of(properties.getParserConfiguration())));
         NetClient netClient = vertx.createNetClient(properties.getOptions());
         NetClient netClient = vertx.createNetClient(properties.getOptions());
         client.setClient(netClient);
         client.setClient(netClient);
-        client.setKeepAliveTimeout(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));
+        client.setKeepAliveTimeoutMs(properties.getLong("keepAliveTimeout").orElse(Duration.ofMinutes(10).toMillis()));
         netClient.connect(properties.getPort(), properties.getHost(), result -> {
         netClient.connect(properties.getPort(), properties.getHost(), result -> {
             if (result.succeeded()) {
             if (result.succeeded()) {
                 log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort());
                 log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort());

+ 15 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpDeviceSession.java

@@ -9,6 +9,10 @@ import org.jetlinks.community.network.tcp.TcpMessage;
 import org.jetlinks.community.network.tcp.client.TcpClient;
 import org.jetlinks.community.network.tcp.client.TcpClient;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Optional;
+
 class TcpDeviceSession implements DeviceSession {
 class TcpDeviceSession implements DeviceSession {
 
 
     @Getter
     @Getter
@@ -64,11 +68,22 @@ class TcpDeviceSession implements DeviceSession {
         client.keepAlive();
         client.keepAlive();
     }
     }
 
 
+
+    @Override
+    public void setKeepAliveTimeout(Duration timeout) {
+        client.setKeepAliveTimeout(timeout);
+    }
+
     @Override
     @Override
     public boolean isAlive() {
     public boolean isAlive() {
         return client.isAlive();
         return client.isAlive();
     }
     }
 
 
+    @Override
+    public Optional<InetSocketAddress> getClientAddress() {
+        return Optional.ofNullable(client.getRemoteAddress());
+    }
+
     @Override
     @Override
     public void onClose(Runnable call) {
     public void onClose(Runnable call) {
         client.onDisconnect(call);
         client.onDisconnect(call);

+ 1 - 1
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java

@@ -57,7 +57,7 @@ public class VertxTcpServer extends AbstractTcpServer implements TcpServer {
 
 
     protected void acceptTcpConnection(NetSocket socket) {
     protected void acceptTcpConnection(NetSocket socket) {
         VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress());
         VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress());
-        client.setKeepAliveTimeout(keepAliveTimeout);
+        client.setKeepAliveTimeoutMs(keepAliveTimeout);
         try {
         try {
             socket.exceptionHandler(err -> {
             socket.exceptionHandler(err -> {
                 log.error("tcp server client [{}] error", socket.remoteAddress(), err);
                 log.error("tcp server client [{}] error", socket.remoteAddress(), err);