Forráskód Böngészése

增加规则日志

zhou-hao 5 éve
szülő
commit
760ab2af15

+ 6 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java

@@ -3,9 +3,11 @@ package org.jetlinks.community.gateway;
 import com.alibaba.fastjson.JSON;
 import io.netty.buffer.ByteBufUtil;
 import org.jetlinks.core.message.codec.EncodedMessage;
+import org.jetlinks.supports.utils.MqttTopicUtils;
 
 import javax.annotation.Nonnull;
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
 
 public interface TopicMessage {
 
@@ -29,6 +31,10 @@ public interface TopicMessage {
     @Nonnull
     EncodedMessage getMessage();
 
+    default Map<String, String> getTopicVars(String pattern) {
+        return MqttTopicUtils.getPathVariables(pattern, getTopic());
+    }
+
     default Object convertMessage() {
         if (getMessage() instanceof EncodableMessage) {
             return ((EncodableMessage) getMessage()).getNativePayload();

+ 17 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/entity/RuleEngineExecuteEventInfo.java

@@ -1,7 +1,12 @@
 package org.jetlinks.community.rule.engine.entity;
 
+import com.alibaba.fastjson.JSONObject;
 import lombok.Getter;
 import lombok.Setter;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.jetlinks.community.gateway.TopicMessage;
+
+import java.util.Map;
 
 /**
  * @author bsetfeng
@@ -21,4 +26,16 @@ public class RuleEngineExecuteEventInfo {
     private String nodeId;
 
     private String ruleData;
+
+    private String contextId;
+
+    public static RuleEngineExecuteEventInfo of(TopicMessage message) {
+        Map<String, String> vars = message.getTopicVars("/rule-engine/{instanceId}/{nodeId}/event/{event}");
+        RuleEngineExecuteEventInfo info = FastBeanCopier.copy(vars, new RuleEngineExecuteEventInfo());
+        JSONObject json = message.getMessage().payloadAsJson();
+        info.id=json.getString("id");
+        info.contextId=json.getString("contextId");
+        info.setRuleData(json.toJSONString());
+        return info;
+    }
 }

+ 32 - 0
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/event/handler/RuleLogHandler.java

@@ -0,0 +1,32 @@
+package org.jetlinks.community.rule.engine.event.handler;
+
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.elastic.search.service.ElasticSearchService;
+import org.jetlinks.community.gateway.TopicMessage;
+import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
+import org.jetlinks.rule.engine.defaults.LogEvent;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
+@Component
+@Slf4j
+@Order(3)
+public class RuleLogHandler {
+
+    @Autowired
+    private ElasticSearchService elasticSearchService;
+
+    @Subscribe("/rule-engine/*/*/event/*")
+    public Mono<Void> handleEvent(TopicMessage event) {
+
+        return elasticSearchService.commit(RuleEngineLoggerIndexProvider.RULE_EVENT_LOG, RuleEngineExecuteEventInfo.of(event));
+    }
+
+    @Subscribe("/rule-engine/*/*/logger/*")
+    public Mono<Void> handleLog(LogEvent event) {
+        return elasticSearchService.commit(RuleEngineLoggerIndexProvider.RULE_LOG, event);
+    }
+}