浏览代码

增加网关设备自动注册

zhou-hao 5 年之前
父节点
当前提交
11e60d7329

+ 48 - 31
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -1,6 +1,7 @@
 package org.jetlinks.community.device.message;
 
 import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.core.Values;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.event.EventMessage;
@@ -14,6 +15,7 @@ import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 
 import javax.annotation.Nonnull;
+import java.util.HashMap;
 import java.util.function.Function;
 
 /**
@@ -89,7 +91,6 @@ public class DeviceMessageConnector
             .then();
     }
 
-
     public Mono<String> getTopic(Message message) {
         if (message instanceof DeviceMessage) {
             DeviceMessage deviceMessage = ((DeviceMessage) message);
@@ -101,43 +102,16 @@ public class DeviceMessageConnector
                 .getDevice(deviceId)
                 //获取设备配置是可能存在的性能瓶颈
                 .flatMap(operator -> operator.getSelfConfigs(appendConfigHeader))
+                .switchIfEmpty(Mono.fromSupplier(() -> Values.of(new HashMap<>())))
                 .flatMap(configs -> {
                     configs.getAllValues().forEach(deviceMessage::addHeader);
-                    String topic;
-
-                    // TODO: 2019/12/28 自定义topic支持?
-
-                    if (message instanceof EventMessage) {   //事件
-                        EventMessage event = ((EventMessage) message);
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/event/".concat(event.getEvent());
-                    } else if (message instanceof ReportPropertyMessage) {   //上报属性
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/property/report";
-                    } else if (message instanceof DeviceOnlineMessage) {   //设备上线
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/online";
-                    } else if (message instanceof DeviceOfflineMessage) {   //设备离线
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/offline";
-                    } else if (message instanceof ChildDeviceMessage) { //子设备消息
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/children";
+                    String topic = "/device/".concat(deviceId).concat(createDeviceMessageTopic(message));
+                    if (message instanceof ChildDeviceMessage) { //子设备消息
                         return onMessage(((ChildDeviceMessage) message).getChildDeviceMessage())
                             .thenReturn(topic);
                     } else if (message instanceof ChildDeviceMessageReply) { //子设备消息
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/children/reply";
                         return onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage())
                             .thenReturn(topic);
-                    } else if (message instanceof ReadPropertyMessage) { //读取属性
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/property/read";
-                    } else if (message instanceof WritePropertyMessage) { //修改属性
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/property/write";
-                    } else if (message instanceof FunctionInvokeMessage) { //调用功能
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/function/reply";
-                    } else if (message instanceof ReadPropertyMessageReply) { //读取属性回复
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/property/read/reply";
-                    } else if (message instanceof WritePropertyMessageReply) { //修改属性回复
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/property/write/reply";
-                    } else if (message instanceof FunctionInvokeMessageReply) { //调用功能回复
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/function/reply";
-                    } else {
-                        topic = "/device/" + deviceMessage.getDeviceId() + "/message/unknown";
                     }
                     return Mono.just(topic);
                 });
@@ -146,6 +120,49 @@ public class DeviceMessageConnector
         return Mono.just("/device/unknown/message/unknown");
     }
 
+    public String createDeviceMessageTopic(Message message) {
+        if (message instanceof EventMessage) {   //事件
+            EventMessage event = ((EventMessage) message);
+            return "/message/event/".concat(event.getEvent());
+        } else if (message instanceof ReportPropertyMessage) {   //上报属性
+            return "/message/property/report";
+        } else if (message instanceof DeviceOnlineMessage) {   //设备上线
+            return "/online";
+        } else if (message instanceof DeviceOfflineMessage) {   //设备离线
+            return "/offline";
+        } else if (message instanceof ChildDeviceMessage) { //子设备消息
+            Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage();
+            if (msg instanceof DeviceMessage) {
+                return "/message/children/".concat(((DeviceMessage) msg).getDeviceId()).concat(createDeviceMessageTopic(msg));
+            }
+            return "/message/children/".concat(createDeviceMessageTopic(message));
+        } else if (message instanceof ChildDeviceMessageReply) { //子设备消息
+            Message msg = ((ChildDeviceMessageReply) message).getChildDeviceMessage();
+            if (msg instanceof DeviceMessage) {
+                return "/message/children/reply/".concat(((DeviceMessage) msg).getDeviceId()).concat(createDeviceMessageTopic(msg));
+            }
+            return "/message/children/reply/".concat(createDeviceMessageTopic(message));
+        } else if (message instanceof ReadPropertyMessage) { //读取属性
+            return "/message/property/read";
+        } else if (message instanceof WritePropertyMessage) { //修改属性
+            return "/message/property/write";
+        } else if (message instanceof FunctionInvokeMessage) { //调用功能
+            return "/message/function/reply";
+        } else if (message instanceof ReadPropertyMessageReply) { //读取属性回复
+            return "/message/property/read/reply";
+        } else if (message instanceof WritePropertyMessageReply) { //修改属性回复
+            return "/message/property/write/reply";
+        } else if (message instanceof FunctionInvokeMessageReply) { //调用功能回复
+            return "/message/function/reply";
+        } else if (message instanceof DeviceRegisterMessage) { //注册
+            return "/register";
+        } else if (message instanceof DeviceUnRegisterMessage) { //注销
+            return "/unregister";
+        } else {
+            return "/message/unknown";
+        }
+    }
+
     @Nonnull
     @Override
     public Flux<MessageConnection> onConnection() {

+ 46 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -21,12 +21,11 @@ import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
 import org.jetlinks.community.device.message.DeviceMessageUtils;
 import org.jetlinks.community.gateway.Subscription;
+import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
-import org.jetlinks.core.message.DeviceMessage;
-import org.jetlinks.core.message.DeviceOfflineMessage;
-import org.jetlinks.core.message.DeviceOnlineMessage;
+import org.jetlinks.core.message.*;
 import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.DeviceMetadata;
 import org.jetlinks.core.metadata.EventMetadata;
@@ -389,4 +388,48 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
                 }));
     }
 
+
+    @Subscribe("/device/*/message/children/*/register")
+    public Mono<Void> autoBindChildrenDevice(ChildDeviceMessage message) {
+        String childId = message.getChildDeviceId();
+        Message childMessage = message.getChildDeviceMessage();
+        if (childMessage instanceof DeviceRegisterMessage) {
+            return registry.getDevice(message.getDeviceId())
+                .flatMap(DeviceOperator::getState)
+                .flatMap(state -> createUpdate()
+                    .set(DeviceInstanceEntity::getParentId, message.getDeviceId())
+                    .set(DeviceInstanceEntity::getState, DeviceState.of(state))
+                    .where(DeviceInstanceEntity::getId, childId)
+                    .execute()
+                    .then(registry
+                        .getDevice(childId)
+                        .flatMap(dev -> dev.setConfig(DeviceConfigKey.parentGatewayId, message.getDeviceId())))
+                    .then());
+        }
+        return Mono.empty();
+    }
+
+    @Subscribe("/device/*/message/children/*/unregister")
+    public Mono<Void> autoUnbindChildrenDevice(ChildDeviceMessage message) {
+        String childId = message.getChildDeviceId();
+        Message childMessage = message.getChildDeviceMessage();
+        if (childMessage instanceof DeviceUnRegisterMessage) {
+
+            return registry.getDevice(childId)
+                .flatMap(dev -> dev
+                    .removeConfig(DeviceConfigKey.parentGatewayId.getKey())
+                    .then(dev.checkState()))
+                .flatMap(state -> createUpdate()
+                    .setNull(DeviceInstanceEntity::getParentId)
+                    .set(DeviceInstanceEntity::getState, DeviceState.of(state))
+                    .where(DeviceInstanceEntity::getId, childId)
+                    .execute()
+                    .then());
+
+
+        }
+        return Mono.empty();
+    }
+
+
 }