|
@@ -30,6 +30,7 @@ import reactor.core.publisher.FluxSink;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
import javax.annotation.Nonnull;
|
|
import javax.annotation.Nonnull;
|
|
|
|
+import java.net.InetSocketAddress;
|
|
import java.time.Duration;
|
|
import java.time.Duration;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
@@ -113,6 +114,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
disposable.add(tcpServer
|
|
disposable.add(tcpServer
|
|
.handleConnection()
|
|
.handleConnection()
|
|
.flatMap(client -> {
|
|
.flatMap(client -> {
|
|
|
|
+ InetSocketAddress clientAddr=client.getRemoteAddress();
|
|
counter.increment();
|
|
counter.increment();
|
|
gatewayMonitor.totalConnection(counter.intValue());
|
|
gatewayMonitor.totalConnection(counter.intValue());
|
|
client.onDisconnect(() -> {
|
|
client.onDisconnect(() -> {
|
|
@@ -160,7 +162,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
}))
|
|
}))
|
|
.switchIfEmpty(Mono.fromRunnable(() ->
|
|
.switchIfEmpty(Mono.fromRunnable(() ->
|
|
log.warn("无法识别的TCP客户端[{}]消息:[{}]",
|
|
log.warn("无法识别的TCP客户端[{}]消息:[{}]",
|
|
- client.getRemoteAddress(),
|
|
|
|
|
|
+ clientAddr,
|
|
ByteBufUtil.hexDump(tcpMessage.getPayload())
|
|
ByteBufUtil.hexDump(tcpMessage.getPayload())
|
|
)))
|
|
)))
|
|
.cast(DeviceMessage.class)
|
|
.cast(DeviceMessage.class)
|
|
@@ -169,7 +171,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
.switchIfEmpty(Mono.fromRunnable(() -> {
|
|
.switchIfEmpty(Mono.fromRunnable(() -> {
|
|
log.warn("设备[{}]未注册,TCP[{}]消息:[{}],设备消息:{}",
|
|
log.warn("设备[{}]未注册,TCP[{}]消息:[{}],设备消息:{}",
|
|
message.getDeviceId(),
|
|
message.getDeviceId(),
|
|
- client.getRemoteAddress(),
|
|
|
|
|
|
+ clientAddr,
|
|
ByteBufUtil.hexDump(tcpMessage.getPayload()),
|
|
ByteBufUtil.hexDump(tcpMessage.getPayload()),
|
|
message
|
|
message
|
|
);
|
|
);
|
|
@@ -206,7 +208,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
sessionManager.unregister(device.getDeviceId());
|
|
sessionManager.unregister(device.getDeviceId());
|
|
return Mono.empty();
|
|
return Mono.empty();
|
|
}
|
|
}
|
|
- message.addHeaderIfAbsent(Headers.clientAddress, String.valueOf(client.getRemoteAddress()));
|
|
|
|
|
|
+ message.addHeaderIfAbsent(Headers.clientAddress, String.valueOf(clientAddr));
|
|
|
|
|
|
if (processor.hasDownstreams()) {
|
|
if (processor.hasDownstreams()) {
|
|
sink.next(message);
|
|
sink.next(message);
|
|
@@ -215,10 +217,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
}))
|
|
}))
|
|
.onErrorContinue((err, o) ->
|
|
.onErrorContinue((err, o) ->
|
|
log.error("处理TCP[{}]消息[{}]失败",
|
|
log.error("处理TCP[{}]消息[{}]失败",
|
|
- client.getRemoteAddress(),
|
|
|
|
|
|
+ clientAddr,
|
|
ByteBufUtil.hexDump(tcpMessage.getPayload())
|
|
ByteBufUtil.hexDump(tcpMessage.getPayload())
|
|
- , err))
|
|
|
|
- );
|
|
|
|
|
|
+ , err)));
|
|
}).subscribe());
|
|
}).subscribe());
|
|
}
|
|
}
|
|
|
|
|