소스 검색

重构场景联动

liujq 2 년 전
부모
커밋
6607301e59
40개의 변경된 파일1585개의 추가작업 그리고 272개의 파일을 삭제
  1. 2 2
      docker/dev-env/docker-compose.yml
  2. 4 4
      jetlinks-components/relation-component/src/main/java/org/jetlinks/community/relation/impl/DefaultRelationOperation.java
  3. 24 0
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/RuleEngineConstants.java
  4. 9 0
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProvider.java
  5. 44 9
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProviders.java
  6. 2 1
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorSpec.java
  7. 37 11
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/function/ReactorQLDeviceSelectorBuilder.java
  8. 20 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmConstants.java
  9. 25 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmData.java
  10. 110 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmRuleHandler.java
  11. 2 1
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmSceneHandler.java
  12. 1 2
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTarget.java
  13. 90 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTaskExecutorProvider.java
  14. 459 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DefaultAlarmRuleHandler.java
  15. 1 1
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DeviceAlarmTarget.java
  16. 4 3
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/OtherAlarmTarget.java
  17. 2 2
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/ProductAlarmTarget.java
  18. 2 2
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineManagerConfiguration.java
  19. 34 6
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmConfigEntity.java
  20. 24 5
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRecordEntity.java
  21. 52 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRuleBindEntity.java
  22. 8 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/enums/AlarmMode.java
  23. 13 13
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceOperation.java
  24. 7 7
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceTrigger.java
  25. 116 30
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneAction.java
  26. 6 1
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneConditionAction.java
  27. 130 94
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneRule.java
  28. 88 56
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneTaskExecutorProvider.java
  29. 2 2
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java
  30. 2 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Trigger.java
  31. 20 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Variable.java
  32. 43 7
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/FixedTermTypeSupport.java
  33. 10 9
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermColumn.java
  34. 2 1
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermTypes.java
  35. 17 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/AlarmRuleBindService.java
  36. 0 1
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/ElasticSearchAlarmHistoryService.java
  37. 58 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/terms/AlarmBindRuleTerm.java
  38. 58 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/terms/RuleBindAlarmTerm.java
  39. 52 0
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmRuleBindController.java
  40. 5 2
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/SceneController.java

+ 2 - 2
docker/dev-env/docker-compose.yml

@@ -6,7 +6,7 @@ services:
     ports:
       - "6379:6379"
     volumes:
-      - "redis-volume:/data"
+      - "./data/redis:/data"
     command: redis-server --appendonly yes
     environment:
       - TZ=Asia/Shanghai
@@ -40,7 +40,7 @@ services:
     ports:
       - "5432:5432"
     volumes:
-      - "postgres-volume:/var/lib/postgresql/data"
+      - "./data/pg:/var/lib/postgresql/data"
     environment:
       POSTGRES_PASSWORD: jetlinks
       POSTGRES_DB: jetlinks

+ 4 - 4
jetlinks-components/relation-component/src/main/java/org/jetlinks/community/relation/impl/DefaultRelationOperation.java

