|
@@ -44,23 +44,31 @@ import java.util.function.Function;
|
|
|
class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
|
|
|
|
|
|
@Getter
|
|
|
- private String id;
|
|
|
+ private final String id;
|
|
|
|
|
|
- private TcpServer tcpServer;
|
|
|
+ private final TcpServer tcpServer;
|
|
|
|
|
|
- private String protocol;
|
|
|
+ private final String protocol;
|
|
|
|
|
|
- private ProtocolSupports supports;
|
|
|
+ private final ProtocolSupports supports;
|
|
|
|
|
|
- private DeviceRegistry registry;
|
|
|
+ private final DeviceRegistry registry;
|
|
|
|
|
|
- private DecodedClientMessageHandler clientMessageHandler;
|
|
|
+ private final DecodedClientMessageHandler clientMessageHandler;
|
|
|
|
|
|
- private DeviceSessionManager sessionManager;
|
|
|
+ private final DeviceSessionManager sessionManager;
|
|
|
|
|
|
- private DeviceGatewayMonitor gatewayMonitor;
|
|
|
+ private final DeviceGatewayMonitor gatewayMonitor;
|
|
|
|
|
|
- private LongAdder counter = new LongAdder();
|
|
|
+ private final LongAdder counter = new LongAdder();
|
|
|
+
|
|
|
+ private final EmitterProcessor<Message> processor = EmitterProcessor.create(false);
|
|
|
+
|
|
|
+ private final FluxSink<Message> sink = processor.sink();
|
|
|
+
|
|
|
+ private final AtomicBoolean started = new AtomicBoolean();
|
|
|
+
|
|
|
+ private final List<Disposable> disposable = new CopyOnWriteArrayList<>();
|
|
|
|
|
|
public TcpServerDeviceGateway(String id,
|
|
|
String protocol,
|
|
@@ -80,12 +88,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
}
|
|
|
|
|
|
|
|
|
- private EmitterProcessor<Message> processor = EmitterProcessor.create(false);
|
|
|
-
|
|
|
- private FluxSink<Message> sink = processor.sink();
|
|
|
-
|
|
|
- private AtomicBoolean started = new AtomicBoolean();
|
|
|
-
|
|
|
public Mono<ProtocolSupport> getProtocol() {
|
|
|
return supports.getProtocol(protocol);
|
|
|
}
|
|
@@ -105,8 +107,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
return DefaultNetworkType.TCP_SERVER;
|
|
|
}
|
|
|
|
|
|
- private List<Disposable> disposable = new CopyOnWriteArrayList<>();
|
|
|
-
|
|
|
private void doStart() {
|
|
|
if (started.getAndSet(true) || !disposable.isEmpty()) {
|
|
|
return;
|
|
@@ -127,7 +127,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
AtomicReference<DeviceSession> sessionRef = new AtomicReference<>(sessionManager.getSession(client.getId()));
|
|
|
client.subscribe()
|
|
|
.filter(r -> started.get())
|
|
|
- .takeWhile(r -> disposable != null)
|
|
|
+ .takeWhile(r -> !disposable.isEmpty())
|
|
|
.doOnNext(r -> {
|
|
|
log.debug("收到TCP报文:\n{}", r);
|
|
|
gatewayMonitor.receivedMessage();
|
|
@@ -165,11 +165,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
return getSession().getOperator();
|
|
|
}
|
|
|
}))
|
|
|
- .switchIfEmpty(Mono.fromRunnable(() ->
|
|
|
- log.warn("无法识别的TCP客户端[{}]消息:\n{}",
|
|
|
- clientAddr,
|
|
|
- tcpMessage
|
|
|
- )))
|
|
|
.cast(DeviceMessage.class)
|
|
|
.flatMap(message -> registry
|
|
|
.getDevice(message.getDeviceId())
|
|
@@ -182,20 +177,20 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
);
|
|
|
}))
|
|
|
.flatMap(device -> {
|
|
|
- DeviceSession fSession = sessionRef.get() == null ?
|
|
|
- sessionManager.getSession(device.getDeviceId()) :
|
|
|
- sessionRef.get();
|
|
|
+ DeviceSession fSession = sessionManager.getSession(device.getDeviceId());
|
|
|
//处理设备上线消息
|
|
|
if (message instanceof DeviceOnlineMessage) {
|
|
|
if (fSession == null) {
|
|
|
- fSession = new TcpDeviceSession(client.getId(), device, client, getTransport()) {
|
|
|
+ boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false);
|
|
|
+ String sessionId = device.getDeviceId();
|
|
|
+ fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) {
|
|
|
@Override
|
|
|
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
|
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
|
|
}
|
|
|
};
|
|
|
- //保持设备一直在线.(通过短连接上报数据的场景.可以让设备一直为在线状态)
|
|
|
- if (message.getHeader(Headers.keepOnline).orElse(false)) {
|
|
|
+ //保持设备一直在线.(短连接上报数据的场景.可以让设备一直为在线状态)
|
|
|
+ if (keepOnline) {
|
|
|
fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
|
|
|
} else {
|
|
|
client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));
|