Browse Source

重构规则引擎

zhou-hao 5 years ago
parent
commit
c26cd5615a
40 changed files with 1177 additions and 1360 deletions
  1. 48 37
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/rule/MessageGatewayRuleNode.java
  2. 1 14
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/rule/MessageGatewayRuleNodeConfig.java
  3. 39 0
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/MqttClientTaskConfiguration.java
  4. 131 0
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/MqttClientTaskExecutorProvider.java
  5. 111 0
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/MqttRuleDataCodec.java
  6. 24 0
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/MqttTopics.java
  7. 13 0
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/TopicVariables.java
  8. 0 90
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/node/MqttClientNode.java
  9. 15 1
      jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/PubSubType.java
  10. 25 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/executor/TcpClientTaskConfiguration.java
  11. 109 0
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/executor/TcpClientTaskExecutorProvider.java
  12. 1 6
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpMessageCodec.java
  13. 0 70
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNode.java
  14. 0 38
      jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNodeConfig.java
  15. 0 48
      jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/rule/NotifierRuleNode.java
  16. 66 0
      jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/rule/NotifierTaskExecutorProvider.java
  17. 1 14
      jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/rule/RuleNotifierProperties.java
  18. 5 0
      jetlinks-components/rule-engine-component/pom.xml
  19. 42 0
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/EventTopicMessage.java
  20. 54 48
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineConfiguration.java
  21. 0 64
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/event/handler/RuleLogHandler.java
  22. 21 19
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/DataMappingWorkerNode.java
  23. 53 29
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/DeviceMessageSendNode.java
  24. 125 0
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java
  25. 22 14
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ScriptWorkerNode.java
  26. 19 24
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/SqlExecutorWorkerNode.java
  27. 125 0
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/TimerTaskExecutorProvider.java
  28. 0 134
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ReactorSqlNode.java
  29. 0 129
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/TimerWorkerNode.java
  30. 81 56
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java
  31. 3 7
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/RuleInstanceEntity.java
  32. 0 4
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/Action.java
  33. 2 4
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java
  34. 2 2
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/SqlRuleModelParser.java
  35. 0 361
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/RuleEngineDebugService.java
  36. 27 31
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/RuleInstanceService.java
  37. 0 80
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/RuleEngineDebugController.java
  38. 1 28
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/RuleInstanceController.java
  39. 6 3
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/RuleModelController.java
  40. 5 5
      pom.xml

+ 48 - 37
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/rule/MessageGatewayRuleNode.java

@@ -1,19 +1,21 @@
 package org.jetlinks.community.gateway.rule;
 
+import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.exception.NotFoundException;
 import org.jetlinks.community.gateway.MessageGatewayManager;
 import org.jetlinks.community.network.PubSubType;
 import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor;
 import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 
-import java.util.function.Function;
-
 @Component
-public class MessageGatewayRuleNode extends CommonExecutableRuleNodeFactoryStrategy<MessageGatewayRuleNodeConfig> {
+public class MessageGatewayRuleNode implements TaskExecutorProvider {
 
     private final MessageGatewayManager gatewayManager;
 
@@ -26,43 +28,52 @@ public class MessageGatewayRuleNode extends CommonExecutableRuleNodeFactoryStrat
     }
 
     @Override
-    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, MessageGatewayRuleNodeConfig config) {
-        if (config.getType() == PubSubType.consumer) {
-            return Mono::just;
-        }
-        return ruleData -> gatewayManager
-            .getGateway(config.getGatewayId())
-            .switchIfEmpty(Mono.error(() -> new NotFoundException("消息网关[{" + config.getGatewayId() + "}]不存在")))
-            .flatMap(gateway -> config.convert(ruleData)
-                .flatMap(msg -> gateway.publish(msg, config.isShareCluster()))
-                .then())
-            .thenReturn(ruleData);
+    public String getExecutor() {
+        return "message-gateway";
     }
 
     @Override
-    protected void onStarted(ExecutionContext context, MessageGatewayRuleNodeConfig config) {
-        super.onStarted(context, config);
-        if (config.getType() == PubSubType.producer) {
-            return;
-        }
-        //订阅网关中的消息
-        context.onStop(gatewayManager
-            .getGateway(config.getGatewayId())
-            .switchIfEmpty(Mono.fromRunnable(() -> context.logger().error("消息网关[{" + config.getGatewayId() + "}]不存在")))
-            .flatMapMany(gateway -> gateway.subscribe(config.createTopics()))
-            .map(config::convert)
-            .flatMap(data -> context.getOutput().write(Mono.just(RuleData.create(data))))
-            .onErrorContinue((err, obj) -> {
-                context.logger().error(err.getMessage(), err);
-            })
-            .subscribe()::dispose);
-
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new MessageGatewayPubSubExecutor(context));
     }
 
-    @Override
-    public String getSupportType() {
-        return "message-gateway";
-    }
+    class MessageGatewayPubSubExecutor extends FunctionTaskExecutor {
+        MessageGatewayRuleNodeConfig config;
+
+        public MessageGatewayPubSubExecutor(ExecutionContext context) {
+            super("消息网关订阅发布", context);
+            this.config = FastBeanCopier.copy(context.getJob().getConfiguration(), MessageGatewayRuleNodeConfig.class);
+            this.config.validate();
+        }
 
+        @Override
+        protected Publisher<RuleData> apply(RuleData input) {
+            return gatewayManager
+                .getGateway(config.getGatewayId())
+                .switchIfEmpty(Mono.error(() -> new NotFoundException("消息网关[{" + config.getGatewayId() + "}]不存在")))
+                .flatMap(gateway -> config.convert(input)
+                    .flatMap(msg -> gateway.publish(msg, config.isShareCluster()))
+                    .then())
+                .thenReturn(input);
+        }
 
+        @Override
+        protected Disposable doStart() {
+            if (config.getType() == PubSubType.producer) {
+                return super.doStart();
+            }
+
+            //订阅网关中的消息
+            return gatewayManager
+                .getGateway(config.getGatewayId())
+                .switchIfEmpty(Mono.fromRunnable(() -> context.getLogger().error("消息网关[{" + config.getGatewayId() + "}]不存在")))
+                .flatMapMany(gateway -> gateway.subscribe(config.createTopics()))
+                .map(config::convert)
+                .flatMap(data -> context.getOutput().write(Mono.just(RuleData.create(data))))
+                .onErrorContinue((err, obj) -> {
+                    context.getLogger().error(err.getMessage(), err);
+                })
+                .subscribe();
+        }
+    }
 }

+ 1 - 14
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/rule/MessageGatewayRuleNodeConfig.java

@@ -5,14 +5,12 @@ import lombok.Setter;
 import org.jetlinks.community.gateway.TopicMessage;
 import org.jetlinks.community.network.PubSubType;
 import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
 import org.springframework.util.Assert;
 import reactor.core.publisher.Flux;
 
 @Getter
 @Setter
-public class MessageGatewayRuleNodeConfig implements RuleNodeConfig {
+public class MessageGatewayRuleNodeConfig {
 
     private String gatewayId;
 
@@ -35,7 +33,6 @@ public class MessageGatewayRuleNodeConfig implements RuleNodeConfig {
         return topics.split("[,;\n]");
     }
 
-    @Override
     public void validate() {
         Assert.hasText(gatewayId, "gatewayId can not be empty");
         Assert.hasText(topics, "topics can not be empty");
@@ -43,14 +40,4 @@ public class MessageGatewayRuleNodeConfig implements RuleNodeConfig {
 
     }
 
-    @Override
-    public NodeType getNodeType() {
-        return NodeType.MAP;
-    }
-
-
-    @Override
-    public void setNodeType(NodeType nodeType) {
-
-    }
 }

+ 39 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/MqttClientTaskConfiguration.java

@@ -0,0 +1,39 @@
+package org.jetlinks.community.network.mqtt.executor;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.web.utils.ExpressionUtils;
+import org.jetlinks.community.network.PubSubType;
+import org.jetlinks.rule.engine.executor.PayloadType;
+import org.springframework.util.Assert;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Getter
+@Setter
+public class MqttClientTaskConfiguration {
+
+    private String clientId;
+
+    private PayloadType payloadType = PayloadType.JSON;
+
+    private PubSubType[] clientType;
+
+    private List<String> topics;
+
+    private List<String> topicVariables;
+
+    public List<String> getTopics(Map<String, Object> vars) {
+        return topics.stream()
+                .map(topic -> ExpressionUtils.analytical(topic, vars, "spel")).collect(Collectors.toList());
+    }
+
+    public void validate() {
+        Assert.hasText(clientId, "clientId can not be empty");
+        Assert.notNull(clientType, "clientType can not be null");
+        Assert.notEmpty(topics, "topics can not be empty");
+
+    }
+}

+ 131 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/MqttClientTaskExecutorProvider.java

@@ -0,0 +1,131 @@
+package org.jetlinks.community.network.mqtt.executor;
+
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.hswebframework.web.dict.EnumDict;
+import org.jetlinks.core.message.codec.MqttMessage;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.PubSubType;
+import org.jetlinks.community.network.mqtt.client.MqttClient;
+import org.jetlinks.rule.engine.api.RuleConstants;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.RuleDataCodecs;
+import org.jetlinks.rule.engine.api.RuleDataHelper;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.Task;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
+import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@AllArgsConstructor
+@Component
+public class MqttClientTaskExecutorProvider implements TaskExecutorProvider {
+
+    private final NetworkManager networkManager;
+
+    static {
+        MqttRuleDataCodec.load();
+    }
+
+    @Override
+    public String getExecutor() {
+        return "mqtt-client";
+    }
+
+    protected Flux<MqttMessage> convertMessage(RuleData message, MqttClientTaskConfiguration config) {
+
+        return RuleDataCodecs.getCodec(MqttMessage.class)
+            .map(codec ->
+                codec.decode(message,
+                    config.getPayloadType(),
+                    new MqttTopics(config.getTopics(RuleDataHelper.toContextMap(message))))
+                    .cast(MqttMessage.class))
+            .orElseThrow(() -> new UnsupportedOperationException("unsupported decode message:{}" + message));
+    }
+
+    protected Mono<RuleData> convertMessage(MqttMessage message, MqttClientTaskConfiguration config) {
+
+        return Mono.just(RuleDataCodecs.getCodec(MqttMessage.class)
+            .map(codec -> codec.encode(message, config.getPayloadType(), new TopicVariables(config.getTopicVariables())))
+            .map(RuleData::create)
+            .orElseGet(() -> RuleData.create(message)));
+    }
+
+    @Override
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new MqttClientTaskExecutor(context));
+    }
+
+    class MqttClientTaskExecutor extends AbstractTaskExecutor {
+
+        private MqttClientTaskConfiguration config;
+
+        public MqttClientTaskExecutor(ExecutionContext context) {
+            super(context);
+            reload();
+        }
+
+        @Override
+        public String getName() {
+            return "MQTT Client";
+        }
+
+        @Override
+        public void reload() {
+            config = FastBeanCopier.copy(context.getJob().getConfiguration(), new MqttClientTaskConfiguration());
+            config.validate();
+            if (disposable != null) {
+                disposable.dispose();
+            }
+        }
+
+        @Override
+        public void validate() {
+            FastBeanCopier
+                .copy(context.getJob().getConfiguration(), new MqttClientTaskConfiguration())
+                .validate();
+        }
+
+        @Override
+        protected Disposable doStart() {
+            Disposable.Composite disposable = Disposables.composite();
+
+            if (EnumDict.in(PubSubType.producer, config.getClientType())) {
+                disposable.add(context.getInput()
+                    .accept()
+                    .filter((data) -> state == Task.State.running)
+                    .flatMap(data ->
+                        networkManager
+                            .<MqttClient>getNetwork(DefaultNetworkType.MQTT_CLIENT, config.getClientId())
+                            .flatMapMany(client -> convertMessage(data, config)
+                                .flatMap(msg -> client
+                                    .publish(msg)
+                                    .doOnSuccess((v) -> context.getLogger().debug("推送MQTT[{}]消息:{}", client.getId(), msg))
+                                )
+                            ).onErrorContinue((err, e) -> context.onError(err, null).subscribe())
+                    )
+                    .subscribe()
+                );
+            }
+            if (EnumDict.in(PubSubType.consumer, config.getClientType())) {
+                disposable.add(networkManager
+                    .<MqttClient>getNetwork(DefaultNetworkType.MQTT_CLIENT, config.getClientId())
+                    .flatMapMany(client -> client.subscribe(config.getTopics()))
+                    .filter((data) -> state == Task.State.running)
+                    .doOnNext(message -> context.getLogger().info("consume mqtt message:{}", message))
+                    .flatMap(message -> convertMessage(message, config))
+                    .flatMap(ruleData -> context.getOutput().write(Mono.just(ruleData)).thenReturn(ruleData))
+                    .flatMap(ruleData -> context.fireEvent(RuleConstants.Event.result, ruleData).thenReturn(ruleData))
+                    .onErrorContinue((err, e) -> context.onError(err, null).subscribe())
+                    .subscribe());
+            }
+            return disposable;
+        }
+    }
+}

