|
@@ -9,6 +9,7 @@ import lombok.AllArgsConstructor;
|
|
|
import org.jetlinks.community.ValueObject;
|
|
|
import org.jetlinks.rule.engine.api.RuleConstants;
|
|
|
import org.jetlinks.rule.engine.api.task.ExecutionContext;
|
|
|
+import org.jetlinks.rule.engine.api.task.Task;
|
|
|
import org.jetlinks.rule.engine.api.task.TaskExecutor;
|
|
|
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
|
|
|
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
|
|
@@ -69,7 +70,13 @@ public class TimerTaskExecutorProvider implements TaskExecutorProvider {
|
|
|
Mono.delay(nextTime, scheduler)
|
|
|
.flatMap(t -> context.getOutput().write(Mono.just(context.newRuleData(t))))
|
|
|
.then(context.fireEvent(RuleConstants.Event.complete, context.newRuleData(System.currentTimeMillis())).thenReturn(1))
|
|
|
- .subscribe(t -> execute());
|
|
|
+ .onErrorResume(err -> context.onError(err, null).then(Mono.empty()))
|
|
|
+ .doFinally(s -> {
|
|
|
+ if(getState()== Task.State.running){
|
|
|
+ execute();
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .subscribe();
|
|
|
}
|
|
|
|
|
|
@Override
|