Pārlūkot izejas kodu

增加告警抖动处理 #8

zhou-hao 5 gadi atpakaļ
vecāks
revīzija
2da42e854d

+ 27 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java

@@ -73,6 +73,11 @@ public class DeviceAlarmRule implements Serializable {
      */
     private List<Action> actions;
 
+    /**
+     * 防抖限制
+     */
+    private ShakeLimit shakeLimit;
+
 
     public void validate() {
         if (org.apache.commons.collections.CollectionUtils.isEmpty(getTriggers())) {
@@ -286,6 +291,28 @@ public class DeviceAlarmRule implements Serializable {
     }
 
 
+    /**
+     * 抖动限制
+     * <a href="https://github.com/jetlinks/jetlinks-community/issues/8">https://github.com/jetlinks/jetlinks-community/issues/8</a>
+     *
+     * @since 1.3
+     */
+    @Getter
+    @Setter
+    public static class ShakeLimit implements Serializable {
+        private boolean enabled;
+
+        //时间限制,单位时间内发生多次告警时,只算一次。单位:秒
+        private int time;
+
+        //触发阈值,单位时间内发生n次告警,只算一次。
+        private int threshold;
+
+        //当发生第一次告警时就触发,为false时表示最后一次才触发(告警有延迟,但是可以统计出次数)
+        private boolean alarmFirst;
+
+    }
+
     @AllArgsConstructor
     @Getter
     public enum Operator {

+ 37 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java

@@ -23,7 +23,10 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
+import java.time.Duration;
 import java.util.*;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -157,9 +160,41 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
                 );
 
             binds.forEach(context::bind);
-            return (ql == null ? ql = createQL() : ql)
+
+            Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL() : ql)
                 .start(context)
-                .map(ReactorQLRecord::asMap)
+                .map(ReactorQLRecord::asMap);
+
+            DeviceAlarmRule.ShakeLimit shakeLimit;
+            if ((shakeLimit = rule.getShakeLimit()) != null
+                && shakeLimit.isEnabled()
+                && shakeLimit.getTime() > 0) {
+                int thresholdNumber = shakeLimit.getThreshold();
+                //打开时间窗口
+                Flux<Flux<Map<String, Object>>> window = resultFlux.window(Duration.ofSeconds(shakeLimit.getTime()));
+
+                Function<Flux<Tuple2<Long, Map<String, Object>>>, Publisher<Tuple2<Long, Map<String, Object>>>> mapper =
+                    shakeLimit.isAlarmFirst()
+                        ?
+                        group -> group
+                            .takeUntil(tp -> tp.getT1() >= thresholdNumber) //达到触发阈值
+                            .take(1) //取第一个
+                            .singleOrEmpty()
+                        :
+                        group -> group.takeLast(1).singleOrEmpty();//取最后一个
+
+                resultFlux = window
+                    .flatMap(group -> group
+                        .index((index, data) -> Tuples.of(index + 1, data))
+                        .transform(mapper)
+                        .filter(tp -> tp.getT1() >= thresholdNumber) //超过阈值告警
+                        .map(tp2 -> {
+                            tp2.getT2().put("totalAlarms", tp2.getT1());
+                            return tp2.getT2();
+                        }));
+            }
+
+            return resultFlux
                 .flatMap(map -> {
                     map.put("productId", rule.getProductId());
                     map.put("alarmId", rule.getId());