|
@@ -194,33 +194,32 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
|
.flatMap(device -> {
|
|
|
DeviceSession fSession = sessionManager.getSession(device.getDeviceId());
|
|
|
//处理设备上线消息
|
|
|
- if (message instanceof DeviceOnlineMessage) {
|
|
|
- if (fSession == null) {
|
|
|
- boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false);
|
|
|
- String sessionId = device.getDeviceId();
|
|
|
- fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) {
|
|
|
- @Override
|
|
|
- public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
|
- return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
|
|
- }
|
|
|
- };
|
|
|
- //保持设备一直在线.(短连接上报数据的场景.可以让设备一直为在线状态)
|
|
|
- if (keepOnline) {
|
|
|
- fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
|
|
|
- } else {
|
|
|
- client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));
|
|
|
+ if (fSession == null) {
|
|
|
+ boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false);
|
|
|
+ String sessionId = device.getDeviceId();
|
|
|
+ fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) {
|
|
|
+ @Override
|
|
|
+ public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
|
+ return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
|
|
}
|
|
|
- sessionRef.set(fSession);
|
|
|
- sessionManager.register(fSession);
|
|
|
- }
|
|
|
- fSession.keepAlive();
|
|
|
- if (keepaliveTimeout.get() != null) {
|
|
|
- fSession.setKeepAliveTimeout(keepaliveTimeout.get());
|
|
|
+ };
|
|
|
+ //保持设备一直在线.(短连接上报数据的场景.可以让设备一直为在线状态)
|
|
|
+ if (keepOnline) {
|
|
|
+ fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
|
|
|
+ } else {
|
|
|
+ client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));
|
|
|
}
|
|
|
- return Mono.empty();
|
|
|
+ sessionRef.set(fSession);
|
|
|
+ sessionManager.register(fSession);
|
|
|
}
|
|
|
- if (fSession != null) {
|
|
|
- fSession.keepAlive();
|
|
|
+ fSession.keepAlive();
|
|
|
+ Duration timeout = message.getHeader(Headers.keepOnlineTimeoutSeconds).map(Duration::ofSeconds).orElse(keepaliveTimeout.get());
|
|
|
+ if (timeout != null) {
|
|
|
+ fSession.setKeepAliveTimeout(timeout);
|
|
|
+ }
|
|
|
+ fSession.keepAlive();
|
|
|
+ if (message instanceof DeviceOnlineMessage) {
|
|
|
+ return Mono.empty();
|
|
|
}
|
|
|
//设备下线
|
|
|
if (message instanceof DeviceOfflineMessage) {
|