+ 111 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/MqttRuleDataCodec.java

@@ -0,0 +1,111 @@
+package org.jetlinks.community.network.mqtt.executor;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.collections4.CollectionUtils;
+import org.jetlinks.core.message.codec.MessagePayloadType;
+import org.jetlinks.core.message.codec.MqttMessage;
+import org.jetlinks.core.message.codec.SimpleMqttMessage;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.RuleDataCodec;
+import org.jetlinks.rule.engine.api.RuleDataCodecs;
+import org.jetlinks.rule.engine.executor.PayloadType;
+import org.jetlinks.supports.utils.MqttTopicUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class MqttRuleDataCodec implements RuleDataCodec<MqttMessage> {
+
+    static {
+
+        MqttRuleDataCodec codec = new MqttRuleDataCodec();
+//        EncodedMessageCodec.register(DefaultTransport.MQTT, codec);
+//        EncodedMessageCodec.register(DefaultTransport.MQTT_TLS, codec);
+        RuleDataCodecs.register(MqttMessage.class, codec);
+
+    }
+
+    static void load() {
+
+    }
+
+    @Override
+    public Object encode(MqttMessage message, Feature... features) {
+        Map<String, Object> payload = new HashMap<>();
+        payload.put("topic", message.getTopic());
+        payload.put("will", message.isWill());
+        payload.put("qos", message.getQosLevel());
+        payload.put("dup", message.isDup());
+        payload.put("retain", message.isRetain());
+        PayloadType payloadType = Feature.find(PayloadType.class, features).orElse(PayloadType.JSON);
+        Feature.find(TopicVariables.class, features)
+            .map(TopicVariables::getVariables)
+            .filter(CollectionUtils::isNotEmpty)
+            .flatMap(list -> list.stream()
+                .map(str -> MqttTopicUtils.getPathVariables(str, message.getTopic()))
+                .reduce((m1, m2) -> {
+                    m1.putAll(m2);
+                    return m1;
+                }))
+            .ifPresent(vars -> payload.put("vars", vars));
+
+        payload.put("payloadType", payloadType.name());
+        payload.put("payload", payloadType.read(message.getPayload()));
+        payload.put("clientId", message.getClientId());
+
+
+        return payload;
+    }
+
+    @Override
+    public Flux<MqttMessage> decode(RuleData data, Feature... features) {
+        if (data.getData() instanceof MqttMessage) {
+            return Flux.just(((MqttMessage) data.getData()));
+        }
+        MqttTopics topics = Feature.find(MqttTopics.class, features).orElse(null);
+
+        return data
+            .dataToMap()
+            .filter(map -> map.containsKey("payload"))
+            .flatMap(map -> {
+                if (topics != null && !map.containsKey("topic")) {
+                    return Flux.fromIterable(topics.getTopics())
+                        .flatMap(topic -> {
+                            Map<String, Object> copy = new HashMap<>();
+                            copy.put("topic", topic);
+                            copy.putAll(map);
+                            return Mono.just(copy);
+                        })
+                        .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("topic not set")));
+                }
+                return Flux.just(map);
+            })
+            .map(map -> {
+                PayloadType payloadType = Feature.find(PayloadType.class, features)
+                    .orElseGet(() -> Optional.ofNullable(map.get("payloadType"))
+                        .map(String::valueOf)
+                        .map(PayloadType::valueOf)
+                        .orElse(PayloadType.JSON));
+                Object payload = map.get("payload");
+
+                ByteBuf byteBuf = payloadType.write(payload);
+
+                Integer qos = (Integer) map.get("qos");
+
+                return SimpleMqttMessage
+                    .builder()
+                    .clientId((String) map.get("clientId"))
+                    .topic((String) map.get("topic"))
+                    .dup(Boolean.TRUE.equals(map.get("dup")))
+                    .will(Boolean.TRUE.equals(map.get("will")))
+                    .retain(Boolean.TRUE.equals(map.get("retain")))
+                    .qosLevel(qos == null ? 0 : qos)
+                    .payloadType(MessagePayloadType.valueOf(payloadType.name()))
+                    .payload(byteBuf)
+                    .build();
+            });
+    }
+}

+ 24 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/MqttTopics.java

@@ -0,0 +1,24 @@
+package org.jetlinks.community.network.mqtt.executor;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.jetlinks.rule.engine.api.RuleDataCodec;
+
+import java.util.List;
+
+@Getter
+@AllArgsConstructor
+public class MqttTopics implements RuleDataCodec.Feature {
+
+    private List<String> topics;
+
+    @Override
+    public String getId() {
+        return "mqtt-topic";
+    }
+
+    @Override
+    public String getName() {
+        return "MQTT Topics";
+    }
+}

+ 13 - 0
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/TopicVariables.java

@@ -0,0 +1,13 @@
+package org.jetlinks.community.network.mqtt.executor;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.jetlinks.rule.engine.api.RuleDataCodec;
+
+import java.util.List;
+
+@Getter
+@AllArgsConstructor
+public class TopicVariables implements RuleDataCodec.Feature {
+    List<String> variables;
+}

+ 0 - 90
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/node/MqttClientNode.java

@@ -1,90 +0,0 @@
-package org.jetlinks.community.network.mqtt.node;
-
-import lombok.AllArgsConstructor;
-import org.hswebframework.web.dict.EnumDict;
-import org.jetlinks.community.network.mqtt.client.MqttClient;
-import org.jetlinks.core.message.codec.MqttMessage;
-import org.jetlinks.community.network.DefaultNetworkType;
-import org.jetlinks.community.network.NetworkManager;
-import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.RuleDataCodecs;
-import org.jetlinks.rule.engine.api.RuleDataHelper;
-import org.jetlinks.rule.engine.api.events.RuleEvent;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
-import org.jetlinks.rule.engine.executor.node.mqtt.*;
-import org.reactivestreams.Publisher;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.function.Function;
-
-@AllArgsConstructor
-@Component
-public class MqttClientNode extends CommonExecutableRuleNodeFactoryStrategy<MqttClientConfiguration> {
-
-    private NetworkManager networkManager;
-
-    static {
-        try {
-            Class.forName("org.jetlinks.rule.engine.executor.node.mqtt.MqttRuleDataCodec");
-        } catch (ClassNotFoundException ignore) {
-
-        }
-    }
-
-    @Override
-    public Function<RuleData, Publisher<Object>> createExecutor(ExecutionContext context, MqttClientConfiguration config) {
-
-        if (!EnumDict.in(ClientType.producer, config.getClientType())) {
-            return Mono::just;
-        }
-        return ruleData -> networkManager
-                .<org.jetlinks.community.network.mqtt.client.MqttClient>getNetwork(DefaultNetworkType.MQTT_CLIENT, config.getClientId())
-                .flatMapMany(client -> this.convertMessage(ruleData, config).flatMap(client::publish))
-                .then(Mono.just(ruleData))
-                ;
-    }
-
-    protected Flux<MqttMessage> convertMessage(RuleData message, MqttClientConfiguration config) {
-
-        return RuleDataCodecs.getCodec(MqttMessage.class)
-                .map(codec ->
-                        codec.decode(message,
-                                config.getPayloadType(),
-                                new MqttTopics(config.getTopics(RuleDataHelper.toContextMap(message))))
-                                .cast(MqttMessage.class))
-                .orElseThrow(() -> new UnsupportedOperationException("unsupported decode message:{}" + message));
-    }
-
-    protected Mono<RuleData> convertMessage(MqttMessage message, MqttClientConfiguration config) {
-
-        return Mono.just(RuleDataCodecs.getCodec(MqttMessage.class)
-                .map(codec -> codec.encode(message, config.getPayloadType(), new TopicVariables(config.getTopicVariables())))
-                .map(RuleData::create)
-                .orElseGet(() -> RuleData.create(message)));
-    }
-
-
-    @Override
-    protected void onStarted(ExecutionContext context, MqttClientConfiguration config) {
-        if (!EnumDict.in(ClientType.consumer, config.getClientType())) {
-            return;
-        }
-        context.onStop(networkManager
-                .<MqttClient>getNetwork(DefaultNetworkType.MQTT_CLIENT, config.getClientId())
-                .flatMapMany(client -> client.subscribe(config.getTopics()))
-                .doOnNext(message -> context.logger().info("consume mqtt message:{}", message))
-                .flatMap(message -> convertMessage(message, config))
-                .flatMap(ruleData -> context.getOutput().write(Mono.just(ruleData)).thenReturn(ruleData))
-                .flatMap(ruleData -> context.fireEvent(RuleEvent.NODE_EXECUTE_RESULT, ruleData).thenReturn(ruleData))
-                .onErrorContinue((err, e) -> context.onError(RuleData.create("consume mqtt message error"), err).subscribe())
-                .subscribe()::dispose);
-    }
-
-    @Override
-    public String getSupportType() {
-        return "mqtt-client";
-    }
-}

+ 15 - 1
jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/PubSubType.java

@@ -1,7 +1,21 @@
 package org.jetlinks.community.network;
 
-public enum PubSubType {
+import lombok.Getter;
+import org.hswebframework.web.dict.EnumDict;
+
+@Getter
+public enum PubSubType implements EnumDict<String> {
 
     producer,
     consumer;
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getText() {
+        return name();
+    }
 }

+ 25 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/executor/TcpClientTaskConfiguration.java

@@ -0,0 +1,25 @@
+package org.jetlinks.community.network.tcp.executor;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.jetlinks.community.network.PubSubType;
+import org.jetlinks.rule.engine.executor.PayloadType;
+import org.springframework.util.Assert;
+
+@Getter
+@Setter
+public class TcpClientTaskConfiguration {
+
+    private String clientId;
+
+    private PubSubType type;
+
+    private PayloadType payloadType;
+
+    public void validate() {
+        Assert.hasText(clientId, "clientId can not be empty!");
+        Assert.notNull(type, "type can not be null!");
+        Assert.notNull(payloadType, "type can not be null!");
+
+    }
+}

+ 109 - 0
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/executor/TcpClientTaskExecutorProvider.java

@@ -0,0 +1,109 @@
+package org.jetlinks.community.network.tcp.executor;
+
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.network.DefaultNetworkType;
+import org.jetlinks.community.network.NetworkManager;
+import org.jetlinks.community.network.PubSubType;
+import org.jetlinks.community.network.tcp.TcpMessage;
+import org.jetlinks.community.network.tcp.client.TcpClient;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.RuleDataCodecs;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
+import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@AllArgsConstructor
+@Component
+public class TcpClientTaskExecutorProvider implements TaskExecutorProvider {
+
+    private final NetworkManager clientManager;
+
+    static {
+        TcpMessageCodec.register();
+    }
+
+    @Override
+    public String getExecutor() {
+        return "tcp-client";
+    }
+
+    @Override
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new TcpTaskExecutor(context));
+    }
+
+    class TcpTaskExecutor extends AbstractTaskExecutor {
+
+        private TcpClientTaskConfiguration config;
+
+        public TcpTaskExecutor(ExecutionContext context) {
+            super(context);
+            reload();
+        }
+
+        @Override
+        public String getName() {
+            return "Tcp Client";
+        }
+
+        @Override
+        public void reload() {
+            config = FastBeanCopier.copy(context.getJob().getConfiguration(), new TcpClientTaskConfiguration());
+            config.validate();
+        }
+
+        @Override
+        public void validate() {
+            FastBeanCopier
+                .copy(context.getJob().getConfiguration(), new TcpClientTaskConfiguration())
+                .validate();
+        }
+
+        @Override
+        protected Disposable doStart() {
+            Disposable.Composite disposable = Disposables.composite();
+
+            if (config.getType() == PubSubType.producer) {
+                disposable.add(context
+                    .getInput()
+                    .accept()
+                    .flatMap(data ->
+                        clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT, config.getClientId())
+                            .flatMapMany(client -> RuleDataCodecs
+                                .getCodec(TcpMessage.class)
+                                .map(codec -> codec.decode(data, config.getPayloadType())
+                                    .cast(TcpMessage.class)
+                                    .switchIfEmpty(Mono.fromRunnable(() -> context.getLogger().warn("can not decode rule data to tcp message:{}", data))))
+                                .orElseGet(() -> Flux.just(new TcpMessage(config.getPayloadType().write(data.getData()))))
+                                .flatMap(client::send)
+                                .onErrorContinue((err, r) -> {
+                                    context.onError(err, data).subscribe();
+                                })
+                                .then()
+                            )).subscribe()
+                )
+                ;
+            }
+            if (config.getType() == PubSubType.consumer) {
+                disposable.add(clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT, config.getClientId())
+                    .switchIfEmpty(Mono.fromRunnable(() -> context.getLogger().error("tcp client {} not found", config.getClientId())))
+                    .flatMapMany(TcpClient::subscribe)
+                    .doOnNext(msg -> context.getLogger().info("received tcp client message:{}", config.getPayloadType().read(msg.getPayload())))
+                    .map(r -> RuleDataCodecs.getCodec(TcpMessage.class)
+                        .map(codec -> codec.encode(r, config.getPayloadType()))
+                        .orElse(r.getPayload()))
+                    .flatMap(out -> context.getOutput().write(Mono.just(RuleData.create(out))))
+                    .onErrorContinue((err, obj) -> context.getLogger().error("consume tcp message error", err))
+                    .subscribe());
+            }
+            return disposable;
+        }
+    }
+}

