Prechádzať zdrojové kódy

修复多个告警条件可能不触发的问题

zhouhao 3 rokov pred
rodič
commit
1e1fb62b54

+ 75 - 66
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java

@@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.hswebframework.web.bean.FastBeanCopier;
+import org.hswebframework.web.exception.BusinessException;
 import org.hswebframework.web.id.IDGenerator;
 import org.jetlinks.community.ValueObject;
 import org.jetlinks.core.event.EventBus;
@@ -29,8 +30,10 @@ import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuples;
 
+import javax.annotation.Nonnull;
 import java.time.Duration;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Slf4j
 @AllArgsConstructor
@@ -54,7 +57,11 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
     static class DeviceAlarmTaskExecutor extends AbstractTaskExecutor {
 
         List<String> default_columns = Arrays.asList(
-            "timestamp", "deviceId", "this.headers headers", "this.headers.deviceName deviceName"
+            "this.timestamp timestamp",
+            "this.deviceId deviceId",
+            "this.headers headers",
+            "this.headers.deviceName deviceName",
+            "this.messageType messageType"
         );
         private final EventBus eventBus;
 
@@ -62,7 +69,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
 
         private DeviceAlarmRule rule;
 
-        private ReactorQL ql;
+        private final Map<DeviceAlarmRule.Trigger, ReactorQL> triggerQL = new ConcurrentHashMap<>();
 
         DeviceAlarmTaskExecutor(ExecutionContext context,
                                 EventBus eventBus,
@@ -70,10 +77,10 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
             super(context);
             this.eventBus = eventBus;
             this.scheduler = scheduler;
-            rule = createRule();
-            ql = createQL(rule);
+            init();
         }
 
+
         @Override
         public String getName() {
             return "设备告警";
@@ -96,50 +103,52 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                 .subscribe();
         }
 
+        void init() {
+            rule = createRule();
+            Map<DeviceAlarmRule.Trigger, ReactorQL> ql = createQL(rule);
+            triggerQL.clear();
+            triggerQL.putAll(ql);
+        }
+
         @Override
         public void reload() {
-            rule = createRule();
-            ql = createQL(rule);
+            init();
             if (disposable != null) {
                 disposable.dispose();
             }
             disposable = doStart();
         }
 
+        @Nonnull
         private DeviceAlarmRule createRule() {
             DeviceAlarmRule rule = ValueObject
                 .of(context.getJob().getConfiguration())
                 .get("rule")
                 .map(val -> FastBeanCopier.copy(val, new DeviceAlarmRule()))
-                .orElseThrow(() -> new IllegalArgumentException("告警配置错误"));
+                .orElseThrow(() -> new IllegalArgumentException("error.alarm_configuration_error"));
             rule.validate();
             return rule;
         }
 
         @Override
         public void validate() {
-            DeviceAlarmRule rule = createRule();
             try {
-                createQL(rule);
+                createQL(createRule());
             } catch (Exception e) {
-                throw new IllegalArgumentException("配置错误:" + e.getMessage(), e);
+                throw new BusinessException("error.configuration_error", 500, e.getMessage(), e);
             }
         }
 
-        private ReactorQL createQL(DeviceAlarmRule rule) {
+        private ReactorQL createQL(int index, DeviceAlarmRule.Trigger trigger, DeviceAlarmRule rule) {
             List<String> columns = new ArrayList<>(default_columns);
             List<String> wheres = new ArrayList<>();
 
-            List<DeviceAlarmRule.Trigger> triggers = rule.getTriggers();
+            // select this.properties.this trigger0
+            columns.add(trigger.getType().getPropertyPrefix() + "this trigger" + index);
+            columns.addAll(trigger.toColumns());
+            trigger.createExpression()
+                   .ifPresent(expr -> wheres.add("(" + expr + ")"));
 
-            for (int i = 0; i < triggers.size(); i++) {
-                DeviceAlarmRule.Trigger trigger = triggers.get(i);
-                // select this.properties.this trigger0
-                columns.add(trigger.getType().getPropertyPrefix() + "this trigger" + i);
-                columns.addAll(trigger.toColumns());
-                trigger.createExpression()
-                       .ifPresent(expr -> wheres.add("(" + expr + ")"));
-            }
             String sql = "select \n\t\t" + String.join("\n\t\t,", columns) + " \n\tfrom dual ";
 
             if (!wheres.isEmpty()) {
@@ -147,11 +156,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
             }
 
             if (CollectionUtils.isNotEmpty(rule.getProperties())) {
-                List<String> newColumns = new ArrayList<>(Arrays.asList(
-                    "this.deviceName deviceName",
-                    "this.deviceId deviceId",
-                    "this.headers headers",
-                    "this.timestamp timestamp"));
+                List<String> newColumns = new ArrayList<>(default_columns);
                 for (DeviceAlarmRule.Property property : rule.getProperties()) {
                     if (StringUtils.isEmpty(property.getProperty())) {
                         continue;
@@ -166,7 +171,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                         newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
                     }
                 }
-                if (newColumns.size() > 4) {
+                if (newColumns.size() > default_columns.size()) {
                     sql = "select \n\t" + String.join("\n\t,", newColumns) + "\n from (\n\t" + sql + "\n) t";
                 }
             }
@@ -175,43 +180,49 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
             return ReactorQL.builder().sql(sql).build();
         }
 
+
+        private Map<DeviceAlarmRule.Trigger, ReactorQL> createQL(DeviceAlarmRule rule) {
+            Map<DeviceAlarmRule.Trigger, ReactorQL> qlMap = new HashMap<>();
+            int index = 0;
+            for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) {
+                qlMap.put(trigger, createQL(index++, trigger, rule));
+            }
+            return qlMap;
+        }
+
         public Flux<Map<String, Object>> doSubscribe(EventBus eventBus) {
-            Set<String> topics = new HashSet<>();
 
-            List<Object> binds = new ArrayList<>();
 
+            List<Flux<? extends Map<String, Object>>> inputs = new ArrayList<>();
+            int index = 0;
             for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) {
-                binds.addAll(trigger.toFilterBinds());
+
+                ReactorQL ql = triggerQL.get(trigger);
+                if (ql == null) {
+                    continue;
+                }
+                Flux<? extends Map<String, Object>> datasource;
                 //since 1.11 定时触发的不从eventBus订阅
                 if (trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer) {
-                    continue;
+                    //从上游获取输入进行处理(通常是定时触发发送指令后得到的回复)
+                    datasource = context
+                        .getInput()
+                        .accept()
+                        .flatMap(RuleData::dataToMap);
                 }
-
-                String topic = trigger
-                    .getType()
-                    .getTopic(rule.getProductId(), rule.getDeviceId(), trigger.getModelId());
-                topics.add(topic);
-            }
-
-            List<Flux<? extends Map<String, Object>>> inputs = new ArrayList<>();
-
-            //从上游获取输入进行处理(通常是定时触发发送指令后得到的回复)
-            inputs.add(
-                context
-                    .getInput()
-                    .accept()
-                    .flatMap(RuleData::dataToMap)
-            );
-
-            //从事件总线订阅数据进行处理
-            if (!topics.isEmpty()) {
-                Subscription subscription = Subscription.of(
-                    "device_alarm:" + rule.getId(),
-                    topics.toArray(new String[0]),
-                    Subscription.Feature.local
-                );
-                inputs.add(
-                    eventBus
+                //从事件总线中订阅数据
+                else {
+                    String topic = trigger
+                        .getType()
+                        .getTopic(rule.getProductId(), rule.getDeviceId(), trigger.getModelId());
+
+                    //从事件总线订阅数据进行处理
+                    Subscription subscription = Subscription.of(
+                        "device_alarm:" + rule.getId() + ":" + index++,
+                        topic,
+                        Subscription.Feature.local
+                    );
+                    datasource = eventBus
                         .subscribe(subscription, DeviceMessage.class)
                         .map(Jsonable::toJson)
                         .doOnNext(json -> {
@@ -224,17 +235,16 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                             json.put("productId", rule.getProductId());
                             json.put("alarmId", rule.getId());
                             json.put("alarmName", rule.getName());
-                        })
-                );
-            }
-            ReactorQLContext context = ReactorQLContext
-                .ofDatasource(ignore -> Flux.merge(inputs));
+                        });
 
-            binds.forEach(context::bind);
+                }
 
-            Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL(rule) : ql)
-                .start(context)
-                .map(ReactorQLRecord::asMap);
+                ReactorQLContext qlContext = ReactorQLContext.ofDatasource((t) -> datasource);
+                trigger.toFilterBinds().forEach(qlContext::bind);
+                inputs.add(ql.start(qlContext).map(ReactorQLRecord::asMap));
+            }
+
+            Flux<Map<String, Object>> resultFlux = Flux.merge(inputs);
 
             ShakeLimit shakeLimit;
             if ((shakeLimit = rule.getShakeLimit()) != null) {
@@ -296,7 +306,6 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                             map.get("deviceId"),
                             rule.getId()), map)
                         .thenReturn(map);
-
                 });
         }
     }