Selaa lähdekoodia

优化设备告警

zhou-hao 5 vuotta sitten
vanhempi
commit
ac316b00da

+ 100 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/DeviceMessageSendNode.java

@@ -0,0 +1,100 @@
+package org.jetlinks.community.rule.engine.nodes;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.web.id.IDGenerator;
+import org.jetlinks.core.device.DeviceOperator;
+import org.jetlinks.core.device.DeviceProductOperator;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.message.Headers;
+import org.jetlinks.core.message.MessageType;
+import org.jetlinks.core.message.RepayableDeviceMessage;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.executor.ExecutionContext;
+import org.jetlinks.rule.engine.api.model.NodeType;
+import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
+import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
+import org.reactivestreams.Publisher;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+@AllArgsConstructor
+@Component
+public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrategy<DeviceMessageSendNode.Config> {
+
+    private final DeviceRegistry registry;
+
+    @Override
+    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, Config config) {
+
+        return data -> {
+            Flux<DeviceOperator> devices = StringUtils.hasText(config.getDeviceId())
+                ? registry.getDevice(config.getDeviceId()).flux()
+                : registry.getProduct(config.getProductId()).flatMapMany(DeviceProductOperator::getDevices);
+
+            return devices
+                .filterWhen(DeviceOperator::isOnline)
+                .publishOn(Schedulers.parallel())
+                .flatMap(config::doSend)
+                .onErrorResume(error -> context.onError(data, error).then(Mono.empty()));
+        };
+    }
+
+    @Override
+    public String getSupportType() {
+        return "device-message-sender";
+    }
+
+    @Getter
+    @Setter
+    public static class Config implements RuleNodeConfig {
+
+        //设备ID
+        private String deviceId;
+
+        //产品ID
+        private String productId;
+
+        private Map<String, Object> message;
+
+        private boolean async;
+
+        public Publisher<?> doSend(DeviceOperator device) {
+            Map<String, Object> message = new HashMap<>(this.message);
+            message.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
+            message.put("deviceId", device.getDeviceId());
+            return Mono
+                .justOrEmpty(MessageType.convertMessage(message))
+                .cast(RepayableDeviceMessage.class)
+                .doOnNext(msg -> msg.addHeader(Headers.async, async))
+                .flatMapMany(msg -> device.messageSender().send(Mono.just(msg)));
+        }
+
+        @Override
+        public void validate() {
+            if (StringUtils.isEmpty(deviceId) && StringUtils.isEmpty(productId)) {
+                throw new IllegalArgumentException("deviceId和productId不能同时为空");
+            }
+            MessageType.convertMessage(message).orElseThrow(() -> new IllegalArgumentException("不支持的消息格式"));
+        }
+
+        @Override
+        public NodeType getNodeType() {
+            return NodeType.MAP;
+        }
+
+        @Override
+        public void setNodeType(NodeType nodeType) {
+
+        }
+    }
+
+}

+ 83 - 6
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java

@@ -3,6 +3,10 @@ package org.jetlinks.community.rule.engine.device;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
+import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.message.function.FunctionInvokeMessage;
+import org.jetlinks.core.message.function.FunctionParameter;
+import org.jetlinks.core.message.property.ReadPropertyMessage;
 import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration;
 import org.jetlinks.rule.engine.api.model.RuleNodeModel;
 import org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy;
