Jelajahi Sumber

优化消息释放

zhou-hao 4 tahun lalu
induk
melakukan
a8d4703c7a

+ 23 - 13
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java

@@ -79,11 +79,11 @@ class ProxyMessageListener implements MessageListener {
         if (Payload.class.isAssignableFrom(paramType)) {
         if (Payload.class.isAssignableFrom(paramType)) {
             return message;
             return message;
         }
         }
-        if (paramType.equals(TopicMessage.class)) {
-            log.warn("TopicMessage已弃用,请替换为TopicPayload! {}", method);
-            return TopicMessageWrap.wrap(message);
-        }
         try {
         try {
+            if (paramType.equals(TopicMessage.class)) {
+                log.warn("TopicMessage已弃用,请替换为TopicPayload! {}", method);
+                return TopicMessageWrap.wrap(message);
+            }
             Payload payload = message.getPayload();
             Payload payload = message.getPayload();
             Object decodedPayload;
             Object decodedPayload;
             if (payload instanceof NativePayload) {
             if (payload instanceof NativePayload) {
@@ -105,17 +105,27 @@ class ProxyMessageListener implements MessageListener {
 
 
     @Override
     @Override
     public Mono<Void> onMessage(TopicPayload message) {
     public Mono<Void> onMessage(TopicPayload message) {
-        boolean paramVoid = paramType == Void.class;
         try {
         try {
-            Object val = proxy.apply(target, paramVoid ? null : convert(message));
-            if (val instanceof Publisher) {
-                return Mono.from((Publisher<?>) val).then();
-            }
-            return Mono.empty();
-        } finally {
-            if (paramVoid) {
-                message.release();
+            boolean paramVoid = paramType == Void.class;
+            try {
+                Object val = proxy.apply(target, paramVoid ? null : convert(message));
+                if (val instanceof Publisher) {
+                    return Mono.from((Publisher<?>) val).then();
+                }
+                return Mono.empty();
+            } finally {
+                if (paramVoid) {
+                    message.release();
+                }
             }
             }
+        } catch (Throwable e) {
+            log.error("invoke event listener [{}] error", toString(), e);
         }
         }
+        return Mono.empty();
+    }
+
+    @Override
+    public String toString() {
+        return ClassUtils.getUserClass(target).getSimpleName() + "." + method.getName();
     }
     }
 }
 }

+ 3 - 3
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java

@@ -82,9 +82,9 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
                                         .concat(context.getJob().getNodeId()),
                                         .concat(context.getJob().getNodeId()),
                                     table,
                                     table,
                                     Subscription.Feature.local
                                     Subscription.Feature.local
-                                    ),
-                                    JsonCodec.of(Map.class)
-                                );
+                                    )
+                                )
+                                .map(payload -> payload.bodyToJson(true));
                         }
                         }
                         return Flux.just(1);
                         return Flux.just(1);
                     });
                     });