Browse Source

优化消息订阅

zhou-hao 5 năm trước cách đây
mục cha
commit
4cf91380fd

+ 3 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/MessageGateway.java

@@ -74,6 +74,9 @@ public interface MessageGateway {
         return subscribe(Stream.of(topics).map(Subscription::new).collect(Collectors.toList()), false);
     }
 
+    Flux<TopicMessage> subscribe(Collection<Subscription> subscription, String id, boolean shareCluster);
+
+
     /**
      * 注册一个消息连接器,用于进行真实的消息收发
      *

+ 10 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/Subscription.java

@@ -2,6 +2,10 @@ package org.jetlinks.community.gateway;
 
 import lombok.*;
 
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 /**
  * 订阅信息.支持通配符**(匹配多层目录)和*(匹配单层目录).
  *
@@ -23,4 +27,10 @@ public class Subscription {
         this.topic = topic;
     }
 
+    public static Collection<Subscription> asList(String... sub) {
+        return Stream.of(sub)
+            .map(Subscription::new)
+            .collect(Collectors.toList());
+    }
+
 }

+ 2 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/annotation/Subscribe.java

@@ -23,6 +23,8 @@ public @interface Subscribe {
     @AliasFor("topics")
     String[] value() default {};
 
+    String id() default "";
+
     boolean shareCluster() default false;
 
 }

+ 6 - 2
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageConnector.java

@@ -12,6 +12,7 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.ClassUtils;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.ReflectionUtils;
+import org.springframework.util.StringUtils;
 import reactor.core.publisher.EmitterProcessor;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
@@ -55,9 +56,12 @@ public class SpringMessageConnector implements MessageConnector, BeanPostProcess
             if (CollectionUtils.isEmpty(subscribes)) {
                 return;
             }
+            String id = subscribes.getString("id");
+            if (!StringUtils.hasText(id)) {
+                id = type.getSimpleName().concat(".").concat(method.getName());
+            }
             SpringMessageConnection connection = new SpringMessageConnection(
-                type.getSimpleName().concat(".").concat(method.getName())
-                , Stream.of(subscribes.getStringArray("value")).map(Subscription::new).collect(Collectors.toList())
+                id, Stream.of(subscribes.getStringArray("value")).map(Subscription::new).collect(Collectors.toList())
                 , new ProxyMessageListener(bean, method),
                 subscribes.getBoolean("shareCluster")
             );

+ 6 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultMessageGateway.java

@@ -54,8 +54,13 @@ public class DefaultMessageGateway implements MessageGateway {
 
     @Override
     public Flux<TopicMessage> subscribe(Collection<Subscription> subscriptions, boolean shareCluster) {
+        return subscribe(subscriptions, "local:".concat(IDGenerator.SNOW_FLAKE_STRING.generate()), shareCluster);
+    }
+
+    @Override
+    public Flux<TopicMessage> subscribe(Collection<Subscription> subscriptions, String id, boolean shareCluster) {
         return Flux.defer(() -> {
-            LocalMessageConnection networkConnection = localGatewayConnector.addConnection("local:" + IDGenerator.SNOW_FLAKE_STRING.generate(), shareCluster);
+            LocalMessageConnection networkConnection = localGatewayConnector.addConnection(id, shareCluster);
             return networkConnection
                 .onLocalMessage()
                 .doOnSubscribe(sub -> subscriptions.forEach(networkConnection::addSubscription))

+ 15 - 10
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java

@@ -8,34 +8,39 @@ import org.jetlinks.community.device.message.DeviceMessageUtils;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
 import org.jetlinks.community.gateway.MessageGateway;
 import org.jetlinks.community.gateway.TopicMessage;
+import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.micrometer.MeterRegistryManager;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
 import org.jetlinks.core.message.DeviceMessage;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 import java.time.Duration;
 
 @Component
 public class DeviceMessageMeasurementProvider extends StaticMeasurementProvider {
 
+    MeterRegistry registry;
+
     public DeviceMessageMeasurementProvider(MessageGateway messageGateway,
                                             MeterRegistryManager registryManager,
                                             TimeSeriesManager timeSeriesManager) {
         super(DeviceDashboardDefinition.instance, DeviceObjectDefinition.message);
-        addMeasurement(new DeviceMessageMeasurement(messageGateway, timeSeriesManager));
-
-
-        MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(),
+        registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(),
             "target", "msgType", "productId");
 
-        //订阅设备消息,用于统计设备消息量
-        messageGateway.subscribe("/device/*/message/**")
-            .map(this::convertTags)
-            .subscribe(tags -> registry
-                .counter("message-count", tags)
-                .increment());
+        addMeasurement(new DeviceMessageMeasurement(messageGateway, timeSeriesManager));
+
+    }
 
+    @Subscribe("/device/*/message/**")
+    public Mono<Void> incrementMessage(TopicMessage message) {
+        return Mono.fromRunnable(() -> {
+            registry
+                .counter("message-count", convertTags(message))
+                .increment();
+        });
     }
 
     static final String[] empty = new String[0];

+ 36 - 28
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java

@@ -10,9 +10,11 @@ import org.jetlinks.community.device.service.LocalDeviceInstanceService;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
 import org.jetlinks.community.gateway.MessageGateway;
 import org.jetlinks.community.gateway.TopicMessage;
+import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.micrometer.MeterRegistryManager;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -22,6 +24,19 @@ import java.util.function.Function;
 @Component
 public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
 
+    private MeterRegistry registry;
+
+    Map<String, LongAdder> productCounts = new ConcurrentHashMap<>();
+
+    Function<String, LongAdder> counterAdder = productId ->
+        productCounts.computeIfAbsent(productId, __id -> {
+            LongAdder adder = new LongAdder();
+            Gauge.builder("online-count", adder, LongAdder::sum)
+                .tag("productId", __id)
+                .register(registry);
+            return adder;
+        });
+
     public DeviceStatusMeasurementProvider(MeterRegistryManager registryManager,
                                            LocalDeviceInstanceService instanceService,
                                            TimeSeriesManager timeSeriesManager,
@@ -32,37 +47,30 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
 
         addMeasurement(new DeviceStatusRecordMeasurement(instanceService, timeSeriesManager));
 
-        MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(),
+        registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(),
             "target", "msgType", "productId");
-        Map<String, LongAdder> productCounts = new ConcurrentHashMap<>();
+    }
 
-        Function<String, LongAdder> counterAdder = productId ->
-            productCounts.computeIfAbsent(productId, __id -> {
-                LongAdder adder = new LongAdder();
-                Gauge.builder("online-count", adder, LongAdder::sum)
-                    .tag("productId", __id)
-                    .register(registry);
-                return adder;
-            });
+    @Subscribe("/device/*/online")
+    public Mono<Void> incrementOnline(TopicMessage msg){
+        return Mono.fromRunnable(()->{
+            String productId = parseProductId(msg);
+            counterAdder.apply(productId).increment();
+            registry
+                .counter("online", "productId", productId)
+                .increment();
+        });
+    }
 