@@ -10,8 +14,7 @@ import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 
 import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -76,6 +79,13 @@ public class DeviceAlarmRule implements Serializable {
     private List<Operation> operations;
 
 
+    public void validate() {
+        if (org.apache.commons.collections.CollectionUtils.isEmpty(getConditions())) {
+            throw new IllegalArgumentException("conditions不能为空");
+        }
+
+    }
+
     public List<String> getPlainColumns() {
         Stream<String> conditionColumns = conditions
             .stream()
@@ -134,6 +144,16 @@ public class DeviceAlarmRule implements Serializable {
             public String getTopic(String productId, String deviceId, String key) {
                 return String.format(getTopicTemplate(), productId, StringUtils.isEmpty(deviceId) ? "*" : deviceId);
             }
+
+            @Override
+            public Optional<DeviceMessage> createMessage(Condition condition) {
+                ReadPropertyMessage readPropertyMessage = new ReadPropertyMessage();
+
+                String property = StringUtils.hasText(condition.getModelId()) ? condition.getModelId() : condition.getKey();
+
+                readPropertyMessage.setProperties(new ArrayList<>(Collections.singletonList(property)));
+                return Optional.of(readPropertyMessage);
+            }
         },
         //事件
         event("/device/%s/%s/message/event/%s", "this.data.") {
@@ -141,23 +161,72 @@ public class DeviceAlarmRule implements Serializable {
             public String getTopic(String productId, String deviceId, String property) {
                 return String.format(getTopicTemplate(), productId, StringUtils.isEmpty(deviceId) ? "*" : deviceId, property);
             }
+        },
+        //功能调用回复
+        function("/device/%s/%s/message/function/reply", "this.output") {
+            @Override
+            public String getTopic(String productId, String deviceId, String property) {
+                return String.format(getTopicTemplate(), productId, StringUtils.isEmpty(deviceId) ? "*" : deviceId);
+            }
+
+            @Override
+            public Optional<DeviceMessage> createMessage(Condition condition) {
+                FunctionInvokeMessage message = new FunctionInvokeMessage();
+                message.setFunctionId(condition.getModelId());
+                message.setInputs(condition.getParameters());
+                message.setTimestamp(System.currentTimeMillis());
+                return Optional.of(message);
+            }
         };
 
-        private String topicTemplate;
+        private final String topicTemplate;
 
-        private String propertyPrefix;
+        private final String propertyPrefix;
 
         public abstract String getTopic(String productId, String deviceId, String key);
+
+        public Optional<DeviceMessage> createMessage(Condition condition) {
+            return Optional.empty();
+        }
+    }
+
+    @Getter
+    @AllArgsConstructor
+    public enum ConditionType implements Serializable {
+        //设备消息
+        message(Arrays.asList(
+            MessageType.online,
+            MessageType.offline,
+            MessageType.properties,
+            MessageType.event
+        )),
+        //定时,定时获取只支持获取设备属性和调用功能.
+        timer(Arrays.asList(
+            MessageType.properties,
+            MessageType.function
+        ));
+
+        final List<MessageType> supportMessageTypes;
+
     }
 
     @Getter
     @Setter
     public static class Condition implements Serializable {
 
+        //条件类型,定时
+        private ConditionType trigger = ConditionType.message;
+
+        //trigger为定时任务时的cron表达式
+        private String cron;
+
+        //trigger为定时任务并且消息类型为功能调用时
+        private List<FunctionParameter> parameters;
+
         //物模型属性或者事件的标识 如: fire_alarm
         private String modelId;
 
-        //过滤条件key 如: temperature.value = ?
+        //过滤条件key 如: temperature
         private String key;
 
         //过滤条件值
@@ -173,6 +242,10 @@ public class DeviceAlarmRule implements Serializable {
         public String createExpression(MessageType type) {
             return type.getPropertyPrefix() + (key.trim()) + " " + operator.symbol + " ? ";
         }
+
+        public Object convertValue(){
+            return operator.convert(value);
+        }
     }
 
 
@@ -186,8 +259,11 @@ public class DeviceAlarmRule implements Serializable {
         gte(">="),
         lte("<="),
         like("like");
-        private String symbol;
+        private final String symbol;
 
+        public Object convert(String value) {
+            return value;
+        }
     }
 
     @Getter
@@ -202,4 +278,5 @@ public class DeviceAlarmRule implements Serializable {
             return property.concat(" ").concat(StringUtils.hasText(alias) ? alias : property);
         }
     }
+
 }

+ 4 - 4
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java

@@ -32,16 +32,16 @@ import java.util.stream.Collectors;
 @AllArgsConstructor
 public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy<DeviceAlarmRuleNode.Config> {
 
-    private MessageGateway messageGateway;
+    private final MessageGateway messageGateway;
 
     @Override
-    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, DeviceAlarmRuleNode.Config config) {
+    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, org.jetlinks.community.rule.engine.device.DeviceAlarmRuleNode.Config config) {
 
         return Mono::just;
     }
 
     @Override
-    protected void onStarted(ExecutionContext context, DeviceAlarmRuleNode.Config config) {
+    protected void onStarted(ExecutionContext context, org.jetlinks.community.rule.engine.device.DeviceAlarmRuleNode.Config config) {
         context.onStop(
             config.doSubscribe(messageGateway)
                 .flatMap(result -> {
@@ -101,7 +101,7 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
             for (DeviceAlarmRule.Condition condition : rule.getConditions()) {
                 String topic = rule.getType().getTopic(rule.getProductId(), rule.getDeviceId(), condition.getModelId());
                 topics.add(topic);
-                binds.add(condition.getValue());
+                binds.add(condition.convertValue());
             }
             List<Subscription> subscriptions = topics.stream().map(Subscription::new).collect(Collectors.toList());
 

+ 54 - 4
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java

@@ -3,8 +3,10 @@ package org.jetlinks.community.rule.engine.model;
 import com.alibaba.fastjson.JSON;
 import org.apache.commons.collections.CollectionUtils;
 import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.community.rule.engine.device.DeviceAlarmRule;
 import org.jetlinks.community.rule.engine.entity.DeviceAlarmEntity;
+import org.jetlinks.community.rule.engine.nodes.DeviceMessageSendNode;
 import org.jetlinks.rule.engine.api.cluster.RunMode;
 import org.jetlinks.rule.engine.api.model.RuleLink;
 import org.jetlinks.rule.engine.api.model.RuleModel;
@@ -13,11 +15,15 @@ import org.jetlinks.rule.engine.model.RuleModelParserStrategy;
 import org.springframework.stereotype.Component;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
 
 @Component
 public class DeviceAlarmModelParser implements RuleModelParserStrategy {
 
     public static String format = "device_alarm";
+
     @Override
     public String getFormat() {
         return format;
@@ -25,18 +31,62 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
 
     @Override
     public RuleModel parse(String modelDefineString) {
-        DeviceAlarmEntity rule = FastBeanCopier.copy(JSON.parseObject(modelDefineString),DeviceAlarmEntity::new);
+        DeviceAlarmEntity rule = FastBeanCopier.copy(JSON.parseObject(modelDefineString), DeviceAlarmEntity::new);
 
         RuleModel model = new RuleModel();
         model.setId("device_alarm:".concat(rule.getId()));
         model.setName(rule.getName());
         model.setRunMode(RunMode.CLUSTER);
 
+        DeviceAlarmRule alarmRule = rule.getAlarmRule();
+        alarmRule.validate();
+
+        //处理定时触发
+        {
+            List<DeviceAlarmRule.Condition> timerConditions = alarmRule.getConditions().stream()
+                .filter(condition -> condition.getTrigger() == DeviceAlarmRule.ConditionType.timer)
+                .collect(Collectors.toList());
+            int index = 0;
+            for (DeviceAlarmRule.Condition timerCondition : timerConditions) {
+                DeviceMessage msg = alarmRule.getType().createMessage(timerCondition).orElse(null);
+                if (msg == null) {
+                    throw new UnsupportedOperationException("不支持定时条件类型:" + alarmRule.getType());
+                }
+                RuleNodeModel timer = new RuleNodeModel();
+                timer.setId("timer:" + (++index));
+                timer.setName("定时发送设备消息");
+                timer.setExecutor("timer");
+                timer.setConfiguration(Collections.singletonMap("cron", timerCondition.getCron()));
+
+                DeviceMessageSendNode.Config senderConfig = new DeviceMessageSendNode.Config();
+                senderConfig.setAsync(true);
+                senderConfig.setDeviceId(alarmRule.getDeviceId());
+                senderConfig.setProductId(alarmRule.getProductId());
+                senderConfig.setMessage(msg.toJson());
+
+                RuleNodeModel messageSender = new RuleNodeModel();
+                messageSender.setId("message-sender:" + (++index));
+                messageSender.setName("定时发送设备消息");
+                messageSender.setExecutor("device-message-sender");
+                messageSender.setConfiguration(FastBeanCopier.copy(senderConfig, new HashMap<>()));
+
+                RuleLink link = new RuleLink();
+                link.setId(timer.getId().concat(":").concat(messageSender.getId()));
+                link.setName("执行动作:" + index);
+                link.setSource(timer);
+                link.setTarget(messageSender);
+                timer.getOutputs().add(link);
+                messageSender.getInputs().add(link);
+                model.getNodes().add(timer);
+                model.getNodes().add(messageSender);
+            }
+        }
+
         RuleNodeModel conditionNode = new RuleNodeModel();
         conditionNode.setId("conditions");
         conditionNode.setName("预警条件");
         conditionNode.setExecutor("device_alarm");
-        conditionNode.setConfiguration(Collections.singletonMap("rule",rule.getAlarmRule()));
+        conditionNode.setConfiguration(Collections.singletonMap("rule", rule.getAlarmRule()));
         model.getNodes().add(conditionNode);
         if (CollectionUtils.isNotEmpty(rule.getAlarmRule().getOperations())) {
             int index = 0;
@@ -47,9 +97,9 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
                 action.setExecutor(operation.getExecutor());
                 action.setConfiguration(operation.getConfiguration());
 
-                RuleLink link=new RuleLink();
+                RuleLink link = new RuleLink();
                 link.setId(action.getId().concat(":").concat(conditionNode.getId()));
-                link.setName("执行动作:"+index);
+                link.setName("执行动作:" + index);
                 link.setSource(conditionNode);
                 link.setTarget(action);
                 model.getNodes().add(action);