Ver Fonte

增加SQL规则

zhou-hao há 5 anos atrás
pai
commit
53edfb1567

+ 63 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/enums/SqlRuleType.java

@@ -0,0 +1,63 @@
+package org.jetlinks.community.rule.engine.enums;
+
+import com.alibaba.fastjson.annotation.JSONType;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.select.PlainSelect;
+import net.sf.jsqlparser.statement.select.Select;
+import org.hswebframework.web.dict.EnumDict;
+import org.jetlinks.community.rule.engine.ql.SqlRule;
+import org.springframework.scheduling.support.CronSequenceGenerator;
+import org.springframework.util.Assert;
+
+@Getter
+@AllArgsConstructor
+@JSONType(deserializer = EnumDict.EnumDictJSONDeserializer.class)
+public enum SqlRuleType implements EnumDict<String> {
+
+    timer("定时") {
+        @Override
+        public void validate(SqlRule rule) {
+            Assert.notNull(rule.getCron(), "cron表达式不能为空");
+            try {
+                new CronSequenceGenerator(rule.getCron());
+            } catch (Exception e) {
+                throw new IllegalArgumentException("cron表达式格式错误", e);
+            }
+        }
+    },
+    realTime("实时") {
+        @Override
+        public void validate(SqlRule rule) {
+            Assert.notNull(rule.getSql(), "sql能为空");
+            try {
+                PlainSelect select = ((PlainSelect) ((Select) CCJSqlParserUtil.parse(rule.getSql())).getSelectBody());
+                if (select.getGroupBy() != null && select.getGroupBy().getGroupByExpressions() != null) {
+                    for (Expression groupByExpression : select.getGroupBy().getGroupByExpressions()) {
+                        if (groupByExpression instanceof Function) {
+                            String name = ((Function) groupByExpression).getName();
+                            if ("interval".equalsIgnoreCase(name) || "_window".equalsIgnoreCase(name)) {
+                                return;
+                            }
+                        }
+                    }
+                    throw new IllegalArgumentException("实时数据处理必须指定分组函数interval或者_window");
+                }
+            } catch (Exception e) {
+                throw new IllegalArgumentException("sql格式错误", e);
+            }
+        }
+    };
+
+    private final String text;
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    public abstract void validate(SqlRule rule);
+}

+ 109 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/SqlRuleModelParser.java

@@ -0,0 +1,109 @@
+package org.jetlinks.community.rule.engine.model;
+
+import com.alibaba.fastjson.JSON;
+import org.jetlinks.community.rule.engine.enums.SqlRuleType;
+import org.jetlinks.community.rule.engine.ql.SqlRule;
+import org.jetlinks.rule.engine.api.events.RuleEvent;
+import org.jetlinks.rule.engine.api.model.RuleLink;
+import org.jetlinks.rule.engine.api.model.RuleModel;
+import org.jetlinks.rule.engine.api.model.RuleNodeModel;
+import org.jetlinks.rule.engine.model.RuleModelParserStrategy;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Component
+public class SqlRuleModelParser implements RuleModelParserStrategy {
+    @Override
+    public String getFormat() {
+        return "sql_rule";
+    }
+
+    @Override
+    public RuleModel parse(String modelDefineString) {
+
+        SqlRule sqlRule = JSON.parseObject(modelDefineString, SqlRule.class);
+
+        sqlRule.validate();
+
+        RuleModel model = new RuleModel();
+        model.setId(sqlRule.getId());
+        model.setName(sqlRule.getName());
+
+        RuleNodeModel sqlNode = new RuleNodeModel();
+        sqlNode.setId("sql");
+        sqlNode.setExecutor("reactor-ql");
+        sqlNode.setConfiguration(Collections.singletonMap("sql", sqlRule.getSql()));
+        sqlNode.setName("SQL");
+
+        model.getNodes().add(sqlNode);
+
+        //错误处理
+        List<RuleLink> errorHandler = new ArrayList<>();
+        if (!CollectionUtils.isEmpty(sqlRule.getWhenErrorThen())) {
+            int index = 0;
+            for (Action act : sqlRule.getWhenErrorThen()) {
+                RuleNodeModel action = new RuleNodeModel();
+                action.setId("error:action:" + index);
+                action.setName("错误处理:" + index);
+                action.setExecutor(act.getExecutor());
+                action.setConfiguration(act.getConfiguration());
+                RuleLink link = new RuleLink();
+                link.setId(action.getId().concat(":").concat(action.getId()));
+                link.setName("错误处理:" + index);
+                link.setSource(sqlNode);
+                link.setType(RuleEvent.NODE_EXECUTE_FAIL);
+                link.setTarget(action);
+                errorHandler.add(link);
+                model.getNodes().add(action);
+            }
+        }
+
+        sqlNode.getEvents().addAll(errorHandler);
+
+        //定时触发
+        if (sqlRule.getType() == SqlRuleType.timer) {
+            RuleNodeModel timerNode = new RuleNodeModel();
+            timerNode.setId("timer");
+            timerNode.setExecutor("timer");
+            timerNode.setConfiguration(Collections.singletonMap("cron", sqlRule.getCron()));
+            timerNode.setRuleId(model.getId());
+            RuleLink link = new RuleLink();
+            link.setId("sql:timer");
+            link.setName("定时触发SQL");
+            link.setSource(timerNode);
+            link.setTarget(sqlNode);
+            timerNode.getOutputs().add(link);
+            sqlNode.getInputs().add(link);
+            model.getNodes().add(timerNode);
+        }
+
+
+        if (!CollectionUtils.isEmpty(sqlRule.getActions())) {
+            int index = 0;
+            for (Action operation : sqlRule.getActions()) {
+                RuleNodeModel action = new RuleNodeModel();
+                action.setId("action:" + index);
+                action.setName("执行动作:" + index);
+                action.setExecutor(operation.getExecutor());
+                action.setConfiguration(operation.getConfiguration());
+                RuleLink link = new RuleLink();
+                link.setId(action.getId().concat(":").concat(action.getId()));
+                link.setName("执行动作:" + index);
+                link.setSource(sqlNode);
+                link.setTarget(action);
+                model.getNodes().add(action);
+                action.getInputs().add(link);
+                sqlNode.getOutputs().add(link);
+                model.getNodes().add(action);
+
+                action.getEvents().addAll(errorHandler);
+            }
+        }
+
+        return model;
+    }
+}

+ 43 - 0
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/ql/SqlRule.java

@@ -0,0 +1,43 @@
+package org.jetlinks.community.rule.engine.ql;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.jetlinks.community.rule.engine.enums.SqlRuleType;
+import org.jetlinks.community.rule.engine.model.Action;
+import org.springframework.util.Assert;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * 使用SQL来处理数据
+ *
+ * @author zhouhao
+ * @since 1.1
+ */
+@Getter
+@Setter
+public class SqlRule implements Serializable {
+
+    private static final long serialVersionUID = -6849794470754667710L;
+
+    private String id;
+
+    private String name;
+
+    private SqlRuleType type;
+
+    private String cron;
+
+    private String sql;
+
+    private List<Action> actions;
+
+    private List<Action> whenErrorThen;
+
+    public void validate() {
+        Assert.notNull(type, "type不能为空");
+
+        type.validate(this);
+    }
+}