Explorar o código

优化告警防抖

zhouhao %!s(int64=5) %!d(string=hai) anos
pai
achega
842ab4671e

+ 13 - 17
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java

@@ -205,24 +205,20 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                 && shakeLimit.isEnabled()
                 && shakeLimit.isEnabled()
                 && shakeLimit.getTime() > 0) {
                 && shakeLimit.getTime() > 0) {
                 int thresholdNumber = shakeLimit.getThreshold();
                 int thresholdNumber = shakeLimit.getThreshold();
-                //打开时间窗口
-                Flux<Flux<Map<String, Object>>> window = resultFlux.window(Duration.ofSeconds(shakeLimit.getTime()));
-
-                Function<Flux<Tuple2<Long, Map<String, Object>>>, Publisher<Tuple2<Long, Map<String, Object>>>> mapper =
-                    shakeLimit.isAlarmFirst()
-                        ?
-                        group -> group
-                            .takeUntil(tp -> tp.getT1() >= thresholdNumber)
-                            .take(1)
-                            .singleOrEmpty()
-                        :
-                        group -> group.takeLast(1).singleOrEmpty();
-
-                resultFlux = window
+                Duration windowTime = Duration.ofSeconds(shakeLimit.getTime());
+
+                resultFlux = resultFlux
+                    .as(flux ->
+                        StringUtils.hasText(rule.getDeviceId())
+                            ? flux.window(windowTime)//规则已经指定了固定的设备,直接开启时间窗口就行
+                            : flux //规则配置在设备产品上,则按设备ID分组后再开窗口
+                            .groupBy(map -> String.valueOf(map.get("deviceId")))
+                            .flatMap(group -> group.window(windowTime)))
+                    //处理每一组数据
                     .flatMap(group -> group
                     .flatMap(group -> group
-                        .index((index, data) -> Tuples.of(index + 1, data))
-                        .transform(mapper)
-                        .filter(tp -> tp.getT1() >= thresholdNumber) //超过阈值告警
+                        .index((index, data) -> Tuples.of(index + 1, data)) //给数据打上索引,索引号就是告警次数
+                        .filter(tp -> tp.getT1() >= thresholdNumber)//超过阈值告警
+                        .as(flux -> shakeLimit.isAlarmFirst() ? flux.take(1) : flux.takeLast(1))//取第一个或者最后一个
                         .map(tp2 -> {
                         .map(tp2 -> {
                             tp2.getT2().put("totalAlarms", tp2.getT1());
                             tp2.getT2().put("totalAlarms", tp2.getT1());
                             return tp2.getT2();
                             return tp2.getT2();