|
@@ -18,6 +18,7 @@ import org.jetlinks.community.rule.engine.commons.ShakeLimit;
|
|
import org.jetlinks.community.rule.engine.commons.TermsConditionEvaluator;
|
|
import org.jetlinks.community.rule.engine.commons.TermsConditionEvaluator;
|
|
import org.jetlinks.community.rule.engine.scene.term.TermColumn;
|
|
import org.jetlinks.community.rule.engine.scene.term.TermColumn;
|
|
import org.jetlinks.community.rule.engine.scene.term.limit.ShakeLimitGrouping;
|
|
import org.jetlinks.community.rule.engine.scene.term.limit.ShakeLimitGrouping;
|
|
|
|
+import org.jetlinks.rule.engine.api.RuleData;
|
|
import org.jetlinks.rule.engine.api.model.RuleLink;
|
|
import org.jetlinks.rule.engine.api.model.RuleLink;
|
|
import org.jetlinks.rule.engine.api.model.RuleModel;
|
|
import org.jetlinks.rule.engine.api.model.RuleModel;
|
|
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
|
|
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
|
|
@@ -27,19 +28,23 @@ import reactor.core.Disposables;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Sinks;
|
|
import reactor.core.publisher.Sinks;
|
|
|
|
+import reactor.function.Function3;
|
|
import reactor.util.concurrent.Queues;
|
|
import reactor.util.concurrent.Queues;
|
|
|
|
|
|
import javax.validation.constraints.NotBlank;
|
|
import javax.validation.constraints.NotBlank;
|
|
import javax.validation.constraints.NotNull;
|
|
import javax.validation.constraints.NotNull;
|
|
import java.io.Serializable;
|
|
import java.io.Serializable;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
-import java.util.function.BiFunction;
|
|
|
|
import java.util.function.Function;
|
|
import java.util.function.Function;
|
|
|
|
|
|
@Getter
|
|
@Getter
|
|
@Setter
|
|
@Setter
|
|
public class SceneRule implements Serializable {
|
|
public class SceneRule implements Serializable {
|
|
|
|
|
|
|
|
+ public static final String ACTION_KEY_BRANCH_INDEX = "_branchIndex";
|
|
|
|
+ public static final String ACTION_KEY_GROUP_INDEX = "_groupIndex";
|
|
|
|
+ public static final String ACTION_KEY_ACTION_INDEX = "_actionIndex";
|
|
|
|
+
|
|
@Schema(description = "告警ID")
|
|
@Schema(description = "告警ID")
|
|
@NotBlank(message = "error.scene_rule_id_cannot_be_blank")
|
|
@NotBlank(message = "error.scene_rule_id_cannot_be_blank")
|
|
private String id;
|
|
private String id;
|
|
@@ -52,6 +57,11 @@ public class SceneRule implements Serializable {
|
|
@NotNull(message = "error.scene_rule_trigger_cannot_be_null")
|
|
@NotNull(message = "error.scene_rule_trigger_cannot_be_null")
|
|
private Trigger trigger;
|
|
private Trigger trigger;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * @see org.jetlinks.community.rule.engine.scene.term.TermColumn
|
|
|
|
+ * @see org.jetlinks.community.rule.engine.scene.term.TermType
|
|
|
|
+ * @see org.jetlinks.community.rule.engine.scene.value.TermValue
|
|
|
|
+ */
|
|
@Schema(description = "触发条件")
|
|
@Schema(description = "触发条件")
|
|
private List<Term> terms;
|
|
private List<Term> terms;
|
|
|
|
|
|
@@ -64,6 +74,9 @@ public class SceneRule implements Serializable {
|
|
@Schema(description = "动作分支")
|
|
@Schema(description = "动作分支")
|
|
private List<SceneConditionAction> branches;
|
|
private List<SceneConditionAction> branches;
|
|
|
|
|
|
|
|
+ @Schema(description = "扩展配置")
|
|
|
|
+ private Map<String, Object> options;
|
|
|
|
+
|
|
@Schema(description = "说明")
|
|
@Schema(description = "说明")
|
|
private String description;
|
|
private String description;
|
|
|
|
|
|
@@ -141,6 +154,7 @@ public class SceneRule implements Serializable {
|
|
|
|
|
|
public Flux<Variable> createVariables(List<TermColumn> columns,
|
|
public Flux<Variable> createVariables(List<TermColumn> columns,
|
|
Integer branchIndex,
|
|
Integer branchIndex,
|
|
|
|
+ Integer branchGroupIndex,
|
|
Integer actionIndex,
|
|
Integer actionIndex,
|
|
DeviceRegistry registry) {
|
|
DeviceRegistry registry) {
|
|
Flux<Variable> variables = createSceneVariables(columns);
|
|
Flux<Variable> variables = createSceneVariables(columns);
|
|
@@ -149,19 +163,20 @@ public class SceneRule implements Serializable {
|
|
if (branchIndex == null && !parallel && actionIndex != null && CollectionUtils.isNotEmpty(actions)) {
|
|
if (branchIndex == null && !parallel && actionIndex != null && CollectionUtils.isNotEmpty(actions)) {
|
|
|
|
|
|
for (int i = 0; i < Math.min(actions.size(), actionIndex + 1); i++) {
|
|
for (int i = 0; i < Math.min(actions.size(), actionIndex + 1); i++) {
|
|
- variables = variables.concatWith(actions.get(i).createVariables(registry, i));
|
|
|
|
|
|
+ variables = variables.concatWith(actions.get(i).createVariables(registry, branchIndex, i));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
//分支条件
|
|
//分支条件
|
|
- if (branchIndex != null && CollectionUtils.isNotEmpty(branches) && branches.size() > branchIndex) {
|
|
|
|
|
|
+ if (branchIndex != null && branchGroupIndex != null && CollectionUtils.isNotEmpty(branches) && branches.size() > branchIndex) {
|
|
SceneConditionAction branch = branches.get(branchIndex);
|
|
SceneConditionAction branch = branches.get(branchIndex);
|
|
|
|
+ SceneActions then = branch.getThen() != null && branch.getThen().size() > branchGroupIndex
|
|
|
|
+ ? branch.getThen().get(branchGroupIndex) : null;
|
|
List<SceneAction> actionList;
|
|
List<SceneAction> actionList;
|
|
- if (branch.getThen() != null && !branch.getThen().isParallel() &&
|
|
|
|
-
|
|
|
|
- CollectionUtils.isNotEmpty(actionList = branch.getThen().getActions())) {
|
|
|
|
|
|
+ if (then != null && !then.isParallel() &&
|
|
|
|
+ CollectionUtils.isNotEmpty(actionList = then.getActions())) {
|
|
|
|
|
|
for (int i = 0; i < Math.min(actionList.size(), actionIndex + 1); i++) {
|
|
for (int i = 0; i < Math.min(actionList.size(), actionIndex + 1); i++) {
|
|
- variables = variables.concatWith(actionList.get(i).createVariables(registry, i));
|
|
|
|
|
|
+ variables = variables.concatWith(actionList.get(i).createVariables(registry, branchIndex, i));
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
@@ -171,8 +186,12 @@ public class SceneRule implements Serializable {
|
|
.doOnNext(Variable::refactorPrefix);
|
|
.doOnNext(Variable::refactorPrefix);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private String createBranchActionId(int branchIndex, int groupId, int actionIndex) {
|
|
|
|
+ return "branch_" + branchIndex + "_group_" + groupId + "_action_" + actionIndex;
|
|
|
|
+ }
|
|
|
|
+
|
|
public Disposable createBranchHandler(Flux<Map<String, Object>> sourceData,
|
|
public Disposable createBranchHandler(Flux<Map<String, Object>> sourceData,
|
|
- BiFunction<String, Map<String, Object>, Mono<Void>> output) {
|
|
|
|
|
|
+ Function3<Integer, String, Map<String, Object>, Mono<Void>> output) {
|
|
if (CollectionUtils.isEmpty(branches)) {
|
|
if (CollectionUtils.isEmpty(branches)) {
|
|
return Disposables.disposed();
|
|
return Disposables.disposed();
|
|
}
|
|
}
|
|
@@ -186,65 +205,72 @@ public class SceneRule implements Serializable {
|
|
//执行条件
|
|
//执行条件
|
|
Function<Map<String, Object>, Mono<Boolean>> filter = createFilter(branch.getWhen());
|
|
Function<Map<String, Object>, Mono<Boolean>> filter = createFilter(branch.getWhen());
|
|
//满足条件后的输出操作
|
|
//满足条件后的输出操作
|
|
- Function<Map<String, Object>, Mono<Void>> out;
|
|
|
|
-
|
|
|
|
- SceneActions then = branch.getThen();
|
|
|
|
- //执行动作
|
|
|
|
- if (then != null && CollectionUtils.isNotEmpty(then.getActions())) {
|
|
|
|
-
|
|
|
|
- int size = then.getActions().size();
|
|
|
|
- //串行,只传递到第一个动作
|
|
|
|
- if (!then.isParallel() || size == 1) {
|
|
|
|
- String nodeId = "branch_" + _branchIndex + "_action_1";
|
|
|
|
- out = data -> output.apply(nodeId, data);
|
|
|
|
- } else {
|
|
|
|
- //多个并行执行动作
|
|
|
|
- String[] nodeIds = new String[size];
|
|
|
|
- for (int i = 0; i < nodeIds.length; i++) {
|
|
|
|
- nodeIds[0] = "branch_" + _branchIndex + "_action_" + (i + 1);
|
|
|
|
|
|
+ List<Function<Map<String, Object>, Mono<Void>>> outs = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ List<SceneActions> groups = branch.getThen();
|
|
|
|
+ int thenIndex = 0;
|
|
|
|
+ if (CollectionUtils.isNotEmpty(groups)) {
|
|
|
|
+ thenIndex++;
|
|
|
|
+
|
|
|
|
+ for (SceneActions then : groups) {
|
|
|
|
+ Function<Map<String, Object>, Mono<Void>> out;
|
|
|
|
+
|
|
|
|
+ int size = then.getActions().size();
|
|
|
|
+ //串行,只传递到第一个动作
|
|
|
|
+ if (!then.isParallel() || size == 1) {
|
|
|
|
+ String nodeId = createBranchActionId(_branchIndex, thenIndex, 1);
|
|
|
|
+ out = data -> output.apply(_branchIndex, nodeId, data);
|
|
|
|
+ } else {
|
|
|
|
+ //多个并行执行动作
|
|
|
|
+ String[] nodeIds = new String[size];
|
|
|
|
+ for (int i = 0; i < nodeIds.length; i++) {
|
|
|
|
+ nodeIds[0] = createBranchActionId(_branchIndex, thenIndex, 1 + (i + 1));
|
|
|
|
+ }
|
|
|
|
+ Flux<String> nodeIdFlux = Flux.fromArray(nodeIds);
|
|
|
|
+ //并行
|
|
|
|
+ out = data -> nodeIdFlux
|
|
|
|
+ .flatMap(nodeId -> output.apply(_branchIndex, nodeId, data))
|
|
|
|
+ .then();
|
|
}
|
|
}
|
|
- Flux<String> nodeIdFlux = Flux.fromArray(nodeIds);
|
|
|
|
- //并行
|
|
|
|
- out = data -> nodeIdFlux
|
|
|
|
- .flatMap(nodeId -> output.apply(nodeId, data))
|
|
|
|
- .then();
|
|
|
|
- }
|
|
|
|
- //防抖
|
|
|
|
- ShakeLimit shakeLimit = branch.getShakeLimit();
|
|
|
|
- if (shakeLimit != null && shakeLimit.isEnabled()) {
|
|
|
|
-
|
|
|
|
- Sinks.Many<Map<String, Object>> sinks = Sinks
|
|
|
|
- .many()
|
|
|
|
- .unicast()
|
|
|
|
- .onBackpressureBuffer(Queues.<Map<String, Object>>unboundedMultiproducer().get());
|
|
|
|
-
|
|
|
|
- //分组方式,比如设备触发时,应该按设备分组,每个设备都走独立的防抖策略
|
|
|
|
- ShakeLimitGrouping<Map<String, Object>> grouping = createGrouping();
|
|
|
|
-
|
|
|
|
- Function<Map<String, Object>, Mono<Void>> handler = out;
|
|
|
|
-
|
|
|
|
- disposable.add(
|
|
|
|
- shakeLimit
|
|
|
|
- .transfer(sinks.asFlux(),
|
|
|
|
- (duration, stream) ->
|
|
|
|
- grouping
|
|
|
|
- .group(stream)//先按自定义分组再按事件窗口进行分组
|
|
|
|
- .flatMap(group -> group.window(duration), Integer.MAX_VALUE),
|
|
|
|
- (map, total) -> map.put("_total", total))
|
|
|
|
- .flatMap(handler)
|
|
|
|
- .subscribe()
|
|
|
|
- );
|
|
|
|
- //输出到sink进行防抖控制
|
|
|
|
- out = data -> {
|
|
|
|
- sinks.emitNext(data, Reactors.emitFailureHandler());
|
|
|
|
- return Mono.empty();
|
|
|
|
- };
|
|
|
|
|
|
+ //防抖
|
|
|
|
+ ShakeLimit shakeLimit = branch.getShakeLimit();
|
|
|
|
+ if (shakeLimit != null && shakeLimit.isEnabled()) {
|
|
|
|
+
|
|
|
|
+ Sinks.Many<Map<String, Object>> sinks = Sinks
|
|
|
|
+ .many()
|
|
|
|
+ .unicast()
|
|
|
|
+ .onBackpressureBuffer(Queues.<Map<String, Object>>unboundedMultiproducer().get());
|
|
|
|
+
|
|
|
|
+ //分组方式,比如设备触发时,应该按设备分组,每个设备都走独立的防抖策略
|
|
|
|
+ ShakeLimitGrouping<Map<String, Object>> grouping = createGrouping();
|
|
|
|
+
|
|
|
|
+ Function<Map<String, Object>, Mono<Void>> handler = out;
|
|
|
|
+
|
|
|
|
+ disposable.add(
|
|
|
|
+ shakeLimit
|
|
|
|
+ .transfer(sinks.asFlux(),
|
|
|
|
+ (duration, stream) ->
|
|
|
|
+ grouping
|
|
|
|
+ .group(stream)//先按自定义分组再按事件窗口进行分组
|
|
|
|
+ .flatMap(group -> group.window(duration), Integer.MAX_VALUE),
|
|
|
|
+ (map, total) -> map.put("_total", total))
|
|
|
|
+ .flatMap(handler)
|
|
|
|
+ .subscribe()
|
|
|
|
+ );
|
|
|
|
+ //输出到sink进行防抖控制
|
|
|
|
+ out = data -> {
|
|
|
|
+ sinks.emitNext(data, Reactors.emitFailureHandler());
|
|
|
|
+ return Mono.empty();
|
|
|
|
+ };
|
|
|
|
+ }
|
|
|
|
+ outs.add(out);
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- out = ignore -> Mono.empty();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- Function<Map<String, Object>, Mono<Void>> fOut = out;
|
|
|
|
|
|
+
|
|
|
|
+ Flux<Function<Map<String, Object>, Mono<Void>>> outFlux = Flux.fromIterable(outs);
|
|
|
|
+
|
|
|
|
+ Function<Map<String, Object>, Mono<Void>> fOut = out -> outFlux.flatMap(fun -> fun.apply(out)).then();
|
|
|
|
|
|
|
|
|
|
Function<Map<String, Object>, Mono<Boolean>> handler =
|
|
Function<Map<String, Object>, Mono<Boolean>> handler =
|
|
@@ -363,40 +389,52 @@ public class SceneRule implements Serializable {
|
|
for (SceneConditionAction branch : branches) {
|
|
for (SceneConditionAction branch : branches) {
|
|
branchIndex++;
|
|
branchIndex++;
|
|
|
|
|
|
- SceneActions actions = branch.getThen();
|
|
|
|
- if (actions != null && CollectionUtils.isNotEmpty(actions.getActions())) {
|
|
|
|
- int actionIndex = 1;
|
|
|
|
- RuleNodeModel preNode = null;
|
|
|
|
- SceneAction preAction = null;
|
|
|
|
- for (SceneAction action : actions.getActions()) {
|
|
|
|
- RuleNodeModel actionNode = new RuleNodeModel();
|
|
|
|
- actionNode.setId("branch_" + branchIndex + "_action_" + actionIndex);
|
|
|
|
- actionNode.setName("条件_" + branchIndex + "_动作_" + actionIndex);
|
|
|
|
-
|
|
|
|
- action.applyNode(actionNode);
|
|
|
|
- //串行
|
|
|
|
- if (!actions.isParallel()) {
|
|
|
|
- //串行的时候 标记记录每一个动作的数据到header中,用于进行条件判断或者数据引用
|
|
|
|
- actionNode.addConfiguration(AbstractExecutionContext.RECORD_DATA_TO_HEADER, true);
|
|
|
|
- actionNode.addConfiguration(AbstractExecutionContext.RECORD_DATA_TO_HEADER_KEY, actionNode.getId());
|
|
|
|
-
|
|
|
|
- if (preNode != null) {
|
|
|
|
- //上一个节点->当前动作节点
|
|
|
|
- RuleLink link = model.link(preNode, actionNode);
|
|
|
|
- //设置上一个节点到此节点的输出条件
|
|
|
|
- if (CollectionUtils.isNotEmpty(preAction.getTerms())) {
|
|
|
|
- link.setCondition(TermsConditionEvaluator.createCondition(preAction.getTerms()));
|
|
|
|
|
|
+ List<SceneActions> group = branch.getThen();
|
|
|
|
+
|
|
|
|
+ if (CollectionUtils.isNotEmpty(group)) {
|
|
|
|
+ int groupIndex = 0;
|
|
|
|
+ for (SceneActions actions : group) {
|
|
|
|
+ groupIndex++;
|
|
|
|
+ if (actions != null && CollectionUtils.isNotEmpty(actions.getActions())) {
|
|
|
|
+ int actionIndex = 1;
|
|
|
|
+ RuleNodeModel preNode = null;
|
|
|
|
+ SceneAction preAction = null;
|
|
|
|
+ for (SceneAction action : actions.getActions()) {
|
|
|
|
+ RuleNodeModel actionNode = new RuleNodeModel();
|
|
|
|
+ actionNode.setId(createBranchActionId(branchIndex, groupIndex, actionIndex));
|
|
|
|
+ actionNode.setName("条件" + branchIndex + "_分组" + groupIndex + "_动作" + actionIndex);
|
|
|
|
+
|
|
|
|
+ action.applyNode(actionNode);
|
|
|
|
+ //串行
|
|
|
|
+ if (!actions.isParallel()) {
|
|
|
|
+ //串行的时候 标记记录每一个动作的数据到header中,用于进行条件判断或者数据引用
|
|
|
|
+ actionNode.addConfiguration(RuleData.RECORD_DATA_TO_HEADER, true);
|
|
|
|
+ actionNode.addConfiguration(RuleData.RECORD_DATA_TO_HEADER_KEY, actionNode.getId());
|
|
|
|
+ actionNode.addConfiguration(ACTION_KEY_BRANCH_INDEX, branchIndex);
|
|
|
|
+ actionNode.addConfiguration(ACTION_KEY_GROUP_INDEX, groupIndex);
|
|
|
|
+ actionNode.addConfiguration(ACTION_KEY_ACTION_INDEX, actionIndex);
|
|
|
|
+
|
|
|
|
+ if (preNode != null) {
|
|
|
|
+ //上一个节点->当前动作节点
|
|
|
|
+ RuleLink link = model.link(preNode, actionNode);
|
|
|
|
+ //设置上一个节点到此节点的输出条件
|
|
|
|
+ if (CollectionUtils.isNotEmpty(preAction.getTerms())) {
|
|
|
|
+ link.setCondition(TermsConditionEvaluator.createCondition(preAction.getTerms()));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ preNode = actionNode;
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- preNode = actionNode;
|
|
|
|
|
|
+ model.getNodes().add(actionNode);
|
|
|
|
+ preAction = action;
|
|
|
|
+ actionIndex++;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
-
|
|
|
|
- model.getNodes().add(actionNode);
|
|
|
|
- preAction = action;
|
|
|
|
- actionIndex++;
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -406,7 +444,5 @@ public class SceneRule implements Serializable {
|
|
|
|
|
|
public void validate() {
|
|
public void validate() {
|
|
ValidatorUtils.tryValidate(this);
|
|
ValidatorUtils.tryValidate(this);
|
|
- trigger.validate();
|
|
|
|
-
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|