|
@@ -7,6 +7,7 @@ import org.hswebframework.web.logger.ReactiveLogger;
|
|
|
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
|
|
|
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
|
|
|
import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
|
|
|
+import org.jetlinks.core.ProtocolSupport;
|
|
|
import org.jetlinks.core.device.AuthenticationResponse;
|
|
|
import org.jetlinks.core.device.DeviceOperator;
|
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
@@ -63,19 +64,24 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
|
|
|
private final AtomicBoolean started = new AtomicBoolean();
|
|
|
|
|
|
+ private final Mono<ProtocolSupport> supportMono;
|
|
|
+
|
|
|
private Disposable disposable;
|
|
|
|
|
|
public MqttServerDeviceGateway(String id,
|
|
|
DeviceRegistry registry,
|
|
|
DeviceSessionManager sessionManager,
|
|
|
MqttServer mqttServer,
|
|
|
- DecodedClientMessageHandler messageHandler) {
|
|
|
+ DecodedClientMessageHandler messageHandler,
|
|
|
+ Mono<ProtocolSupport> customProtocol
|
|
|
+ ) {
|
|
|
this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
|
|
|
this.id = id;
|
|
|
this.registry = registry;
|
|
|
this.sessionManager = sessionManager;
|
|
|
this.mqttServer = mqttServer;
|
|
|
this.messageHandler = messageHandler;
|
|
|
+ this.supportMono = customProtocol;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -88,7 +94,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
if (started.getAndSet(true) || disposable != null) {
|
|
|
return;
|
|
|
}
|
|
|
- disposable = (Disposable) mqttServer
|
|
|
+ disposable = mqttServer
|
|
|
.handleConnection()
|
|
|
.filter(conn -> {
|
|
|
if (!started.get()) {
|
|
@@ -110,30 +116,29 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
|
|
|
//处理连接,并进行认证
|
|
|
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
|
|
|
- return Mono.justOrEmpty(connection.getAuth())
|
|
|
- //没有认证信息,则拒绝连接.
|
|
|
- .switchIfEmpty(Mono.fromRunnable(() -> {
|
|
|
- connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
|
|
|
- gatewayMonitor.rejected();
|
|
|
- }))
|
|
|
- .flatMap(auth ->
|
|
|
- registry.getDevice(connection.getClientId())
|
|
|
- .flatMap(device -> device
|
|
|
- .authenticate(new MqttAuthenticationRequest(connection.getClientId(), auth.getUsername(), auth.getPassword(), getTransport()))
|
|
|
- .switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)))
|
|
|
- .flatMap(resp -> {
|
|
|
- String deviceId = StringUtils.isEmpty(resp.getDeviceId()) ? device.getDeviceId() : resp.getDeviceId();
|
|
|
- //认证返回了新的设备ID,则使用新的设备
|
|
|
- if (!deviceId.equals(device.getDeviceId())) {
|
|
|
- return registry
|
|
|
- .getDevice(deviceId)
|
|
|
- .map(operator -> Tuples.of(operator, resp, connection))
|
|
|
- .switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)))
|
|
|
- ;
|
|
|
- }
|
|
|
- return Mono.just(Tuples.of(device, resp, connection));
|
|
|
- })
|
|
|
- ))
|
|
|
+ return Mono
|
|
|
+ .justOrEmpty(connection.getAuth())
|
|
|
+ .flatMap(auth -> {
|
|
|
+ MqttAuthenticationRequest request = new MqttAuthenticationRequest(connection.getClientId(), auth.getUsername(), auth.getPassword(), getTransport());
|
|
|
+ return supportMono
|
|
|
+ //使用自定义协议来认证
|
|
|
+ .map(support -> support.authenticate(request, registry))
|
|
|
+ .defaultIfEmpty(Mono.defer(() -> registry
|
|
|
+ .getDevice(connection.getClientId())
|
|
|
+ .flatMap(device -> device.authenticate(request))))
|
|
|
+ .flatMap(Function.identity())
|
|
|
+ .switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)));
|
|
|
+ })
|
|
|
+ .flatMap(resp -> {
|
|
|
+ String deviceId = StringUtils.isEmpty(resp.getDeviceId()) ? connection.getClientId() : resp.getDeviceId();
|
|
|
+ //认证返回了新的设备ID,则使用新的设备
|
|
|
+ return registry
|
|
|
+ .getDevice(deviceId)
|
|
|
+ .map(operator -> Tuples.of(operator, resp, connection))
|
|
|
+ .switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)))
|
|
|
+ ;
|
|
|
+ })
|
|
|
+ //设备注册信息不存在,拒绝连接
|
|
|
.onErrorResume((err) -> Mono.fromRunnable(() -> {
|
|
|
gatewayMonitor.rejected();
|
|
|
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|