Forráskód Böngészése

Merge remote-tracking branch 'origin/master'

zhou-hao 5 éve
szülő
commit
a96f2f4164

+ 16 - 7
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java

@@ -30,6 +30,7 @@ import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 
@@ -82,11 +83,12 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
         return counter.sum();
     }
 
+
     private void doStart() {
         if (started.getAndSet(true) || disposable != null) {
             return;
         }
-        disposable = mqttServer
+        disposable = (Disposable) mqttServer
             .handleConnection()
             .filter(conn -> {
                 if (!started.get()) {
@@ -95,11 +97,14 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                 }
                 return started.get();
             })
+
+            .publishOn(Schedulers.parallel())
             .flatMap(this::handleConnection)
             .flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
+            .flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()) , Integer.MAX_VALUE)
             .onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
             .subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
-            .subscribe(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()));
+            .subscribe();
 
     }
 
@@ -180,11 +185,16 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
     }
 
     //处理已经建立连接的MQTT连接
-    private void handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, DeviceSession session) {
+    private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, DeviceSession session) {
 
-        connection.handleMessage()
+        return connection
+            .handleMessage()
             .filter(pb -> started.get())
-            .takeWhile(pub -> disposable != null)
+            .doOnCancel(() -> {
+                //流被取消时(可能网关关闭了)断开连接
+                connection.close().subscribe();
+            })
+            .publishOn(Schedulers.parallel())
             .doOnNext(msg -> gatewayMonitor.receivedMessage())
             .flatMap(publishing ->
                 this.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
@@ -197,7 +207,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                     .flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage, connection))
             )
             .subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
-            .subscribe();
+            .then();
     }
 
     //解码消息并处理
@@ -221,7 +231,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
                     sink.next(msg);
                 }
                 String deviceId = msg.getDeviceId();