+ 1 - 6
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpMessageCodec.java

@@ -1,12 +1,10 @@
-package org.jetlinks.community.network.tcp.node;
+package org.jetlinks.community.network.tcp.executor;
 
-import org.jetlinks.core.message.codec.DefaultTransport;
 import org.jetlinks.community.network.tcp.TcpMessage;
 import org.jetlinks.rule.engine.api.RuleData;
 import org.jetlinks.rule.engine.api.RuleDataCodec;
 import org.jetlinks.rule.engine.api.RuleDataCodecs;
 import org.jetlinks.rule.engine.executor.PayloadType;
-import org.jetlinks.rule.engine.executor.node.device.EncodedMessageCodec;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -19,9 +17,6 @@ public class TcpMessageCodec implements RuleDataCodec<TcpMessage> {
 
     static {
         RuleDataCodecs.register(TcpMessage.class, instance);
-
-        EncodedMessageCodec.register(DefaultTransport.TCP, instance);
-        EncodedMessageCodec.register(DefaultTransport.TCP_TLS, instance);
     }
 
     static void register() {

+ 0 - 70
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNode.java

@@ -1,70 +0,0 @@
-package org.jetlinks.community.network.tcp.node;
-
-import lombok.AllArgsConstructor;
-import org.jetlinks.community.network.DefaultNetworkType;
-import org.jetlinks.community.network.NetworkManager;
-import org.jetlinks.community.network.PubSubType;
-import org.jetlinks.community.network.tcp.TcpMessage;
-import org.jetlinks.community.network.tcp.client.TcpClient;
-import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.RuleDataCodecs;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
-import org.reactivestreams.Publisher;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.function.Function;
-
-@AllArgsConstructor
-@Component
-public class TcpClientNode extends CommonExecutableRuleNodeFactoryStrategy<TcpClientNodeConfig> {
-
-    private NetworkManager clientManager;
-
-    static {
-        TcpMessageCodec.register();
-    }
-
-    @Override
-    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, TcpClientNodeConfig config) {
-
-        if (config.getType() != PubSubType.producer) {
-            return Mono::just;
-        }
-        return data -> clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT,config.getClientId())
-                .flatMapMany(client -> RuleDataCodecs
-                        .getCodec(TcpMessage.class)
-                        .map(codec -> codec.decode(data, config.getPayloadType())
-                                .cast(TcpMessage.class)
-                                .switchIfEmpty(Mono.fromRunnable(() -> context.logger().warn("can not decode rule data to tcp message:{}", data))))
-                        .orElseGet(() -> Flux.just(new TcpMessage(config.getPayloadType().write(data.getData()))))
-                        .flatMap(client::send)
-                        .all(r-> r))
-                        ;
-    }
-
-    @Override
-    protected void onStarted(ExecutionContext context, TcpClientNodeConfig config) {
-        super.onStarted(context, config);
-        if (config.getType() == PubSubType.consumer) {
-            context.onStop( clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT,config.getClientId())
-                    .switchIfEmpty(Mono.fromRunnable(() -> context.logger().error("tcp client {} not found", config.getClientId())))
-                    .flatMapMany(TcpClient::subscribe)
-                    .doOnNext(msg -> context.logger().info("received tcp client message:{}", config.getPayloadType().read(msg.getPayload())))
-                    .map(r -> RuleDataCodecs.getCodec(TcpMessage.class)
-                            .map(codec -> codec.encode(r, config.getPayloadType()))
-                            .orElse(r.getPayload()))
-                    .onErrorContinue((err, obj) -> {
-                        context.logger().error("consume tcp message error", err);
-                    })
-                    .subscribe(msg -> context.getOutput().write(Mono.just(RuleData.create(msg))).subscribe())::dispose);
-        }
-    }
-
-    @Override
-    public String getSupportType() {
-        return "tcp-client";
-    }
-}

+ 0 - 38
jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNodeConfig.java

@@ -1,38 +0,0 @@
-package org.jetlinks.community.network.tcp.node;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.jetlinks.community.network.PubSubType;
-import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.executor.PayloadType;
-import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
-import org.springframework.util.Assert;
-
-@Getter
-@Setter
-public class TcpClientNodeConfig implements RuleNodeConfig {
-
-    private String clientId;
-
-    private PubSubType type;
-
-    private PayloadType payloadType;
-
-    @Override
-    public NodeType getNodeType() {
-        return NodeType.MAP;
-    }
-
-    @Override
-    public void setNodeType(NodeType nodeType) {
-
-    }
-
-    @Override
-    public void validate() {
-        Assert.hasText(clientId, "clientId can not be empty!");
-        Assert.notNull(type, "type can not be null!");
-        Assert.notNull(payloadType, "payloadType can not be null!");
-
-    }
-}

+ 0 - 48
jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/rule/NotifierRuleNode.java

@@ -1,48 +0,0 @@
-package org.jetlinks.community.notify.rule;
-
-import lombok.AllArgsConstructor;
-import org.jetlinks.core.Values;
-import org.jetlinks.community.notify.NotifierManager;
-import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.RuleDataHelper;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
-import org.reactivestreams.Publisher;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
-
-import java.util.function.Function;
-
-@Component
-@AllArgsConstructor
-public class NotifierRuleNode extends CommonExecutableRuleNodeFactoryStrategy<RuleNotifierProperties> {
-
-    private NotifierManager notifierManager;
-
-    @Override
-    public String getSupportType() {
-        return "notifier";
-    }
-
-    @Override
-    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, RuleNotifierProperties config) {
-        return rule -> notifierManager
-                .getNotifier(config.getNotifyType(), config.getNotifierId())
-                .switchIfEmpty(Mono.fromRunnable(() -> {
-                    context.logger().warn("通知器[{}-{}]不存在", config.getNodeType(), config.getNotifierId());
-                }))
-                .flatMap(notifier -> notifier.send(config.getTemplateId(), Values.of(RuleDataHelper.toContextMap(rule))))
-                .doOnError(err -> {
-                    context.logger().error("发送[{}]通知[{}-{}]失败",
-                            config.getNotifyType().getName(),
-                            config.getNotifierId(),
-                            config.getTemplateId(), err);
-                })
-                .doOnSuccess(ignore -> {
-                    context.logger().info("发送[{}]通知[{}-{}]完成",
-                            config.getNotifyType().getName(),
-                            config.getNotifierId(),
-                            config.getTemplateId());
-                });
-    }
-}

+ 66 - 0
jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/rule/NotifierTaskExecutorProvider.java

@@ -0,0 +1,66 @@
+package org.jetlinks.community.notify.rule;
+
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.notify.NotifierManager;
+import org.jetlinks.core.Values;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.RuleDataHelper;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor;
+import org.reactivestreams.Publisher;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
+import java.util.function.Function;
+
+@Component
+@AllArgsConstructor
+public class NotifierTaskExecutorProvider implements TaskExecutorProvider {
+
+    private final NotifierManager notifierManager;
+
+    @Override
+    public String getExecutor() {
+        return "notifier";
+    }
+
+    @Override
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        RuleNotifierProperties properties = FastBeanCopier.copy(context.getJob().getConfiguration(), RuleNotifierProperties.class);
+        properties.validate();
+
+        Function<RuleData, Publisher<RuleData>> executor = createExecutor(context, properties);
+        return Mono.just(new FunctionTaskExecutor("消息通知", context) {
+            @Override
+            protected Publisher<RuleData> apply(RuleData input) {
+                return executor.apply(input);
+            }
+        });
+    }
+
+
+    public Function<RuleData, Publisher<RuleData>> createExecutor(ExecutionContext context, RuleNotifierProperties config) {
+        return rule -> notifierManager
+            .getNotifier(config.getNotifyType(), config.getNotifierId())
+            .switchIfEmpty(Mono.fromRunnable(() -> {
+                context.getLogger().warn("通知器[{}-{}]不存在", config.getNotifyType(), config.getNotifierId());
+            }))
+            .flatMap(notifier -> notifier.send(config.getTemplateId(), Values.of(RuleDataHelper.toContextMap(rule))))
+            .doOnError(err -> {
+                context.getLogger().error("发送[{}]通知[{}-{}]失败",
+                    config.getNotifyType().getName(),
+                    config.getNotifierId(),
+                    config.getTemplateId(), err);
+            })
+            .doOnSuccess(ignore -> {
+                context.getLogger().info("发送[{}]通知[{}-{}]完成",
+                    config.getNotifyType().getName(),
+                    config.getNotifierId(),
+                    config.getTemplateId());
+            }).then(Mono.empty());
+    }
+
+}

+ 1 - 14
jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/rule/RuleNotifierProperties.java

@@ -3,13 +3,11 @@ package org.jetlinks.community.notify.rule;
 import lombok.Getter;
 import lombok.Setter;
 import org.jetlinks.community.notify.DefaultNotifyType;
-import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
 import org.springframework.util.Assert;
 
 @Getter
 @Setter
