zhou-hao 5 роки тому
батько
коміт
8cee5fc626

+ 23 - 28
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java

@@ -44,23 +44,31 @@ import java.util.function.Function;
 class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
 
     @Getter
-    private String id;
+    private final String id;
 
-    private TcpServer tcpServer;
+    private final TcpServer tcpServer;
 
-    private String protocol;
+    private final String protocol;
 
-    private ProtocolSupports supports;
+    private final ProtocolSupports supports;
 
-    private DeviceRegistry registry;
+    private final DeviceRegistry registry;
 
-    private DecodedClientMessageHandler clientMessageHandler;
+    private final DecodedClientMessageHandler clientMessageHandler;
 
-    private DeviceSessionManager sessionManager;
+    private final DeviceSessionManager sessionManager;
 
-    private DeviceGatewayMonitor gatewayMonitor;
+    private final DeviceGatewayMonitor gatewayMonitor;
 
-    private LongAdder counter = new LongAdder();
+    private final LongAdder counter = new LongAdder();
+
+    private final EmitterProcessor<Message> processor = EmitterProcessor.create(false);
+
+    private final FluxSink<Message> sink = processor.sink();
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    private final List<Disposable> disposable = new CopyOnWriteArrayList<>();
 
     public TcpServerDeviceGateway(String id,
                                   String protocol,
@@ -80,12 +88,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
     }
 
 
-    private EmitterProcessor<Message> processor = EmitterProcessor.create(false);
-
-    private FluxSink<Message> sink = processor.sink();
-
-    private AtomicBoolean started = new AtomicBoolean();
-
     public Mono<ProtocolSupport> getProtocol() {
         return supports.getProtocol(protocol);
     }
@@ -105,8 +107,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
         return DefaultNetworkType.TCP_SERVER;
     }
 
-    private List<Disposable> disposable = new CopyOnWriteArrayList<>();
-
     private void doStart() {
         if (started.getAndSet(true) || !disposable.isEmpty()) {
             return;
@@ -165,11 +165,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                                 return getSession().getOperator();
                             }
                         }))
-                        .switchIfEmpty(Mono.fromRunnable(() ->
-                            log.warn("无法识别的TCP客户端[{}]消息:\n{}",
-                                clientAddr,
-                                tcpMessage
-                            )))
                         .cast(DeviceMessage.class)
                         .flatMap(message -> registry
                             .getDevice(message.getDeviceId())
@@ -182,20 +177,20 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
                                 );
                             }))
                             .flatMap(device -> {
-                                DeviceSession fSession = sessionRef.get() == null ?
-                                    sessionManager.getSession(device.getDeviceId()) :
-                                    sessionRef.get();
+                                DeviceSession fSession = sessionManager.getSession(device.getDeviceId());
                                 //处理设备上线消息
                                 if (message instanceof DeviceOnlineMessage) {
                                     if (fSession == null) {
-                                        fSession = new TcpDeviceSession(client.getId(), device, client, getTransport()) {
+                                        boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false);
+                                        String sessionId = device.getDeviceId();
+                                        fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) {
                                             @Override
                                             public Mono<Boolean> send(EncodedMessage encodedMessage) {
                                                 return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
                                             }
                                         };
-                                        //保持设备一直在线.(通过短连接上报数据的场景.可以让设备一直为在线状态)
-                                        if (message.getHeader(Headers.keepOnline).orElse(false)) {
+                                        //保持设备一直在线.(短连接上报数据的场景.可以让设备一直为在线状态)
+                                        if (keepOnline) {
                                             fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
                                         } else {
                                             client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));