Ver código fonte

优化设备指令发送规则执行器

zhou-hao 4 anos atrás
pai
commit
331f31a62b

+ 108 - 33
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java

@@ -9,10 +9,9 @@ import org.hswebframework.web.utils.ExpressionUtils;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceProductOperator;
 import org.jetlinks.core.device.DeviceProductOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.DeviceRegistry;
-import org.jetlinks.core.message.DeviceMessageReply;
-import org.jetlinks.core.message.Headers;
-import org.jetlinks.core.message.MessageType;
-import org.jetlinks.core.message.RepayableDeviceMessage;
+import org.jetlinks.core.enums.ErrorCode;
+import org.jetlinks.core.exception.DeviceOperationException;
+import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
 import org.jetlinks.core.message.function.FunctionParameter;
 import org.jetlinks.core.message.function.FunctionParameter;
 import org.jetlinks.core.message.property.ReadPropertyMessage;
 import org.jetlinks.core.message.property.ReadPropertyMessage;
@@ -33,9 +32,11 @@ import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 import reactor.util.function.Tuples;
 
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 @Component
 @Component
@@ -56,26 +57,36 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
 
 
     class DeviceMessageSendTaskExecutor extends FunctionTaskExecutor {
     class DeviceMessageSendTaskExecutor extends FunctionTaskExecutor {
 
 
-        private Config config;
+        private DeviceMessageSendConfig config;
+
+        private Function<Map<String, Object>, Flux<DeviceOperator>> selector;
 
 
         public DeviceMessageSendTaskExecutor(ExecutionContext context) {
         public DeviceMessageSendTaskExecutor(ExecutionContext context) {
             super("发送设备消息", context);
             super("发送设备消息", context);
-            validate();
             reload();
             reload();
         }
         }
 
 
+        protected Flux<DeviceOperator> selectDevice(Map<String, Object> ctx) {
+            return selector.apply(ctx);
+        }
+
         @Override
         @Override
         protected Publisher<RuleData> apply(RuleData input) {
         protected Publisher<RuleData> apply(RuleData input) {
-            Flux<DeviceOperator> devices = StringUtils.hasText(config.getDeviceId())
-                ? registry.getDevice(config.getDeviceId()).flux()
-                : registry.getProduct(config.getProductId()).flatMapMany(DeviceProductOperator::getDevices);
             Map<String, Object> ctx = RuleDataHelper.toContextMap(input);
             Map<String, Object> ctx = RuleDataHelper.toContextMap(input);
-            return devices
-                .filterWhen(DeviceOperator::isOnline)
-                .publishOn(Schedulers.parallel())
-                .flatMap(device -> config.doSend(ctx, device))
-                .onErrorResume(error -> context.onError(error, input).then(Mono.empty()))
-                .map(reply -> input.newData(reply.toJson()))
+
+            Flux<DeviceOperator> readySendDevice =
+                "ignoreOffline".equals(config.getStateOperator())
+                    ? selectDevice(ctx).filterWhen(DeviceOperator::isOnline)
+                    : selectDevice(ctx);
+
+            return readySendDevice
+                .switchIfEmpty(context.onError(() -> new DeviceOperationException(ErrorCode.SYSTEM_ERROR, "无可用设备"), input))
+                .flatMap(device -> config
+                    .doSend(ctx, context, device, input)
+                    .onErrorResume(error -> context.onError(error, input))
+                    .subscribeOn(Schedulers.parallel())
+                )
+                .map(reply -> context.newRuleData(input.newData(reply.toJson())))
                 ;
                 ;
         }
         }
 
 
@@ -84,22 +95,37 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
             if (CollectionUtils.isEmpty(context.getJob().getConfiguration())) {
             if (CollectionUtils.isEmpty(context.getJob().getConfiguration())) {
                 throw new IllegalArgumentException("配置不能为空");
                 throw new IllegalArgumentException("配置不能为空");
             }
             }
-            Config config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
-            config.validate();
+            FastBeanCopier.copy(context.getJob().getConfiguration(), new DeviceMessageSendConfig()).validate();
         }
         }
 
 
         @Override
         @Override
         public void reload() {
         public void reload() {
-            config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
+            config = FastBeanCopier.copy(context.getJob().getConfiguration(), new DeviceMessageSendConfig());
+            config.validate();
+            if (StringUtils.hasText(config.deviceId)) {
+                selector = ctx -> registry.getDevice(config.getDeviceId()).flux();
+            } else if (StringUtils.hasText(config.productId)) {
+                selector = ctx -> registry.getProduct(config.getProductId()).flatMapMany(DeviceProductOperator::getDevices);
+            } else {
+                if (config.isFixed() && config.getMessage() != null) {
+                    selector = ctx -> registry.getDevice((String) config.getMessage().get("deviceId")).flux();
+                } else {
+                    selector = ctx -> registry
+                        .getDevice((String) ctx
+                            .getOrDefault("deviceId",
+                                          config.getMessage() == null
+                                              ? null
+                                              : config.getMessage().get("deviceId")))
+                        .flux();
+                }
+            }
         }
         }
 
 
