|
@@ -114,7 +114,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
disposable.add(tcpServer
|
|
disposable.add(tcpServer
|
|
.handleConnection()
|
|
.handleConnection()
|
|
.flatMap(client -> {
|
|
.flatMap(client -> {
|
|
- InetSocketAddress clientAddr=client.getRemoteAddress();
|
|
|
|
|
|
+ InetSocketAddress clientAddr = client.getRemoteAddress();
|
|
counter.increment();
|
|
counter.increment();
|
|
gatewayMonitor.totalConnection(counter.intValue());
|
|
gatewayMonitor.totalConnection(counter.intValue());
|
|
client.onDisconnect(() -> {
|
|
client.onDisconnect(() -> {
|
|
@@ -123,7 +123,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
gatewayMonitor.totalConnection(counter.sum());
|
|
gatewayMonitor.totalConnection(counter.sum());
|
|
});
|
|
});
|
|
AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
|
|
AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
|
|
- DeviceSession session = sessionManager.getSession(client.getId());
|
|
|
|
|
|
+ AtomicReference<DeviceSession> sessionRef = new AtomicReference<>(sessionManager.getSession(client.getId()));
|
|
return client.subscribe()
|
|
return client.subscribe()
|
|
.filter(r -> started.get())
|
|
.filter(r -> started.get())
|
|
.doOnNext(r -> gatewayMonitor.receivedMessage())
|
|
.doOnNext(r -> gatewayMonitor.receivedMessage())
|
|
@@ -139,7 +139,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
@Override
|
|
@Override
|
|
public DeviceSession getSession() {
|
|
public DeviceSession getSession() {
|
|
//session还未注册
|
|
//session还未注册
|
|
- if (session == null) {
|
|
|
|
|
|
+ if (sessionRef.get() == null) {
|
|
return new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
|
|
return new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
|
|
@Override
|
|
@Override
|
|
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
@@ -152,7 +152,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
}
|
|
}
|
|
};
|
|
};
|
|
}
|
|
}
|
|
- return session;
|
|
|
|
|
|
+ return sessionRef.get();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -179,9 +179,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
.flatMap(device -> {
|
|
.flatMap(device -> {
|
|
//处理设备上线消息
|
|
//处理设备上线消息
|
|
if (message instanceof DeviceOnlineMessage) {
|
|
if (message instanceof DeviceOnlineMessage) {
|
|
- DeviceSession fSession = session == null ?
|
|
|
|
|
|
+ DeviceSession fSession = sessionRef.get() == null ?
|
|
sessionManager.getSession(device.getDeviceId()) :
|
|
sessionManager.getSession(device.getDeviceId()) :
|
|
- session;
|
|
|
|
|
|
+ sessionRef.get();
|
|
|
|
|
|
if (fSession == null) {
|
|
if (fSession == null) {
|
|
fSession = new TcpDeviceSession(client.getId(), device, client, getTransport()) {
|
|
fSession = new TcpDeviceSession(client.getId(), device, client, getTransport()) {
|
|
@@ -194,8 +194,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
if (message.getHeader(Headers.keepOnline).orElse(false)) {
|
|
if (message.getHeader(Headers.keepOnline).orElse(false)) {
|
|
fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
|
|
fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
|
|
} else {
|
|
} else {
|
|
- client.onDisconnect(() -> sessionManager.unregister(client.getId()));
|
|
|
|
|
|
+ client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));
|
|
}
|
|
}
|
|
|
|
+ sessionRef.set(fSession);
|
|
sessionManager.register(fSession);
|
|
sessionManager.register(fSession);
|
|
}
|
|
}
|
|
if (keepaliveTimeout.get() != null) {
|
|
if (keepaliveTimeout.get() != null) {
|
|
@@ -223,7 +224,8 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|
return Mono.empty();
|
|
return Mono.empty();
|
|
}
|
|
}
|
|
));
|
|
));
|
|
- }).subscribe());
|
|
|
|
|
|
+ })
|
|
|
|
+ .subscribe());
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|