|
@@ -3,6 +3,7 @@ package org.jetlinks.community.network.tcp.device;
|
|
|
import io.netty.buffer.ByteBufUtil;
|
|
|
import lombok.Getter;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.hswebframework.web.logger.ReactiveLogger;
|
|
|
import org.jetlinks.core.ProtocolSupport;
|
|
|
import org.jetlinks.core.ProtocolSupports;
|
|
|
import org.jetlinks.core.device.DeviceOperator;
|
|
@@ -113,7 +114,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
|
|
|
disposable.add(tcpServer
|
|
|
.handleConnection()
|
|
|
- .flatMap(client -> {
|
|
|
+ .subscribe(client -> {
|
|
|
InetSocketAddress clientAddr = client.getRemoteAddress();
|
|
|
counter.increment();
|
|
|
gatewayMonitor.totalConnection(counter.intValue());
|
|
@@ -124,9 +125,13 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
});
|
|
|
AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
|
|
|
AtomicReference<DeviceSession> sessionRef = new AtomicReference<>(sessionManager.getSession(client.getId()));
|
|
|
- return client.subscribe()
|
|
|
+ client.subscribe()
|
|
|
.filter(r -> started.get())
|
|
|
- .doOnNext(r -> gatewayMonitor.receivedMessage())
|
|
|
+ .takeWhile(r -> disposable != null)
|
|
|
+ .doOnNext(r -> {
|
|
|
+ log.debug("收到TCP报文:\n{}", r);
|
|
|
+ gatewayMonitor.receivedMessage();
|
|
|
+ })
|
|
|
.flatMap(tcpMessage -> getProtocol()
|
|
|
.flatMap(pt -> pt.getMessageCodec(getTransport()))
|
|
|
.flatMapMany(codec -> codec.decode(new FromDeviceMessageContext() {
|
|
@@ -161,9 +166,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
}
|
|
|
}))
|
|
|
.switchIfEmpty(Mono.fromRunnable(() ->
|
|
|
- log.warn("无法识别的TCP客户端[{}]消息:[{}]",
|
|
|
+ log.warn("无法识别的TCP客户端[{}]消息:\n{}",
|
|
|
clientAddr,
|
|
|
- ByteBufUtil.hexDump(tcpMessage.getPayload())
|
|
|
+ tcpMessage
|
|
|
)))
|
|
|
.cast(DeviceMessage.class)
|
|
|
.flatMap(message -> registry
|
|
@@ -180,7 +185,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
DeviceSession fSession = sessionRef.get() == null ?
|
|
|
sessionManager.getSession(device.getDeviceId()) :
|
|
|
sessionRef.get();
|
|
|
-
|
|
|
//处理设备上线消息
|
|
|
if (message instanceof DeviceOnlineMessage) {
|
|
|
if (fSession == null) {
|
|
@@ -220,16 +224,15 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
}
|
|
|
return clientMessageHandler.handleMessage(device, message);
|
|
|
}))
|
|
|
- .onErrorResume((err) -> {
|
|
|
- log.error("处理TCP[{}]消息[{}]失败",
|
|
|
- clientAddr,
|
|
|
- ByteBufUtil.hexDump(tcpMessage.getPayload())
|
|
|
- , err);
|
|
|
- return Mono.empty();
|
|
|
- }
|
|
|
- ));
|
|
|
- })
|
|
|
- .subscribe());
|
|
|
+ .doOnEach(ReactiveLogger.onError(err ->
|
|
|
+ log.error("处理TCP[{}]消息失败:\n{}",
|
|
|
+ clientAddr,
|
|
|
+ tcpMessage
|
|
|
+ , err))))
|
|
|
+ .onErrorResume((err) -> Mono.empty())
|
|
|
+ .subscriberContext(ReactiveLogger.start("network", tcpServer.getId()))
|
|
|
+ .subscribe();
|
|
|
+ }));
|
|
|
}
|
|
|
|
|
|
@Override
|