-
     }
     }
 
 
-
     @Getter
     @Getter
     @Setter
     @Setter
-    public static class Config {
+    public static class DeviceMessageSendConfig {
 
 
         //设备ID
         //设备ID
         private String deviceId;
         private String deviceId;
@@ -107,21 +133,55 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
         //产品ID
         //产品ID
         private String productId;
         private String productId;
 
 
+        //消息来源: pre-node(上游节点),fixed(固定消息)
+        private String from;
+
+        private Duration timeout = Duration.ofSeconds(10);
+
         private Map<String, Object> message;
         private Map<String, Object> message;
 
 
         private boolean async;
         private boolean async;
 
 
+        private String waitType = "sync";
+
+        private String stateOperator = "ignoreOffline";
+
+        public Map<String, Object> toMap() {
+            Map<String, Object> conf = FastBeanCopier.copy(this, new HashMap<>());
+            conf.put("timeout", timeout.toString());
+            return conf;
+        }
+
         @SuppressWarnings("all")
         @SuppressWarnings("all")
-        public Publisher<DeviceMessageReply> doSend(Map<String, Object> ctx, DeviceOperator device) {
-            Map<String, Object> message = new HashMap<>(this.message);
+        public Flux<DeviceMessage> doSend(Map<String, Object> ctx,
+                                          ExecutionContext context,
+                                          DeviceOperator device,
+                                          RuleData input) {
+            Map<String, Object> message = new HashMap<>("pre-node".equals(from) ? ctx : this.message);
             message.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
             message.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
             message.put("deviceId", device.getDeviceId());
             message.put("deviceId", device.getDeviceId());
+            message.put("timestamp", System.currentTimeMillis());
             return Mono
             return Mono
                 .justOrEmpty(MessageType.convertMessage(message))
                 .justOrEmpty(MessageType.convertMessage(message))
-                .cast(RepayableDeviceMessage.class)
+                .switchIfEmpty(context.onError(() -> new DeviceOperationException(ErrorCode.UNSUPPORTED_MESSAGE), input))
+                .cast(DeviceMessage.class)
                 .map(msg -> applyMessageExpression(ctx, msg))
                 .map(msg -> applyMessageExpression(ctx, msg))
-                .doOnNext(msg -> msg.addHeader(Headers.async, async))
-                .flatMapMany(msg -> device.messageSender().send(Mono.just(msg)));
+                .doOnNext(msg -> msg
+                    .addHeader(Headers.async, async || !"sync".equals(waitType))
+                    .addHeader(Headers.sendAndForget, "forget".equals(waitType))
+                    .addHeader(Headers.timeout, timeout.toMillis()))
+                .flatMapMany(msg -> "forget".equals(waitType)
+                    ? device.messageSender().send(msg).then(Mono.empty())
+                    : device.messageSender()
+                            .send(msg)
+                            .onErrorResume(err -> {
+                                //失败尝试转为消息回复
+                                if (msg instanceof RepayableDeviceMessage) {
+                                    return Mono.just(((RepayableDeviceMessage<?>) msg).newReply().error(err));
+                                }
+                                return Mono.error(err);
+                            })
+                );
         }
         }
 
 
         private ReadPropertyMessage applyMessageExpression(Map<String, Object> ctx, ReadPropertyMessage message) {
         private ReadPropertyMessage applyMessageExpression(Map<String, Object> ctx, ReadPropertyMessage message) {
@@ -129,7 +189,10 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
 
 
             if (!CollectionUtils.isEmpty(properties)) {
             if (!CollectionUtils.isEmpty(properties)) {
                 message.setProperties(
                 message.setProperties(
-                    properties.stream().map(prop -> ExpressionUtils.analytical(prop, ctx, "spel")).collect(Collectors.toList())
+                    properties
+                        .stream()
+                        .map(prop -> ExpressionUtils.analytical(prop, ctx, "spel"))
+                        .collect(Collectors.toList())
                 );
                 );
             }
             }
 
 
@@ -141,7 +204,8 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
 
 
             if (!CollectionUtils.isEmpty(properties)) {
             if (!CollectionUtils.isEmpty(properties)) {
                 message.setProperties(
                 message.setProperties(
-                    properties.entrySet()
+                    properties
+                        .entrySet()
                         .stream()
                         .stream()
                         .map(prop -> Tuples.of(prop.getKey(), ExpressionUtils.analytical(String.valueOf(prop.getValue()), ctx, "spel")))
                         .map(prop -> Tuples.of(prop.getKey(), ExpressionUtils.analytical(String.valueOf(prop.getValue()), ctx, "spel")))
                         .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2))
                         .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2))
@@ -156,14 +220,17 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
 
 
             if (!CollectionUtils.isEmpty(inputs)) {
             if (!CollectionUtils.isEmpty(inputs)) {
                 for (FunctionParameter input : inputs) {
                 for (FunctionParameter input : inputs) {
-                    input.setValue(ExpressionUtils.analytical(String.valueOf(input.getValue()), ctx, "spel"));
+                    String stringVal = String.valueOf(input.getValue());
+                    if (stringVal.contains("$")) {
+                        input.setValue(ExpressionUtils.analytical(stringVal, ctx, "spel"));
+                    }
                 }
                 }
             }
             }
 
 
             return message;
             return message;
         }
         }
 
 
-        private RepayableDeviceMessage<?> applyMessageExpression(Map<String, Object> ctx, RepayableDeviceMessage<?> message) {
+        private DeviceMessage applyMessageExpression(Map<String, Object> ctx, DeviceMessage message) {
             if (message instanceof ReadPropertyMessage) {
             if (message instanceof ReadPropertyMessage) {
                 return applyMessageExpression(ctx, ((ReadPropertyMessage) message));
                 return applyMessageExpression(ctx, ((ReadPropertyMessage) message));
             }
             }
@@ -176,11 +243,19 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
             return message;
             return message;
         }
         }
 
 
+        private boolean isFixed() {
+            return "fixed".equals(from);
+        }
+
+        private boolean isPreNode() {
+            return "pre-node".equals(from);
+        }
+
+
         public void validate() {
         public void validate() {
-            if (StringUtils.isEmpty(deviceId) && StringUtils.isEmpty(productId)) {
-                throw new IllegalArgumentException("deviceId和productId不能同时为空");
+            if ("fixed".equals(from)) {
+                MessageType.convertMessage(message).orElseThrow(() -> new IllegalArgumentException("不支持的消息格式"));
             }
             }
-            MessageType.convertMessage(message).orElseThrow(() -> new IllegalArgumentException("不支持的消息格式"));
         }
         }
 
 
     }
     }

