Browse Source

优化规则

zhou-hao 5 years ago
parent
commit
1b76d5e4bd

+ 5 - 1
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java

@@ -66,7 +66,11 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
                 dataStream = context.getInput()
                     .accept()
                     .map(RuleDataHelper::toContextMap)
-                    .as(reactorQL::start)
+                    .flatMap(v -> reactorQL.start(Flux.just(v))
+                        .onErrorResume(err -> {
+                            context.getLogger().error(err.getMessage(),err);
+                            return context.onError(err, null).then(Mono.empty());
+                        }))
                 ;
             } else {
                 dataStream = reactorQL