Browse Source

增加物模型更新

zhou-hao 4 năm trước cách đây
mục cha
commit
67ffacffe0

+ 19 - 23
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -225,6 +225,8 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
             }
             appendDeviceMessageTopic(msg, builder);
         });
+        //上报了新的物模型
+        createFastBuilder(MessageType.DERIVED_METADATA, "/metadata/derived");
     }
 
     private static void createFastBuilder(MessageType messageType,
@@ -247,38 +249,37 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
         }
     }
 
-    protected Mono<Boolean> handleChildrenDeviceMessage(DeviceOperator device, String childrenId, Message message) {
+    protected Mono<Boolean> handleChildrenDeviceMessage(Message message) {
         if (message instanceof DeviceMessageReply) {
             return doReply(((DeviceMessageReply) message));
         }
+        //不处理子设备上下线,统一由 DeviceGatewayHelper处理
         return Mono.just(true);
     }
 
-    protected Mono<Boolean> handleChildrenDeviceMessageReply(DeviceOperator session, ChildDeviceMessage reply) {
-        return handleChildrenDeviceMessage(session, reply.getChildDeviceId(), reply.getChildDeviceMessage());
+    protected Mono<Boolean> handleChildrenDeviceMessageReply(ChildDeviceMessage reply) {
+        return handleChildrenDeviceMessage(reply.getChildDeviceMessage());
     }
 
-    protected Mono<Boolean> handleChildrenDeviceMessageReply(DeviceOperator session, ChildDeviceMessageReply reply) {
-        return handleChildrenDeviceMessage(session, reply.getChildDeviceId(), reply.getChildDeviceMessage());
+    protected Mono<Boolean> handleChildrenDeviceMessageReply(ChildDeviceMessageReply reply) {
+        return handleChildrenDeviceMessage(reply.getChildDeviceMessage());
     }
 
     @Override
     public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
+        Mono<Boolean> then;
+        if (message instanceof ChildDeviceMessageReply) {
+            then = handleChildrenDeviceMessageReply(((ChildDeviceMessageReply) message));
+        } else if (message instanceof ChildDeviceMessage) {
+            then = handleChildrenDeviceMessageReply(((ChildDeviceMessage) message));
+        } else if (message instanceof DeviceMessageReply) {
+            then = doReply(((DeviceMessageReply) message));
+        } else {
+            then = Mono.just(true);
+        }
         return this
             .onMessage(message)
-            .then(Mono.defer(() -> {
-                if (device != null) {
-                    if (message instanceof ChildDeviceMessageReply) {
-                        return handleChildrenDeviceMessageReply(device, ((ChildDeviceMessageReply) message));
-                    } else if (message instanceof ChildDeviceMessage) {
-                        return handleChildrenDeviceMessageReply(device, ((ChildDeviceMessage) message));
-                    }
-                }
-                if (message instanceof DeviceMessageReply) {
-                    return doReply(((DeviceMessageReply) message));
-                }
-                return Mono.just(true);
-            }))
+            .then(then)
             .defaultIfEmpty(false);
 
     }
@@ -289,11 +290,6 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
         }
         return messageHandler
             .reply(reply)
-            .doOnSuccess(success -> {
-                if (log.isDebugEnabled()) {
-                    log.debug("reply message {} complete", reply.getMessageId());
-                }
-            })
             .thenReturn(true)
             .doOnError((error) -> log.error("reply message error", error))
             ;

+ 37 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java

@@ -9,11 +9,13 @@ import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.message.*;
+import org.jetlinks.core.metadata.DeviceMetadata;
 import org.jetlinks.core.utils.FluxUtils;
 import org.jetlinks.community.device.entity.DeviceInstanceEntity;
 import org.jetlinks.community.device.entity.DeviceTagEntity;
 import org.jetlinks.community.device.enums.DeviceState;
 import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
@@ -182,6 +184,41 @@ public class DeviceMessageBusinessHandler {
             .then();
     }
 
+    @Subscribe("/device/*/*/metadata/derived")
+    public Mono<Void> updateMetadata(DerivedMetadataMessage message) {
+        if (message.isAll()) {
+            return updateMedata(message.getDeviceId(), message.getMetadata());
+        }
+        return Mono
+            .zip(
+                //原始物模型
+                registry
+                    .getDevice(message.getDeviceId())
+                    .flatMap(DeviceOperator::getMetadata),
+                //新的物模型
+                JetLinksDeviceMetadataCodec
+                    .getInstance()
+                    .decode(message.getMetadata()),
+                //合并在一起
+                DeviceMetadata::merge
+            )
+            //重新编码为字符串
+            .flatMap(JetLinksDeviceMetadataCodec.getInstance()::encode)
+            //更新物模型
+            .flatMap(metadata -> updateMedata(message.getDeviceId(), metadata));
+    }
+
+    private Mono<Void> updateMedata(String deviceId, String metadata) {
+        return deviceService
+            .createUpdate()
+            .set(DeviceInstanceEntity::getDeriveMetadata, metadata)
+            .where(DeviceInstanceEntity::getId, deviceId)
+            .execute()
+            .then(registry.getDevice(deviceId))
+            .flatMap(device -> device.updateMetadata(metadata))
+            .then();
+    }
+
 
     @PostConstruct
     public void init() {