|
@@ -101,7 +101,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
.publishOn(Schedulers.parallel())
|
|
|
.flatMap(this::handleConnection)
|
|
|
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
|
|
|
- .flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()) , Integer.MAX_VALUE)
|
|
|
+ .flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
|
|
|
.onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
|
|
|
.subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
|
|
|
.subscribe();
|
|
@@ -231,24 +231,26 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
sink.next(msg);
|
|
|
}
|
|
|
String deviceId = msg.getDeviceId();
|
|
|
- //返回了其他设备的消息,则自动创建会话
|
|
|
- if (!deviceId.equals(operator.getDeviceId())) {
|
|
|
- DeviceSession anotherSession = sessionManager.getSession(msg.getDeviceId());
|
|
|
- if (anotherSession == null) {
|
|
|
-
|
|
|
- connection.onClose(c -> sessionManager.unregister(deviceId));
|
|
|
-
|
|
|
- return registry
|
|
|
- .getDevice(msg.getDeviceId())
|
|
|
- .doOnNext(device -> sessionManager.register(
|
|
|
- new MqttConnectionSession(msg.getDeviceId(), device, getTransport(), connection) {
|
|
|
- @Override
|
|
|
- public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
|
- return super.send(encodedMessage).doOnSuccess(s -> gatewayMonitor.sentMessage());
|
|
|
- }
|
|
|
- }))
|
|
|
- .then(messageHandler.handleMessage(operator, msg))
|
|
|
- ;
|
|
|
+ if (!StringUtils.isEmpty(deviceId)) {
|
|
|
+ //返回了其他设备的消息,则自动创建会话
|
|
|
+ if (!deviceId.equals(operator.getDeviceId())) {
|
|
|
+ DeviceSession anotherSession = sessionManager.getSession(msg.getDeviceId());
|
|
|
+ if (anotherSession == null) {
|
|
|
+
|
|
|
+ connection.onClose(c -> sessionManager.unregister(deviceId));
|
|
|
+
|
|
|
+ return registry
|
|
|
+ .getDevice(msg.getDeviceId())
|
|
|
+ .doOnNext(device -> sessionManager.register(
|
|
|
+ new MqttConnectionSession(msg.getDeviceId(), device, getTransport(), connection) {
|
|
|
+ @Override
|
|
|
+ public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
|
|
+ return super.send(encodedMessage).doOnSuccess(s -> gatewayMonitor.sentMessage());
|
|
|
+ }
|
|
|
+ }))
|
|
|
+ .then(messageHandler.handleMessage(operator, msg))
|
|
|
+ ;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
//丢给默认的消息处理逻辑
|