Browse Source

优化设备告警

zhou-hao 5 years ago
parent
commit
8c54ef4593

+ 14 - 45
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java

@@ -7,9 +7,7 @@ import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
 import org.jetlinks.core.message.function.FunctionParameter;
 import org.jetlinks.core.message.function.FunctionParameter;
 import org.jetlinks.core.message.property.ReadPropertyMessage;
 import org.jetlinks.core.message.property.ReadPropertyMessage;
-import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration;
-import org.jetlinks.rule.engine.api.model.RuleNodeModel;
-import org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy;
+import org.jetlinks.community.rule.engine.model.Action;
 import org.springframework.scheduling.support.CronSequenceGenerator;
 import org.springframework.scheduling.support.CronSequenceGenerator;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 import org.springframework.util.StringUtils;
@@ -83,40 +81,6 @@ public class DeviceAlarmRule implements Serializable {
         getTriggers().forEach(Trigger::validate);
         getTriggers().forEach(Trigger::validate);
     }
     }
 
 
-    public List<String> getPlainColumns() {
-        Stream<String> conditionColumns = triggers
-            .stream()
-            .flatMap(trigger -> trigger.getColumns().stream());
-
-        if (CollectionUtils.isEmpty(properties)) {
-            return conditionColumns.collect(Collectors.toList());
-        }
-        return Stream.concat(conditionColumns, properties
-            .stream()
-            .map(Property::toString))
-            .collect(Collectors.toList());
-    }
-
-    @Getter
-    @Setter
-    public static class Action implements Serializable {
-
-        /**
-         * 执行器
-         *
-         * @see RuleNodeModel#getExecutor()
-         * @see ExecutableRuleNodeFactoryStrategy#getSupportType()
-         */
-        private String executor;
-
-        /**
-         * 执行器配置
-         *
-         * @see RuleNodeModel#getConfiguration()
-         * @see RuleNodeConfiguration
-         */
-        private Map<String, Object> configuration;
-    }
 
 
     @AllArgsConstructor
     @AllArgsConstructor
     @Getter
     @Getter
@@ -161,7 +125,7 @@ public class DeviceAlarmRule implements Serializable {
             }
             }
         },
         },
         //功能调用回复
         //功能调用回复