-
                 //返回了其他设备的消息,则自动创建会话
                 if (!deviceId.equals(operator.getDeviceId())) {
                     DeviceSession anotherSession = sessionManager.getSession(msg.getDeviceId());

+ 151 - 134
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java

@@ -1,12 +1,19 @@
 package org.jetlinks.community.network.tcp.device;
 
-import io.netty.buffer.ByteBufUtil;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.logger.ReactiveLogger;
+import org.jetlinks.community.gateway.DeviceGateway;
+import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
+import org.jetlinks.community.gateway.monitor.GatewayMonitors;
+import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkType;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import org.jetlinks.community.network.tcp.client.TcpClient;
+import org.jetlinks.community.network.tcp.server.TcpServer;
 import org.jetlinks.core.ProtocolSupport;
 import org.jetlinks.core.ProtocolSupports;
-import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.codec.DefaultTransport;
@@ -15,13 +22,6 @@ import org.jetlinks.core.message.codec.FromDeviceMessageContext;
 import org.jetlinks.core.message.codec.Transport;
 import org.jetlinks.core.server.session.DeviceSession;
 import org.jetlinks.core.server.session.DeviceSessionManager;
-import org.jetlinks.community.gateway.DeviceGateway;
-import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
-import org.jetlinks.community.gateway.monitor.GatewayMonitors;
-import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
-import org.jetlinks.community.network.DefaultNetworkType;
-import org.jetlinks.community.network.NetworkType;
-import org.jetlinks.community.network.tcp.server.TcpServer;
 import org.jetlinks.core.server.session.KeepOnlineSession;
 import org.jetlinks.supports.server.DecodedClientMessageHandler;
 import reactor.core.Disposable;
@@ -29,12 +29,11 @@ import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
-import javax.annotation.Nonnull;
 import java.net.InetSocketAddress;
 import java.time.Duration;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
@@ -68,7 +67,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
 
     private final AtomicBoolean started = new AtomicBoolean();
 
-    private final List<Disposable> disposable = new CopyOnWriteArrayList<>();
+    private Disposable disposable;
 
     public TcpServerDeviceGateway(String id,
                                   String protocol,
@@ -107,129 +106,149 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
         return DefaultNetworkType.TCP_SERVER;
     }
 
-    private void doStart() {
-        if (started.getAndSet(true) || !disposable.isEmpty()) {
-            return;
+
+    class TcpConnection {
+        final TcpClient client;
+        final AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
+        final AtomicReference<DeviceSession> sessionRef = new AtomicReference<>();
+        final InetSocketAddress address;
+
+        TcpConnection(TcpClient client) {
+            this.client = client;
+            this.address = client.getRemoteAddress();
+            gatewayMonitor.totalConnection(counter.sum());
+            client.onDisconnect(() -> {
+                counter.decrement();
+                gatewayMonitor.disconnected();
+                gatewayMonitor.totalConnection(counter.sum());
+            });
+
+            DeviceSession session = sessionManager.getSession(client.getId());
+            if (session == null) {
+                session = new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
+                    @Override
+                    public Mono<Boolean> send(EncodedMessage encodedMessage) {
+                        return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
+                    }
+
+                    @Override
+                    public void setKeepAliveTimeout(Duration timeout) {
+                        keepaliveTimeout.set(timeout);
+                    }
+
+                    @Override
+                    public Optional<InetSocketAddress> getClientAddress() {
+                        return Optional.of(address);
+                    }
+                };
+            }
+
+            sessionRef.set(session);
+
         }
 
-        disposable.add(tcpServer
-            .handleConnection()
-            .subscribe(client -> {
-                InetSocketAddress clientAddr = client.getRemoteAddress();
-                counter.increment();
-                gatewayMonitor.totalConnection(counter.intValue());
-                client.onDisconnect(() -> {
-                    counter.decrement();
-                    gatewayMonitor.disconnected();
-                    gatewayMonitor.totalConnection(counter.sum());
-                });
-                AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
-                AtomicReference<DeviceSession> sessionRef = new AtomicReference<>(sessionManager.getSession(client.getId()));
-                client.subscribe()
-                    .filter(r -> started.get())
-                    .takeWhile(r -> !disposable.isEmpty())
-                    .doOnNext(r -> {
-                        log.debug("收到TCP报文:\n{}", r);
-                        gatewayMonitor.receivedMessage();
-                    })
-                    .flatMap(tcpMessage -> getProtocol()
-                        .flatMap(pt -> pt.getMessageCodec(getTransport()))
-                        .flatMapMany(codec -> codec.decode(new FromDeviceMessageContext() {
-                            @Override
-                            @Nonnull
-                            public EncodedMessage getMessage() {
-                                return tcpMessage;
-                            }
+        Mono<Void> accept() {
+            return client
+                .subscribe()
+                .filter(tcp -> started.get())
+                .doOnCancel(client::shutdown)
+                .flatMap(this::handleTcpMessage)
+                .onErrorContinue((err, ignore) -> log.error(err.getMessage(), err))
+                .then();
+        }
 
-                            @Override
-                            public DeviceSession getSession() {
-                                //session还未注册
-                                if (sessionRef.get() == null) {
-                                    return new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
-                                        @Override
-                                        public Mono<Boolean> send(EncodedMessage encodedMessage) {
-                                            return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
-                                        }
-
-                                        @Override
-                                        public void setKeepAliveTimeout(Duration timeout) {
-                                            keepaliveTimeout.set(timeout);
-                                        }
-                                    };
-                                }
-                                return sessionRef.get();
-                            }
+        Mono<Void> handleTcpMessage(TcpMessage message) {
+            return getProtocol()
+                .flatMap(pt -> pt.getMessageCodec(getTransport()))
+                .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message)))
+                .cast(DeviceMessage.class)
+                .flatMap(this::handleDeviceMessage)
+                .doOnEach(ReactiveLogger.onError(err ->
+                    log.error("处理TCP[{}]消息失败:\n{}",
+                        address,
+                        message
+                        , err)))
+                .onErrorResume((err) -> Mono.fromRunnable(client::reset))
+                .then();
+        }
 
-                            @Override
-                            public DeviceOperator getDevice() {
-                                return getSession().getOperator();
-                            }
-                        }))
-                        .cast(DeviceMessage.class)
-                        .flatMap(message -> registry
-                            .getDevice(message.getDeviceId())
-                            .switchIfEmpty(Mono.fromRunnable(() -> {
-                                log.warn("设备[{}]未注册,TCP[{}]消息:[{}],设备消息:{}",
-                                    message.getDeviceId(),
-                                    clientAddr,
-                                    ByteBufUtil.hexDump(tcpMessage.getPayload()),
-                                    message
-                                );
-                            }))
-                            .flatMap(device -> {
-                                DeviceSession fSession = sessionManager.getSession(device.getDeviceId());
-                                //处理设备上线消息
-                                if (message instanceof DeviceOnlineMessage) {
-                                    if (fSession == null) {
-                                        boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false);
-                                        String sessionId = device.getDeviceId();
-                                        fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) {
-                                            @Override
-                                            public Mono<Boolean> send(EncodedMessage encodedMessage) {
-                                                return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
-                                            }
-                                        };
-                                        //保持设备一直在线.(短连接上报数据的场景.可以让设备一直为在线状态)
-                                        if (keepOnline) {
-                                            fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
-                                        } else {
-                                            client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));
-                                        }
-                                        sessionRef.set(fSession);
-                                        sessionManager.register(fSession);
-                                    }
-                                    fSession.keepAlive();
-                                    if (keepaliveTimeout.get() != null) {
-                                        fSession.setKeepAliveTimeout(keepaliveTimeout.get());
-                                    }
-                                    return Mono.empty();
-                                }
-                                if (fSession != null) {
-                                    fSession.keepAlive();
+        Mono<Void> handleDeviceMessage(DeviceMessage message) {
+            return registry
+                .getDevice(message.getDeviceId())
+                .switchIfEmpty(Mono.defer(() -> {
+                    if (processor.hasDownstreams()) {
+                        sink.next(message);
+                    }
+                    if (message instanceof DeviceRegisterMessage) {
+                        return clientMessageHandler
+                            .handleMessage(null, message)
+                            .then(Mono.empty());
+                    } else {
+                        log.warn("无法从tcp[{}]消息中获取设备信息:{}",address, message);
+                        return Mono.empty();
+                    }
+                }))
+                .flatMap(device -> {
+                    DeviceSession fSession = sessionManager.getSession(device.getDeviceId());
+                    //处理设备上线消息
+                    if (message instanceof DeviceOnlineMessage) {
+                        if (fSession == null) {
+                            boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false);
+                            String sessionId = device.getDeviceId();
+                            fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) {
+                                @Override
+                                public Mono<Boolean> send(EncodedMessage encodedMessage) {
+                                    return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
                                 }
-                                //设备下线
-                                if (message instanceof DeviceOfflineMessage) {
-                                    sessionManager.unregister(device.getDeviceId());
-                                    return Mono.empty();
-                                }
-                                message.addHeaderIfAbsent(Headers.clientAddress, String.valueOf(clientAddr));
+                            };
+                            //保持设备一直在线.(短连接上报数据的场景.可以让设备一直为在线状态)
+                            if (keepOnline) {
+                                fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
+                            } else {
+                                client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));
+                            }
+                            sessionRef.set(fSession);
+                            sessionManager.register(fSession);
+                        }
+                        fSession.keepAlive();
+                        if (keepaliveTimeout.get() != null) {
+                            fSession.setKeepAliveTimeout(keepaliveTimeout.get());
+                        }
+                        return Mono.empty();
+                    }
+                    if (fSession != null) {
+                        fSession.keepAlive();
+                    }
+                    //设备下线
+                    if (message instanceof DeviceOfflineMessage) {
+                        sessionManager.unregister(device.getDeviceId());
+                        return Mono.empty();
+                    }
+                    message.addHeaderIfAbsent(Headers.clientAddress, String.valueOf(address));
+                    if (processor.hasDownstreams()) {
+                        sink.next(message);
+                    }
+                    return clientMessageHandler.handleMessage(device, message);
+                })
+                .then()
+                ;
+        }
+    }
 