-        //上线
-        messageGateway.subscribe("/device/*/online")
-            .map(this::parseProductId)
-            .subscribe(productId -> {
-                counterAdder.apply(productId).increment();
-                registry
-                    .counter("online", "productId", productId)
-                    .increment();
-            });
-        //下线
-        messageGateway.subscribe("/device/*/offline")
-            .map(this::parseProductId)
-            .subscribe(productId -> {
-                counterAdder.apply(productId).decrement();
-                registry
-                    .counter("offline", "productId", productId)
-                    .increment();
-            });
+    @Subscribe("/device/*/offline")
+    public Mono<Void> incrementOffline(TopicMessage msg){
+        return Mono.fromRunnable(()->{
+            String productId = parseProductId(msg);
+            counterAdder.apply(productId).increment();
+            registry
+                .counter("offline", "productId", productId)
+                .increment();
+        });
     }
 
     private String parseProductId(TopicMessage msg) {

+ 2 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -19,6 +19,7 @@ import org.hswebframework.web.exception.NotFoundException;
 import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
 import org.jetlinks.community.device.message.DeviceMessageUtils;
+import org.jetlinks.community.gateway.Subscription;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
@@ -326,13 +327,12 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
 
         //订阅设备上下线
         FluxUtils.bufferRate(messageGateway
-            .subscribe("/device/*/online", "/device/*/offline")
+            .subscribe(Subscription.asList("/device/*/online", "/device/*/offline"), "device-state-synchronizer", false)
             .flatMap(message -> Mono.justOrEmpty(DeviceMessageUtils.convert(message))
                 .map(DeviceMessage::getDeviceId)), 800, 200, Duration.ofSeconds(2))
             .flatMap(list -> syncStateBatch(Flux.just(list), false).count())
             .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
             .subscribe((i) -> log.info("同步设备状态成功:{}", i));
-
     }
 
     public Mono<DeviceInfo> getDeviceInfoById(String id) {