-        function("/device/%s/%s/message/function/reply", "this.output") {
+        function("/device/%s/%s/message/function/reply", "this.output.") {
             @Override
             @Override
             public String getTopic(String productId, String deviceId, String property) {
             public String getTopic(String productId, String deviceId, String property) {
                 return String.format(getTopicTemplate(), productId, StringUtils.isEmpty(deviceId) ? "*" : deviceId);
                 return String.format(getTopicTemplate(), productId, StringUtils.isEmpty(deviceId) ? "*" : deviceId);
@@ -228,10 +192,15 @@ public class DeviceAlarmRule implements Serializable {
         private List<ConditionFilter> filters;
         private List<ConditionFilter> filters;
 
 
         public Set<String> getColumns() {
         public Set<String> getColumns() {
-            return filters == null
-                ? Collections.emptySet()
-                : filters.stream()
-                .map(filter -> filter.getColumn(type))
+
+            return Stream.concat(
+                (StringUtils.hasText(modelId)
+                    ? Collections.singleton(type.getPropertyPrefix() + "this['" + modelId + "'] \"" + modelId + "\"")
+                    : Collections.<String>emptySet()).stream(),
+                (CollectionUtils.isEmpty(filters)
+                    ? Stream.<ConditionFilter>empty()
+                    : filters.stream())
+                    .map(filter -> filter.getColumn(type)))
                 .collect(Collectors.toSet());
                 .collect(Collectors.toSet());
         }
         }
 
 
@@ -291,7 +260,7 @@ public class DeviceAlarmRule implements Serializable {
         private Operator operator = Operator.eq;
         private Operator operator = Operator.eq;
 
 
         public String getColumn(MessageType type) {
         public String getColumn(MessageType type) {
-            return type.getPropertyPrefix() + (key.trim()) + " " + (key.trim());
+            return type.getPropertyPrefix() + "this['" + (key.trim()) + "'] \"" + (key.trim()) + "\"";
         }
         }
 
 
         public String createExpression(MessageType type) {
         public String createExpression(MessageType type) {
@@ -299,7 +268,7 @@ public class DeviceAlarmRule implements Serializable {
             if (key.contains("(") || key.startsWith("this")) {
             if (key.contains("(") || key.startsWith("this")) {
                 return key;
                 return key;
             }
             }
-            return type.getPropertyPrefix() + (key.trim()) + " " + operator.symbol + " ? ";
+            return type.getPropertyPrefix() + "this['" + (key.trim()) + "'] " + operator.symbol + " ? ";
         }
         }
 
 
         public Object convertValue() {
         public Object convertValue() {
@@ -343,7 +312,7 @@ public class DeviceAlarmRule implements Serializable {
 
 
         @Override
         @Override
         public String toString() {
         public String toString() {
-            return property.concat(" ").concat(StringUtils.hasText(alias) ? alias : property);
+            return property.concat(" \"").concat(StringUtils.hasText(alias) ? alias : property).concat("\"");
         }
         }
     }
     }
 
 

+ 35 - 8
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java

@@ -35,7 +35,7 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
     private final MessageGateway messageGateway;
     private final MessageGateway messageGateway;
 
 
     @Override
     @Override
-    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, DeviceAlarmRuleNode.Config config) {
+    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context,DeviceAlarmRuleNode.Config config) {
 
 
         return Mono::just;
         return Mono::just;
     }
     }
@@ -70,30 +70,57 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
 
 
         private DeviceAlarmRule rule;
         private DeviceAlarmRule rule;
 
 
+        private ReactorQL ql;
+
         @Override
         @Override
         public void validate() {
         public void validate() {
             if (CollectionUtils.isEmpty(rule.getTriggers())) {
             if (CollectionUtils.isEmpty(rule.getTriggers())) {
                 throw new IllegalArgumentException("预警条件不能为空");
                 throw new IllegalArgumentException("预警条件不能为空");
             }
             }
+            try {
+                ql = createQL();
+            } catch (Exception e) {
+                throw new IllegalArgumentException("配置错误:" + e.getMessage(), e);
+            }
         }
         }
 
 
         private ReactorQL createQL() {
         private ReactorQL createQL() {
             List<String> columns = new ArrayList<>(default_columns);
             List<String> columns = new ArrayList<>(default_columns);
             List<String> wheres = new ArrayList<>();
             List<String> wheres = new ArrayList<>();
-            columns.addAll(rule.getPlainColumns());
 
 
-            for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) {
+            List<DeviceAlarmRule.Trigger> triggers = rule.getTriggers();
+
+            for (int i = 0; i < triggers.size(); i++) {
+                DeviceAlarmRule.Trigger trigger = triggers.get(i);
+                // select this.properties.this trigger0
+                columns.add(trigger.getType().getPropertyPrefix() + "this trigger" + i);
+                columns.addAll(trigger.getColumns());
                 trigger.createExpression()
                 trigger.createExpression()
                     .ifPresent(expr -> wheres.add("(" + expr + ")"));
                     .ifPresent(expr -> wheres.add("(" + expr + ")"));
             }
             }
-
-            String sql = "select " + String.join(",", columns) + " from msg ";
+            String sql = "select \n\t\t" + String.join("\n\t\t,", columns) + " \n\tfrom dual ";
 
 
             if (!wheres.isEmpty()) {
             if (!wheres.isEmpty()) {
-                sql = "where " + String.join(" or ", wheres);
+                sql += "\n\twhere " + String.join("\n\t\t or ", wheres);
             }
             }
 
 
-            log.debug("create device alarm sql : {}", sql);
+            if (CollectionUtils.isNotEmpty(rule.getProperties())) {
+                List<String> newColumns = new ArrayList<>(Arrays.asList(
+                    "this.deviceName deviceName",
+                    "this.deviceId deviceId",
+                    "this.timestamp timestamp"));
+                for (DeviceAlarmRule.Property property : rule.getProperties()) {
+                    if (StringUtils.isEmpty(property.getProperty())) {
+                        continue;
+                    }
+                    String alias = StringUtils.hasText(property.getAlias()) ? property.getAlias() : property.getProperty();
+                    newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
+                }
+                if (newColumns.size() > 3) {
+                    sql = "select \n\t" + String.join("\n\t,", newColumns) + "\n from (\n\t" + sql + "\n) t";
+                }
+            }
+            log.debug("create device alarm sql : \n{}", sql);
             return ReactorQL.builder().sql(sql).build();
             return ReactorQL.builder().sql(sql).build();
         }
         }
 
 
@@ -128,7 +155,7 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
                 );
                 );
 
 
             binds.forEach(context::bind);
             binds.forEach(context::bind);
-            return createQL()
+            return (ql == null ? ql = createQL() : ql)
                 .start(context)
                 .start(context)
                 .map(ReactorQLRecord::asMap)
                 .map(ReactorQLRecord::asMap)
                 .flatMap(map -> {
                 .flatMap(map -> {

+ 32 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/Action.java

@@ -0,0 +1,32 @@
+package org.jetlinks.community.rule.engine.model;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration;
+import org.jetlinks.rule.engine.api.model.RuleNodeModel;
+import org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Getter
+@Setter
+public class Action implements Serializable {
+    private static final long serialVersionUID = -6849794470754667710L;
+
+    /**
+     * 执行器
+     *
+     * @see RuleNodeModel#getExecutor()
+     * @see ExecutableRuleNodeFactoryStrategy#getSupportType()
+     */
+    private String executor;
+
+    /**
+     * 执行器配置
+     *
+     * @see RuleNodeModel#getConfiguration()
+     * @see RuleNodeConfiguration
+     */
+    private Map<String, Object> configuration;
+}