|
@@ -176,15 +176,33 @@ public class DeviceGatewayHelper {
|
|
|
return Mono.empty();
|
|
|
}))
|
|
|
.flatMap(device -> {
|
|
|
+ //忽略会话管理,比如一个设备存在多种接入方式时,其中一种接入方式收到的消息设置忽略会话来防止会话冲突
|
|
|
+ if (message.getHeader(Headers.ignoreSession).orElse(false)) {
|
|
|
+ if (!isDoRegister(message)) {
|
|
|
+ return messageHandler
|
|
|
+ .handleMessage(device, message)
|
|
|
+ .thenReturn(device);
|
|
|
+ }
|
|
|
+ return Mono.just(device);
|
|
|
+ }
|
|
|
+ //session已经存在了,可能是并发创建.
|
|
|
+ DeviceSession trySession = sessionManager.getSession(deviceId);
|
|
|
+ if (trySession != null) {
|
|
|
+ trySession.keepAlive();
|
|
|
+ return Mono.just(device);
|
|
|
+ }
|
|
|
DeviceSession newSession = sessionBuilder.apply(device);
|
|
|
if (null != newSession) {
|
|
|
- //保持会话,在低功率设备上,可能无法保持mqtt长连接.
|
|
|
+ //保持会话,在低功率设备上,可能无法保持长连接.
|
|
|
if (message.getHeader(Headers.keepOnline).orElse(false)) {
|
|
|
int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
|
|
|
newSession = new KeepOnlineSession(newSession, Duration.ofSeconds(timeout));
|
|
|
}
|
|
|
+ //注册会话
|
|
|
sessionManager.register(newSession);
|
|
|
+ //执行自定义会话回调
|
|
|
sessionConsumer.accept(newSession);
|
|
|
+ //保活
|
|
|
newSession.keepAlive();
|
|
|
if (!(message instanceof DeviceRegisterMessage) &&
|
|
|
!(message instanceof DeviceOnlineMessage)) {
|