|
@@ -32,6 +32,7 @@ import reactor.core.publisher.FluxSink;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.util.function.Tuples;
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
|
|
|
+import javax.annotation.Nonnull;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
import java.util.function.Function;
|
|
import java.util.function.Function;
|
|
@@ -120,11 +121,11 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
|
|
con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
|
|
con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
|
|
gatewayMonitor.rejected();
|
|
gatewayMonitor.rejected();
|
|
}))
|
|
}))
|
|
- .onErrorContinue((err, res) -> {
|
|
|
|
|
|
+ .onErrorResume((err) -> Mono.fromRunnable(() -> {
|
|
gatewayMonitor.rejected();
|
|
gatewayMonitor.rejected();
|
|
con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
|
con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
|
log.error("MQTT连接认证[{}]失败", con.getClientId(), err);
|
|
log.error("MQTT连接认证[{}]失败", con.getClientId(), err);
|
|
- }))
|
|
|
|
|
|
+ })))
|
|
.flatMap(tuple3 -> {
|
|
.flatMap(tuple3 -> {
|
|
counter.increment();
|
|
counter.increment();
|
|
DeviceOperator device = tuple3.getT1();
|
|
DeviceOperator device = tuple3.getT1();
|
|
@@ -155,7 +156,7 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
|
|
}
|
|
}
|
|
return Mono.empty();
|
|
return Mono.empty();
|
|
})
|
|
})
|
|
- .onErrorContinue((err, res) -> log.error("处理MQTT连接失败", err))
|
|
|
|
|
|
+ .onErrorResume((err) -> Mono.fromRunnable(() -> log.error("处理MQTT连接失败", err)))
|
|
.subscribe(tp -> tp.getT1()
|
|
.subscribe(tp -> tp.getT1()
|
|
.handleMessage()
|
|
.handleMessage()
|
|
.filter(pb -> started.get())
|
|
.filter(pb -> started.get())
|
|
@@ -171,6 +172,7 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
|
+ @Nonnull
|
|
public EncodedMessage getMessage() {
|
|
public EncodedMessage getMessage() {
|
|
return publishing.getMessage();
|
|
return publishing.getMessage();
|
|
}
|
|
}
|
|
@@ -181,9 +183,10 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
|
|
}
|
|
}
|
|
return messageHandler.handleMessage(tp.getT2(), msg);
|
|
return messageHandler.handleMessage(tp.getT2(), msg);
|
|
})
|
|
})
|
|
- .onErrorContinue((err, res) -> log.error("处理MQTT连接[{}]消息失败:{}", tp.getT2().getDeviceId(), publishing.getMessage(), err)))
|
|
|
|
- .subscribe()
|
|
|
|
- );
|
|
|
|
|
|
+ .onErrorResume((err) ->
|
|
|
|
+ Mono.fromRunnable(() -> log.error("处理MQTT连接[{}]消息失败:{}", tp.getT2().getDeviceId(), publishing.getMessage(), err))
|
|
|
|
+ ))
|
|
|
|
+ .subscribe());
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|