@@ -79,8 +79,8 @@ class DefaultRelationOperation implements RelationOperation {
     private RelatedObject toObject(RelatedEntity entity) {
         if (reverse) {
             return new DefaultRelatedObject(
-                entity.getRelatedType(),
-                entity.getRelatedId(),
+                entity.getObjectType(),
+                entity.getObjectId(),
                 type,
                 id,
                 entity.getRelation(),
@@ -88,10 +88,10 @@ class DefaultRelationOperation implements RelationOperation {
                 objectProvider);
         }
         return new DefaultRelatedObject(
-            type,
-            id,
             entity.getRelatedType(),
             entity.getRelatedId(),
+            type,
+            id,
             entity.getRelation(),
             relatedRepository,
             objectProvider);

+ 24 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/RuleEngineConstants.java

@@ -0,0 +1,24 @@
+package org.jetlinks.community.rule.engine;
+
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+
+import java.util.Optional;
+
+public interface RuleEngineConstants {
+
+    String ruleCreatorIdKey = "creatorId";
+
+    String ruleName = "name";
+
+    static Optional<String> getCreatorId(ExecutionContext context) {
+        return context.getJob()
+                      .getRuleConfiguration(ruleCreatorIdKey)
+                      .map(String::valueOf);
+    }
+
+    static Optional<String> getRuleName(ExecutionContext context) {
+        return context.getJob()
+                      .getRuleConfiguration(ruleName)
+                      .map(String::valueOf);
+    }
+}

+ 9 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProvider.java

@@ -7,6 +7,8 @@ import reactor.core.publisher.Mono;
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 public interface DeviceSelectorProvider extends Ordered {
 
@@ -21,6 +23,13 @@ public interface DeviceSelectorProvider extends Ordered {
                                                                        Map<String,Object> ctx,
                                                                        NestConditional<T> conditional);
 
+
+    default <T extends Conditional<T>> Function<Map<String, Object>, Mono<NestConditional<T>>> createLazy(
+        DeviceSelectorSpec source,
+        Supplier<NestConditional<T>> conditionalSupplier) {
+        return ctx -> applyCondition(source, ctx, conditionalSupplier.get());
+    }
+
     @Override
     default int getOrder() {
         return 0;

+ 44 - 9
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProviders.java

@@ -12,6 +12,12 @@ public class DeviceSelectorProviders {
 
     private final static Map<String, DeviceSelectorProvider> providers = new LinkedHashMap<>();
 
+    public static final String
+        PROVIDER_FIXED = "fixed",
+        PROVIDER_CONTEXT = "context",
+        PROVIDER_PRODUCT = "product",
+        PROVIDER_TAG = "tag";
+
     static {
 
         register(SimpleDeviceSelectorProvider
@@ -19,21 +25,44 @@ public class DeviceSelectorProviders {
                          "all", "全部设备",
                          (args, query) -> query));
 
-        register(SimpleDeviceSelectorProvider
-                     .of(
-                         "fixed", "固定设备",
-                         (args, query) -> query.in("id", args)));
+        { //固定设备,fixed和context作用和效果完全相同,只是为了前端方便区分不同的操作
+
+              /*
+            选择固定的设备
+            {
+             "selector":"fixed",
+             "selectorValues":[ {"value":"deviceId","name":"设备名称"} ],
+            }
+             */
+            register(SimpleDeviceSelectorProvider
+                         .of(
+                             PROVIDER_FIXED, "固定设备",
+                             (args, query) -> query.in("id", args)));
+            /*
+            根据上下文变量选择设备
+            {
+             "selector":"context",
+             "source":"upper",
+             "upperKey":"deviceId"
+            }
+             */
+            register(SimpleDeviceSelectorProvider
+                         .of(
+                             PROVIDER_CONTEXT, "内置参数",
+                             (args, query) -> query.in("id", args)));
+        }
+
 
         register(SimpleDeviceSelectorProvider
                      .of("state", "按状态",
                          (args, query) -> query.in("state", args)));
 
         register(SimpleDeviceSelectorProvider
-                     .of("product", "按产品",
+                     .of(PROVIDER_PRODUCT, "按产品",
                          (args, query) -> query.in("productId", args)));
 
         register(SimpleDeviceSelectorProvider
-                     .of("tag", "按标签",
+                     .of(PROVIDER_TAG, "按标签",
                          (args, query) -> {
                              if (args.size() == 1) {
                                  return query.accept("id",
@@ -69,9 +98,15 @@ public class DeviceSelectorProviders {
 
     }
 
+    //判断是否为固定设备选择器,固定设备选择器不需要执行查询库,性能更高
+    public static boolean isFixed(DeviceSelectorSpec spec) {
+        return PROVIDER_FIXED.equals(spec.getSelector()) ||
+            PROVIDER_CONTEXT.equals(spec.getSelector());
+    }
+
     public static DeviceSelectorSpec fixed(Object value) {
         DeviceSelectorSpec spec = new DeviceSelectorSpec();
-        spec.setSelector("fixed");
+        spec.setSelector(PROVIDER_CONTEXT);
         spec.setSource(VariableSource.Source.fixed);
         spec.setValue(value);
         return spec;
@@ -79,7 +114,7 @@ public class DeviceSelectorProviders {
 
     public static DeviceSelectorSpec product(String productId) {
         DeviceSelectorSpec spec = new DeviceSelectorSpec();
-        spec.setSelector("product");
+        spec.setSelector(PROVIDER_PRODUCT);
         spec.setSource(VariableSource.Source.fixed);
         spec.setValue(productId);
         return spec;
@@ -118,4 +153,4 @@ public class DeviceSelectorProviders {
         return new ArrayList<>(providers.values());
     }
 
-}
+}

+ 2 - 1
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorSpec.java

@@ -122,7 +122,7 @@ public class DeviceSelectorSpec extends VariableSource {
         if (CollectionUtils.isNotEmpty(selectorValues)) {
             return Flux
                 .fromIterable(selectorValues)
-                .map(SelectorValue::getValue);
+                .mapNotNull(SelectorValue::getValue);
         }
         return super.resolve(context);
     }
@@ -148,3 +148,4 @@ public class DeviceSelectorSpec extends VariableSource {
             .flatMap(DeviceProductOperator::getMetadata);
     }
 }
+

+ 37 - 11
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/function/ReactorQLDeviceSelectorBuilder.java

@@ -1,6 +1,8 @@
 package org.jetlinks.community.device.function;
 
 import lombok.AllArgsConstructor;
+import org.hswebframework.ezorm.core.NestConditional;
+import org.hswebframework.ezorm.rdb.mapping.ReactiveQuery;
 import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
@@ -13,11 +15,26 @@ import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorSpec;
 import org.jetlinks.reactor.ql.ReactorQL;
 import org.jetlinks.reactor.ql.ReactorQLContext;
 import org.jetlinks.reactor.ql.ReactorQLRecord;
+import org.jetlinks.reactor.ql.feature.FromFeature;
+import org.springframework.data.util.Lazy;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 import java.util.Map;
+import java.util.function.Function;
 
 /**
+ * 基于ReactorQL的设备选择器,通过自定义{@link FromFeature}来实现设备数据源.
+ * <pre>
+ * in_gourp('groupId') 在指定的设备分组中
+ * in_group_tree('groupId') 在指定分组中(包含下级分组)
+ * same_group('deviceId') 在指定设备的相同分组中
+ * product('productId') 指定产品ID对应的设备
+ * tag('tag1Key','tag1Value','tag2Key','tag2Value') 按指定的标签获取
+ * state('online') 按指定的状态获取
+ * in_tenant('租户ID') 在指定租户中的设备
+ * </pre>
+ *
  * @author zhouhao
  * @since 2.0
  */
@@ -28,6 +45,7 @@ public class ReactorQLDeviceSelectorBuilder implements DeviceSelectorBuilder {
 
     private final ReactiveRepository<DeviceInstanceEntity, String> deviceRepository;
 
+
     @Override
     @SuppressWarnings("all")
     public DeviceSelector createSelector(DeviceSelectorSpec spec) {
@@ -35,17 +53,25 @@ public class ReactorQLDeviceSelectorBuilder implements DeviceSelectorBuilder {
             .getProvider(spec.getSelector())
             .orElseThrow(() -> new UnsupportedOperationException("unsupported selector:" + spec.getSelector()));
 
-        return context -> provider
-            .applyCondition(spec,
-                            context,
-                            deviceRepository
-                                .createQuery()
-                                .select(DeviceInstanceEntity::getId)
-                                .nest())
-            .flatMapMany(ctd -> ctd
-                .end()
-                .fetch()
-                .map(DeviceInstanceEntity::getId))
+        //固定设备,直接获取,避免查询数据库性能低.
+        if (DeviceSelectorProviders.isFixed(spec)) {
+            return ctx -> {
+                return spec
+                    .resolveSelectorValues(ctx)
+                    .map(String::valueOf)
+                    .flatMap(registry::getDevice);
+            };
+        }
+        Function<Map<String, Object>, Mono<NestConditional<ReactiveQuery<DeviceInstanceEntity>>>> lazy = provider
+            .createLazy(spec,
+                        Lazy.of(() -> deviceRepository
+                            .createQuery()
+                            .select(DeviceInstanceEntity::getId)
+                            .nest()));
+
+        return context -> lazy
+            .apply(context)
+            .flatMapMany(ctd -> ctd.end().fetch().map(DeviceInstanceEntity::getId))
             .flatMap(registry::getDevice);
     }
 

+ 20 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmConstants.java

@@ -0,0 +1,20 @@
+package org.jetlinks.community.rule.engine.alarm;
+
+public interface AlarmConstants {
+
+    interface ConfigKey {
+        String alarmConfigId = "alarmConfigId";
+        String alarming = "alarming";
+        String firstAlarm = "firstAlarm";
+        String alarmName = "name";
+        String level = "level";
+        String ownerId = "ownerId";
+        String targetType = "targetType";
+        String state = "state";
+        String alarmTime = "alarmTime";
+        String lastAlarmTime = "lastAlarmTime";
+        String targetId = "targetId";
+        String targetName = "targetName";
+
+    }
+}

+ 25 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmData.java

@@ -0,0 +1,25 @@
+package org.jetlinks.community.rule.engine.alarm;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@Getter
+@Setter
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+public class AlarmData implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private String alarmConfigId;
+    private String alarmName;
+
+    private String ruleId;
+    private String ruleName;
+
+    private Map<String, Object> output;
+}

+ 110 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmRuleHandler.java

@@ -0,0 +1,110 @@
+package org.jetlinks.community.rule.engine.alarm;
+
+import com.google.common.collect.Maps;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.rule.engine.alarm.AlarmConstants.ConfigKey;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import reactor.core.publisher.Flux;
+
+import java.util.Map;
+
+/**
+ * 告警规则数据处理器,当场景规则中配置的告警动作被执行时,将调用此处理器的相关方法.
+ *
+ * @author zhouhao
+ * @since 2.0
+ * @see AlarmTaskExecutorProvider
+ */
+public interface AlarmRuleHandler {
+
+    /**
+     * 触发告警
+     *
+     * @param context 告警规则上下文
+     * @param data    告警数据
+     * @return 处理结果
+     * @see org.jetlinks.community.rule.engine.enums.AlarmMode#trigger
+     */
+    Flux<Result> triggered(ExecutionContext context, RuleData data);
+
+    /**
+     * 解除告警
+     *
+     * @param context 告警规则上下文
+     * @param data    告警数据
+     * @return 处理结果
+     * @see org.jetlinks.community.rule.engine.enums.AlarmMode#relieve
+     */
+    Flux<Result> relieved(ExecutionContext context, RuleData data);
+
+    @Getter
+    @Setter
+    @AllArgsConstructor(staticName = "of")
+    @NoArgsConstructor
+    class Result {
+
+        @Schema(description = "告警配置ID")
+        private String alarmConfigId;
+
+        @Schema(description = "告警名称")
+        private String alarmName;
+
+        @Schema(description = "当前是否正在告警")
+        private boolean alarming;
+
+        @Schema(description = "当前首次触发")
+        private boolean firstAlarm;
+
+        @Schema(description = "告警级别")
+        private int level;
+
+        @Schema(description = "上一次告警时间")
+        private long lastAlarmTime;
+
+        @Schema(description = "首次告警或者解除告警后的再一次告警时间.")
+        private long alarmTime;
+
+        @Schema(description = "告警目标类型")
+        private String targetType;
+
+        @Schema(description = "告警目标ID")
+        private String targetId;
+
+        @Schema(description = "告警目标名称")
+        private String targetName;
+
+
+        public Result copyWith(AlarmTargetInfo targetInfo) {
+            Result result = FastBeanCopier.copy(this, new Result());
+            result.setTargetType(targetInfo.getTargetType());
+            result.setTargetId(targetInfo.getTargetId());
+            result.setTargetName(targetInfo.getTargetName());
+            return result;
+        }
+
+
+        public Map<String, Object> toMap() {
+            Map<String, Object> map = Maps.newHashMapWithExpectedSize(16);
+
+            map.put(ConfigKey.alarmConfigId, alarmConfigId);
+            map.put(ConfigKey.alarmName, alarmName);
+            map.put(ConfigKey.alarming, alarming);
+            map.put(ConfigKey.firstAlarm, firstAlarm);
+            map.put(ConfigKey.level, level);
+            map.put(ConfigKey.alarmTime, alarmTime);
+            map.put(ConfigKey.lastAlarmTime, lastAlarmTime);
+            map.put(ConfigKey.targetType, targetType);
+            map.put(ConfigKey.targetId, targetId);
+            map.put(ConfigKey.targetName, targetName);
+
+            return map;
+        }
+    }
+
+}

+ 2 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmSceneHandler.java

@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 @Component
 @AllArgsConstructor
+@Deprecated
 public class AlarmSceneHandler implements SceneFilter, CommandLineRunner {
 
     private final EventBus eventBus;
@@ -63,7 +64,7 @@ public class AlarmSceneHandler implements SceneFilter, CommandLineRunner {
                 .fromIterable(alarmConfigMap.values())
                 .flatMap(alarmConfig -> AlarmTarget
                     .of(alarmConfig.getTargetType())
-                    .convert(data)
+                    .convert(AlarmData.of(alarmConfig.getId(), alarmConfig.getName(), data.getRule().getId(), data.getRule().getName(), data.getOutput()))
                     .flatMap(targetInfo -> {
                         AlarmRecordEntity record = ofRecord(targetInfo, alarmConfig);
                         //修改告警记录

+ 1 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTarget.java

@@ -1,6 +1,5 @@
 package org.jetlinks.community.rule.engine.alarm;
 
-import org.jetlinks.community.rule.engine.scene.SceneData;
 import reactor.core.publisher.Flux;
 
 /**
@@ -15,7 +14,7 @@ public interface AlarmTarget {
 
     String getName();
 
-    Flux<AlarmTargetInfo> convert(SceneData data);
+    Flux<AlarmTargetInfo> convert(AlarmData data);
 
     static AlarmTarget of(String type) {
         return AlarmTargetSupplier

+ 90 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTaskExecutorProvider.java

@@ -0,0 +1,90 @@
+package org.jetlinks.community.rule.engine.alarm;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.hswebframework.web.validator.ValidatorUtils;
+import org.jetlinks.community.rule.engine.enums.AlarmMode;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor;
+import org.reactivestreams.Publisher;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import javax.validation.constraints.NotNull;
+import java.io.Serializable;
+import java.util.function.Function;
+
+@AllArgsConstructor
+@Component
+public class AlarmTaskExecutorProvider implements TaskExecutorProvider {
+    public static final String executor = "alarm";
+
+    private final AlarmRuleHandler alarmHandler;
+
+    @Override
+    public String getExecutor() {
+        return executor;
+    }
+
+    @Override
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new AlarmTaskExecutor(context, alarmHandler));
+    }
+
+    static class AlarmTaskExecutor extends FunctionTaskExecutor {
+
+        private final AlarmRuleHandler handler;
+
+        private Function<RuleData, Flux<AlarmRuleHandler.Result>> executor;
+
+        private Config config;
+
+        public AlarmTaskExecutor(ExecutionContext context, AlarmRuleHandler handler) {
+            super("告警", context);
+            this.handler = handler;
+            reload();
+        }
+
+        @Override
+        public String getName() {
+            return config.getMode() == AlarmMode.relieve
+                ? "解除告警" : "触发告警";
+        }
+
+        @Override
+        protected Publisher<RuleData> apply(RuleData input) {
+            return executor
+                .apply(input)
+                .map(result -> context.newRuleData(input.newData(result.toMap())));
+        }
+
+        @Override
+        public void reload() {
+            config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
+            ValidatorUtils.tryValidate(config);
+            if (config.mode == AlarmMode.relieve) {
+                executor = input -> handler.relieved(context, input);
+            } else {
+                executor = input -> handler.triggered(context, input);
+            }
+        }
+    }
+
+
+    @Getter
+    @Setter
+    public static class Config implements Serializable {
+        @NotNull
+        @Schema(description = "告警方式")
+        private AlarmMode mode;
+
+
+    }
+}

+ 459 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DefaultAlarmRuleHandler.java

@@ -0,0 +1,459 @@
+package org.jetlinks.community.rule.engine.alarm;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
+import org.hswebframework.web.authorization.DefaultDimensionType;
+import org.hswebframework.web.authorization.ReactiveAuthenticationHolder;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.hswebframework.web.crud.events.EntityCreatedEvent;
+import org.hswebframework.web.crud.events.EntityDeletedEvent;
+import org.hswebframework.web.crud.events.EntityModifyEvent;
+import org.hswebframework.web.crud.events.EntitySavedEvent;
+import org.hswebframework.web.id.IDGenerator;
+import org.jetlinks.community.rule.engine.RuleEngineConstants;
+import org.jetlinks.core.config.ConfigStorage;
+import org.jetlinks.core.config.ConfigStorageManager;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.utils.CompositeSet;
+import org.jetlinks.core.utils.Reactors;
+import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.community.rule.engine.entity.AlarmConfigEntity;
+import org.jetlinks.community.rule.engine.entity.AlarmHistoryInfo;
+import org.jetlinks.community.rule.engine.entity.AlarmRecordEntity;
+import org.jetlinks.community.rule.engine.entity.AlarmRuleBindEntity;
+import org.jetlinks.community.rule.engine.enums.AlarmRecordState;
+import org.jetlinks.community.rule.engine.enums.AlarmState;
+import org.jetlinks.community.rule.engine.scene.SceneRule;
+import org.jetlinks.community.rule.engine.service.AlarmHistoryService;
+import org.jetlinks.community.rule.engine.service.AlarmRecordService;
+import org.jetlinks.community.topic.Topics;
+import org.jetlinks.community.utils.ObjectMappers;
+import org.jetlinks.reactor.ql.utils.CastUtils;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.RuleDataHelper;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+@AllArgsConstructor
+@Component
+public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRunner {
+
+    private static final Set<String> configInfoKey = new HashSet<>(
+        Arrays.asList(
+            AlarmConstants.ConfigKey.alarmConfigId,
+            AlarmConstants.ConfigKey.alarmName,
+            AlarmConstants.ConfigKey.level,
+            AlarmConstants.ConfigKey.alarmTime,
+            AlarmConstants.ConfigKey.lastAlarmTime,
+            AlarmConstants.ConfigKey.targetType,
+            AlarmConstants.ConfigKey.state,
+            AlarmConstants.ConfigKey.ownerId
+        ));
+
+    private final Map<Tuple2<String, Integer>, Set<String>> ruleAlarmBinds = new ConcurrentHashMap<>();
+
+    private final AlarmRecordService alarmRecordService;
+    private final AlarmHistoryService historyService;
+    private final ConfigStorageManager storageManager;
+    private final ApplicationEventPublisher eventPublisher;
+
+    private final EventBus eventBus;
+
+    public final ReactiveRepository<AlarmRuleBindEntity, String> bindRepository;
+
+    @Override
+    public Flux<Result> triggered(ExecutionContext context, RuleData data) {
+        return this
+            .parseAlarmInfo(context, data)
+            .flatMap(this::triggerAlarm);
+    }
+
+    @Override
+    public Flux<Result> relieved(ExecutionContext context, RuleData data) {
+        return this
+            .parseAlarmInfo(context, data)
+            .flatMap(this::relieveAlarm);
+    }
+
+    private Flux<AlarmInfo> parseAlarmInfo(ExecutionContext context, RuleData data) {
+        if (ruleAlarmBinds.isEmpty()) {
+            return Flux.empty();
+        }
+        //节点所在的条件分支索引
+        int branchIndex = context
+            .getJob()
+            .getConfiguration(SceneRule.ACTION_KEY_BRANCH_INDEX)
+            .map(idx -> CastUtils.castNumber(idx).intValue())
+            .orElse(AlarmRuleBindEntity.ANY_BRANCH_INDEX);
+
+        Set<String> alarmId = getBoundAlarmId(context.getInstanceId(), branchIndex);
+
+        if (CollectionUtils.isEmpty(alarmId)) {
+            return Flux.empty();
+        }
+
+        Map<String, Object> contextMap = RuleDataHelper.toContextMap(data);
+        return Flux
+            .fromIterable(alarmId)
+            .flatMap(this::getAlarmStorage)
+            .flatMap(store -> parseAlarm(context, store, contextMap));
+    }
+
+    private Set<String> getBoundAlarmId(String ruleId, int branchIndex) {
+        //指定和特定分支绑定的告警
+        Set<String> specific = ruleAlarmBinds.get(Tuples.of(ruleId, branchIndex));
+
+        //未指定特定分支的告警
+        Set<String> any = ruleAlarmBinds.get(Tuples.of(ruleId, AlarmRuleBindEntity.ANY_BRANCH_INDEX));
+
+        //没有任何告警绑定了规则
+        if (CollectionUtils.isEmpty(specific) && CollectionUtils.isEmpty(any)) {
+            return Collections.emptySet();
+        }
+
+        //只有特定分支
+        if (CollectionUtils.isNotEmpty(specific) && CollectionUtils.isEmpty(any)) {
+            return specific;
+        }
+        //只有任意规则
+        else if (CollectionUtils.isEmpty(specific) && CollectionUtils.isNotEmpty(any)) {
+            return any;
+        } else {
+            return new CompositeSet<>(specific, any);
+        }
+    }
+
+    private AlarmRecordEntity ofRecord(Result result) {
+        AlarmRecordEntity entity = new AlarmRecordEntity();
+        entity.setAlarmConfigId(result.getAlarmConfigId());
+        entity.setState(AlarmRecordState.warning);
+        entity.setAlarmTime(System.currentTimeMillis());
+        entity.setLevel(result.getLevel());
+        entity.setTargetType(result.getTargetType());
+        entity.setTargetName(result.getTargetName());
+        entity.setTargetId(result.getTargetId());
+        entity.setAlarmName(result.getAlarmName());
+        entity.generateId();
+        return entity;
+    }
+
+    private Flux<AlarmInfo> parseAlarm(ExecutionContext context, ConfigStorage alarm, Map<String, Object> contextMap) {
+        return this
+            .getAlarmInfo(alarm)
+            .flatMapMany(result -> {
+
+                String ruleName = RuleEngineConstants
+                    .getRuleName(context)
+                    .orElse(result.getAlarmName());
+
+                AlarmData alarmData = AlarmData.of(
+                    result.getAlarmConfigId(),
+                    result.getAlarmName(),
+                    context.getInstanceId(),
+                    ruleName,
+                    contextMap);
+
+                result.setData(alarmData);
+
+                return AlarmTarget
+                    .of(result.getTargetType())
+                    .convert(alarmData)
+                    .map(result::copyWith);
+            });
+    }
+
+    private Mono<AlarmInfo> relieveAlarm(AlarmInfo result) {
+        AlarmRecordEntity record = ofRecord(result);
+
+        //更新告警状态.
+        return alarmRecordService
+            .createUpdate()
+            .set(AlarmRecordEntity::getState, AlarmRecordState.normal)
+            .set(AlarmRecordEntity::getHandleTime, System.currentTimeMillis())
+            .where(AlarmRecordEntity::getId, record.getId())
+            .and(AlarmRecordEntity::getState, AlarmRecordState.warning)
+            .execute()
+            .map(total -> {
+
+                //如果有数据被更新说明是正在告警中
+                result.setAlarming(total > 0);
+
+                return result;
+            });
+
+    }
+
+    private Mono<AlarmInfo> triggerAlarm(AlarmInfo result) {
+        AlarmRecordEntity record = ofRecord(result);
+
+        //更新告警状态.
+        return alarmRecordService
+            .createUpdate()
+            .set(record)
+            .where(AlarmRecordEntity::getId, record.getId())
+            .and(AlarmRecordEntity::getState, AlarmRecordState.warning)
+            .execute()
+            //更新数据库报错,依然尝试触发告警!
+            .onErrorResume(err -> {
+                log.error("trigger alarm error", err);
+                return Reactors.ALWAYS_ZERO;
+            })
+            .flatMap(total -> {
+                AlarmHistoryInfo historyInfo = createHistory(record, result);
+                //更新结果返回0 说明是新产生的告警数据
+                if (total == 0) {
+                    result.setFirstAlarm(true);
+                    result.setAlarming(false);
+                    result.setAlarmTime(record.getAlarmTime());
+
+                    return alarmRecordService
+                        .save(record)
+                        .then(historyService.save(historyInfo))
+                        .then(publishAlarmRecord(historyInfo))
+                        .then(publishEvent(historyInfo))
+                        .then(saveAlarmCache(result, record));
+                }
+                result.setFirstAlarm(false);
+                result.setAlarming(true);
+
+                return historyService
+                    .save(historyInfo)
+                    .then(publishEvent(historyInfo))
+                    .then(saveAlarmCache(result, record));
+            });
+    }
+
+    private Mono<Void> publishEvent(AlarmHistoryInfo historyInfo) {
+        return Mono.fromRunnable(() -> eventPublisher.publishEvent(historyInfo));
+    }
+
+    private AlarmHistoryInfo createHistory(AlarmRecordEntity record, AlarmInfo alarmInfo) {
+        AlarmHistoryInfo info = new AlarmHistoryInfo();
+        info.setId(IDGenerator.SNOW_FLAKE_STRING.generate());
+        info.setAlarmConfigId(record.getAlarmConfigId());
+        info.setAlarmConfigName(record.getAlarmName());
+        info.setAlarmRecordId(record.getId());
+        info.setLevel(record.getLevel());
+        info.setAlarmTime(record.getAlarmTime());
+        info.setTargetName(record.getTargetName());
+        info.setTargetId(record.getTargetId());
+        info.setTargetType(record.getTargetType());
+        info.setAlarmInfo(ObjectMappers.toJsonString(alarmInfo.getData().getOutput()));
+        return info;
+    }
+
+
+    public Mono<Void> publishAlarmRecord(AlarmHistoryInfo historyInfo) {
+        String topic = Topics.alarm(historyInfo.getTargetType(), historyInfo.getTargetId(), historyInfo.getAlarmConfigId());
+        return eventBus.publish(topic, historyInfo).then();
+    }
+
+    private Mono<AlarmInfo> saveAlarmCache(AlarmInfo result,
+                                           AlarmRecordEntity record) {
+
+
+        return this
+            .getAlarmStorage(result.getAlarmConfigId())
+            .flatMap(store -> {
+                Mono<Void> save = store.setConfig("lastAlarmTime", record.getAlarmTime()).then();
+
+                if (!result.isAlarming()) {
+                    save = save.then(store.setConfig("alarmTime", record.getAlarmTime()).then());
+                }
+
+                return save;
+            })
+            .thenReturn(result);
+    }
+
+    private Mono<AlarmInfo> getAlarmInfo(ConfigStorage alarm) {
+        return alarm
+            .getConfigs(configInfoKey)
+            .mapNotNull(values -> {
+                //告警禁用了
+                if (values
+                    .getString(AlarmConstants.ConfigKey.state, AlarmState.enabled.name())
+                    .equals(AlarmState.disabled.name())) {
+                    return null;
+                }
+
+                AlarmInfo result = FastBeanCopier.copy(values.getAllValues(), new AlarmInfo());
+
+                if (result.getAlarmConfigId() == null ||
+                    result.getAlarmName() == null) {
+                    //缓存丢失了?从数据库里获取?
+                    return null;
+                }
+
+                return result;
+            });
+    }
+
+    private Mono<ConfigStorage> getAlarmStorage(String alarmId) {
+        return storageManager.getStorage("alarm:" + alarmId);
+    }
+
+
+    /*  处理告警配置缓存事件 */
+
+    static final String TOPIC_ALARM_CONFIG_SAVE = "/_sys/device-alarm-config/save";
+    static final String TOPIC_ALARM_CONFIG_DELETE = "/_sys/device-alarm-rule/del";
+
+    @EventListener
+    public void handleConfigEvent(EntitySavedEvent<AlarmConfigEntity> event) {
+        event.async(
+            Flux.fromIterable(event.getEntity())
+                .flatMap(e -> eventBus.publish(TOPIC_ALARM_CONFIG_SAVE, e))
+        );
+    }
+
+    @EventListener
+    public void handleConfigEvent(EntityCreatedEvent<AlarmConfigEntity> event) {
+        event.async(
+            Flux.fromIterable(event.getEntity())
+                .flatMap(e -> eventBus.publish(TOPIC_ALARM_CONFIG_SAVE, e))
+        );
+    }
+
+    @EventListener
+    public void handleConfigEvent(EntityModifyEvent<AlarmConfigEntity> event) {
+        event.async(
+            Flux.fromIterable(event.getAfter())
+                .flatMap(e -> eventBus.publish(TOPIC_ALARM_CONFIG_SAVE, e))
+        );
+    }
+
+    @EventListener
+    public void handleConfigEvent(EntityDeletedEvent<AlarmConfigEntity> event) {
+        event.async(
+            Flux.fromIterable(event.getEntity())
+                .flatMap(e -> eventBus.publish(TOPIC_ALARM_CONFIG_DELETE, e))
+        );
+    }
+
+
+    @Subscribe(value = TOPIC_ALARM_CONFIG_SAVE, features = {Subscription.Feature.local, Subscription.Feature.broker})
+    public Mono<Void> handleAlarmConfig(AlarmConfigEntity entity) {
+        return this
+            .getAlarmStorage(entity.getId())
+            .flatMap(store -> store.setConfigs(entity.toConfigMap()))
+            .then();
+    }
+
+    @Subscribe(value = TOPIC_ALARM_CONFIG_DELETE, features = {Subscription.Feature.local, Subscription.Feature.broker})
+    public Mono<Void> removeAlarmConfig(AlarmConfigEntity entity) {
+        return this
+            .getAlarmStorage(entity.getId())
+            .flatMap(ConfigStorage::clear)
+            .then();
+    }
+
+
+    /*  处理告警和规则绑定事件 */
+    static final String TOPIC_ALARM_RULE_BIND = "/_sys/device-alarm-rule/bind";
+    static final String TOPIC_ALARM_RULE_UNBIND = "/_sys/device-alarm-rule/unbind";
+
+
+    @EventListener
+    public void handleBindEvent(EntitySavedEvent<AlarmRuleBindEntity> event) {
+        event.async(
+            Flux.fromIterable(event.getEntity())
+                .flatMap(e -> eventBus.publish(TOPIC_ALARM_RULE_BIND, e))
+        );
+    }
+
+    @EventListener
+    public void handleBindEvent(EntityCreatedEvent<AlarmRuleBindEntity> event) {
+        event.async(
+            Flux.fromIterable(event.getEntity())
+                .flatMap(e -> eventBus.publish(TOPIC_ALARM_RULE_BIND, e))
+        );
+    }
+
+    @EventListener
+    public void handleBindEvent(EntityDeletedEvent<AlarmRuleBindEntity> event) {
+        event.async(
+            Flux.fromIterable(event.getEntity())
+                .flatMap(e -> eventBus.publish(TOPIC_ALARM_RULE_UNBIND, e))
+        );
+    }
+
+    @Subscribe(value = TOPIC_ALARM_RULE_UNBIND, features = {Subscription.Feature.local, Subscription.Feature.broker})
+    public void handleUnBind(AlarmRuleBindEntity entity) {
+        Integer index = entity.getBranchIndex();
+        if (index == null) {
+            index = AlarmRuleBindEntity.ANY_BRANCH_INDEX;
+        }
+
+        ruleAlarmBinds
+            .compute(Tuples.of(entity.getRuleId(), index), (key, value) -> {
+                if (value == null) {
+                    return null;
+                }
+                value.remove(entity.getAlarmId());
+                if (value.isEmpty()) {
+                    return null;
+                }
+                return value;
+            });
+    }
+
+    @Subscribe(value = TOPIC_ALARM_RULE_BIND, features = {Subscription.Feature.local, Subscription.Feature.broker})
+    public void handleBind(AlarmRuleBindEntity entity) {
+        Integer index = entity.getBranchIndex();
+        if (index == null) {
+            index = AlarmRuleBindEntity.ANY_BRANCH_INDEX;
+        }
+        ruleAlarmBinds
+            .computeIfAbsent(Tuples.of(entity.getRuleId(), index), ignore -> ConcurrentHashMap.newKeySet())
+            .add(entity.getAlarmId());
+    }
+
+    @Override
+    public void run(String... args) throws Exception {
+        //启动时加载绑定配置
+        bindRepository
+            .createQuery()
+            .fetch()
+            .doOnNext(this::handleBind)
+            .subscribe();
+
+    }
+
+    @Getter
+    @Setter
+    public static class AlarmInfo extends Result {
+        /**
+         * 告警所有者用户ID,表示告警是属于哪个用户的,用于进行数据权限控制
+         */
+        private String ownerId;
+
+        private AlarmData data;
+
+        @Override
+        public AlarmInfo copyWith(AlarmTargetInfo targetInfo) {
+            AlarmInfo result = FastBeanCopier.copy(this, new AlarmInfo());
+            result.setTargetType(targetInfo.getTargetType());
+            result.setTargetId(targetInfo.getTargetId());
+            result.setTargetName(targetInfo.getTargetName());
+            return result;
+        }
+    }
+
+}

+ 1 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DeviceAlarmTarget.java

@@ -23,7 +23,7 @@ public class DeviceAlarmTarget implements AlarmTarget {
     }
 
     @Override
-    public Flux<AlarmTargetInfo> convert(SceneData data) {
+    public Flux<AlarmTargetInfo> convert(AlarmData data) {
         Map<String, Object> output = data.getOutput();
         String deviceId = CastUtils.castString(output.get("deviceId"));
         String deviceName = CastUtils.castString(output.getOrDefault("deviceName", deviceId));

+ 4 - 3
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/OtherAlarmTarget.java

@@ -20,11 +20,12 @@ public class OtherAlarmTarget implements AlarmTarget {
     }
 
     @Override
-    public Flux<AlarmTargetInfo> convert(SceneData data) {
+    public Flux<AlarmTargetInfo> convert(AlarmData data) {
         return Flux.just(AlarmTargetInfo
-                             .of(data.getRule().getId(),
-                                 data.getRule().getName(),
+                             .of(data.getRuleId(),
+                                 data.getRuleName(),
                                  getType()));
     }
 
+
 }

+ 2 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/ProductAlarmTarget.java

@@ -1,6 +1,6 @@
 package org.jetlinks.community.rule.engine.alarm;
 
-import org.jetlinks.community.rule.engine.scene.SceneData;
+
 import org.jetlinks.reactor.ql.utils.CastUtils;
 import reactor.core.publisher.Flux;
 
@@ -23,7 +23,7 @@ public class ProductAlarmTarget implements AlarmTarget {
     }
 
     @Override
-    public Flux<AlarmTargetInfo> convert(SceneData data) {
+    public Flux<AlarmTargetInfo> convert(AlarmData data) {
         Map<String, Object> output = data.getOutput();
         String productId = CastUtils.castString(output.get("productId"));
         String productName = CastUtils.castString(output.getOrDefault("productName", productId));

+ 2 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineManagerConfiguration.java

@@ -18,9 +18,9 @@ public class RuleEngineManagerConfiguration {
 
     @Bean
     public SceneTaskExecutorProvider sceneTaskExecutorProvider(EventBus eventBus,
-                                                               ObjectProvider<SceneFilter> filters) {
+                                                                ObjectProvider<SceneFilter> filters) {
         return new SceneTaskExecutorProvider(eventBus,
-                                             SceneFilter.composite(filters));
+                                              SceneFilter.composite(filters));
     }
 
     @Configuration(proxyBeanMethods = false)

+ 34 - 6
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmConfigEntity.java

@@ -9,14 +9,18 @@ import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
 import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
 import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
+import org.hswebframework.web.api.crud.entity.RecordModifierEntity;
 import org.hswebframework.web.crud.annotation.EnableEntityEvent;
 import org.hswebframework.web.crud.generator.Generators;
+import org.jetlinks.community.rule.engine.alarm.AlarmConstants;
 import org.jetlinks.community.rule.engine.enums.AlarmState;
 import org.jetlinks.community.rule.engine.scene.TriggerType;
 
 import javax.persistence.Column;
 import javax.persistence.Index;
 import javax.persistence.Table;
+import java.util.HashMap;
+import java.util.Map;
 
 @Getter
 @Setter
@@ -25,7 +29,7 @@ import javax.persistence.Table;
 })
 @Comment("告警配置表")
 @EnableEntityEvent
-public class AlarmConfigEntity extends GenericEntity<String> implements RecordCreationEntity {
+public class AlarmConfigEntity extends GenericEntity<String> implements RecordCreationEntity, RecordModifierEntity {
 
     @Column(length = 64, nullable = false)
     @Schema(description = "名称")
@@ -39,32 +43,33 @@ public class AlarmConfigEntity extends GenericEntity<String> implements RecordCr
     @Schema(description = "告警级别")
     private Integer level;
 
-    @Column(length = 128, nullable = false)
+    @Column(length = 128)
     @Schema(description = "关联场景名称")
     private String sceneName;
 
-    @Column(length = 64, nullable = false)
+    @Column(length = 64)
     @Schema(description = "关联场景Id")
     private String sceneId;
 
     @Column(length = 32, nullable = false)
     @EnumCodec
     @ColumnType(javaType = String.class)
-    @DefaultValue("enabled")
+    @DefaultValue("disabled")
     @Schema(description = "状态")
     private AlarmState state;
 
-    @Column(length = 32, nullable = false)
+    @Column(length = 32)
     @EnumCodec
     @ColumnType(javaType = String.class)
     @Schema(description = "场景触发类型")
+    @Deprecated
     private TriggerType sceneTriggerType;
 
     @Column(length = 256)
     @Schema(description = "说明")
     private String description;
 
-    @Column(updatable = false)
+    @Column(length = 64, updatable = false)
     @Schema(
         description = "创建者ID(只读)"
         , accessMode = Schema.AccessMode.READ_ONLY
@@ -78,4 +83,27 @@ public class AlarmConfigEntity extends GenericEntity<String> implements RecordCr
         , accessMode = Schema.AccessMode.READ_ONLY
     )
     private Long createTime;
+
+    @Column(length = 64)
+    @Schema(description = "更新者ID", accessMode = Schema.AccessMode.READ_ONLY)
+    private String modifierId;
+
+    @Column
+    @DefaultValue(generator = Generators.CURRENT_TIME)
+    @Schema(description = "更新时间")
+    private Long modifyTime;
+
+
+    public Map<String, Object> toConfigMap() {
+        Map<String, Object> configs = new HashMap<>();
+
+        configs.put(AlarmConstants.ConfigKey.alarmConfigId, getId());
+        configs.put(AlarmConstants.ConfigKey.alarmName, getName());
+        configs.put(AlarmConstants.ConfigKey.level, getLevel());
+        configs.put(AlarmConstants.ConfigKey.ownerId, getModifierId() == null ? getCreatorId() : getModifierId());
+        configs.put(AlarmConstants.ConfigKey.targetType, getTargetType());
+        configs.put(AlarmConstants.ConfigKey.state, getState().name());
+
+        return configs;
+    }
 }

+ 24 - 5
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRecordEntity.java

@@ -25,18 +25,22 @@ public class AlarmRecordEntity extends GenericEntity<String> {
     @Schema(description = "告警配置ID")
     private String alarmConfigId;
 
-    @Column(length = 64, nullable = false, updatable = false)
+    @Column(length = 64, nullable = false)
     @Schema(description = "告警配置名称")
     private String alarmName;
 
-    @Column
+    @Column(length = 32, updatable = false)
     @Schema(description = "告警目标类型")
     private String targetType;
 
-    @Column
+    @Column(length = 64, updatable = false)
     @Schema(description = "告警目标Id")
     private String targetId;
 
+    @Column(length = 64, updatable = false)
+    @Schema(description = "告警目标Key")
+    private String targetKey;
+
     @Column
     @Schema(description = "告警目标名称")
     private String targetName;
@@ -45,6 +49,10 @@ public class AlarmRecordEntity extends GenericEntity<String> {
     @Schema(description = "最近一次告警时间")
     private Long alarmTime;
 
+    @Column
+    @Schema(description = "处理时间")
+    private Long handleTime;
+
     @Column
     @Schema(description = "告警级别")
     private Integer level;
@@ -60,14 +68,25 @@ public class AlarmRecordEntity extends GenericEntity<String> {
     @Schema(description = "说明")
     private String description;
 
+    public String getTargetKey() {
+        if (targetKey == null) {
+            generateKey();
+        }
+        return targetKey;
+    }
+
+    public void generateKey() {
+        setTargetKey(generateId(targetId, targetType));
+    }
 
     public void generateId() {
         setId(generateId(targetId, targetType, alarmConfigId));
     }
 
-    public static String generateId(String targetId, String targetType, String alarmConfigId) {
-        return DigestUtils.md5Hex(String.join("-", targetId, targetType, alarmConfigId));
+    public static String generateId(String... args) {
+        return DigestUtils.md5Hex(String.join("-", args));
     }
 
 
 }
+

+ 52 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRuleBindEntity.java

@@ -0,0 +1,52 @@
+package org.jetlinks.community.rule.engine.entity;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
+import org.hswebframework.web.api.crud.entity.GenericEntity;
+import org.hswebframework.web.crud.annotation.EnableEntityEvent;
+import org.hswebframework.web.utils.DigestUtils;
+import org.springframework.util.StringUtils;
+
+import javax.persistence.Column;
+import javax.persistence.Index;
+import javax.persistence.Table;
+import javax.validation.constraints.NotBlank;
+
+@Table(name = "s_alarm_rule_bind", indexes = {
+    @Index(name = "idx_alarm_rule_aid", columnList = "alarmId"),
+    @Index(name = "idx_alarm_rule_rid", columnList = "ruleId"),
+})
+@Getter
+@Setter
+@Schema(description = "告警规则绑定信息")
+@EnableEntityEvent
+public class AlarmRuleBindEntity extends GenericEntity<String> {
+
+    public static final int ANY_BRANCH_INDEX = -1;
+
+    @Column(nullable = false, updatable = false)
+    @NotBlank
+    @Schema(description = "告警ID")
+    private String alarmId;
+
+    @Column(nullable = false, updatable = false)
+    @NotBlank
+    @Schema(description = "场景规则ID")
+    private String ruleId;
+
+    @Column(nullable = false, updatable = false)
+    @Schema(description = "规则条件分支ID")
+    @DefaultValue("-1")
+    private Integer branchIndex;
+
+    @Override
+    public String getId() {
+        if (StringUtils.hasText(super.getId())) {
+            return super.getId();
+        }
+        setId(DigestUtils.md5Hex(String.join("|", alarmId, ruleId, String.valueOf(branchIndex))));
+        return super.getId();
+    }
+}

+ 8 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/enums/AlarmMode.java

@@ -0,0 +1,8 @@
+package org.jetlinks.community.rule.engine.enums;
+
+public enum AlarmMode {
+    //触发告警
+    trigger,
+    //解除告警
+    relieve
+}

+ 13 - 13
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceOperation.java

@@ -135,13 +135,13 @@ public class DeviceOperation {
             terms.addAll(
                 this.createTerm(
                     metadata.getEvent(eventId)
-                            .<List<PropertyMetadata>>map(event -> Collections
-                                .singletonList(
-                                    of("data",
-                                       event.getName(),
-                                       event.getType())
-                                ))
-                            .orElse(Collections.emptyList()),
+                        .<List<PropertyMetadata>>map(event -> Collections
+                            .singletonList(
+                                of("data",
+                                   event.getName(),
+                                   event.getType())
+                            ))
+                        .orElse(Collections.emptyList()),
                     (property, column) -> column.setChildren(createTermColumn("event", property, false))));
         }
         //调用功能
@@ -149,12 +149,12 @@ public class DeviceOperation {
             terms.addAll(
                 this.createTerm(
                     metadata.getFunction(functionId)
-                            .<List<PropertyMetadata>>map(meta -> Collections.singletonList(
-                                of("output",
-                                   meta.getName(),
-                                   meta.getOutput()))
-                            )
-                            .orElse(Collections.emptyList()),
+                        .<List<PropertyMetadata>>map(meta -> Collections.singletonList(
+                            of("output",
+                               meta.getName(),
+                               meta.getOutput()))
+                        )
+                        .orElse(Collections.emptyList()),
                     (property, column) -> column.setChildren(createTermColumn("function", property, false))));
         }
 

+ 7 - 7
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceTrigger.java

@@ -25,6 +25,7 @@ import org.jetlinks.community.rule.engine.scene.term.TermColumn;
 import org.jetlinks.community.rule.engine.scene.term.TermTypeSupport;
 import org.jetlinks.community.rule.engine.scene.term.TermTypes;
 import org.jetlinks.community.rule.engine.scene.value.TermValue;
+import org.jetlinks.reactor.ql.DefaultReactorQLContext;
 import org.jetlinks.reactor.ql.ReactorQL;
 import org.jetlinks.reactor.ql.ReactorQLContext;
 import org.jetlinks.rule.engine.api.model.RuleModel;
@@ -56,6 +57,9 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
     @NotNull(message = "error.scene_rule_trigger_device_operation_cannot_be_null")
     private DeviceOperation operation;
 
+    @Schema(description = "拓展信息")
+    private Map<String,Object> options;
+
     public SqlRequest createSql(List<Term> terms) {
         return createSql(terms, true);
     }
@@ -170,16 +174,12 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
                 .builder()
                 .sql(sql)
                 .build();
-            Object[] args = request.getParameters();
+            List<Object> args = Arrays.asList(request.getParameters());
             String sqlString = request.toNativeSql();
             return new Function<Map<String, Object>, Mono<Boolean>>() {
                 @Override
                 public Mono<Boolean> apply(Map<String, Object> map) {
-                    ReactorQLContext context = ReactorQLContext.ofDatasource((t) -> Flux.just(map));
-                    for (Object arg : args) {
-                        context.bind(arg);
-                    }
-
+                    ReactorQLContext context = new DefaultReactorQLContext((t) -> Flux.just(map), args);
                     return ql
                         .start(context)
                         .hasElements();
@@ -361,7 +361,7 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
 
     public List<Variable> createDefaultVariable() {
         return Arrays.asList(
-            Variable.of("deviceId", "设备ID"),
+            Variable.of("deviceId", "设备ID").withOption(Variable.OPTION_PRODUCT_ID,productId),
             Variable.of("deviceName", "设备名称"),
             Variable.of("productId", "产品ID"),
             Variable.of("productName", "产品名称")

+ 116 - 30
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneAction.java

@@ -3,10 +3,12 @@ package org.jetlinks.community.rule.engine.scene;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.Setter;
 import org.hswebframework.ezorm.core.param.Term;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.i18n.LocaleUtils;
+import org.jetlinks.community.rule.engine.alarm.AlarmTaskExecutorProvider;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.MessageType;
@@ -18,6 +20,7 @@ import org.jetlinks.core.metadata.DeviceMetadata;
 import org.jetlinks.core.metadata.FunctionMetadata;
 import org.jetlinks.core.metadata.PropertyMetadata;
 import org.jetlinks.core.metadata.types.BooleanType;
+import org.jetlinks.core.metadata.types.IntType;
 import org.jetlinks.core.metadata.types.ObjectType;
 import org.jetlinks.community.relation.utils.VariableSource;
 import org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider;
@@ -27,8 +30,10 @@ import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorSpec;
 import org.jetlinks.community.rule.engine.scene.term.TermTypes;
 import org.jetlinks.rule.engine.api.model.RuleNodeModel;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
 import java.io.Serializable;
 import java.time.temporal.ChronoUnit;
 import java.util.*;
@@ -46,6 +51,7 @@ import static org.hswebframework.web.i18n.LocaleUtils.*;
 public class SceneAction implements Serializable {
 
     @Schema(description = "执行器类型")
+    @NotNull
     private Executor executor;
 
     @Schema(description = "执行器类型为[notify]时不能为空")
@@ -57,23 +63,55 @@ public class SceneAction implements Serializable {
     @Schema(description = "执行器类型为[device]时不能为空")
     private Device device;
 
+    @Schema(description = "执行器类型为[alarm]时不能为空")
+    private Alarm alarm;
+
     @Schema(description = "输出过滤条件,串行执行动作时,满足条件才会进入下一个节点")
     private List<Term> terms;
 
-    public Flux<Variable> createVariables(DeviceRegistry registry, int index) {
+    @Schema(description = "拓展信息")
+    private Map<String,Object> options;
+
+    public Flux<Variable> createVariables(DeviceRegistry registry, Integer branchIndex, int index) {
         //设备
         if (executor == Executor.device && device != null) {
             return device
                 .getDeviceMetadata(registry, device.productId)
-                .flatMapMany(metadata -> currentReactive()
-                    .flatMapIterable(locale ->
-                                         doWith(metadata,
-                                                locale,
-                                                (m, l) -> device.createVariables(metadata, index))));
+                .map(metadata -> createVariable(branchIndex, index, device.createVariables(metadata)))
+                .flux()
+                .as(LocaleUtils::transform);
+        }
+        if (executor == Executor.alarm && alarm != null) {
+            return Mono
+                .fromSupplier(() -> createVariable(branchIndex, index, alarm.createVariables()))
+                .flux()
+                .as(LocaleUtils::transform);
         }
         return Flux.empty();
     }
 
+    private Variable createVariable(Integer branchIndex, int actionIndex, List<Variable> children) {
+        int humanIndex = actionIndex + 1;
+
+        String varId = "action_" + humanIndex;
+
+        if (branchIndex != null) {
+            varId = "branch_" + branchIndex + "_" + varId;
+        }
+
+        String message = resolveMessage(
+            "message.action_var_index",
+            String.format("动作[%s]", humanIndex),
+            humanIndex
+        );
+
+
+        Variable variable = Variable.of(varId, message);
+        variable.setChildren(children);
+
+        return variable;
+    }
+
     public static SceneAction notify(String notifyType,
                                      String notifierId,
                                      String templateId,
@@ -114,17 +152,27 @@ public class SceneAction implements Serializable {
                 node.setConfiguration(config);
                 return;
             }
+            case alarm:
+                node.setExecutor(AlarmTaskExecutorProvider.executor);
+                node.setConfiguration(FastBeanCopier.copy(alarm, new HashMap<>()));
+                return;
             //设备指令
             case device: {
                 DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig config = new DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig();
 
                 config.setMessage(device.message);
-                config.setSelectorSpec(
-                    DeviceSelectorProviders.composite(
-                        //先选择产品下的设备
-                        DeviceSelectorProviders.product(device.productId),
-                        FastBeanCopier.copy(device, new DeviceSelectorSpec())
-                    ));
+
+                if (DeviceSelectorProviders.isFixed(device)) {
+                    config.setSelectorSpec(FastBeanCopier.copy(device, new DeviceSelectorSpec()));
+                } else {
+                    config.setSelectorSpec(
+                        DeviceSelectorProviders.composite(
+                            //先选择产品下的设备
+                            DeviceSelectorProviders.product(device.productId),
+                            FastBeanCopier.copy(device, new DeviceSelectorSpec())
+                        ));
+                }
+
                 config.setFrom("fixed");
                 config.setStateOperator("direct");
                 config.setProductId(device.productId);
@@ -153,8 +201,7 @@ public class SceneAction implements Serializable {
         private Map<String, Object> message;
 
 
-        public List<Variable> createVariables(DeviceMetadata metadata,
-                                              int actionIndex) {
+        public List<Variable> createVariables(DeviceMetadata metadata) {
             DeviceMessage message = MessageType
                 .convertMessage(this.message)
                 .filter(DeviceMessage.class::isInstance)
@@ -164,25 +211,28 @@ public class SceneAction implements Serializable {
                 return Collections.emptyList();
             }
             List<Variable> variables = new ArrayList<>();
-            int humanIndex = actionIndex + 1;
-
-            Variable action = Variable
-                .of("action_" + humanIndex, resolveMessage(
-                    "message.action_var_index",
-                    String.format("动作[%s]", humanIndex),
-                    humanIndex
-                ));
 
             //下发指令是否成功
             variables.add(Variable
                               .of("success",
                                   resolveMessage(
                                       "message.action_execute_success",
-                                      "执行是否成功",
-                                      humanIndex
+                                      "执行是否成功"
                                   ))
                               .withType(BooleanType.ID));
 
+            //设备ID
+            variables.add(Variable
+                              .of("deviceId",
+                                  resolveMessage(
+                                      "message.device_id",
+                                      "设备ID"
+                                  ))
+                              .withType(BooleanType.ID)
+                              //标识变量属于哪个产品
+                              .withOption(Variable.OPTION_PRODUCT_ID, productId)
+            );
+
             if (message instanceof ReadPropertyMessage) {
                 List<String> properties = ((ReadPropertyMessage) message).getProperties();
                 for (String property : properties) {
@@ -220,8 +270,7 @@ public class SceneAction implements Serializable {
 
                 }
             }
-            action.setChildren(variables);
-            return Collections.singletonList(action);
+            return variables;
 
         }
     }
@@ -272,7 +321,9 @@ public class SceneAction implements Serializable {
 
     @Getter
     @Setter
-    public static class Delay {
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class Delay implements Serializable {
         @Schema(description = "延迟时间")
         private int time;
 
@@ -282,7 +333,7 @@ public class SceneAction implements Serializable {
 
     @Getter
     @Setter
-    public static class Notify {
+    public static class Notify implements Serializable {
         @Schema(description = "通知类型")
         @NotBlank(message = "error.scene_rule_actions_notify_type_cannot_be_empty")
         private String notifyType;
@@ -303,6 +354,40 @@ public class SceneAction implements Serializable {
         private Map<String, Object> variables;
     }
 
+
+    @Getter
+    @Setter
+    public static class Alarm extends AlarmTaskExecutorProvider.Config {
+
+        /**
+         * @see org.jetlinks.community.rule.engine.alarm.AlarmRuleHandler.Result
+         */
+        public List<Variable> createVariables() {
+
+            List<Variable> variables = new ArrayList<>();
+
+            variables.add(
+                Variable.of("alarmName",
+                            LocaleUtils.resolveMessage("message.alarm_config_name", "告警配置名称"))
+            );
+
+            variables.add(
+                Variable.of("level",
+                            LocaleUtils.resolveMessage("message.alarm_level", "告警级别"))
+                        .withType(IntType.ID)
+            );
+
+            variables.add(
+                Variable.of("alarming",
+                            LocaleUtils.resolveMessage("message.is_alarming", "是否正在告警"))
+                        .withType(BooleanType.ID)
+            );
+
+            return variables;
+        }
+    }
+
+
     @Getter
     @AllArgsConstructor
     public enum DelayUnit {
@@ -316,7 +401,8 @@ public class SceneAction implements Serializable {
     public enum Executor {
         notify,
         delay,
-        device
+        device,
+        alarm
     }
 
-}
+}

+ 6 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneConditionAction.java

@@ -13,6 +13,11 @@ import java.util.List;
 @Setter
 public class SceneConditionAction implements Serializable {
 
+    /**
+     * @see org.jetlinks.community.rule.engine.scene.term.TermColumn
+     * @see org.jetlinks.community.rule.engine.scene.term.TermType
+     * @see org.jetlinks.community.rule.engine.scene.value.TermValue
+     */
     @Schema(description = "条件")
     private List<Term> when;
 
@@ -20,6 +25,6 @@ public class SceneConditionAction implements Serializable {
     private ShakeLimit shakeLimit;
 
     @Schema(description = "满足条件时执行的动作")
-    private SceneActions then;
+    private List<SceneActions> then;
 
 }

+ 130 - 94
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneRule.java

@@ -18,6 +18,7 @@ import org.jetlinks.community.rule.engine.commons.ShakeLimit;
 import org.jetlinks.community.rule.engine.commons.TermsConditionEvaluator;
 import org.jetlinks.community.rule.engine.scene.term.TermColumn;
 import org.jetlinks.community.rule.engine.scene.term.limit.ShakeLimitGrouping;
+import org.jetlinks.rule.engine.api.RuleData;
 import org.jetlinks.rule.engine.api.model.RuleLink;
 import org.jetlinks.rule.engine.api.model.RuleModel;
 import org.jetlinks.rule.engine.api.model.RuleNodeModel;
@@ -27,19 +28,23 @@ import reactor.core.Disposables;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Sinks;
+import reactor.function.Function3;
 import reactor.util.concurrent.Queues;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
 import java.io.Serializable;
 import java.util.*;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 
 @Getter
 @Setter
 public class SceneRule implements Serializable {
 
+    public static final String ACTION_KEY_BRANCH_INDEX = "_branchIndex";
+    public static final String ACTION_KEY_GROUP_INDEX = "_groupIndex";
+    public static final String ACTION_KEY_ACTION_INDEX = "_actionIndex";
+
     @Schema(description = "告警ID")
     @NotBlank(message = "error.scene_rule_id_cannot_be_blank")
     private String id;
@@ -52,6 +57,11 @@ public class SceneRule implements Serializable {
     @NotNull(message = "error.scene_rule_trigger_cannot_be_null")
     private Trigger trigger;
 
+    /**
+     * @see org.jetlinks.community.rule.engine.scene.term.TermColumn
+     * @see org.jetlinks.community.rule.engine.scene.term.TermType
+     * @see org.jetlinks.community.rule.engine.scene.value.TermValue
+     */
     @Schema(description = "触发条件")
     private List<Term> terms;
 
@@ -64,6 +74,9 @@ public class SceneRule implements Serializable {
     @Schema(description = "动作分支")
     private List<SceneConditionAction> branches;
 
+    @Schema(description = "扩展配置")
+    private Map<String, Object> options;
+
     @Schema(description = "说明")
     private String description;
 
@@ -141,6 +154,7 @@ public class SceneRule implements Serializable {
 
     public Flux<Variable> createVariables(List<TermColumn> columns,
                                           Integer branchIndex,
+                                          Integer branchGroupIndex,
                                           Integer actionIndex,
                                           DeviceRegistry registry) {
         Flux<Variable> variables = createSceneVariables(columns);
@@ -149,19 +163,20 @@ public class SceneRule implements Serializable {
         if (branchIndex == null && !parallel && actionIndex != null && CollectionUtils.isNotEmpty(actions)) {
 
             for (int i = 0; i < Math.min(actions.size(), actionIndex + 1); i++) {
-                variables = variables.concatWith(actions.get(i).createVariables(registry, i));
+                variables = variables.concatWith(actions.get(i).createVariables(registry, branchIndex, i));
             }
         }
         //分支条件
-        if (branchIndex != null && CollectionUtils.isNotEmpty(branches) && branches.size() > branchIndex) {
+        if (branchIndex != null && branchGroupIndex != null && CollectionUtils.isNotEmpty(branches) && branches.size() > branchIndex) {
             SceneConditionAction branch = branches.get(branchIndex);
+            SceneActions then = branch.getThen() != null && branch.getThen().size() > branchGroupIndex
+                ? branch.getThen().get(branchGroupIndex) : null;
             List<SceneAction> actionList;
-            if (branch.getThen() != null && !branch.getThen().isParallel() &&
-
-                CollectionUtils.isNotEmpty(actionList = branch.getThen().getActions())) {
+            if (then != null && !then.isParallel() &&
+                CollectionUtils.isNotEmpty(actionList = then.getActions())) {
 
                 for (int i = 0; i < Math.min(actionList.size(), actionIndex + 1); i++) {
-                    variables = variables.concatWith(actionList.get(i).createVariables(registry, i));
+                    variables = variables.concatWith(actionList.get(i).createVariables(registry, branchIndex, i));
                 }
 
             }
@@ -171,8 +186,12 @@ public class SceneRule implements Serializable {
             .doOnNext(Variable::refactorPrefix);
     }
 
+    private String createBranchActionId(int branchIndex, int groupId, int actionIndex) {
+        return "branch_" + branchIndex + "_group_" + groupId + "_action_" + actionIndex;
+    }
+
     public Disposable createBranchHandler(Flux<Map<String, Object>> sourceData,
-                                          BiFunction<String, Map<String, Object>, Mono<Void>> output) {
+                                          Function3<Integer, String, Map<String, Object>, Mono<Void>> output) {
         if (CollectionUtils.isEmpty(branches)) {
             return Disposables.disposed();
         }
@@ -186,65 +205,72 @@ public class SceneRule implements Serializable {
             //执行条件
             Function<Map<String, Object>, Mono<Boolean>> filter = createFilter(branch.getWhen());
             //满足条件后的输出操作
-            Function<Map<String, Object>, Mono<Void>> out;
-
-            SceneActions then = branch.getThen();
-            //执行动作
-            if (then != null && CollectionUtils.isNotEmpty(then.getActions())) {
-
-                int size = then.getActions().size();
-                //串行,只传递到第一个动作
-                if (!then.isParallel() || size == 1) {
-                    String nodeId = "branch_" + _branchIndex + "_action_1";
-                    out = data -> output.apply(nodeId, data);
-                } else {
-                    //多个并行执行动作
-                    String[] nodeIds = new String[size];
-                    for (int i = 0; i < nodeIds.length; i++) {
-                        nodeIds[0] = "branch_" + _branchIndex + "_action_" + (i + 1);
+            List<Function<Map<String, Object>, Mono<Void>>> outs = new ArrayList<>();
+
+            List<SceneActions> groups = branch.getThen();
+            int thenIndex = 0;
+            if (CollectionUtils.isNotEmpty(groups)) {
+                thenIndex++;
+
+                for (SceneActions then : groups) {
+                    Function<Map<String, Object>, Mono<Void>> out;
+
+                    int size = then.getActions().size();
+                    //串行,只传递到第一个动作
+                    if (!then.isParallel() || size == 1) {
+                        String nodeId = createBranchActionId(_branchIndex, thenIndex, 1);
+                        out = data -> output.apply(_branchIndex, nodeId, data);
+                    } else {
+                        //多个并行执行动作
+                        String[] nodeIds = new String[size];
+                        for (int i = 0; i < nodeIds.length; i++) {
+                            nodeIds[0] = createBranchActionId(_branchIndex, thenIndex, 1 + (i + 1));
+                        }
+                        Flux<String> nodeIdFlux = Flux.fromArray(nodeIds);
+                        //并行
+                        out = data -> nodeIdFlux
+                            .flatMap(nodeId -> output.apply(_branchIndex, nodeId, data))
+                            .then();
                     }
-                    Flux<String> nodeIdFlux = Flux.fromArray(nodeIds);
-                    //并行
-                    out = data -> nodeIdFlux
-                        .flatMap(nodeId -> output.apply(nodeId, data))
-                        .then();
-                }
-                //防抖
-                ShakeLimit shakeLimit = branch.getShakeLimit();
-                if (shakeLimit != null && shakeLimit.isEnabled()) {
-
-                    Sinks.Many<Map<String, Object>> sinks = Sinks
-                        .many()
-                        .unicast()
-                        .onBackpressureBuffer(Queues.<Map<String, Object>>unboundedMultiproducer().get());
-
-                    //分组方式,比如设备触发时,应该按设备分组,每个设备都走独立的防抖策略
-                    ShakeLimitGrouping<Map<String, Object>> grouping = createGrouping();
-
-                    Function<Map<String, Object>, Mono<Void>> handler = out;
-
-                    disposable.add(
-                        shakeLimit
-                            .transfer(sinks.asFlux(),
-                                      (duration, stream) ->
-                                          grouping
-                                              .group(stream)//先按自定义分组再按事件窗口进行分组
-                                              .flatMap(group -> group.window(duration), Integer.MAX_VALUE),
-                                      (map, total) -> map.put("_total", total))
-                            .flatMap(handler)
-                            .subscribe()
-                    );
-                    //输出到sink进行防抖控制
-                    out = data -> {
-                        sinks.emitNext(data, Reactors.emitFailureHandler());
-                        return Mono.empty();
-                    };
+                    //防抖
+                    ShakeLimit shakeLimit = branch.getShakeLimit();
+                    if (shakeLimit != null && shakeLimit.isEnabled()) {
+
+                        Sinks.Many<Map<String, Object>> sinks = Sinks
+                            .many()
+                            .unicast()
+                            .onBackpressureBuffer(Queues.<Map<String, Object>>unboundedMultiproducer().get());
+
+                        //分组方式,比如设备触发时,应该按设备分组,每个设备都走独立的防抖策略
+                        ShakeLimitGrouping<Map<String, Object>> grouping = createGrouping();
+
+                        Function<Map<String, Object>, Mono<Void>> handler = out;
+
+                        disposable.add(
+                            shakeLimit
+                                .transfer(sinks.asFlux(),
+                                          (duration, stream) ->
+                                              grouping
+                                                  .group(stream)//先按自定义分组再按事件窗口进行分组
+                                                  .flatMap(group -> group.window(duration), Integer.MAX_VALUE),
+                                          (map, total) -> map.put("_total", total))
+                                .flatMap(handler)
+                                .subscribe()
+                        );
+                        //输出到sink进行防抖控制
+                        out = data -> {
+                            sinks.emitNext(data, Reactors.emitFailureHandler());
+                            return Mono.empty();
+                        };
+                    }
+                    outs.add(out);
                 }
-            } else {
-                out = ignore -> Mono.empty();
             }
 
-            Function<Map<String, Object>, Mono<Void>> fOut = out;
+
+            Flux<Function<Map<String, Object>, Mono<Void>>> outFlux = Flux.fromIterable(outs);
+
+            Function<Map<String, Object>, Mono<Void>> fOut = out -> outFlux.flatMap(fun -> fun.apply(out)).then();
 
 
             Function<Map<String, Object>, Mono<Boolean>> handler =
@@ -363,40 +389,52 @@ public class SceneRule implements Serializable {
             for (SceneConditionAction branch : branches) {
                 branchIndex++;
 
-                SceneActions actions = branch.getThen();
-                if (actions != null && CollectionUtils.isNotEmpty(actions.getActions())) {
-                    int actionIndex = 1;
-                    RuleNodeModel preNode = null;
-                    SceneAction preAction = null;
-                    for (SceneAction action : actions.getActions()) {
-                        RuleNodeModel actionNode = new RuleNodeModel();
-                        actionNode.setId("branch_" + branchIndex + "_action_" + actionIndex);
-                        actionNode.setName("条件_" + branchIndex + "_动作_" + actionIndex);
-
-                        action.applyNode(actionNode);
-                        //串行
-                        if (!actions.isParallel()) {
-                            //串行的时候 标记记录每一个动作的数据到header中,用于进行条件判断或者数据引用
-                            actionNode.addConfiguration(AbstractExecutionContext.RECORD_DATA_TO_HEADER, true);
-                            actionNode.addConfiguration(AbstractExecutionContext.RECORD_DATA_TO_HEADER_KEY, actionNode.getId());
-
-                            if (preNode != null) {
-                                //上一个节点->当前动作节点
-                                RuleLink link = model.link(preNode, actionNode);
-                                //设置上一个节点到此节点的输出条件
-                                if (CollectionUtils.isNotEmpty(preAction.getTerms())) {
-                                    link.setCondition(TermsConditionEvaluator.createCondition(preAction.getTerms()));
+                List<SceneActions> group = branch.getThen();
+
+                if (CollectionUtils.isNotEmpty(group)) {
+                    int groupIndex = 0;
+                    for (SceneActions actions : group) {
+                        groupIndex++;
+                        if (actions != null && CollectionUtils.isNotEmpty(actions.getActions())) {
+                            int actionIndex = 1;
+                            RuleNodeModel preNode = null;
+                            SceneAction preAction = null;
+                            for (SceneAction action : actions.getActions()) {
+                                RuleNodeModel actionNode = new RuleNodeModel();
+                                actionNode.setId(createBranchActionId(branchIndex, groupIndex, actionIndex));
+                                actionNode.setName("条件" + branchIndex + "_分组" + groupIndex + "_动作" + actionIndex);
+
+                                action.applyNode(actionNode);
+                                //串行
+                                if (!actions.isParallel()) {
+                                    //串行的时候 标记记录每一个动作的数据到header中,用于进行条件判断或者数据引用
+                                    actionNode.addConfiguration(RuleData.RECORD_DATA_TO_HEADER, true);
+                                    actionNode.addConfiguration(RuleData.RECORD_DATA_TO_HEADER_KEY, actionNode.getId());
+                                    actionNode.addConfiguration(ACTION_KEY_BRANCH_INDEX, branchIndex);
+                                    actionNode.addConfiguration(ACTION_KEY_GROUP_INDEX, groupIndex);
+                                    actionNode.addConfiguration(ACTION_KEY_ACTION_INDEX, actionIndex);
+
+                                    if (preNode != null) {
+                                        //上一个节点->当前动作节点
+                                        RuleLink link = model.link(preNode, actionNode);
+                                        //设置上一个节点到此节点的输出条件
+                                        if (CollectionUtils.isNotEmpty(preAction.getTerms())) {
+                                            link.setCondition(TermsConditionEvaluator.createCondition(preAction.getTerms()));
+                                        }
+                                    }
+
+                                    preNode = actionNode;
                                 }
-                            }
 
-                            preNode = actionNode;
+                                model.getNodes().add(actionNode);
+                                preAction = action;
+                                actionIndex++;
+                            }
                         }
-
-                        model.getNodes().add(actionNode);
-                        preAction = action;
-                        actionIndex++;
                     }
                 }
+
+
             }
         }
 
@@ -406,7 +444,5 @@ public class SceneRule implements Serializable {
 
     public void validate() {
         ValidatorUtils.tryValidate(this);
-        trigger.validate();
-
     }
 }

+ 88 - 56
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneTaskExecutorProvider.java

@@ -8,6 +8,7 @@ import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.id.IDGenerator;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.trace.TraceHolder;
 import org.jetlinks.core.utils.FluxUtils;
 import org.jetlinks.community.PropertyConstants;
 import org.jetlinks.community.rule.engine.scene.term.limit.ShakeLimitGrouping;
@@ -24,14 +25,15 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
 @Slf4j
 @AllArgsConstructor
 public class SceneTaskExecutorProvider implements TaskExecutorProvider {
 
+    private static final int BACKPRESSURE_BUFFER_MAX_SIZE =
+        Integer.getInteger("scene.backpressure-buffer-size", 10_0000);
+
     public static final String EXECUTOR = "scene";
 
     private final EventBus eventBus;
@@ -86,49 +88,13 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider {
             this.rule = sceneRule;
         }
 
-        private Disposable init() {
-            if (disposable != null) {
-                disposable.dispose();
-            }
-            boolean useBranch = CollectionUtils.isNotEmpty(rule.getBranches());
-
-            SqlRequest request = rule.createSql(!useBranch);
-
-            //不是通过SQL来处理数据
-            if (request.isEmpty()) {
-                return context
-                    .getInput()
-                    .accept()
-                    .flatMap(this::handleOutput)
-                    .subscribe();
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("init scene [{}:{}], sql:{}", rule.getId(), rule.getName(), request.toNativeSql());
-            }
-            //数据源
-            ReactorQLContext qlContext = ReactorQLContext
+        private ReactorQLContext createReactorQLContext() {
+            return ReactorQLContext
                 .ofDatasource(table -> {
-                    //来自上游(定时等)
                     if (table.startsWith("/")) {
                         //来自事件总线
                         return this
-                            .refactorTopic(table)
-                            .flatMapMany(topics -> eventBus
-                                .subscribe(
-                                    Subscription
-                                        .builder()
-                                        .justLocal()
-                                        .topics(topics)
-                                        .subscriberId("scene:" + rule.getId())
-                                        .build()))
-                            .<Map<String, Object>>handle((topicPayload, synchronousSink) -> {
-                                String topic = topicPayload.getTopic();
-                                try {
-                                    synchronousSink.next(topicPayload.bodyToJson(true));
-                                } catch (Throwable err) {
-                                    log.warn("decode payload error {}", topic, err);
-                                }
-                            })
+                            .subscribe(table)
                             //有效期去重,同一个设备在多个部门的场景下,可能收到2条相同的数据问题
                             .as(FluxUtils.distinct(map -> {
                                 Object id = map.get(PropertyConstants.uid.getKey());
@@ -136,26 +102,68 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider {
                                     id = IDGenerator.SNOW_FLAKE_STRING.generate();
                                 }
                                 return id;
-                            }, Duration.ofSeconds(5)));
+                            }, Duration.ofSeconds(1)));
                     } else {
+                        //来自上游(定时等)
                         return context
                             .getInput()
                             .accept()
                             .flatMap(RuleData::dataToMap);
                     }
                 });
+        }
 
-            //sql参数
-            for (Object parameter : request.getParameters()) {
-                qlContext.bind(parameter);
+        private Disposable init() {
+            if (disposable != null) {
+                disposable.dispose();
             }
+            boolean useBranch = CollectionUtils.isNotEmpty(rule.getBranches());
 
-            Flux<Map<String, Object>> source = ReactorQL
-                .builder()
-                .sql(request.getSql())
-                .build()
-                .start(qlContext)
-                .map(ReactorQLRecord::asMap);
+            SqlRequest request = rule.createSql(!useBranch);
+            Flux<Map<String, Object>> source;
+
+            //不是通过SQL来处理数据
+            if (request.isEmpty()) {
+                source = context
+                    .getInput()
+                    .accept()
+                    .flatMap(RuleData::dataToMap);
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("init scene [{}:{}], sql:{}", rule.getId(), rule.getName(), request.toNativeSql());
+                }
+
+                ReactorQLContext qlContext = createReactorQLContext();
+
+                //sql参数
+                for (Object parameter : request.getParameters()) {
+                    qlContext.bind(parameter);
+                }
+                source = ReactorQL
+                    .builder()
+                    .sql(request.getSql())
+                    .build()
+                    .start(qlContext)
+                    .map(ReactorQLRecord::asMap);
+            }
+
+            // 分支条件
+            if (useBranch) {
+                return rule
+                    .createBranchHandler(
+                        source,
+                        (idx,nodeId, data) -> {
+                            if (log.isDebugEnabled()) {
+                                log.debug("scene [{}] branch [{}] execute", rule.getId(), nodeId);
+                            }
+                            RuleData ruleData = context.newRuleData(data);
+                            return context
+                                .getOutput()
+                                .write(nodeId, ruleData)
+                                .onErrorResume(err -> context.onError(err, ruleData))
+                                .as(tracer());
+                        });
+            }
 
             //防抖
             Trigger.GroupShakeLimit shakeLimit = rule.getTrigger().getShakeLimit();
@@ -176,10 +184,30 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider {
                 .subscribe();
         }
 
-        private Mono<List<String>> refactorTopic(String topic) {
-            //todo 根据权限对topic进行重构
-
-            return Mono.just(Collections.singletonList(topic));
+        private Flux<Map<String, Object>> subscribe(String topic) {
+            return eventBus
+                .subscribe(
+                    Subscription
+                        .builder()
+                        .justLocal()
+                        .topics(topic)
+                        .subscriberId("scene:" + rule.getId())
+                        .build())
+                .<Map<String, Object>>handle((topicPayload, synchronousSink) -> {
+                    try {
+                        synchronousSink.next(topicPayload.bodyToJson(true));
+                    } catch (Throwable err) {
+                        log.warn("decode payload error {}", topicPayload.getTopic(), err);
+                    }
+                })
+                //有效期去重,同一个设备在多个部门的场景下,可能收到2条相同的数据问题
+                .as(FluxUtils.distinct(map -> {
+                    Object id = map.get(PropertyConstants.uid.getKey());
+                    if (null == id) {
+                        id = IDGenerator.SNOW_FLAKE_STRING.generate();
+                    }
+                    return id;
+                }, Duration.ofSeconds(5)));
         }
 
         private Mono<Void> handleOutput(RuleData data) {
@@ -198,7 +226,11 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider {
                         .filter(sceneData)
                         .defaultIfEmpty(true);
                 })
-                .flatMap(map -> context.getOutput().write(data.newData(map)))
+                .flatMap(map -> context
+                    .getOutput()
+                    .write(data.newData(map))
+                    .as(tracer())
+                    .contextWrite(ctx -> TraceHolder.readToContext(ctx, map)))
                 .onErrorResume(err -> context.onError(err, data))
                 .then();
 

+ 2 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java

@@ -71,8 +71,8 @@ public class SceneUtils {
             if (term != null) {
                 //有条件的数据会有别名 以_分隔
                 variables.add(Variable
-                    .of(column.getVariable("_"), variableName)
-                    .withType(column.getDataType()));
+                                  .of(column.getVariable("_"), variableName)
+                                  .withType(column.getDataType()));
                 List<TermValue> termValues = TermValue.of(term);
                 String property = column.getPropertyOrNull();
                 for (TermValue termValue : termValues) {

+ 2 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Trigger.java

@@ -89,6 +89,8 @@ public class Trigger implements Serializable {
             timerNode.setId("scene:timer");
             timerNode.setName("定时触发场景");
             timerNode.setExecutor("timer");
+            //使用最小负载节点来执行定时
+            // timerNode.setSchedulingRule(SchedulerSelectorStrategy.minimumLoad());
             timerNode.setConfiguration(FastBeanCopier.copy(timer, new HashMap<>()));
             model.getNodes().add(timerNode);
             //定时->场景

+ 20 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Variable.java

@@ -9,11 +9,14 @@ import org.jetlinks.core.metadata.types.StringType;
 import org.jetlinks.community.rule.engine.scene.term.TermType;
 import org.springframework.util.StringUtils;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 @Getter
 @Setter
 public class Variable {
+    public static final String OPTION_PRODUCT_ID = "productId";
     @Schema(description = "变量ID")
     private String id;
 
@@ -35,6 +38,23 @@ public class Variable {
     @Schema(description = "子级变量")
     private List<Variable> children;
 
+    @Schema(description = "其他配置")
+    private Map<String, Object> options;
+
+    public synchronized Map<String, Object> safeOptions() {
+        return options == null ? options = new HashMap<>() : options;
+    }
+
+    public Variable withOption(String key, Object value) {
+        safeOptions().put(key, value);
+        return this;
+    }
+
+    public Variable withOptions(Map<String, Object> options) {
+        safeOptions().putAll(options);
+        return this;
+    }
+
     public Variable withType(String type) {
         this.type = type;
         return this;

+ 43 - 7
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/FixedTermTypeSupport.java

@@ -36,13 +36,31 @@ public enum FixedTermTypeSupport implements TermTypeSupport {
             return val;
         }
     },
-    in("在...之中", "in", StringType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID, EnumType.ID, ArrayType.ID) {
+    in("在...之中", "in", StringType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID, EnumType.ID) {
         @Override
         protected Object convertValue(Object val) {
             return val;
         }
     },
-    nin("不在...之中", "not in", StringType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID, EnumType.ID, ArrayType.ID) {
+    nin("不在...之中", "nin", StringType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID, EnumType.ID) {
+        @Override
+        protected Object convertValue(Object val) {
+            return val;
+        }
+    },
+    contains_all("全部包含在...之中", "contains_all", ArrayType.ID) {
+        @Override
+        protected Object convertValue(Object val) {
+            return val;
+        }
+    },
+    contains_any("任意包含在...之中", "contains_any", ArrayType.ID) {
+        @Override
+        protected Object convertValue(Object val) {
+            return val;
+        }
+    },
+    not_contains("不包含在...之中", "not_contains", ArrayType.ID) {
         @Override
         protected Object convertValue(Object val) {
             return val;
@@ -50,9 +68,21 @@ public enum FixedTermTypeSupport implements TermTypeSupport {
     },
 
     like("包含字符", "str_like", StringType.ID),
-    nlike("不包含字符", "not str_like", StringType.ID),
+    nlike("不包含字符", "str_nlike", StringType.ID),
 
-    ;
+    // gt(math.sub(column,now()),?)
+    time_gt_now("距离当前时间大于...秒", "time_gt_now", DateTimeType.ID) {
+        @Override
+        protected void appendFunction(String column, PrepareSqlFragments fragments) {
+            fragments.addSql("gt(math.divi(math.sub(now(),", column, "),1000),");
+        }
+    },
+    time_lt_now("距离当前时间小于...秒", "time_lt_now", DateTimeType.ID){
+        @Override
+        protected void appendFunction(String column, PrepareSqlFragments fragments) {
+            fragments.addSql("lt(math.divi(math.sub(now(),", column, "),1000),");
+        }
+    };
 
     private final String text;
     private final Set<String> supportTypes;
@@ -80,17 +110,23 @@ public enum FixedTermTypeSupport implements TermTypeSupport {
         return val;
     }
 
+    protected void appendFunction(String column, PrepareSqlFragments fragments) {
+        fragments.addSql(function + "(", column, ",");
+    }
+
     @Override
-    public final SqlFragments createSql(String column, Object value) {
+    public SqlFragments createSql(String column, Object value) {
         PrepareSqlFragments fragments = PrepareSqlFragments.of();
-        fragments.addSql(function + "(", column, ",");
+        appendFunction(column, fragments);
+        value = convertValue(value);
+
         if (value instanceof NativeSql) {
             fragments
                 .addSql(((NativeSql) value).getSql())
                 .addParameter(((NativeSql) value).getParameters());
         } else {
             fragments.addSql("?")
-                     .addParameter(convertValue(value));
+                     .addParameter(value);
         }
         fragments.addSql(")");
         return fragments;

+ 10 - 9
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermColumn.java

@@ -9,10 +9,10 @@ import org.hswebframework.ezorm.core.param.Term;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.PropertyMetadata;
+import org.jetlinks.core.metadata.types.EnumType;
 import org.jetlinks.community.PropertyMetadataConstants;
 import org.jetlinks.community.PropertyMetric;
 import org.jetlinks.community.rule.engine.scene.DeviceOperation;
-import org.jetlinks.core.metadata.types.EnumType;
 import org.springframework.util.StringUtils;
 
 import java.util.*;
@@ -45,30 +45,31 @@ public class TermColumn {
     @Schema(description = "支持的条件类型")
     private List<TermType> termTypes;
 
-    @Schema(description = "可选内容")
-    private List<PropertyMetric> options;
-
     @Schema(description = "支持的指标")
     private List<PropertyMetric> metrics;
 
+    @Schema(description = "可选内容")
+    private List<PropertyMetric> options;
+
     @Schema(description = "子列,在类型为object时有值")
     private List<TermColumn> children;
 
 
     public TermColumn copyColumn(Predicate<String> childrenPredicate) {
-        TermColumn copy = FastBeanCopier.copy(this,new TermColumn());
+        TermColumn copy = FastBeanCopier.copy(this, new TermColumn());
 
         if (CollectionUtils.isNotEmpty(children)) {
             copy.setChildren(
-                    children.stream()
-                            .filter(child -> childrenPredicate.test(child.getColumn()))
-                            .map(child -> child.copyColumn(childrenPredicate))
-                            .collect(Collectors.toList())
+                children.stream()
+                        .filter(child -> childrenPredicate.test(child.getColumn()))
+                        .map(child -> child.copyColumn(childrenPredicate))
+                        .collect(Collectors.toList())
             );
         }
 
         return copy;
     }
+
     public boolean hasColumn(Collection<String> columns) {
         for (String column : columns) {
             if (hasColumn(column)) {

+ 2 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermTypes.java

@@ -12,7 +12,8 @@ public class TermTypes {
     private static final Map<String, TermTypeSupport> supports = new LinkedHashMap<>();
 
     static {
-        for (FixedTermTypeSupport value : FixedTermTypeSupport.values()) {
+        for (FixedTermTypeSupport value : FixedTermTypeSupport
+                .values()) {
             register(value);
         }
     }

+ 17 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/AlarmRuleBindService.java

@@ -0,0 +1,17 @@
+package org.jetlinks.community.rule.engine.service;
+
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.crud.service.GenericReactiveCrudService;
+import org.jetlinks.community.rule.engine.entity.AlarmRuleBindEntity;
+import org.springframework.stereotype.Component;
+
+/**
+ * 告警规则绑定.
+ *
+ * @author zhangji 2022/11/23
+ */
+@Component
+@AllArgsConstructor
+public class AlarmRuleBindService extends GenericReactiveCrudService<AlarmRuleBindEntity, String> {
+
+}

+ 0 - 1
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/ElasticSearchAlarmHistoryService.java

@@ -18,7 +18,6 @@ import java.time.Duration;
  * @author bestfeng
  */
 @AllArgsConstructor
-@Service
 public class ElasticSearchAlarmHistoryService implements AlarmHistoryService {
 
     public final static String ALARM_HISTORY_INDEX="alarm_history";

+ 58 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/terms/AlarmBindRuleTerm.java

@@ -0,0 +1,58 @@
+package org.jetlinks.community.rule.engine.service.terms;
+
+import org.hswebframework.ezorm.core.param.Term;
+import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
+import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments;
+import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
+import org.hswebframework.ezorm.rdb.operator.builder.fragments.term.AbstractTermFragmentBuilder;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * 根据告警配置查询规则.
+ *
+ * 例如:查询告警ID为alarm-id绑定的场景联动
+ * <pre>
+ *     {
+ *             "column":"id",
+ *             "termType":"alarm-bind-rule",
+ *             "value":"alarm-id"
+ *     }
+ * </pre>
+ *
+ * @author zhangji 2022/11/23
+ */
+@Component
+public class AlarmBindRuleTerm extends AbstractTermFragmentBuilder {
+
+    public AlarmBindRuleTerm() {
+        super("alarm-bind-rule", "告警绑定的规则");
+    }
+
+    @Override
+    public SqlFragments createFragments(String columnFullName,
+                                        RDBColumnMetadata column,
+                                        Term term) {
+
+        PrepareSqlFragments sqlFragments = PrepareSqlFragments.of();
+        if (term.getOptions().contains("not")) {
+            sqlFragments.addSql("not");
+        }
+        sqlFragments
+            .addSql("exists(select 1 from ", getTableName("s_alarm_rule_bind", column), " _bind where _bind.rule_id =", columnFullName);
+
+        List<Object> alarmId = convertList(column, term);
+        sqlFragments
+            .addSql(
+                "and _bind.alarm_id in (",
+                alarmId.stream().map(r -> "?").collect(Collectors.joining(",")),
+                ")")
+            .addParameter(alarmId);
+
+        sqlFragments.addSql(")");
+
+        return sqlFragments;
+    }
+}

+ 58 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/terms/RuleBindAlarmTerm.java

@@ -0,0 +1,58 @@
+package org.jetlinks.community.rule.engine.service.terms;
+
+import org.hswebframework.ezorm.core.param.Term;
+import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
+import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments;
+import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
+import org.hswebframework.ezorm.rdb.operator.builder.fragments.term.AbstractTermFragmentBuilder;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * 根据规则查询告警配置.
+ *
+ * 例如:查询场景联动ID为rule-id绑定的告警
+ * <pre>
+ *     {
+ *             "column":"id",
+ *             "termType":"rule-bind-alarm",
+ *             "value":"rule-id"
+ *     }
+ * </pre>
+ *
+ * @author zhangji 2022/11/23
+ */
+@Component
+public class RuleBindAlarmTerm extends AbstractTermFragmentBuilder {
+
+    public RuleBindAlarmTerm() {
+        super("rule-bind-alarm", "规则绑定的告警");
+    }
+
+    @Override
+    public SqlFragments createFragments(String columnFullName,
+                                        RDBColumnMetadata column,
+                                        Term term) {
+
+        PrepareSqlFragments sqlFragments = PrepareSqlFragments.of();
+        if (term.getOptions().contains("not")) {
+            sqlFragments.addSql("not");
+        }
+        sqlFragments
+            .addSql("exists(select 1 from ", getTableName("s_alarm_rule_bind", column), " _bind where _bind.alarm_id =", columnFullName);
+
+        List<Object> ruleId = convertList(column, term);
+        sqlFragments
+            .addSql(
+                "and _bind.rule_id in (",
+                ruleId.stream().map(r -> "?").collect(Collectors.joining(",")),
+                ")")
+            .addParameter(ruleId);
+
+        sqlFragments.addSql(")");
+
+        return sqlFragments;
+    }
+}

+ 52 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmRuleBindController.java

@@ -0,0 +1,52 @@
+package org.jetlinks.community.rule.engine.web;
+
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.authorization.annotation.Authorize;
+import org.hswebframework.web.authorization.annotation.DeleteAction;
+import org.hswebframework.web.authorization.annotation.Resource;
+import org.hswebframework.web.crud.service.ReactiveCrudService;
+import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
+import org.jetlinks.community.rule.engine.entity.AlarmRuleBindEntity;
+import org.jetlinks.community.rule.engine.service.AlarmRuleBindService;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+
+/**
+ * 告警规则绑定.
+ *
+ * @author zhangji 2022/11/23
+ */
+@RestController
+@RequestMapping("/alarm/rule/bind")
+@Resource(id = "alarm-config", name = "告警配置")
+@Authorize
+@Tag(name = "告警规则绑定")
+@AllArgsConstructor
+public class AlarmRuleBindController implements ReactiveServiceCrudController<AlarmRuleBindEntity, String> {
+
+    private final AlarmRuleBindService service;
+
+    @Override
+    public ReactiveCrudService<AlarmRuleBindEntity, String> getService() {
+        return service;
+    }
+
+    @PostMapping("/{alarmId}/_delete")
+    @DeleteAction
+    @Operation(summary = "批量删除告警规则绑定")
+    public Mono<Integer> deleteAlarmBind(@PathVariable @Parameter(description = "告警配置ID") String alarmId,
+                                         @RequestBody @Parameter(description = "场景联动ID") Mono<List<String>> ruleId) {
+        return ruleId
+            .flatMap(idList -> service
+                .createDelete()
+                .where(AlarmRuleBindEntity::getAlarmId, alarmId)
+                .in(AlarmRuleBindEntity::getRuleId, idList)
+                .execute());
+    }
+
+}

+ 5 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/SceneController.java

@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+
 @RestController
 @RequestMapping("/scene")
 @Tag(name = "场景管理")
@@ -51,8 +52,8 @@ public class SceneController implements ReactiveServiceQueryController<SceneEnti
     @PutMapping("/{id}")
     @Operation(summary = "更新场景")
     @SaveAction
-    public Mono<Void> updateScene(@PathVariable String id,
-                                  @RequestBody Mono<SceneRule> sceneRuleMono) {
+    public Mono<Void> update(@PathVariable String id,
+                             @RequestBody Mono<SceneRule> sceneRuleMono) {
         return sceneRuleMono
             .flatMap(sceneRule -> service.updateScene(id, sceneRule))
             .then();
@@ -111,6 +112,7 @@ public class SceneController implements ReactiveServiceQueryController<SceneEnti
     @QueryAction
     public Flux<Variable> parseVariables(@RequestBody Mono<SceneRule> ruleMono,
                                          @RequestParam(required = false) Integer branch,
+                                         @RequestParam(required = false) Integer branchGroup,
                                          @RequestParam(required = false) Integer action) {
         Mono<SceneRule> cache = ruleMono.cache();
         return Mono
@@ -125,6 +127,7 @@ public class SceneController implements ReactiveServiceQueryController<SceneEnti
                                                     .map(column -> column.copyColumn(terms::containsKey))
                                                     .collect(Collectors.toList()),
                                                 branch,
+                                                branchGroup,
                                                 action,
                                                 deviceRegistry);
                 })