Browse Source

优化topic构造

zhou-hao 4 years ago
parent
commit
7cb8233675

+ 59 - 32
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.device.message;
 
 
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.core.Values;
 import org.jetlinks.core.Values;
+import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.*;
@@ -12,7 +13,10 @@ import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
 import org.jetlinks.core.message.property.*;
 import org.jetlinks.core.message.property.*;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
 
 
 /**
 /**
  * 将设备消息连接到消息网关
  * 将设备消息连接到消息网关
@@ -30,10 +34,17 @@ public class DeviceMessageConnector{
 
 
     private final EventBus eventBus;
     private final EventBus eventBus;
 
 
+    private final static BiConsumer<Throwable, Object> doOnError = (error, val) -> log.error(error.getMessage(), error);
+
+    private final Function<DeviceOperator, Mono<Values>> configGetter;
+
+    private final static Values emptyValues = Values.of(Collections.emptyMap());
+
     public DeviceMessageConnector(EventBus eventBus,
     public DeviceMessageConnector(EventBus eventBus,
                                   DeviceRegistry registry) {
                                   DeviceRegistry registry) {
         this.registry = registry;
         this.registry = registry;
         this.eventBus = eventBus;
         this.eventBus = eventBus;
+        this.configGetter = operator -> operator.getSelfConfigs(appendConfigHeader).defaultIfEmpty(emptyValues);
     }
     }
 
 
     public Mono<Void> onMessage(Message message) {
     public Mono<Void> onMessage(Message message) {
@@ -41,8 +52,8 @@ public class DeviceMessageConnector{
             return Mono.empty();
             return Mono.empty();
         }
         }
         return this.getTopic(message)
         return this.getTopic(message)
-            .flatMap(topic ->eventBus.publish(topic,message).then())
-            .onErrorResume(error -> Mono.fromRunnable(() -> log.error(error.getMessage(), error)))
+            .flatMap(topic -> eventBus.publish(topic, message).then())
+            .onErrorContinue(doOnError)
             .then();
             .then();
     }
     }
 
 
@@ -57,15 +68,12 @@ public class DeviceMessageConnector{
             return registry
             return registry
                 .getDevice(deviceId)
                 .getDevice(deviceId)
                 //获取设备配置是可能存在的性能瓶颈
                 //获取设备配置是可能存在的性能瓶颈
-                .flatMap(operator -> operator.getSelfConfigs(appendConfigHeader))
-                .switchIfEmpty(Mono.fromSupplier(() -> Values.of(new HashMap<>())))
+                .flatMap(configGetter)
                 .flatMap(configs -> {
                 .flatMap(configs -> {
                     configs.getAllValues().forEach(deviceMessage::addHeader);
                     configs.getAllValues().forEach(deviceMessage::addHeader);
                     String productId = deviceMessage.getHeader("productId").map(String::valueOf).orElse("null");
                     String productId = deviceMessage.getHeader("productId").map(String::valueOf).orElse("null");
+                    String topic = createDeviceMessageTopic(productId, deviceId, deviceMessage);
 
 
-                    String topic = String.join("",
-                        "/device", "/", productId, "/", deviceId, createDeviceMessageTopic(message)
-                    );
                     if (message instanceof ChildDeviceMessage) { //子设备消息
                     if (message instanceof ChildDeviceMessage) { //子设备消息
                         return onMessage(((ChildDeviceMessage) message).getChildDeviceMessage())
                         return onMessage(((ChildDeviceMessage) message).getChildDeviceMessage())
                             .thenReturn(topic);
                             .thenReturn(topic);
@@ -80,60 +88,79 @@ public class DeviceMessageConnector{
         return Mono.just("/device/unknown/message/unknown");
         return Mono.just("/device/unknown/message/unknown");
     }
     }
 
 
-    public static String createDeviceMessageTopic(Message message) {
+    public static String createDeviceMessageTopic(String productId, String deviceId, DeviceMessage message) {
+        StringBuilder builder = new StringBuilder(64)
+            .append("/device/")
+            .append(productId)
+            .append("/")
+            .append(deviceId);
+
+        appendDeviceMessageTopic(message, builder);
+        return builder.toString();
+    }
+
+    public static void appendDeviceMessageTopic(Message message, StringBuilder builder) {
         if (message instanceof EventMessage) {   //事件
         if (message instanceof EventMessage) {   //事件
             EventMessage event = ((EventMessage) message);
             EventMessage event = ((EventMessage) message);
-            return "/message/event/".concat(event.getEvent());
+            builder.append("/message/event/").append(event.getEvent());
         } else if (message instanceof ReportPropertyMessage) {   //上报属性
         } else if (message instanceof ReportPropertyMessage) {   //上报属性
-            return "/message/property/report";
+            builder.append("/message/property/report");
         } else if (message instanceof DeviceOnlineMessage) {   //设备上线
         } else if (message instanceof DeviceOnlineMessage) {   //设备上线
-            return "/online";
+            builder.append("/online");
         } else if (message instanceof DeviceOfflineMessage) {   //设备离线
         } else if (message instanceof DeviceOfflineMessage) {   //设备离线
-            return "/offline";
+            builder.append("/offline");
         } else if (message instanceof ChildDeviceMessage) { //子设备消息
         } else if (message instanceof ChildDeviceMessage) { //子设备消息
             Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage();
             Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage();
             if (msg instanceof DeviceMessage) {
             if (msg instanceof DeviceMessage) {
-                return "/message/children/".concat(((DeviceMessage) msg).getDeviceId()).concat(createDeviceMessageTopic(msg));
+                builder.append("/message/children/")
+                    .append(((DeviceMessage) msg).getDeviceId());
+                appendDeviceMessageTopic(msg, builder);
+            } else {
+                builder.append("/message/children");
+                appendDeviceMessageTopic(message, builder);
             }
             }
-            return "/message/children/".concat(createDeviceMessageTopic(message));
         } else if (message instanceof ChildDeviceMessageReply) { //子设备消息
         } else if (message instanceof ChildDeviceMessageReply) { //子设备消息
             Message msg = ((ChildDeviceMessageReply) message).getChildDeviceMessage();
             Message msg = ((ChildDeviceMessageReply) message).getChildDeviceMessage();
             if (msg instanceof DeviceMessage) {
             if (msg instanceof DeviceMessage) {
-                return "/message/children/reply/".concat(((DeviceMessage) msg).getDeviceId()).concat(createDeviceMessageTopic(msg));
+                builder.append("/message/children/reply/")
+                    .append(((DeviceMessage) msg).getDeviceId());
+                appendDeviceMessageTopic(msg, builder);
+            } else {
+                builder.append("/message/children/reply");
+                appendDeviceMessageTopic(message, builder);
             }
             }
-            return "/message/children/reply/".concat(createDeviceMessageTopic(message));
         } else if (message instanceof ReadPropertyMessage) { //读取属性
         } else if (message instanceof ReadPropertyMessage) { //读取属性
-            return "/message/send/property/read";
+            builder.append("/message/send/property/read");
         } else if (message instanceof WritePropertyMessage) { //修改属性
         } else if (message instanceof WritePropertyMessage) { //修改属性
-            return "/message/send/property/write";
+            builder.append("/message/send/property/write");
         } else if (message instanceof FunctionInvokeMessage) { //调用功能
         } else if (message instanceof FunctionInvokeMessage) { //调用功能
-            return "/message/send/function";
+            builder.append("/message/send/function");
         } else if (message instanceof ReadPropertyMessageReply) { //读取属性回复
         } else if (message instanceof ReadPropertyMessageReply) { //读取属性回复
-            return "/message/property/read/reply";
+            builder.append("/message/property/read/reply");
         } else if (message instanceof WritePropertyMessageReply) { //修改属性回复
         } else if (message instanceof WritePropertyMessageReply) { //修改属性回复
-            return "/message/property/write/reply";
+            builder.append("/message/property/write/reply");
         } else if (message instanceof FunctionInvokeMessageReply) { //调用功能回复
         } else if (message instanceof FunctionInvokeMessageReply) { //调用功能回复
-            return "/message/function/reply";
+            builder.append("/message/function/reply");
         } else if (message instanceof DeviceRegisterMessage) { //注册
         } else if (message instanceof DeviceRegisterMessage) { //注册
-            return "/register";
+            builder.append("/register");
         } else if (message instanceof DeviceUnRegisterMessage) { //注销
         } else if (message instanceof DeviceUnRegisterMessage) { //注销
-            return "/unregister";
+            builder.append("/unregister");
         } else if (message instanceof RequestFirmwareMessage) { //拉取固件请求 since 1.3
         } else if (message instanceof RequestFirmwareMessage) { //拉取固件请求 since 1.3
-            return "/firmware/pull";
+            builder.append("/firmware/pull");
         } else if (message instanceof RequestFirmwareMessageReply) { //拉取固件响应 since 1.3
         } else if (message instanceof RequestFirmwareMessageReply) { //拉取固件响应 since 1.3
-            return "/firmware/pull/reply";
+            builder.append("/firmware/pull/reply");
         } else if (message instanceof ReportFirmwareMessage) { //上报固件信息 since 1.3
         } else if (message instanceof ReportFirmwareMessage) { //上报固件信息 since 1.3
-            return "/firmware/report";
+            builder.append("/firmware/report");
         } else if (message instanceof UpgradeFirmwareProgressMessage) { //上报固件更新进度 since 1.3
         } else if (message instanceof UpgradeFirmwareProgressMessage) { //上报固件更新进度 since 1.3
-            return "/firmware/progress";
+            builder.append("/firmware/progress");
         } else if (message instanceof UpgradeFirmwareMessage) { //推送固件更新 since 1.3
         } else if (message instanceof UpgradeFirmwareMessage) { //推送固件更新 since 1.3
-            return "/firmware/push";
+            builder.append("/firmware/push");
         } else if (message instanceof UpgradeFirmwareMessageReply) { //推送固件更新回复 since 1.3
         } else if (message instanceof UpgradeFirmwareMessageReply) { //推送固件更新回复 since 1.3
-            return "/firmware/push/reply";
+            builder.append("/firmware/push/reply");
         } else if (message instanceof DirectDeviceMessage) { //透传消息 since 1.4
         } else if (message instanceof DirectDeviceMessage) { //透传消息 since 1.4
-            return "/message/direct";
+            builder.append("/message/direct");
         } else {
         } else {
-            return "/message/unknown";
+            builder.append("/message/").append(message.getMessageType().name().toLowerCase());
         }
         }
     }
     }
 }
 }

+ 4 - 4
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java

@@ -32,10 +32,10 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter
             .zipWhen(DeviceOperator::getProduct)
             .zipWhen(DeviceOperator::getProduct)
             .doOnNext(tp2 -> message.addHeader("productId", tp2.getT2().getId()))
             .doOnNext(tp2 -> message.addHeader("productId", tp2.getT2().getId()))
             .flatMap(tp2 -> {
             .flatMap(tp2 -> {
-                String topic = DeviceMessageConnector.createDeviceMessageTopic(message);
-                Mono<Void> publisher = eventBus
-                    .publish("/device/" + tp2.getT2().getId() + "/" + tp2.getT1().getDeviceId() + topic, message)
-                    .then();
+                String topic = DeviceMessageConnector.createDeviceMessageTopic(tp2.getT2().getId(),tp2.getT1().getDeviceId(),message);
+
+                Mono<Void> publisher = eventBus.publish(topic, message).then();
+
                 if (message instanceof ChildDeviceMessage) {
                 if (message instanceof ChildDeviceMessage) {
                     DeviceMessage msg = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
                     DeviceMessage msg = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
                     publisher = publisher.then(doPublish(registry.getDevice(msg.getDeviceId()), msg));
                     publisher = publisher.then(doPublish(registry.getDevice(msg.getDeviceId()), msg));