+ 2 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java

@@ -57,7 +57,7 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
                 timer.setExecutor("timer");
                 timer.setExecutor("timer");
                 timer.setConfiguration(Collections.singletonMap("cron", timerTrigger.getCron()));
                 timer.setConfiguration(Collections.singletonMap("cron", timerTrigger.getCron()));
 
 
-                DeviceMessageSendTaskExecutorProvider.Config senderConfig = new DeviceMessageSendTaskExecutorProvider.Config();
+                DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig senderConfig = new DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig();
                 senderConfig.setAsync(true);
                 senderConfig.setAsync(true);
                 senderConfig.setDeviceId(alarmRule.getDeviceId());
                 senderConfig.setDeviceId(alarmRule.getDeviceId());
                 senderConfig.setProductId(alarmRule.getProductId());
                 senderConfig.setProductId(alarmRule.getProductId());
@@ -67,7 +67,7 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
                 messageSender.setId("message-sender:" + (++index));
                 messageSender.setId("message-sender:" + (++index));
                 messageSender.setName("定时发送设备消息");
                 messageSender.setName("定时发送设备消息");
                 messageSender.setExecutor("device-message-sender");
                 messageSender.setExecutor("device-message-sender");
-                messageSender.setConfiguration(FastBeanCopier.copy(senderConfig, new HashMap<>()));
+                messageSender.setConfiguration(senderConfig.toMap());
 
 
                 RuleLink link = new RuleLink();
                 RuleLink link = new RuleLink();
                 link.setId(timer.getId().concat(":").concat(messageSender.getId()));
                 link.setId(timer.getId().concat(":").concat(messageSender.getId()));