-                                if (processor.hasDownstreams()) {
-                                    sink.next(message);
-                                }
-                                return clientMessageHandler.handleMessage(device, message);
-                            }))
-                        .doOnEach(ReactiveLogger.onError(err ->
-                            log.error("处理TCP[{}]消息失败:\n{}",
-                                clientAddr,
-                                tcpMessage
-                                , err)))
-                        .onErrorResume((err) -> Mono.fromRunnable(client::reset))
-                    )
-                    .onErrorResume((err) -> Mono.fromRunnable(client::reset))
-                    .subscriberContext(ReactiveLogger.start("network", tcpServer.getId()))
-                    .subscribe();
-            }));
+    private void doStart() {
+        if (started.getAndSet(true) || disposable != null) {
+            return;
+        }
+        disposable = tcpServer
+            .handleConnection()
+            .publishOn(Schedulers.parallel())
+            .flatMap(client -> new TcpConnection(client).accept(), Integer.MAX_VALUE)
+            .subscriberContext(ReactiveLogger.start("network", tcpServer.getId()))
+            .subscribe(
+                ignore -> {
+                },
+                error -> log.error(error.getMessage(), error)
+            );
     }
 
     @Override
@@ -251,10 +270,8 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
     public Mono<Void> shutdown() {
         return Mono.fromRunnable(() -> {
             started.set(false);
-
-            disposable.forEach(Disposable::dispose);
-
-            disposable.clear();
+            disposable.dispose();
+            disposable = null;
         });
     }