Jelajahi Sumber

优化TopicMessage释放

zhou-hao 4 tahun lalu
induk
melakukan
b0e383005e

+ 1 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java

@@ -9,6 +9,7 @@ import javax.annotation.Nonnull;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
+@Deprecated
 public interface TopicMessage {
 
     /**

+ 24 - 16
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java

@@ -83,31 +83,39 @@ class ProxyMessageListener implements MessageListener {
             log.warn("TopicMessage已弃用,请替换为TopicPayload! {}", method);
             return TopicMessageWrap.wrap(message);
         }
-        Payload payload = message.getPayload();
-        Object decodedPayload;
-        if (payload instanceof NativePayload) {
-            decodedPayload = ((NativePayload<?>) payload).getNativeObject();
-        } else {
-            if (decoder == null) {
-                decoder = Codecs.lookup(resolvableType);
+        try {
+            Payload payload = message.getPayload();
+            Object decodedPayload;
+            if (payload instanceof NativePayload) {
+                decodedPayload = ((NativePayload<?>) payload).getNativeObject();
+            } else {
+                if (decoder == null) {
+                    decoder = Codecs.lookup(resolvableType);
+                }
+                decodedPayload = decoder.decode(message);
             }
-            decodedPayload = decoder.decode(message);
-        }
-        if (paramType.isInstance(decodedPayload)) {
-            return decodedPayload;
+            if (paramType.isInstance(decodedPayload)) {
+                return decodedPayload;
+            }
+            return FastBeanCopier.DEFAULT_CONVERT.convert(decodedPayload, paramType, resolvableType.resolveGenerics());
+        } finally {
+            message.release();
         }
-
-        return FastBeanCopier.DEFAULT_CONVERT.convert(decodedPayload, paramType, resolvableType.resolveGenerics());
     }
 
     @Override
     public Mono<Void> onMessage(TopicPayload message) {
-        return Mono.defer(() -> {
-            Object val = proxy.apply(target, paramType == Void.class ? null : convert(message));
+        boolean paramVoid = paramType == Void.class;
+        try {
+            Object val = proxy.apply(target, paramVoid ? null : convert(message));
             if (val instanceof Publisher) {
                 return Mono.from((Publisher<?>) val).then();
             }
             return Mono.empty();
-        });
+        } finally {
+            if (paramVoid) {
+                message.release();
+            }
+        }
     }
 }

+ 2 - 0
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java

@@ -4,6 +4,7 @@ import org.jetlinks.core.device.DeviceProductOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.event.TopicPayload;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.DefaultConfigMetadata;
@@ -82,6 +83,7 @@ class DeviceMessageMeasurement extends StaticMeasurement {
             //通过订阅消息来统计实时数据量
             return eventBus
                 .subscribe(Subscription.of("real-time-device-message", "/device/**", Subscription.Feature.local, Subscription.Feature.broker))
+                .doOnNext(TopicPayload::release)
                 .window(parameter.getDuration("interval").orElse(Duration.ofSeconds(1)))
                 .flatMap(Flux::count)
                 .map(total -> SimpleMeasurementValue.of(total, System.currentTimeMillis()));

+ 1 - 2
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/message/NotificationsPublishProvider.java

@@ -1,6 +1,5 @@
 package org.jetlinks.community.notify.manager.message;
 
-import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
 import org.jetlinks.community.gateway.external.Message;
 import org.jetlinks.community.gateway.external.SubscribeRequest;
@@ -40,6 +39,6 @@ public class NotificationsPublishProvider implements SubscriptionProvider {
                 "/notifications/user/" + request.getAuthentication().getUser().getId() + "/*/*",
                 Subscription.Feature.local, Subscription.Feature.broker
             ))
-            .map(msg -> Message.success(request.getId(), msg.getTopic(), JSON.parseObject(msg.bodyToString())));
+            .map(msg -> Message.success(request.getId(), msg.getTopic(), msg.bodyToJson(true)));
     }
 }

+ 1 - 1
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java

@@ -59,7 +59,7 @@ public class DeviceAlarmProvider implements SubscriberProvider {
                 Subscription.Feature.local
             ))
             .map(msg -> {
-                JSONObject json = msg.bodyToJson();
+                JSONObject json = msg.bodyToJson(true);
 
                 return Notify.of(
                     String.format("设备[%s]发生告警:[%s]!", json.getString("deviceName"), json.getString("alarmName")),