-public class RuleNotifierProperties implements RuleNodeConfig {
+public class RuleNotifierProperties {
 
     private DefaultNotifyType notifyType;
 
@@ -17,17 +15,6 @@ public class RuleNotifierProperties implements RuleNodeConfig {
 
     private String templateId;
 
-    @Override
-    public NodeType getNodeType() {
-        return NodeType.PEEK;
-    }
-
-    @Override
-    public void setNodeType(NodeType nodeType) {
-
-    }
-
-    @Override
     public void validate() {
         Assert.notNull(notifyType,"notifyType can not be null");
         Assert.hasText(notifierId,"notifierId can not be empty");

+ 5 - 0
jetlinks-components/rule-engine-component/pom.xml

@@ -13,6 +13,11 @@
     <artifactId>rule-engine-component</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>com.cronutils</groupId>
+            <artifactId>cron-utils</artifactId>
+            <version>9.0.2</version>
+        </dependency>
         <dependency>
             <groupId>org.jetlinks</groupId>
             <artifactId>rule-engine-support</artifactId>

+ 42 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/EventTopicMessage.java

@@ -0,0 +1,42 @@
+package org.jetlinks.community.rule.engine.configuration;
+
+import io.netty.buffer.ByteBuf;
+import lombok.Getter;
+import lombok.Setter;
+import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.community.gateway.EncodableMessage;
+import org.jetlinks.community.gateway.TopicMessage;
+import org.jetlinks.rule.engine.api.NativePayload;
+import org.jetlinks.rule.engine.api.SubscribePayload;
+
+import javax.annotation.Nonnull;
+
+@Getter
+@Setter
+public class EventTopicMessage implements TopicMessage, EncodableMessage {
+    private String topic;
+
+    private Object nativePayload;
+
+    private SubscribePayload payload;
+
+    public EventTopicMessage(SubscribePayload payload) {
+        this.topic = payload.getTopic();
+        this.nativePayload = ((NativePayload) payload.getPayload()).getNativeObject();
+        this.payload = payload;
+    }
+
+    @Nonnull
+    @Override
+    public ByteBuf getPayload() {
+        return payload.getBody();
+    }
+
+    @Nonnull
+    @Override
+    public EncodedMessage getMessage() {
+        return this;
+    }
+
+
+}

+ 54 - 48
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineConfiguration.java

@@ -1,33 +1,36 @@
 package org.jetlinks.community.rule.engine.configuration;
 
-import org.jetlinks.community.rule.engine.nodes.TimerWorkerNode;
-import org.jetlinks.rule.engine.api.ConditionEvaluator;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.rule.engine.api.EventBus;
 import org.jetlinks.rule.engine.api.RuleEngine;
-import org.jetlinks.rule.engine.api.Slf4jLogger;
-import org.jetlinks.rule.engine.api.executor.ExecutableRuleNodeFactory;
-import org.jetlinks.rule.engine.cluster.logger.ClusterLogger;
+import org.jetlinks.rule.engine.api.rpc.RpcService;
+import org.jetlinks.rule.engine.api.rpc.RpcServiceFactory;
+import org.jetlinks.rule.engine.api.scheduler.Scheduler;
+import org.jetlinks.rule.engine.api.task.ConditionEvaluator;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.api.worker.Worker;
 import org.jetlinks.rule.engine.condition.ConditionEvaluatorStrategy;
 import org.jetlinks.rule.engine.condition.DefaultConditionEvaluator;
 import org.jetlinks.rule.engine.condition.supports.DefaultScriptEvaluator;
 import org.jetlinks.rule.engine.condition.supports.ScriptConditionEvaluatorStrategy;
 import org.jetlinks.rule.engine.condition.supports.ScriptEvaluator;
-import org.jetlinks.rule.engine.executor.DefaultExecutableRuleNodeFactory;
-import org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy;
-import org.jetlinks.rule.engine.executor.node.route.RouteEventNode;
+import org.jetlinks.rule.engine.defaults.DefaultRuleEngine;
+import org.jetlinks.rule.engine.defaults.LocalEventBus;
+import org.jetlinks.rule.engine.defaults.LocalScheduler;
+import org.jetlinks.rule.engine.defaults.LocalWorker;
+import org.jetlinks.rule.engine.defaults.rpc.DefaultRpcServiceFactory;
+import org.jetlinks.rule.engine.defaults.rpc.EventBusRcpService;
 import org.jetlinks.rule.engine.model.DefaultRuleModelParser;
 import org.jetlinks.rule.engine.model.RuleModelParserStrategy;
 import org.jetlinks.rule.engine.model.antv.AntVG6RuleModelParserStrategy;
-import org.jetlinks.rule.engine.standalone.StandaloneRuleEngine;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
-import org.springframework.boot.CommandLineRunner;
-import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import java.util.concurrent.ExecutorService;
-
 @Configuration
+@Slf4j
 public class RuleEngineConfiguration {
 
     @Bean
@@ -41,19 +44,45 @@ public class RuleEngineConfiguration {
     }
 
     @Bean
-    public DefaultExecutableRuleNodeFactory defaultExecutableRuleNodeFactory() {
-        return new DefaultExecutableRuleNodeFactory();
+    public AntVG6RuleModelParserStrategy antVG6RuleModelParserStrategy() {
+        return new AntVG6RuleModelParserStrategy();
     }
 
     @Bean
-    public AntVG6RuleModelParserStrategy antVG6RuleModelParserStrategy() {
-        return new AntVG6RuleModelParserStrategy();
+    public EventBus eventBus(MessageGateway messageGateway) {
+
+        LocalEventBus local = new LocalEventBus();
+
+        //转发到消息网关
+        local.subscribe("/**")
+            .flatMap(subscribePayload -> messageGateway.publish(new EventTopicMessage(subscribePayload)).then())
+            .onErrorContinue((err, obj) -> log.error(err.getMessage(), obj))
+            .subscribe();
+
+        return local;
+    }
+
+    @Bean
+    public RpcService rpcService(EventBus eventBus) {
+        return new EventBusRcpService(eventBus);
+    }
+
+    @Bean
+    public RpcServiceFactory rpcServiceFactory(RpcService rpcService) {
+        return new DefaultRpcServiceFactory(rpcService);
+    }
+
+    @Bean
+    public LocalScheduler clusterLocalScheduler(Worker worker) {
+        LocalScheduler scheduler = new LocalScheduler("local");
+        scheduler.addWorker(worker);
+        return scheduler;
     }
 
     @Bean
     public BeanPostProcessor autoRegisterStrategy(DefaultRuleModelParser defaultRuleModelParser,
                                                   DefaultConditionEvaluator defaultConditionEvaluator,
-                                                  DefaultExecutableRuleNodeFactory ruleNodeFactory) {
+                                                  LocalWorker worker) {
         return new BeanPostProcessor() {
             @Override
             public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
@@ -69,9 +98,10 @@ public class RuleEngineConfiguration {
                 if (bean instanceof ConditionEvaluatorStrategy) {
                     defaultConditionEvaluator.register(((ConditionEvaluatorStrategy) bean));
                 }
-                if (bean instanceof ExecutableRuleNodeFactoryStrategy) {
-                    ruleNodeFactory.registerStrategy(((ExecutableRuleNodeFactoryStrategy) bean));
+                if (bean instanceof TaskExecutorProvider) {
+                    worker.addExecutor(((TaskExecutorProvider) bean));
                 }
+
                 return bean;
             }
         };
@@ -88,37 +118,13 @@ public class RuleEngineConfiguration {
     }
 
     @Bean
-    public RuleEngine ruleEngine(ExecutableRuleNodeFactory ruleNodeFactory,
-                                 ConditionEvaluator conditionEvaluator,
-                                 ApplicationEventPublisher eventPublisher,
-                                 ExecutorService executorService) {
-        StandaloneRuleEngine ruleEngine = new StandaloneRuleEngine();
-        ruleEngine.setNodeFactory(ruleNodeFactory);
-        ruleEngine.setExecutor(executorService);
-        ruleEngine.setEvaluator(conditionEvaluator);
-        ruleEngine.setEventListener(eventPublisher::publishEvent);
-        ruleEngine.setLoggerSupplier((ctxId, model) -> {
-            ClusterLogger logger = new ClusterLogger();
-            logger.setParent(new Slf4jLogger("rule.engine.logger.".concat(model.getId()).concat(".").concat(model.getName())));
-            logger.setLogInfoConsumer(eventPublisher::publishEvent);
-            logger.setNodeId(model.getId());
-            logger.setInstanceId(ctxId);
-            return logger;
-        });
-        return ruleEngine;
-    }
-
-    /* 规则引擎节点 */
-
-    @Bean //定时调度
-    public TimerWorkerNode timerWorkerNode() {
-        return new TimerWorkerNode();
+    public LocalWorker localWorker(EventBus eventBus, ConditionEvaluator evaluator) {
+        return new LocalWorker("local", "local", eventBus, evaluator);
     }
 
     @Bean
-    public RouteEventNode routeEventNode() {
-        return new RouteEventNode();
+    public RuleEngine defaultRuleEngine(Scheduler scheduler) {
+        return new DefaultRuleEngine(scheduler);
     }
 
-
 }

+ 0 - 64
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/event/handler/RuleLogHandler.java

@@ -1,64 +0,0 @@
-package org.jetlinks.community.rule.engine.event.handler;
-
-import lombok.extern.slf4j.Slf4j;
-import org.hswebframework.web.bean.FastBeanCopier;
-import org.jetlinks.community.elastic.search.service.ElasticSearchService;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
-import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
-import org.jetlinks.rule.engine.api.events.NodeExecuteEvent;
-import org.jetlinks.rule.engine.api.events.RuleEvent;
-import org.jetlinks.rule.engine.cluster.logger.LogInfo;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.event.EventListener;
-import org.springframework.core.annotation.Order;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-@Order(3)
-public class RuleLogHandler {
-
-    @Autowired
-    private ElasticSearchService elasticSearchService;
-
-    @Autowired
-    private MessageGateway messageGateway;
-
-    @EventListener
-    public void handleRuleLog(LogInfo event) {
-        RuleEngineExecuteLogInfo logInfo = FastBeanCopier.copy(event, new RuleEngineExecuteLogInfo());
-        elasticSearchService.commit(RuleEngineLoggerIndexProvider.RULE_LOG, logInfo)
-            .subscribe();
-        // /rule-engine/{instanceId}/{nodeId}/log
-        messageGateway
-            .publish(String.join("/",
-                "/rule-engine",
-                event.getInstanceId(),
-                event.getNodeId(),
-                "log"), logInfo, true)
-            .subscribe();
-    }
-
-    @EventListener
-    public void handleRuleExecuteEvent(NodeExecuteEvent event) {
-        //不记录BEFORE和RESULT事件
-        if (!RuleEvent.NODE_EXECUTE_BEFORE.equals(event.getEvent())
-            && !RuleEvent.NODE_EXECUTE_RESULT.equals(event.getEvent())) {
-            RuleEngineExecuteEventInfo eventInfo = FastBeanCopier.copy(event, new RuleEngineExecuteEventInfo());
-            elasticSearchService.commit(RuleEngineLoggerIndexProvider.RULE_EVENT_LOG, eventInfo)
-                .subscribe();
-        }
-        // /rule-engine/{instanceId}/{nodeId}/event/{eventType}
-        messageGateway
-            .publish(String.join("/",
-                "/rule-engine",
-                event.getInstanceId(),
-                event.getNodeId(),
-                "event",
-                event.getEvent().toLowerCase()
-            ), event, true)
-            .subscribe();
-    }
-
-}

+ 21 - 19
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/DataMappingWorkerNode.java

@@ -1,4 +1,4 @@
-package org.jetlinks.community.rule.engine.nodes;
+package org.jetlinks.community.rule.engine.executor;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -6,18 +6,15 @@ import lombok.SneakyThrows;
 import org.hswebframework.web.bean.Converter;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.utils.ExpressionUtils;
-import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
-import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
-import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
-import org.reactivestreams.Publisher;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.LambdaTaskExecutor;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 
 import java.math.BigDecimal;
 import java.util.*;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -25,31 +22,36 @@ import java.util.stream.Collectors;
  * @since 1.0.0
  */
 @Component
-public class DataMappingWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<DataMappingWorkerNode.Config> {
+public class DataMappingTaskExecutorProvider implements TaskExecutorProvider {
 
     public static Converter converter = FastBeanCopier.DEFAULT_CONVERT;
 
     @Override
-    public String getSupportType() {
+    public String getExecutor() {
         return "data-mapping";
     }
 
     @Override
-    public Function<RuleData, Publisher<Object>> createExecutor(ExecutionContext context, Config config) {
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
 
-        return ruleData -> Mono.just(config.mapping(convertObject(ruleData.getData())));
+        return Mono.just(new LambdaTaskExecutor("Mapping", context, () -> {
+
+            Config config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
+
+            return data -> Mono.just(data.newData(config.mapping(data.getData())));
+
+        }));
     }
 
+
     @Getter
     @Setter
-    public static class Config implements RuleNodeConfig {
+    public static class Config {
 
         private List<Mapping> mappings = new ArrayList<>();
 
         private boolean keepSourceData = false;
 
-        private NodeType nodeType;
-
         private Map<String, Object> toMap(Object source) {
             return FastBeanCopier.copy(source, HashMap::new);
         }
@@ -62,10 +64,10 @@ public class DataMappingWorkerNode extends CommonExecutableRuleNodeFactoryStrate
             if (data instanceof Collection) {
                 Collection<Object> source = ((Collection) data);
                 return source
-                        .stream()
-                        .map(this::toMap)
-                        .map(this::doMapping)
-                        .collect(Collectors.toList());
+                    .stream()
+                    .map(this::toMap)
+                    .map(this::doMapping)
+                    .collect(Collectors.toList());
             }
             return data;
         }

+ 53 - 29
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/DeviceMessageSendNode.java

@@ -1,22 +1,25 @@
-package org.jetlinks.community.rule.engine.nodes;
+package org.jetlinks.community.rule.engine.executor;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
+import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.id.IDGenerator;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceProductOperator;
 import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.message.DeviceMessageReply;
 import org.jetlinks.core.message.Headers;
 import org.jetlinks.core.message.MessageType;
 import org.jetlinks.core.message.RepayableDeviceMessage;
 import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
-import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
-import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor;
 import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -24,18 +27,35 @@ import reactor.core.scheduler.Schedulers;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.function.Function;
 
-@AllArgsConstructor
 @Component
-public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrategy<DeviceMessageSendNode.Config> {
+@AllArgsConstructor
+public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvider {
 
     private final DeviceRegistry registry;
 
     @Override
-    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, Config config) {
+    public String getExecutor() {
+        return "device-message-sender";
+    }
+
+    @Override
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new DeviceMessageSendTaskExecutor(context));
+    }
+
+    class DeviceMessageSendTaskExecutor extends FunctionTaskExecutor {
+
+        private Config config;
 
-        return data -> {
+        public DeviceMessageSendTaskExecutor(ExecutionContext context) {
+            super("发送设备消息", context);
+            validate();
+            reload();
+        }
+
+        @Override
+        protected Publisher<RuleData> apply(RuleData input) {
             Flux<DeviceOperator> devices = StringUtils.hasText(config.getDeviceId())
                 ? registry.getDevice(config.getDeviceId()).flux()
                 : registry.getProduct(config.getProductId()).flatMapMany(DeviceProductOperator::getDevices);
@@ -44,18 +64,32 @@ public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrate
                 .filterWhen(DeviceOperator::isOnline)
                 .publishOn(Schedulers.parallel())
                 .flatMap(config::doSend)
-                .onErrorResume(error -> context.onError(data, error).then(Mono.empty()));
-        };
-    }
+                .onErrorResume(error -> context.onError(error, input).then(Mono.empty()))
+                .map(reply -> input.newData(reply.toJson()))
+                ;
+        }
+
+        @Override
+        public void validate() {
+            if (CollectionUtils.isEmpty(context.getJob().getConfiguration())) {
+                throw new IllegalArgumentException("配置不能为空");
+            }
+            Config config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
+            config.validate();
+        }
+
+        @Override
+        public void reload() {
+            config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
+        }
+
 
-    @Override
-    public String getSupportType() {
-        return "device-message-sender";
     }
 
+
     @Getter
     @Setter
-    public static class Config implements RuleNodeConfig {
+    public static class Config {
 
         //设备ID
         private String deviceId;
@@ -67,7 +101,8 @@ public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrate
 
         private boolean async;
 
-        public Publisher<?> doSend(DeviceOperator device) {
+        @SuppressWarnings("all")
+        public Publisher<DeviceMessageReply> doSend(DeviceOperator device) {
             Map<String, Object> message = new HashMap<>(this.message);
             message.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
             message.put("deviceId", device.getDeviceId());
@@ -78,7 +113,6 @@ public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrate
                 .flatMapMany(msg -> device.messageSender().send(Mono.just(msg)));
         }
 
-        @Override
         public void validate() {
             if (StringUtils.isEmpty(deviceId) && StringUtils.isEmpty(productId)) {
                 throw new IllegalArgumentException("deviceId和productId不能同时为空");
@@ -86,15 +120,5 @@ public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrate
             MessageType.convertMessage(message).orElseThrow(() -> new IllegalArgumentException("不支持的消息格式"));
         }
 
-        @Override
-        public NodeType getNodeType() {
-            return NodeType.MAP;
-        }
-
-        @Override
-        public void setNodeType(NodeType nodeType) {
-
-        }
     }
-
 }

+ 125 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java

@@ -0,0 +1,125 @@
+package org.jetlinks.community.rule.engine.executor;
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.AllArgsConstructor;
+import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.community.gateway.Subscription;
+import org.jetlinks.community.gateway.TopicMessage;
+import org.jetlinks.reactor.ql.ReactorQL;
+import org.jetlinks.rule.engine.api.RuleConstants;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.RuleDataHelper;
+import org.jetlinks.rule.engine.api.model.RuleNodeModel;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+@Component
+@AllArgsConstructor
+public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
+
+    private final MessageGateway messageGateway;
+
+    @Override
+    public String getExecutor() {
+        return "reactor-ql";
+    }
+
+    @Override
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new ReactorQLTaskExecutor(context));
+    }
+
+    class ReactorQLTaskExecutor extends AbstractTaskExecutor {
+
+        private ReactorQL reactorQL;
+
+        public ReactorQLTaskExecutor(ExecutionContext context) {
+            super(context);
+            reactorQL = createQl();
+        }
+
+        @Override
+        public String getName() {
+            return "ReactorQL";
+        }
+
+        @Override
+        protected Disposable doStart() {
+            Disposable.Composite composite = Disposables.composite();
+            Flux<Map<String, Object>> dataStream;
+            //有上游节点
+            if (!CollectionUtils.isEmpty(context.getJob().getInputs())) {
+
+                dataStream = context.getInput()
+                    .accept()
+                    .map(RuleDataHelper::toContextMap)
+                    .as(reactorQL::start)
+                ;
+            } else {
+                dataStream = reactorQL
+                    .start(table -> {
+                        if (table == null || table.equalsIgnoreCase("dual")) {
+                            return Flux.just(1);
+                        }
+                        if (table.startsWith("/")) {
+                            //转换为消息
+                            return messageGateway
+                                .subscribe(
+                                    Collections.singleton(new Subscription(table)),
+                                    "rule-engine:".concat(context.getInstanceId()),
+                                    false)
+                                .map(TopicMessage::convertMessage);
+                        }
+                        return Flux.just(1);
+                    });
+            }
+
+            return dataStream
+                .flatMap(result -> {
+                    RuleData data = context.newRuleData(result);
+                    //输出到下一节点
+                    return context.getOutput()
+                        .write(Mono.just(data))
+                        .then(context.fireEvent(RuleConstants.Event.result, data));
+                })
+                .onErrorResume(err -> context.onError(err, null))
+                .subscribe();
+        }
+
+        protected ReactorQL createQl() {
+            ReactorQL.Builder builder = Optional.ofNullable(context.getJob().getConfiguration())
+                .map(map -> map.get("sql"))
+                .map(String::valueOf)
+                .map(ReactorQL.builder()::sql)
+                .orElseThrow(() -> new IllegalArgumentException("配置sql错误"));
+            return builder.build();
+        }
+
+        @Override
+        public void reload() {
+            reactorQL = createQl();
+            if (this.disposable != null) {
+                this.disposable.dispose();
+            }
+            start();
+        }
+
+        @Override
+        public void validate() {
+            createQl();
+        }
+    }
+}

+ 22 - 14
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ScriptWorkerNode.java

@@ -1,4 +1,4 @@
-package org.jetlinks.community.rule.engine.nodes;
+package org.jetlinks.community.rule.engine.executor;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -7,11 +7,13 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.hswebframework.expands.script.engine.DynamicScriptEngine;
 import org.hswebframework.expands.script.engine.DynamicScriptEngineFactory;
+import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
-import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
-import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
+import org.jetlinks.rule.engine.api.model.RuleNodeModel;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.LambdaTaskExecutor;
 import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
@@ -24,14 +26,22 @@ import java.util.function.Function;
 
 @Component
 @Slf4j
-public class ScriptWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<ScriptWorkerNode.Config> {
+public class ScriptTaskExecutorProvider implements TaskExecutorProvider {
 
     @Override
-    public String getSupportType() {
+    public String getExecutor() {
         return "script";
     }
 
     @Override
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new LambdaTaskExecutor("script", context, () -> {
+
+            return createExecutor(context, FastBeanCopier.copy(context.getJob().getConfiguration(), new Config()));
+
+        }));
+    }
+
     @SneakyThrows
     public Function<RuleData, Publisher<?>> createExecutor(ExecutionContext context, Config config) {
 
@@ -54,16 +64,16 @@ public class ScriptWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<Sc
         scriptContext.put("handler", handler);
         engine.execute(id, scriptContext).getIfSuccess();
 
-        return ruleData -> Flux.defer(()->{
+        return ruleData -> Flux.defer(() -> {
             if (handler.onMessage != null) {
                 Object result = handler.onMessage.apply(ruleData);
                 if (result == null || result.getClass().getName().equals("jdk.nashorn.internal.runtime.Undefined")) {
                     return Flux.empty();
                 }
-                if(result instanceof Publisher){
-                    return Flux.from(((Publisher) result));
+                if (result instanceof Publisher) {
+                    return Flux.from(((Publisher<?>) result));
                 }
-                if(result instanceof Map){
+                if (result instanceof Map) {
                     result = new HashMap<>((Map<?, ?>) result);
                 }
                 return Flux.just(result);
@@ -82,13 +92,11 @@ public class ScriptWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<Sc
 
     @Getter
     @Setter
-    public static class Config implements RuleNodeConfig {
+    public static class Config {
 
         private String lang = "js";
 
         private String script;
 
-        private NodeType nodeType;
-
     }
 }

+ 19 - 24
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/SqlExecutorWorkerNode.java

@@ -1,4 +1,4 @@
-package org.jetlinks.community.rule.engine.nodes;
+package org.jetlinks.community.rule.engine.executor;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -6,12 +6,15 @@ import lombok.SneakyThrows;
 import org.hswebframework.ezorm.rdb.executor.SqlRequests;
 import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
 import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers;
+import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.utils.ExpressionUtils;
 import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
+import org.jetlinks.rule.engine.api.RuleDataHelper;
 import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
-import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.LambdaTaskExecutor;
 import org.reactivestreams.Publisher;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -19,31 +22,28 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 
 @Component
-public class SqlExecutorWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<SqlExecutorWorkerNode.Config> {
+public class SqlExecutorTaskExecutorProvider implements TaskExecutorProvider {
 
     @Autowired
     private ReactiveSqlExecutor sqlExecutor;
 
-    @Override
-    public String getSupportType() {
+    public String getExecutor() {
         return "sql";
     }
 
-    @Override
-    public Function<RuleData, Publisher<Object>> createExecutor(ExecutionContext context, Config config) {
+    public Function<RuleData, Publisher<?>> createExecutor(ExecutionContext context, Config config) {
 
         if (config.isQuery()) {
             return (data) -> Flux.defer(() -> {
                 String sql = config.getSql(data);
                 List<Flux<Map<String, Object>>> fluxes = new ArrayList<>();
                 data.acceptMap(map -> fluxes.add(sqlExecutor.select(Mono.just(SqlRequests.template(sql, map)), ResultWrappers.map())));
-                return Flux.concat(fluxes) ;
+                return Flux.concat(fluxes);
             });
         } else {
             return data -> Mono.defer(() -> {
@@ -57,10 +57,15 @@ public class SqlExecutorWorkerNode extends CommonExecutableRuleNodeFactoryStrate
 
     }
 
+    @Override
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new LambdaTaskExecutor("SQL",context, () -> createExecutor(context, FastBeanCopier.copy(context.getJob().getConfiguration(),new Config()))));
+    }
+
 
     @Getter
     @Setter
-    public static class Config implements RuleNodeConfig {
+    public static class Config {
 
         private String dataSourceId;
 
@@ -75,7 +80,7 @@ public class SqlExecutorWorkerNode extends CommonExecutableRuleNodeFactoryStrate
         public boolean isQuery() {
 
             return sql.trim().startsWith("SELECT") ||
-                    sql.trim().startsWith("select");
+                sql.trim().startsWith("select");
         }
 
         @SneakyThrows
@@ -83,20 +88,10 @@ public class SqlExecutorWorkerNode extends CommonExecutableRuleNodeFactoryStrate
             if (!sql.contains("${")) {
                 return sql;
             }
-            Map<String, Object> map = new HashMap<>();
-            map.put("data", data.getData());
-            map.put("ruleData", data);
-            map.put("attr", data.getAttributes());
-            return ExpressionUtils.analytical(sql, map, "spel");
-        }
-
-        public void switchDataSource() {
 
+            return ExpressionUtils.analytical(sql, RuleDataHelper.toContextMap(data), "spel");
         }
 
-        public void resetDataSource() {
-
-        }
     }
 
 }

+ 125 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/TimerTaskExecutorProvider.java

@@ -0,0 +1,125 @@
+package org.jetlinks.community.rule.engine.executor;
+
+import com.cronutils.model.Cron;
+import com.cronutils.model.CronType;
+import com.cronutils.model.definition.CronDefinitionBuilder;
+import com.cronutils.model.time.ExecutionTime;
+import com.cronutils.parser.CronParser;
+import lombok.AllArgsConstructor;
+import org.jetlinks.community.ValueObject;
+import org.jetlinks.rule.engine.api.RuleConstants;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
+import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.function.Supplier;
+
+@Component
+@AllArgsConstructor
+public class TimerTaskExecutorProvider implements TaskExecutorProvider {
+
+    private final Scheduler scheduler;
+
+    @Override
+    public String getExecutor() {
+        return "timer";
+    }
+
+    @Override
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new TimerTaskExecutor(context));
+    }
+
+    class TimerTaskExecutor extends AbstractTaskExecutor {
+
+        Supplier<Duration> nextDelay;
+
+        public TimerTaskExecutor(ExecutionContext context) {
+            super(context);
+            nextDelay = createNextDelay();
+        }
+
+        @Override
+        public String getName() {
+            return "定时调度";
+        }
+
+        @Override
+        protected Disposable doStart() {
+            return execute();
+        }
+
+        private Disposable execute() {
+            Duration nextTime = nextDelay.get();
+            context.getLogger().debug("trigger timed task after {}", nextTime);
+            if (this.disposable != null) {
+                this.disposable.dispose();
+            }
+            return this.disposable =
+                Mono.delay(nextTime, scheduler)
+                    .flatMap(t -> context.getOutput().write(Mono.just(context.newRuleData(t))))
+                    .then(context.fireEvent(RuleConstants.Event.complete, context.newRuleData(System.currentTimeMillis())).thenReturn(1))
+                    .subscribe(t -> execute());
+        }
+
+        @Override
+        public void reload() {
+            nextDelay = createNextDelay();
+            if (disposable != null) {
+                disposable.dispose();
+            }
+            doStart();
+        }
+
+        @Override
+        public void validate() {
+            createNextDelay();
+        }
+
+        private Supplier<Duration> createNextDelay() {
+            ValueObject config = ValueObject.of(context.getJob().getConfiguration());
+
+            CronParser parser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ));
+            Cron cron = config.getString("cron")
+                .map(parser::parse)
+                .orElseThrow(() -> new IllegalArgumentException("cron配置不存在"));
+            ExecutionTime executionTime = ExecutionTime.forCron(cron);
+
+            return () -> executionTime.timeToNextExecution(ZonedDateTime.now()).orElse(Duration.ofSeconds(10));
+
+        }
+
+    }
+
+    public static Flux<ZonedDateTime> getLastExecuteTimes(String cronExpression, Date from, long times) {
+        return Flux.create(sink -> {
+            CronParser parser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ));
+            Cron cron = parser.parse(cronExpression);
+            ExecutionTime executionTime = ExecutionTime.forCron(cron);
+            ZonedDateTime dateTime = ZonedDateTime.ofInstant(from.toInstant(), ZoneId.systemDefault());
+
+            for (long i = 0; i < times; i++) {
+                dateTime = executionTime.nextExecution(dateTime)
+                    .orElse(null);
+                if (dateTime != null) {
+                    sink.next(dateTime);
+                } else {
+                    break;
+                }
+            }
+            sink.complete();
+
+
+        });
+    }
+}

+ 0 - 134
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ReactorSqlNode.java

@@ -1,134 +0,0 @@
-package org.jetlinks.community.rule.engine.nodes;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.core.message.codec.MessagePayloadType;
-import org.jetlinks.community.gateway.EncodableMessage;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
-import org.jetlinks.reactor.ql.ReactorQL;
-import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.RuleDataHelper;
-import org.jetlinks.rule.engine.api.events.RuleEvent;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
-import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
-import org.jetlinks.rule.engine.executor.PayloadType;
-import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
-import org.reactivestreams.Publisher;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.Collections;
-import java.util.function.Function;
-
-/**
- * <pre>
- *     {@code
- *
- *     select avg(this.temperature) avgVal, deviceId
- *     from "/device/+/message/property/#"
- *     group _window(10,1) --每10条滚动数据
- *     having avgVal > 10
- *
- *     }
- * </pre>
- */
-@Slf4j
-@AllArgsConstructor
-@Component
-public class ReactorSqlNode extends CommonExecutableRuleNodeFactoryStrategy<ReactorSqlNode.Config> {
-
-    private final MessageGateway messageGateway;
-
-    @Override
-    public String getSupportType() {
-        return "reactor-ql";
-    }
-
-    @Override
-    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, Config config) {
-        ReactorQL ql = config.getReactorQL();
-
-        return data -> ql.start(Flux.just(RuleDataHelper.toContextMap(data)));
-    }
-
-    @Override
-    protected void onStarted(ExecutionContext context, Config config) {
-        log.debug("start reactor ql : {}", config.getSql());
-        context.onStop(
-            config.getReactorQL()
-                .start(table -> {
-                    if (table == null || table.equalsIgnoreCase("dual")) {
-                        return Flux.just(1);
-                    }
-
-                    if (table.startsWith("/")) {
-                        return messageGateway
-                            .subscribe(
-                                Collections.singleton(new Subscription(table)),
-                                "rule-engine:".concat(context.getInstanceId()),
-                                false)
-                            .map(msg -> {
-                                //转换为消息
-                                if (msg.getMessage() instanceof EncodableMessage) {
-                                    return ((EncodableMessage) msg.getMessage()).getNativePayload();
-                                }
-                                MessagePayloadType payloadType = msg.getMessage().getPayloadType();
-                                if (payloadType == null) {
-                                    return msg.getMessage().getBytes();
-                                }
-                                return PayloadType.valueOf(payloadType.name()).read(msg.getMessage().getPayload());
-                            });
-                    }
-                    return Flux.just(1);
-                })
-                .flatMap(result -> {
-                    RuleData data = RuleData.create(result);
-                    //输出到下一节点
-                    return context.getOutput()
-                        .write(Mono.just(RuleData.create(result)))
-                        .then(context.fireEvent(RuleEvent.NODE_EXECUTE_DONE, data));
-                })
-                .onErrorResume(err -> context.onError(RuleData.create(""), err))
-                .subscribe()::dispose
-        );
-
-    }
-
-    public static class Config implements RuleNodeConfig {
-
-        @Getter
-        @Setter
-        private String sql;
-
-        private volatile ReactorQL reactorQL;
-
-        @Override
-        public NodeType getNodeType() {
-            return NodeType.MAP;
-        }
-
-        @Override
-        public void setNodeType(NodeType nodeType) {
-
-        }
-
-        public ReactorQL getReactorQL() {
-            if (reactorQL == null) {
-                reactorQL = ReactorQL.builder().sql(sql).build();
-            }
-            return reactorQL;
-        }
-
-        @Override
-        public void validate() {
-            //不报错就ok
-            getReactorQL();
-        }
-    }
-
-}

+ 0 - 129
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/TimerWorkerNode.java

@@ -1,129 +0,0 @@
-package org.jetlinks.community.rule.engine.nodes;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.Setter;
-import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
-import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
-import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
-import org.reactivestreams.Publisher;
-import org.springframework.scheduling.support.CronSequenceGenerator;
-import reactor.core.publisher.Mono;
-
-import java.time.Duration;
-import java.util.Date;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-
-public class TimerWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<TimerWorkerNode.Configuration> {
-
-    private Map<String, TimerWorkerNode.TimerJob> jobs = new ConcurrentHashMap<>();
-
-    @Override
-    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, Configuration config) {
-        return Mono::just;
-    }
-
-    @Override
-    protected void onStarted(ExecutionContext context, Configuration config) {
-        super.onStarted(context, config);
-        String id = context.getInstanceId() + ":" + context.getNodeId();
-
-        context.onStop(() -> {
-            TimerJob job = jobs.remove(id);
-            if (null != job) {
-                job.cancel();
-            }
-        });
-        TimerJob job = jobs.computeIfAbsent(id, _id -> new TimerJob(config, context));
-
-        job.start();
-    }
-
-    @Override
-    public String getSupportType() {
-        return "timer";
-    }
-
-    @AllArgsConstructor
-    private static class TimerJob {
-        private String id;
-        private TimerWorkerNode.Configuration configuration;
-        private ExecutionContext context;
-        private volatile boolean running;
-
-        TimerJob(TimerWorkerNode.Configuration configuration,
-                 ExecutionContext context) {
-            this.configuration = configuration;
-            this.context = context;
-            this.id = context.getInstanceId() + ":" + context.getNodeId();
-        }
-
-
-        void start() {
-            running = true;
-            doStart();
-        }
-
-        void doStart() {
-            if (!running) {
-                return;
-            }
-            running = true;
-            Mono.delay(Duration.ofMillis(configuration.nextMillis()))
-                .subscribe(t -> execute(this::doStart));
-        }
-
-        void execute(Runnable runnable) {
-            if (!running) {
-                return;
-            }
-            context.logger().debug("execute timer:{}", id);
-            context.getOutput()
-                .write(Mono.just(RuleData.create(System.currentTimeMillis())))
-                .doOnError(err -> context.logger().error("fire timer error", err))
-                .doFinally(s -> runnable.run())
-                .subscribe();
-        }
-
-        void cancel() {
-            running = false;
-        }
-    }
-
-
-    public static class Configuration implements RuleNodeConfig {
-        @Getter
-        @Setter
-        private String cron;
-
-        private volatile CronSequenceGenerator generator;
-
-        @Override
-        public NodeType getNodeType() {
-            return NodeType.PEEK;
-        }
-
-        @Override
-        public void setNodeType(NodeType nodeType) {
-
-        }
-
-        public void init() {
-            generator = new CronSequenceGenerator(cron);
-        }
-
-        @Override
-        public void validate() {
-            init();
-        }
-
-        public long nextMillis() {
-            return Math.max(100, generator.next(new Date()).getTime() - System.currentTimeMillis());
-        }
-
-    }
-}

+ 81 - 56
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java

@@ -1,26 +1,28 @@
 package org.jetlinks.community.rule.engine.device;
 
 import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
-import org.jetlinks.core.message.DeviceMessage;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.ValueObject;
 import org.jetlinks.community.gateway.DeviceMessageUtils;
 import org.jetlinks.community.gateway.MessageGateway;
 import org.jetlinks.community.gateway.Subscription;
+import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.reactor.ql.ReactorQL;
 import org.jetlinks.reactor.ql.ReactorQLContext;
 import org.jetlinks.reactor.ql.ReactorQLRecord;
+import org.jetlinks.rule.engine.api.RuleConstants;
 import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.events.RuleEvent;
-import org.jetlinks.rule.engine.api.executor.ExecutionContext;
-import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
-import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
+import org.jetlinks.rule.engine.api.task.ExecutionContext;
+import org.jetlinks.rule.engine.api.task.Task;
+import org.jetlinks.rule.engine.api.task.TaskExecutor;
+import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
+import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
 import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
@@ -31,45 +33,26 @@ import java.util.*;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-@Slf4j(topic = "system.rule.engine.device.alarm")
-@Component
+@Slf4j
 @AllArgsConstructor
-public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy<DeviceAlarmRuleNode.Config> {
+@Component
+public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
 
     private final MessageGateway messageGateway;
 
     @Override
-    public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context,DeviceAlarmRuleNode.Config config) {
-
-        return Mono::just;
-    }
-
-    @Override
-    protected void onStarted(ExecutionContext context, DeviceAlarmRuleNode.Config config) {
-        context.onStop(
-            config.doSubscribe(messageGateway)
-                .flatMap(result -> {
-                    RuleData data = RuleData.create(result);
-                    //输出到下一节点
-                    return context.getOutput()
-                        .write(Mono.just(data))
-                        .then(context.fireEvent(RuleEvent.NODE_EXECUTE_DONE, data));
-                })
-                .onErrorResume(err -> context.onError(RuleData.create(err.getMessage()), err))
-                .subscribe()::dispose
-        );
+    public String getExecutor() {
+        return "device_alarm";
     }
 
     @Override
-    public String getSupportType() {
-        return "device_alarm";
+    public Mono<TaskExecutor> createTask(ExecutionContext context) {
+        return Mono.just(new DeviceAlarmTaskExecutor(context));
     }
 
+    class DeviceAlarmTaskExecutor extends AbstractTaskExecutor {
 
-    @Getter
-    @Setter
-    public static class Config implements RuleNodeConfig {
-        static List<String> default_columns = Arrays.asList(
+        List<String> default_columns = Arrays.asList(
             "timestamp", "deviceId", "this.header.deviceName deviceName"
         );
 
@@ -77,19 +60,63 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
 
         private ReactorQL ql;
 
+        public DeviceAlarmTaskExecutor(ExecutionContext context) {
+            super(context);
+            rule = createRule();
+            ql = createQL(rule);
+        }
+
         @Override
-        public void validate() {
-            if (CollectionUtils.isEmpty(rule.getTriggers())) {
-                throw new IllegalArgumentException("预警条件不能为空");
+        public String getName() {
+            return "设备告警";
+        }
+
+        @Override
+        protected Disposable doStart() {
+            rule.validate();
+            return doSubscribe(messageGateway)
+                .filter(ignore -> state == Task.State.running)
+                .flatMap(result -> {
+                    RuleData data = RuleData.create(result);
+                    //输出到下一节点
+                    return context
+                        .getOutput()
+                        .write(Mono.just(data))
+                        .then(context.fireEvent(RuleConstants.Event.result, data));
+                })
+                .onErrorResume(err -> context.onError(err, null))
+                .subscribe();
+        }
+
+        @Override
+        public void reload() {
+            rule = createRule();
+            ql = createQL(rule);
+            if (disposable != null) {
+                disposable.dispose();
             }
+            doStart();
+        }
+
+        private DeviceAlarmRule createRule() {
+            DeviceAlarmRule rule = ValueObject.of(context.getJob().getConfiguration())
+                .get("rule")
+                .map(val -> FastBeanCopier.copy(val, new DeviceAlarmRule())).orElseThrow(() -> new IllegalArgumentException("告警配置错误"));
+            rule.validate();
+            return rule;
+        }
+
+        @Override
+        public void validate() {
+            DeviceAlarmRule rule = createRule();
             try {
-                ql = createQL();
+                createQL(rule);
             } catch (Exception e) {
                 throw new IllegalArgumentException("配置错误:" + e.getMessage(), e);
             }
         }
 
-        private ReactorQL createQL() {
+        private ReactorQL createQL(DeviceAlarmRule rule) {
             List<String> columns = new ArrayList<>(default_columns);
             List<String> wheres = new ArrayList<>();
 
@@ -119,13 +146,21 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
                         continue;
                     }
                     String alias = StringUtils.hasText(property.getAlias()) ? property.getAlias() : property.getProperty();
-                    newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
+                    // 'message',func(),this[name]
+                    if ((property.getProperty().startsWith("'") && property.getProperty().endsWith("'"))
+                        ||
+                        property.getProperty().contains("(") || property.getProperty().contains("[")) {
+                        newColumns.add(property.getProperty() + "\"" + alias + "\"");
+                    } else {
+                        newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
+                    }
                 }
                 if (newColumns.size() > 3) {
                     sql = "select \n\t" + String.join("\n\t,", newColumns) + "\n from (\n\t" + sql + "\n) t";
                 }
             }
             log.debug("create device alarm sql : \n{}", sql);
+
             return ReactorQL.builder().sql(sql).build();
         }
 
@@ -161,7 +196,7 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
 
             binds.forEach(context::bind);
 
-            Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL() : ql)
+            Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL(rule) : ql)
                 .start(context)
                 .map(ReactorQLRecord::asMap);
 
@@ -177,11 +212,11 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
                     shakeLimit.isAlarmFirst()
                         ?
                         group -> group
-                            .takeUntil(tp -> tp.getT1() >= thresholdNumber) //达到触发阈值
-                            .take(1) //取第一个
+                            .takeUntil(tp -> tp.getT1() >= thresholdNumber)
+                            .take(1)
                             .singleOrEmpty()
                         :
-                        group -> group.takeLast(1).singleOrEmpty();//取最后一个
+                        group -> group.takeLast(1).singleOrEmpty();
 
                 resultFlux = window
                     .flatMap(group -> group
@@ -227,15 +262,5 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
                         .then(Mono.just(map));
                 });
         }
-
-        @Override
-        public NodeType getNodeType() {
-            return NodeType.MAP;
-        }
-
-        @Override
-        public void setNodeType(NodeType nodeType) {
-
-        }
     }
 }

+ 3 - 7
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/RuleInstanceEntity.java

@@ -7,7 +7,6 @@ import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
 import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
 import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
-import org.jetlinks.rule.engine.api.Rule;
 import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
 import org.jetlinks.rule.engine.api.model.RuleModel;
 import org.springframework.util.StringUtils;
@@ -63,14 +62,11 @@ public class RuleInstanceEntity extends GenericEntity<String> implements RecordC
     private String instanceDetailJson;
 
 
-    public Rule toRule(RuleEngineModelParser parser) {
+    public RuleModel toRule(RuleEngineModelParser parser) {
         RuleModel model = parser.parse(modelType, modelMeta);
         model.setId(StringUtils.hasText(modelId)?modelId:getId());
         model.setName(name);
-        Rule rule = new Rule();
-        rule.setModel(model);
-        rule.setVersion(modelVersion);
-        rule.setId(getId());
-        return rule;
+
+        return model;
     }
 }

+ 0 - 4
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/Action.java

@@ -2,9 +2,7 @@ package org.jetlinks.community.rule.engine.model;
 
 import lombok.Getter;
 import lombok.Setter;
-import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration;
 import org.jetlinks.rule.engine.api.model.RuleNodeModel;
-import org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy;
 
 import java.io.Serializable;
 import java.util.Map;
@@ -18,7 +16,6 @@ public class Action implements Serializable {
      * 执行器
      *
      * @see RuleNodeModel#getExecutor()
-     * @see ExecutableRuleNodeFactoryStrategy#getSupportType()
      */
     private String executor;
 
@@ -26,7 +23,6 @@ public class Action implements Serializable {
      * 执行器配置
      *
      * @see RuleNodeModel#getConfiguration()
-     * @see RuleNodeConfiguration
      */
     private Map<String, Object> configuration;
 }

+ 2 - 4
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java

@@ -3,11 +3,10 @@ package org.jetlinks.community.rule.engine.model;
 import com.alibaba.fastjson.JSON;
 import org.apache.commons.collections.CollectionUtils;
 import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.rule.engine.executor.DeviceMessageSendTaskExecutorProvider;
 import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.community.rule.engine.device.DeviceAlarmRule;
 import org.jetlinks.community.rule.engine.entity.DeviceAlarmEntity;
-import org.jetlinks.community.rule.engine.nodes.DeviceMessageSendNode;
-import org.jetlinks.rule.engine.api.cluster.RunMode;
 import org.jetlinks.rule.engine.api.model.RuleLink;
 import org.jetlinks.rule.engine.api.model.RuleModel;
 import org.jetlinks.rule.engine.api.model.RuleNodeModel;
@@ -37,7 +36,6 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
         RuleModel model = new RuleModel();
         model.setId("device_alarm:".concat(rule.getId()));
         model.setName(rule.getName());
-        model.setRunMode(RunMode.CLUSTER);
 
         DeviceAlarmRule alarmRule = rule.getAlarmRule();
         alarmRule.validate();
@@ -59,7 +57,7 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
                 timer.setExecutor("timer");
                 timer.setConfiguration(Collections.singletonMap("cron", timerTrigger.getCron()));
 
-                DeviceMessageSendNode.Config senderConfig = new DeviceMessageSendNode.Config();
+                DeviceMessageSendTaskExecutorProvider.Config senderConfig = new DeviceMessageSendTaskExecutorProvider.Config();
                 senderConfig.setAsync(true);
                 senderConfig.setDeviceId(alarmRule.getDeviceId());
                 senderConfig.setProductId(alarmRule.getProductId());

+ 2 - 2
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/SqlRuleModelParser.java

@@ -3,7 +3,7 @@ package org.jetlinks.community.rule.engine.model;
 import com.alibaba.fastjson.JSON;
 import org.jetlinks.community.rule.engine.enums.SqlRuleType;
 import org.jetlinks.community.rule.engine.ql.SqlRule;
-import org.jetlinks.rule.engine.api.events.RuleEvent;
+import org.jetlinks.rule.engine.api.RuleConstants;
 import org.jetlinks.rule.engine.api.model.RuleLink;
 import org.jetlinks.rule.engine.api.model.RuleModel;
 import org.jetlinks.rule.engine.api.model.RuleNodeModel;
@@ -59,7 +59,7 @@ public class SqlRuleModelParser implements RuleModelParserStrategy {
                 link.setId(action.getId().concat(":").concat(action.getId()));
                 link.setName("错误处理:" + index);
                 link.setSource(sqlNode);
-                link.setType(RuleEvent.NODE_EXECUTE_FAIL);
+                link.setType(RuleConstants.Event.error);
                 link.setTarget(action);
                 errorHandler.add(link);
                 model.getNodes().add(action);

+ 0 - 361
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/RuleEngineDebugService.java

@@ -1,361 +0,0 @@
-package org.jetlinks.community.rule.engine.service;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
-import lombok.Getter;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.hswebframework.utils.StringUtils;
-import org.hswebframework.web.exception.NotFoundException;
-import org.hswebframework.web.id.IDGenerator;
-import org.jetlinks.core.cluster.ClusterManager;
-import org.jetlinks.core.cluster.ClusterQueue;
-import org.jetlinks.core.cluster.ClusterTopic;
-import org.jetlinks.rule.engine.api.*;
-import org.jetlinks.rule.engine.api.executor.*;
-import org.jetlinks.rule.engine.api.model.Condition;
-import org.jetlinks.rule.engine.api.model.NodeType;
-import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
-import org.jetlinks.rule.engine.cluster.logger.ClusterLogger;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import reactor.core.publisher.EmitterProcessor;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import javax.annotation.PostConstruct;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-@Slf4j
-@Service
-public class RuleEngineDebugService {
-
-    @Autowired
-    private ExecutableRuleNodeFactory executableRuleNodeFactory;
-
-    @Autowired
-    private RuleEngineModelParser modelParser;
-
-    @Autowired
-    private ConditionEvaluator conditionEvaluator;
-
-    private Map<String, Session> sessionStore = new ConcurrentHashMap<>();
-
-
-    public Flux<DebugMessage> getDebugMessages(String sessionId) {
-        return getSession(sessionId)
-            .consumeOutPut();
-    }
-
-    private Session getSession(String id) {
-        return Optional.ofNullable(id)
-            .map(sessionStore::get)
-            .orElseThrow(() -> new NotFoundException("session不存在"));
-    }
-
-    public Mono<String> startSession() {
-        return Mono.fromSupplier(() -> {
-            String sessionId = IDGenerator.UUID.generate();
-            Session session = new Session(sessionId);
-            session.local = true;
-            sessionStore.put(sessionId, session);
-            return sessionId;
-        });
-    }
-
-
-    @SneakyThrows
-    public String startNode(String sessionId, RuleNodeConfiguration configuration) {
-        configuration.setNodeType(NodeType.MAP);
-        Session session = getSession(sessionId);
-
-        DebugExecutionContext context = session.createContext(configuration);
-
-        ExecutableRuleNode ruleNode = executableRuleNodeFactory.create(configuration);
-
-        ruleNode.start(context);
-
-        return context.id;
-    }
-
-    public void sendData(String sessionId, String contextId, RuleData ruleData) {
-        getSession(sessionId)
-            .getContext(contextId)
-            .execute(ruleData);
-    }
-
-
-    public Mono<Boolean> stopContext(String sessionId, String contextId) {
-
-        return Mono.fromRunnable(() -> getSession(sessionId).stopContext(contextId))
-            .thenReturn(true);
-    }
-
-    public Set<String> getAllContext(String sessionId) {
-        return getSession(sessionId)
-            .contexts
-            .keySet();
-    }
-
-    public Mono<Boolean> closeSession(String sessionId) {
-        return Mono.fromRunnable(() -> getSession(sessionId).close());
-    }
-
-    public Mono<Boolean> testCondition(String sessionId, Condition condition, Object data) {
-
-        Session session = getSession(sessionId);
-
-        try {
-            boolean success = conditionEvaluator.evaluate(condition, RuleData.create(data));
-            return session.writeMessage(DebugMessage.of("output", null, "测试条件:".concat(success ? "通过" : "未通过")));
-        } catch (Exception e) {
-            return session.writeMessage(DebugMessage.of("error", null, StringUtils.throwable2String(e)));
-        }
-    }
-
-    private class Session {
-        private String id;
-
-        private long lastOperationTime;
-
-        private Map<String, DebugExecutionContext> contexts = new ConcurrentHashMap<>();
-
-        private Map<String, RuleInstanceContext> instanceContext = new ConcurrentHashMap<>();
-
-        private Map<String, String> instanceContextMapping = new ConcurrentHashMap<>();
-
-        private EmitterProcessor<DebugMessage> messageQueue = EmitterProcessor.create(false);
-
-        @Getter
-        private boolean local = false;
-
-        private Session(String id) {
-            this.lastOperationTime = System.currentTimeMillis();
-            this.id = id;
-        }
-
-        private boolean isTimeout() {
-            return System.currentTimeMillis() - lastOperationTime > TimeUnit.MINUTES.toMillis(15);
-        }
-
-        private void checkContextTimeout() {
-            contexts.entrySet()
-                .stream()
-                .filter(e -> e.getValue().isTimeout())
-                .map(Map.Entry::getKey)
-                .map(contexts::remove)
-                .forEach(DebugExecutionContext::stop);
-        }
-
-        private void stopContext(String contextId) {
-            Optional.ofNullable(contexts.remove(contextId))
-                .ifPresent(ExecutionContext::stop);
-        }
-
-        private Logger createLogger(String contextId, String nodeId) {
-            ClusterLogger logger = new ClusterLogger();
-            logger.setParent(new Slf4jLogger("rule.engine.debug.".concat(nodeId)));
-            logger.setNodeId(nodeId);
-            logger.setInstanceId(contextId);
-            logger.setLogInfoConsumer(logInfo -> {
-
-                Map<String, Object> data = new HashMap<>();
-                data.put("level", logInfo.getLevel());
-                data.put("message", logInfo.getMessage());
-
-                writeMessage(DebugMessage.of("log", contextId, data))
-                    .subscribe();
-
-            });
-            return logger;
-        }
-
-        private DebugExecutionContext createContext(RuleNodeConfiguration configuration) {
-            lastOperationTime = System.currentTimeMillis();
-            String id = Optional.ofNullable(configuration.getId()).orElseGet(IDGenerator.MD5::generate);
-            DebugExecutionContext context = contexts.get(id);
-            if (context != null) {
-                context.stop();
-                contexts.remove(id);
-            }
-
-            context = new DebugExecutionContext(id, createLogger(id, configuration.getNodeId()), this);
-            context.local = true;
-            contexts.put(id, context);
-            return context;
-        }
-
-        private DebugExecutionContext getContext(String id) {
-            lastOperationTime = System.currentTimeMillis();
-
-            return contexts.computeIfAbsent(id, _id -> new DebugExecutionContext(id, new Slf4jLogger("rule.engine.debug.none"), this));
-        }
-
-        private void execute(RuleData ruleData) {
-            String instanceId = ruleData.getAttribute("instanceId").map(String::valueOf).orElse(null);
-
-            RuleInstanceContext context = instanceContext.get(instanceId);
-            if (context != null) {
-                doExecute(context, ruleData);
-            }
-
-        }
-
-        private void doExecute(RuleInstanceContext context, RuleData ruleData) {
-
-            context.execute(Mono.just(ruleData))
-                .doOnError((throwable) -> {
-                    writeMessage(DebugMessage.of("error", context.getId(), "执行规则失败:" + StringUtils.throwable2String(throwable)));
-                })
-                .subscribe(resp -> {
-                    writeMessage(DebugMessage.of("output", context.getId(), resp.getData()));
-                });
-
-        }
-
-        private void execute(ExecuteRuleRequest request) {
-
-            RuleInstanceContext context = instanceContext.get(request.getContextId());
-            if (context == null) {
-                return;
-            }
-            RuleData ruleData = RuleData.create(request.getData());
-            RuleDataHelper.markStartWith(ruleData, request.getStartWith());
-            RuleDataHelper.markSyncReturn(ruleData, request.getEndWith());
-            ruleData.setAttribute("debugSessionId", id);
-            ruleData.setAttribute("instanceId", request.getContextId());
-
-            doExecute(context, ruleData);
-        }
-
-        private Mono<Boolean> writeMessage(DebugMessage message) {
-            lastOperationTime = System.currentTimeMillis();
-            return Mono.fromRunnable(() -> messageQueue.onNext(message))
-                .thenReturn(true);
-        }
-
-
-        @SneakyThrows
-        public Flux<DebugMessage> consumeOutPut() {
-
-            return messageQueue
-                .map(Function.identity());
-        }
-
-        public void close() {
-            contexts.forEach((s, context) -> context.stop());
-            instanceContext.values().forEach(RuleInstanceContext::stop);
-            instanceContext.clear();
-            instanceContextMapping.clear();
-            messageQueue.onComplete();
-        }
-
-    }
-
-    private class DebugExecutionContext implements ExecutionContext {
-
-        private Session session;
-
-        private String id;
-
-        private EmitterProcessor<RuleData> inputQueue = EmitterProcessor.create(false);
-
-        private Logger logger;
-
-        private List<Runnable> stopListener = new CopyOnWriteArrayList<>();
-
-        private long lastOperationTime = System.currentTimeMillis();
-
-        @Getter
-        private boolean local = false;
-
-        public DebugExecutionContext(String id, Logger logger, Session session) {
-            this.session = session;
-            this.logger = logger;
-            this.id = id;
-        }
-
-        public boolean isTimeout() {
-            return System.currentTimeMillis() - lastOperationTime > TimeUnit.MINUTES.toMillis(15);
-        }
-
-        @Override
-        public String getInstanceId() {
-            return id;
-        }
-
-        @Override
-        public String getNodeId() {
-            return id;
-        }
-
-        @Override
-        public Logger logger() {
-            return logger;
-        }
-
-        public void execute(RuleData ruleData) {
-            lastOperationTime = System.currentTimeMillis();
-
-            ruleData.setAttribute("debug", true);
-            inputQueue.onNext(ruleData);
-        }
-
-        @Override
-        public Input getInput() {
-
-            return new Input() {
-                @Override
-                public Flux<RuleData> subscribe() {
-                    return inputQueue.map(Function.identity())
-                        .doOnNext(data -> {
-                            log.debug("handle input :{}", data);
-                        })
-                        .doFinally(s -> {
-                            log.debug("unsubscribe input:{}", id);
-                        });
-                }
-
-                @Override
-                public void close() {
-                    inputQueue.onComplete();
-                }
-            };
-        }
-
-        @Override
-        public Output getOutput() {
-            return (data) -> Flux.from(data)
-                .flatMap(d -> session.writeMessage(DebugMessage.of("output", id, JSON.toJSONString(d.getData(), SerializerFeature.PrettyFormat))))
-                .then(Mono.just(true));
-        }
-
-        @Override
-        public Mono<Void> fireEvent(String event, RuleData data) {
-            return Mono.empty();
-        }
-
-        @Override
-        public Mono<Void> onError(RuleData data, Throwable e) {
-            return session
-                .writeMessage(DebugMessage.of("error", id, StringUtils.throwable2String(e)))
-                .then();
-        }
-
-        @Override
-        public void stop() {
-            stopListener.forEach(Runnable::run);
-        }
-
-        @Override
-        public void onStop(Runnable runnable) {
-            stopListener.add(runnable);
-        }
-    }
-
-
-}

+ 27 - 31
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/RuleInstanceService.java

@@ -4,16 +4,15 @@ import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.ezorm.core.param.QueryParam;
 import org.hswebframework.web.api.crud.entity.PagerResult;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
-import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
-import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
-import org.jetlinks.community.rule.engine.event.handler.RuleEngineLoggerIndexProvider;
 import org.jetlinks.community.elastic.search.service.ElasticSearchService;
-import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
+import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
+import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
 import org.jetlinks.community.rule.engine.entity.RuleInstanceEntity;
-import org.jetlinks.rule.engine.api.Rule;
+import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
+import org.jetlinks.community.rule.engine.event.handler.RuleEngineLoggerIndexProvider;
 import org.jetlinks.rule.engine.api.RuleEngine;
-import org.jetlinks.rule.engine.api.RuleInstanceContext;
 import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
+import org.jetlinks.rule.engine.api.model.RuleModel;
 import org.reactivestreams.Publisher;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
@@ -44,30 +43,29 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
 
     public Mono<Void> stop(String id) {
         return this.ruleEngine
-                .getInstance(id)
-                .flatMap(RuleInstanceContext::stop)
-                .switchIfEmpty(Mono.empty())
-                .then(createUpdate()
-                        .set(RuleInstanceEntity::getState, RuleInstanceState.stopped)
-                        .where(RuleInstanceEntity::getId,id)
-                        .execute())
-                .then();
+            .shutdown(id)
+            .then(createUpdate()
+                .set(RuleInstanceEntity::getState, RuleInstanceState.stopped)
+                .where(RuleInstanceEntity::getId, id)
+                .execute())
+            .then();
     }
 
-    public Mono<RuleInstanceContext> start(String id) {
+    public Mono<Void> start(String id) {
         return findById(Mono.just(id))
-                .flatMap(this::doStart);
+            .flatMapMany(instance -> this.ruleEngine.startRule(id, instance.toRule(modelParser)))
+            .then();
     }
 
-    private Mono<RuleInstanceContext> doStart(RuleInstanceEntity entity) {
+    private Mono<Void> doStart(RuleInstanceEntity entity) {
         return Mono.defer(() -> {
-            Rule rule = entity.toRule(modelParser);
-            return ruleEngine.startRule(rule)
-                    .flatMap(ctx -> createUpdate()
-                            .set(RuleInstanceEntity::getState, RuleInstanceState.started)
-                            .where(entity::getId)
-                            .execute()
-                            .thenReturn(ctx));
+            RuleModel model = entity.toRule(modelParser);
+            return ruleEngine
+                .startRule(entity.getId(), model)
+                .then(createUpdate()
+                    .set(RuleInstanceEntity::getState, RuleInstanceState.started)
+                    .where(entity::getId)
+                    .execute()).then();
         });
     }
 
@@ -81,12 +79,10 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
     @Override
     public void run(String... args) {
         createQuery()
-                .where()
-                .is(RuleInstanceEntity::getState, RuleInstanceState.started)
-                .fetch()
-                .flatMap(this::doStart)
-                .subscribe(context -> {
-                    log.debug("start rule {}", context.getId());
-                });
+            .where()
+            .is(RuleInstanceEntity::getState, RuleInstanceState.started)
+            .fetch()
+            .flatMap(this::doStart)
+            .subscribe();
     }
 }

+ 0 - 80
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/RuleEngineDebugController.java

@@ -1,80 +0,0 @@
-package org.jetlinks.community.rule.engine.web;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import org.hswebframework.web.authorization.annotation.Authorize;
-import org.hswebframework.web.authorization.annotation.Resource;
-import org.jetlinks.community.rule.engine.service.DebugMessage;
-import org.jetlinks.community.rule.engine.service.RuleEngineDebugService;
-import org.jetlinks.rule.engine.api.RuleData;
-import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration;
-import org.jetlinks.rule.engine.api.model.Condition;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.MediaType;
-import org.springframework.web.bind.annotation.*;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-@RestController
-@RequestMapping("/rule-engine/debug")
-@Authorize
-@Resource(id="rule-model",name = "规则引擎-模型")
-public class RuleEngineDebugController {
-
-    @Autowired
-    private RuleEngineDebugService ruleDebugService;
-
-    @PostMapping
-    public Mono<String> startSession() {
-        return ruleDebugService.startSession();
-    }
-
-    @PostMapping("/{sessionId}")
-    public Mono<String> startNode(@PathVariable String sessionId, @RequestBody RuleNodeConfiguration configuration) {
-        return Mono.just(ruleDebugService.startNode(sessionId, configuration));
-    }
-
-    @GetMapping(value = "/{sessionId}/logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
-    public Flux<DebugMessage> getSessionLogs(@PathVariable String sessionId) {
-        return ruleDebugService.getDebugMessages(sessionId);
-    }
-
-    @PostMapping("/{sessionId}/{contextId}")
-    public Mono<String> executeNode(@PathVariable String sessionId,
-                                    @PathVariable String contextId,
-                                    @RequestBody String data) {
-        Object object = data.trim();
-        if (data.startsWith("[") || data.startsWith("{")) {
-            object = JSON.parse(data);
-        }
-
-        RuleData ruleData = RuleData.create(object);
-
-        ruleDebugService.sendData(sessionId, contextId, ruleData);
-
-        return Mono.just(ruleData.getId());
-    }
-
-    @GetMapping("/{sessionId}/contexts")
-    public Flux<String> getSessionContexts(@PathVariable String sessionId) {
-        return Flux.fromIterable(ruleDebugService.getAllContext(sessionId));
-    }
-
-
-    @PostMapping("/{sessionId}/condition")
-    public Mono<Boolean> testCondition(@PathVariable String sessionId, @RequestBody JSONObject data) {
-        return ruleDebugService.testCondition(sessionId, data.getJSONObject("condition").toJavaObject(Condition.class), data.get("data"));
-    }
-
-    @DeleteMapping("/{sessionId}")
-    public Mono<Boolean> stopSession(@PathVariable String sessionId) {
-        return   ruleDebugService.closeSession(sessionId);
-    }
-
-    @DeleteMapping("/{sessionId}/{contextId}")
-    public Mono<Boolean> stopContext(@PathVariable String sessionId, @PathVariable String contextId) {
-
-        return  ruleDebugService.stopContext(sessionId, contextId);
-    }
-
-}

+ 1 - 28
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/RuleInstanceController.java

@@ -7,18 +7,12 @@ import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.authorization.annotation.ResourceAction;
 import org.hswebframework.web.crud.service.ReactiveCrudService;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
-import org.hswebframework.web.exception.NotFoundException;
-import org.hswebframework.web.id.IDGenerator;
-import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
 import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
+import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
 import org.jetlinks.community.rule.engine.entity.RuleInstanceEntity;
 import org.jetlinks.community.rule.engine.service.RuleInstanceService;
-import org.jetlinks.rule.engine.api.DefaultRuleData;
-import org.jetlinks.rule.engine.api.RuleDataHelper;
-import org.jetlinks.rule.engine.api.RuleEngine;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 @RestController
@@ -29,8 +23,6 @@ public class RuleInstanceController implements ReactiveServiceCrudController<Rul
     @Autowired
     private RuleInstanceService instanceService;
 
-    @Autowired
-    private RuleEngine ruleEngine;
 
     @PostMapping("/{id}/_start")
     @ResourceAction(id = "start", name = "启动")
@@ -66,25 +58,6 @@ public class RuleInstanceController implements ReactiveServiceCrudController<Rul
     }
 
 
-    @PostMapping("/{id}/_execute/{startWith}/{endWith}")
-    @ResourceAction(id = "execute", name = "执行")
-    public Flux<Object> execute(@PathVariable String id,
-                                @PathVariable String startWith,
-                                @PathVariable String endWith,
-                                @RequestBody Flux<DefaultRuleData> payload) {
-        return ruleEngine
-            .getInstance(id)
-            .switchIfEmpty(Mono.error(NotFoundException::new))
-            .flatMapMany(context -> context
-                .execute(payload
-                    .map(ruleData -> {
-                        ruleData.setId(IDGenerator.SNOW_FLAKE_STRING.generate());
-                        RuleDataHelper.markStartWith(ruleData, startWith);
-                        return RuleDataHelper.markSyncReturn(ruleData, endWith);
-                    })
-                ));
-    }
-
     @Override
     public ReactiveCrudService<RuleInstanceEntity, String> getService() {
         return instanceService;

+ 6 - 3
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/RuleModelController.java

@@ -7,12 +7,15 @@ import org.hswebframework.web.crud.service.ReactiveCrudService;
 import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
 import org.jetlinks.community.rule.engine.entity.RuleModelEntity;
 import org.jetlinks.community.rule.engine.service.RuleModelService;
-import org.jetlinks.rule.engine.api.executor.ExecutableRuleNodeFactory;
+import org.jetlinks.rule.engine.api.RuleEngine;
+import org.jetlinks.rule.engine.api.worker.Worker;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.function.Function;
+
 @RestController
 @RequestMapping("rule-engine/model")
 @Resource(id = "rule-model", name = "规则引擎-模型")
@@ -22,7 +25,7 @@ public class RuleModelController implements ReactiveServiceCrudController<RuleMo
     private RuleModelService ruleModelService;
 
     @Autowired
-    private ExecutableRuleNodeFactory factory;
+    private RuleEngine ruleEngine;
 
     @Override
     public ReactiveCrudService<RuleModelEntity, String> getService() {
@@ -39,6 +42,6 @@ public class RuleModelController implements ReactiveServiceCrudController<RuleMo
     @GetMapping("/executors")
     @QueryAction
     public Flux<String> getAllSupportExecutors() {
-        return Flux.fromIterable(factory.getAllSupportExecutor());
+        return ruleEngine.getWorkers().flatMap(Worker::getSupportExecutors).flatMapIterable(Function.identity());
     }
 }

+ 5 - 5
pom.xml

@@ -16,16 +16,16 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.build.locales>zh_CN</project.build.locales>
-        <spring.boot.version>2.2.6.RELEASE</spring.boot.version>
+        <spring.boot.version>2.2.8.RELEASE</spring.boot.version>
         <java.version>1.8</java.version>
         <project.build.jdk>${java.version}</project.build.jdk>
-        <hsweb.framework.version>4.0.3</hsweb.framework.version>
-        <easyorm.version>4.0.3</easyorm.version>
+        <hsweb.framework.version>4.0.4-SNAPSHOT</hsweb.framework.version>
+        <easyorm.version>4.0.4-SNAPSHOT</easyorm.version>
         <hsweb.expands.version>3.0.2</hsweb.expands.version>
-        <jetlinks.version>1.0.4-SNAPSHOT</jetlinks.version>
+        <jetlinks.version>1.1.0-SNAPSHOT</jetlinks.version>
         <r2dbc.version>Arabba-SR3</r2dbc.version>
         <vertx.version>3.8.5</vertx.version>
-        <netty.version>4.1.46.Final</netty.version>
+        <netty.version>4.1.50.Final</netty.version>
         <elasticsearch.version>6.8.6</elasticsearch.version>
         <reactor.excel.version>1.0-RC</reactor.excel.version>
         <reactor.ql.version>1.0.0</reactor.ql.version>