|
@@ -4,26 +4,32 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
|
|
import lombok.Getter;
|
|
import lombok.Getter;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.hswebframework.web.logger.ReactiveLogger;
|
|
import org.hswebframework.web.logger.ReactiveLogger;
|
|
-import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
|
|
|
|
-import org.jetlinks.community.gateway.monitor.GatewayMonitors;
|
|
|
|
-import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
|
|
|
|
import org.jetlinks.core.ProtocolSupport;
|
|
import org.jetlinks.core.ProtocolSupport;
|
|
import org.jetlinks.core.device.AuthenticationResponse;
|
|
import org.jetlinks.core.device.AuthenticationResponse;
|
|
import org.jetlinks.core.device.DeviceOperator;
|
|
import org.jetlinks.core.device.DeviceOperator;
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
import org.jetlinks.core.device.DeviceRegistry;
|
|
import org.jetlinks.core.device.MqttAuthenticationRequest;
|
|
import org.jetlinks.core.device.MqttAuthenticationRequest;
|
|
-import org.jetlinks.core.message.*;
|
|
|
|
-import org.jetlinks.core.message.codec.*;
|
|
|
|
|
|
+import org.jetlinks.core.message.CommonDeviceMessage;
|
|
|
|
+import org.jetlinks.core.message.CommonDeviceMessageReply;
|
|
|
|
+import org.jetlinks.core.message.DeviceMessage;
|
|
|
|
+import org.jetlinks.core.message.Message;
|
|
|
|
+import org.jetlinks.core.message.codec.DefaultTransport;
|
|
|
|
+import org.jetlinks.core.message.codec.FromDeviceMessageContext;
|
|
|
|
+import org.jetlinks.core.message.codec.MqttMessage;
|
|
|
|
+import org.jetlinks.core.message.codec.Transport;
|
|
import org.jetlinks.core.server.session.DeviceSession;
|
|
import org.jetlinks.core.server.session.DeviceSession;
|
|
import org.jetlinks.core.server.session.DeviceSessionManager;
|
|
import org.jetlinks.core.server.session.DeviceSessionManager;
|
|
|
|
+import org.jetlinks.core.server.session.ReplaceableDeviceSession;
|
|
import org.jetlinks.community.gateway.DeviceGateway;
|
|
import org.jetlinks.community.gateway.DeviceGateway;
|
|
|
|
+import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
|
|
|
|
+import org.jetlinks.community.gateway.monitor.GatewayMonitors;
|
|
|
|
+import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
|
|
import org.jetlinks.community.network.DefaultNetworkType;
|
|
import org.jetlinks.community.network.DefaultNetworkType;
|
|
import org.jetlinks.community.network.NetworkType;
|
|
import org.jetlinks.community.network.NetworkType;
|
|
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
|
|
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
|
|
import org.jetlinks.community.network.mqtt.server.MqttConnection;
|
|
import org.jetlinks.community.network.mqtt.server.MqttConnection;
|
|
import org.jetlinks.community.network.mqtt.server.MqttServer;
|
|
import org.jetlinks.community.network.mqtt.server.MqttServer;
|
|
-import org.jetlinks.core.server.session.KeepOnlineSession;
|
|
|
|
-import org.jetlinks.core.server.session.ReplaceableDeviceSession;
|
|
|
|
|
|
+import org.jetlinks.community.network.utils.DeviceGatewayHelper;
|
|
import org.jetlinks.supports.server.DecodedClientMessageHandler;
|
|
import org.jetlinks.supports.server.DecodedClientMessageHandler;
|
|
import org.springframework.util.StringUtils;
|
|
import org.springframework.util.StringUtils;
|
|
import reactor.core.Disposable;
|
|
import reactor.core.Disposable;
|
|
@@ -35,8 +41,6 @@ import reactor.core.scheduler.Schedulers;
|
|
import reactor.util.function.Tuple3;
|
|
import reactor.util.function.Tuple3;
|
|
import reactor.util.function.Tuples;
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
|
-import javax.annotation.Nonnull;
|
|
|
|
-import java.time.Duration;
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
import java.util.concurrent.atomic.LongAdder;
|
|
import java.util.function.Function;
|
|
import java.util.function.Function;
|
|
@@ -69,13 +73,14 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
|
|
|
private Disposable disposable;
|
|
private Disposable disposable;
|
|
|
|
|
|
|
|
+ private final DeviceGatewayHelper helper;
|
|
|
|
+
|
|
public MqttServerDeviceGateway(String id,
|
|
public MqttServerDeviceGateway(String id,
|
|
DeviceRegistry registry,
|
|
DeviceRegistry registry,
|
|
DeviceSessionManager sessionManager,
|
|
DeviceSessionManager sessionManager,
|
|
MqttServer mqttServer,
|
|
MqttServer mqttServer,
|
|
DecodedClientMessageHandler messageHandler,
|
|
DecodedClientMessageHandler messageHandler,
|
|
- Mono<ProtocolSupport> customProtocol
|
|
|
|
- ) {
|
|
|
|
|
|
+ Mono<ProtocolSupport> customProtocol) {
|
|
this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
|
|
this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
|
|
this.id = id;
|
|
this.id = id;
|
|
this.registry = registry;
|
|
this.registry = registry;
|
|
@@ -83,6 +88,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
this.mqttServer = mqttServer;
|
|
this.mqttServer = mqttServer;
|
|
this.messageHandler = messageHandler;
|
|
this.messageHandler = messageHandler;
|
|
this.supportMono = customProtocol;
|
|
this.supportMono = customProtocol;
|
|
|
|
+ this.helper = new DeviceGatewayHelper(registry, sessionManager, messageHandler);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -115,11 +121,11 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
|
|
|
//处理连接,并进行认证
|
|
//处理连接,并进行认证
|
|
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
|
|
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
|
|
-
|
|
|
|
return Mono
|
|
return Mono
|
|
.justOrEmpty(connection.getAuth())
|
|
.justOrEmpty(connection.getAuth())
|
|
.flatMap(auth -> {
|
|
.flatMap(auth -> {
|
|
- MqttAuthenticationRequest request = new MqttAuthenticationRequest(connection.getClientId(), auth.getUsername(), auth.getPassword(), getTransport());
|
|
|
|
|
|
+ MqttAuthenticationRequest request = new MqttAuthenticationRequest(connection.getClientId(), auth.getUsername(), auth
|
|
|
|
+ .getPassword(), getTransport());
|
|
return supportMono
|
|
return supportMono
|
|
//使用自定义协议来认证
|
|
//使用自定义协议来认证
|
|
.map(support -> support.authenticate(request, registry))
|
|
.map(support -> support.authenticate(request, registry))
|
|
@@ -175,7 +181,11 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
gatewayMonitor.disconnected();
|
|
gatewayMonitor.disconnected();
|
|
gatewayMonitor.totalConnection(counter.sum());
|
|
gatewayMonitor.totalConnection(counter.sum());
|
|
});
|
|
});
|
|
- return Tuples.of(connection.accept(), device, newSession);
|
|
|
|
|
|
+ try {
|
|
|
|
+ return Tuples.of(connection.accept(), device, newSession);
|
|
|
|
+ } catch (IllegalStateException ignore) {
|
|
|
|
+
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
|
|
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
|
|
gatewayMonitor.rejected();
|
|
gatewayMonitor.rejected();
|
|
@@ -203,9 +213,9 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
.publishOn(Schedulers.parallel())
|
|
.publishOn(Schedulers.parallel())
|
|
.doOnNext(msg -> gatewayMonitor.receivedMessage())
|
|
.doOnNext(msg -> gatewayMonitor.receivedMessage())
|
|
.flatMap(publishing ->
|
|
.flatMap(publishing ->
|
|
- this.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
|
|
|
|
- //ack
|
|
|
|
- .doOnSuccess(s -> publishing.acknowledge())
|
|
|
|
|
|
+ this.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
|
|
|
|
+ //ack
|
|
|
|
+ .doOnSuccess(s -> publishing.acknowledge())
|
|
)
|
|
)
|
|
//合并遗言消息
|
|
//合并遗言消息
|
|
.mergeWith(
|
|
.mergeWith(
|
|
@@ -224,30 +234,25 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
return operator
|
|
return operator
|
|
.getProtocol()
|
|
.getProtocol()
|
|
.flatMap(protocol -> protocol.getMessageCodec(getTransport()))
|
|
.flatMap(protocol -> protocol.getMessageCodec(getTransport()))
|
|
- .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message,registry)))
|
|
|
|
|
|
+ .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message, registry)))
|
|
.cast(DeviceMessage.class)
|
|
.cast(DeviceMessage.class)
|
|
.flatMap(msg -> {
|
|
.flatMap(msg -> {
|
|
|
|
+ if (messageProcessor.hasDownstreams()) {
|
|
|
|
+ sink.next(msg);
|
|
|
|
+ }
|
|
if (msg instanceof CommonDeviceMessage) {
|
|
if (msg instanceof CommonDeviceMessage) {
|
|
CommonDeviceMessage _msg = ((CommonDeviceMessage) msg);
|
|
CommonDeviceMessage _msg = ((CommonDeviceMessage) msg);
|
|
if (StringUtils.isEmpty(_msg.getDeviceId())) {
|
|
if (StringUtils.isEmpty(_msg.getDeviceId())) {
|
|
_msg.setDeviceId(operator.getDeviceId());
|
|
_msg.setDeviceId(operator.getDeviceId());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- String deviceId = msg.getDeviceId();
|
|
|
|
- if (!StringUtils.isEmpty(deviceId)) {
|
|
|
|
- //返回了其他设备的消息,则自动创建会话
|
|
|
|
- if (!deviceId.equals(operator.getDeviceId())) {
|
|
|
|
- DeviceSession anotherSession = sessionManager.getSession(msg.getDeviceId());
|
|
|
|
- if (anotherSession == null) {
|
|
|
|
- return registry
|
|
|
|
- .getDevice(msg.getDeviceId())
|
|
|
|
- .map(device -> handleMessage(device.getDeviceId(), device, msg, session))
|
|
|
|
- .defaultIfEmpty(Mono.defer(()->handleMessage(msg.getDeviceId(), operator, msg, session)))
|
|
|
|
- .flatMap(Function.identity());
|
|
|
|
- }
|
|
|
|
|
|
+ if (msg instanceof CommonDeviceMessageReply) {
|
|
|
|
+ CommonDeviceMessageReply<?> _msg = ((CommonDeviceMessageReply<?>) msg);
|
|
|
|
+ if (StringUtils.isEmpty(_msg.getDeviceId())) {
|
|
|
|
+ _msg.setDeviceId(operator.getDeviceId());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- return handleMessage(deviceId, operator, msg, session);
|
|
|
|
|
|
+ return handleMessage(operator, msg, connection);
|
|
})
|
|
})
|
|
.then()
|
|
.then()
|
|
.doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), message, err)))
|
|
.doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), message, err)))
|
|
@@ -255,76 +260,27 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
;
|
|
;
|
|
}
|
|
}
|
|
|
|
|
|
- protected Mono<Boolean> handleOnlineOffline(DeviceMessage message, MqttConnectionSession firstSession) {
|
|
|
|
- if (message == null || message.getDeviceId() == null) {
|
|
|
|
- return Mono.just(false);
|
|
|
|
- }
|
|
|
|
- String deviceId = message.getDeviceId();
|
|
|
|
- Mono<Boolean> then = Mono.empty();
|
|
|
|
- if (message instanceof ChildDeviceMessage) {
|
|
|
|
- then = handleOnlineOffline((DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage(), firstSession);
|
|
|
|
- } else if (message instanceof ChildDeviceMessageReply) {
|
|
|
|
- then = handleOnlineOffline((DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage(), firstSession);
|
|
|
|
- } else if (message instanceof DeviceOnlineMessage) {
|
|
|
|
- then = Mono.just(true);
|
|
|
|
- } else if (message instanceof DeviceOfflineMessage) {
|
|
|
|
- sessionManager.unregister(deviceId);
|
|
|
|
- return Mono.just(true);
|
|
|
|
- }
|
|
|
|
- if (firstSession.isAlive()) {
|
|
|
|
- DeviceSession[] managedSession = new DeviceSession[]{
|
|
|
|
- sessionManager.getSession(deviceId)
|
|
|
|
- };
|
|
|
|
- //session 不存在,可能是同一个mqtt返回多个设备消息
|
|
|
|
- if (managedSession[0] == null) {
|
|
|
|
- then = registry
|
|
|
|
- .getDevice(deviceId)
|
|
|
|
- .doOnNext(device -> sessionManager
|
|
|
|
- .register(managedSession[0] =
|
|
|
|
- new MqttConnectionSession(deviceId,
|
|
|
|
- device,
|
|
|
|
- getTransport(),
|
|
|
|
- firstSession.getConnection(),
|
|
|
|
- gatewayMonitor)))
|
|
|
|
- .then(then);
|
|
|
|
- } else {
|
|
|
|
- managedSession[0] = firstSession;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //保持会话,在低功率设备上,可能无法保持mqtt长连接.
|
|
|
|
- if (message.getHeader(Headers.keepOnline).orElse(false)) {
|
|
|
|
- then = Mono
|
|
|
|
- .fromRunnable(() -> {
|
|
|
|
- DeviceSession session = managedSession[0];
|
|
|
|
- if (!session.isWrapFrom(KeepOnlineSession.class)) {
|
|
|
|
- int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
|
|
|
|
- KeepOnlineSession keepOnlineSession = new KeepOnlineSession(session, Duration.ofSeconds(timeout));
|
|
|
|
- //替换会话
|
|
|
|
- session = sessionManager.replace(session, keepOnlineSession);
|
|
|
|
- session.keepAlive();
|
|
|
|
- }
|
|
|
|
- })
|
|
|
|
- .then(then);
|
|
|
|
- }else if(managedSession[0]!=null) {
|
|
|
|
- managedSession[0].keepAlive();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return then;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private Mono<Void> handleMessage(String deviceId,
|
|
|
|
- DeviceOperator device,
|
|
|
|
|
|
+ private Mono<Void> handleMessage(DeviceOperator mainDevice,
|
|
DeviceMessage message,
|
|
DeviceMessage message,
|
|
- MqttConnectionSession firstSession) {
|
|
|
|
- if (messageProcessor.hasDownstreams()) {
|
|
|
|
- sink.next(message);
|
|
|
|
|
|
+ MqttConnection connection) {
|
|
|
|
+ if (!connection.isAlive()) {
|
|
|
|
+ return messageHandler
|
|
|
|
+ .handleMessage(mainDevice, message)
|
|
|
|
+ .then();
|
|
}
|
|
}
|
|
-
|
|
|
|
- return handleOnlineOffline(message, firstSession)
|
|
|
|
- //只有empty才转发消息
|
|
|
|
- .switchIfEmpty(messageHandler.handleMessage(device, message))
|
|
|
|
- .then();
|
|
|
|
|
|
+ return helper.handleDeviceMessage(message,
|
|
|
|
+ device -> new MqttConnectionSession(device.getDeviceId(),
|
|
|
|
+ device,
|
|
|
|
+ getTransport(),
|
|
|
|
+ connection,
|
|
|
|
+ gatewayMonitor),
|
|
|
|
+ session -> {
|
|
|
|
+
|
|
|
|
+ },
|
|
|
|
+ () -> {
|
|
|
|
+ log.warn("无法从MQTT[{}]消息中获取设备信息:{}", connection.getClientId(), message);
|
|
|
|
+ })
|
|
|
|
+ .then();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -339,8 +295,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Flux<Message> onMessage() {
|
|
public Flux<Message> onMessage() {
|
|
- return messageProcessor
|
|
|
|
- .map(Function.identity());
|
|
|
|
|
|
+ return messageProcessor;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|