Browse Source

重构事件总线

zhou-hao 4 years ago
parent
commit
6a5d23e9cb
46 changed files with 517 additions and 659 deletions
  1. 2 1
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/MessageGateway.java
  2. 2 1
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/annotation/Subscribe.java
  3. 2 2
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/dashboard/DashBoardSubscriptionProvider.java
  4. 2 2
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/MessageListener.java
  5. 40 21
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java
  6. 55 0
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageBroker.java
  7. 0 73
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageConnection.java
  8. 0 73
      jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageConnector.java
  9. 7 7
      jetlinks-components/logging-component/src/main/java/org/jetlinks/community/logging/event/handler/SystemLoggerEventHandler.java
  10. 2 2
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java
  11. 2 2
      jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/MqttRuleDataCodec.java
  12. 5 6
      jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/DefaultNotifierManager.java
  13. 6 5
      jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/NotifierEventDispatcher.java
  14. 0 18
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineConfiguration.java
  15. 3 3
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/entity/RuleEngineExecuteEventInfo.java
  16. 2 2
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/event/handler/RuleLogHandler.java
  17. 15 16
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java
  18. 11 7
      jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/messaging/RuleEngineSubscriptionProvider.java
  19. 20 20
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java
  20. 17 12
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java
  21. 17 17
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventMeasurement.java
  22. 25 22
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventsMeasurement.java
  23. 37 33
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java
  24. 26 22
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java
  25. 14 15
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java
  26. 5 11
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java
  27. 21 20
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java
  28. 13 13
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java
  29. 2 2
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java
  30. 2 2
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceBatchOperationSubscriptionProvider.java
  31. 27 67
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java
  32. 3 3
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java
  33. 2 2
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendSubscriptionProvider.java
  34. 12 11
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSubscriptionProvider.java
  35. 0 17
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageUtils.java
  36. 2 6
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java
  37. 29 35
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java
  38. 2 21
      jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceMessageController.java
  39. 2 2
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/MqttClientDebugSubscriptionProvider.java
  40. 2 2
      jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/MqttServerDebugSubscriptionProvider.java
  41. 11 7
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/message/NotificationsPublishProvider.java
  42. 16 14
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifySubscriberService.java
  43. 2 1
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/SubscriberProvider.java
  44. 13 12
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java
  45. 23 19
      jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java
  46. 16 10
      jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java

+ 2 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/MessageGateway.java

@@ -13,8 +13,9 @@ import java.util.stream.Stream;
  *
  * @author zhouhao
  * @see 1.0
- * @see MessageConnector
+ * @see org.jetlinks.core.event.EventBus
  */
+@Deprecated
 public interface MessageGateway {
 
     /**

+ 2 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/annotation/Subscribe.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.gateway.annotation;
 
+import org.jetlinks.core.event.Subscription;
 import org.springframework.core.annotation.AliasFor;
 
 import java.lang.annotation.*;
@@ -25,6 +26,6 @@ public @interface Subscribe {
 
     String id() default "";
 
-    boolean shareCluster() default false;
+    Subscription.Feature[] features() default Subscription.Feature.local;
 
 }

+ 2 - 2
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/dashboard/DashBoardSubscriptionProvider.java

@@ -5,7 +5,7 @@ import org.jetlinks.community.dashboard.MeasurementParameter;
 import org.jetlinks.community.gateway.external.Message;
 import org.jetlinks.community.gateway.external.SubscribeRequest;
 import org.jetlinks.community.gateway.external.SubscriptionProvider;
-import org.jetlinks.supports.utils.MqttTopicUtils;
+import org.jetlinks.core.utils.TopicUtils;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 
@@ -39,7 +39,7 @@ public class DashBoardSubscriptionProvider implements SubscriptionProvider {
     public Flux<Message> subscribe(SubscribeRequest request) {
         return Flux.defer(() -> {
             try {
-                Map<String, String> variables = MqttTopicUtils.getPathVariables(
+                Map<String, String> variables = TopicUtils.getPathVariables(
                     "/dashboard/{dashboard}/{object}/{measurement}/{dimension}", request.getTopic());
                 return dashboardManager.getDashboard(variables.get("dashboard"))
                     .flatMap(dashboard -> dashboard.getObject(variables.get("object")))

+ 2 - 2
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/MessageListener.java

@@ -1,10 +1,10 @@
 package org.jetlinks.community.gateway.spring;
 
-import org.jetlinks.community.gateway.TopicMessage;
+import org.jetlinks.core.event.TopicPayload;
 import reactor.core.publisher.Mono;
 
 public interface MessageListener {
 
-   Mono<Void> onMessage(TopicMessage message);
+    Mono<Void> onMessage(TopicPayload message);
 
 }

+ 40 - 21
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java

@@ -1,28 +1,40 @@
 package org.jetlinks.community.gateway.spring;
 
-import io.netty.buffer.ByteBuf;
+import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.proxy.Proxy;
-import org.jetlinks.core.message.codec.EncodedMessage;
-import org.jetlinks.community.gateway.EncodableMessage;
 import org.jetlinks.community.gateway.TopicMessage;
+import org.jetlinks.community.gateway.TopicMessageWrap;
+import org.jetlinks.core.NativePayload;
+import org.jetlinks.core.Payload;
+import org.jetlinks.core.codec.Codecs;
+import org.jetlinks.core.codec.Decoder;
+import org.jetlinks.core.event.TopicPayload;
 import org.reactivestreams.Publisher;
+import org.springframework.core.ResolvableType;
 import org.springframework.util.ClassUtils;
 import reactor.core.publisher.Mono;
 
 import java.lang.reflect.Method;
 import java.util.StringJoiner;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 
+@Slf4j
 class ProxyMessageListener implements MessageListener {
     private final Class<?> paramType;
     private final Object target;
+    private final ResolvableType resolvableType;
 
-    BiFunction<Object, Object, Object> proxy;
+    private final Method method;
+    private final BiFunction<Object, Object, Object> proxy;
+
+    private final AtomicReference<Decoder<?>> decoder = new AtomicReference<>();
 
     @SuppressWarnings("all")
     ProxyMessageListener(Object target, Method method) {
         this.target = target;
+        this.method = method;
         Class<?>[] parameterTypes = method.getParameterTypes();
         if (parameterTypes.length > 1) {
             throw new UnsupportedOperationException("unsupported method [" + method + "] parameter");
@@ -55,38 +67,45 @@ class ProxyMessageListener implements MessageListener {
         }
 
         code.add("}");
-
+        this.resolvableType = ResolvableType.forMethodParameter(method, 0, targetType);
         this.proxy = Proxy.create(BiFunction.class)
             .addMethod(code.toString())
             .newInstance();
+
     }
 
-    Object convert(TopicMessage message) {
-        if (paramType.isAssignableFrom(TopicMessage.class)) {
+    Object convert(TopicPayload message) {
+
+        if (Payload.class.isAssignableFrom(paramType)) {
             return message;
         }
-        if (paramType.isAssignableFrom(EncodedMessage.class)) {
-            return message.getMessage();
+        if (paramType.equals(TopicMessage.class)) {
+            log.warn("TopicMessage已弃用,请替换为TopicPayload! {}", method);
+            return TopicMessageWrap.wrap(message);
         }
-        if (paramType.isAssignableFrom(ByteBuf.class)) {
-            return message.getMessage().getPayload();
-        }
-
-        Object payload = message.convertMessage();
-        if (paramType.isInstance(payload)) {
-            return payload;
+        Payload payload = message.getPayload();
+        Object decodedPayload;
+        if (payload instanceof NativePayload) {
+            decodedPayload = ((NativePayload<?>) payload).getNativeObject();
+        } else {
+            decodedPayload = decoder.getAndUpdate((old) -> {
+                if (old == null) {
+                    return Codecs.lookup(resolvableType);
+                }
+                return old;
+            }).decode(message);
         }
-        if (payload instanceof byte[]) {
-            return payload;
+        if (paramType.isInstance(decodedPayload)) {
+            return decodedPayload;
         }
-        return FastBeanCopier.DEFAULT_CONVERT.convert(payload, paramType, new Class[]{});
 
+        return FastBeanCopier.DEFAULT_CONVERT.convert(decodedPayload, paramType, resolvableType.resolveGenerics());
     }
 
     @Override
-    public Mono<Void> onMessage(TopicMessage message) {
+    public Mono<Void> onMessage(TopicPayload message) {
         return Mono.defer(() -> {
-            Object val = proxy.apply(target, convert(message));
+            Object val = proxy.apply(target, paramType == Void.class ? null : convert(message));
             if (val instanceof Publisher) {
                 return Mono.from((Publisher<?>) val).then();
             }

+ 55 - 0
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageBroker.java

@@ -0,0 +1,55 @@
+package org.jetlinks.community.gateway.spring;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.core.annotation.AnnotatedElementUtils;
+import org.springframework.core.annotation.AnnotationAttributes;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ClassUtils;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.ReflectionUtils;
+import org.springframework.util.StringUtils;
+
+@Component
+@Slf4j
+@AllArgsConstructor
+public class SpringMessageBroker implements BeanPostProcessor {
+
+    private final EventBus eventBus;
+
+    @Override
+    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+        Class<?> type = ClassUtils.getUserClass(bean);
+        ReflectionUtils.doWithMethods(type, method -> {
+            AnnotationAttributes subscribes = AnnotatedElementUtils.getMergedAnnotationAttributes(method, Subscribe.class);
+            if (CollectionUtils.isEmpty(subscribes)) {
+                return;
+            }
+            String id = subscribes.getString("id");
+            if (!StringUtils.hasText(id)) {
+                id = type.getSimpleName().concat(".").concat(method.getName());
+            }
+            Subscription subscription = Subscription.of(
+                "spring:" + id,
+                subscribes.getStringArray("value"),
+                (Subscription.Feature[]) subscribes.get("features"));
+
+            ProxyMessageListener listener = new ProxyMessageListener(bean, method);
+
+            eventBus
+                .subscribe(subscription)
+                .flatMap(listener::onMessage)
+                .onErrorContinue((err, v) -> log.error(err.getMessage(), err))
+                .subscribe();
+
+        });
+
+        return bean;
+    }
+
+}

+ 0 - 73
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageConnection.java

@@ -1,73 +0,0 @@
-package org.jetlinks.community.gateway.spring;
-
-import lombok.Getter;
-import org.jetlinks.community.gateway.MessageConnection;
-import org.jetlinks.community.gateway.MessageSubscriber;
-import org.jetlinks.community.gateway.Subscription;
-import org.jetlinks.community.gateway.TopicMessage;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import javax.annotation.Nonnull;
-import java.util.List;
-
-public class SpringMessageConnection implements MessageConnection,
-    MessageSubscriber {
-
-    @Getter
-    private String id;
-
-    private MessageListener listener;
-
-    private List<Subscription> subscription;
-
-    private boolean shareCluster;
-
-    public SpringMessageConnection(String id,
-                                   List<Subscription> subscription,
-                                   MessageListener listener,
-                                   boolean shareCluster) {
-        this.listener = listener;
-        this.id = id;
-        this.subscription = subscription;
-        this.shareCluster = shareCluster;
-    }
-
-    @Override
-    public void onDisconnect(Runnable disconnectListener) {
-
-    }
-
-    @Override
-    public void disconnect() {
-
-    }
-
-    @Override
-    public boolean isAlive() {
-        return true;
-    }
-
-    @Nonnull
-    @Override
-    public Mono<Void> publish(@Nonnull TopicMessage message) {
-        return listener.onMessage(message);
-    }
-
-    @Nonnull
-    @Override
-    public Flux<Subscription> onSubscribe() {
-        return Flux.fromIterable(subscription);
-    }
-
-    @Nonnull
-    @Override
-    public Flux<Subscription> onUnSubscribe() {
-        return Flux.empty();
-    }
-
-    @Override
-    public boolean isShareCluster() {
-        return shareCluster;
-    }
-}

+ 0 - 73
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageConnector.java

@@ -1,73 +0,0 @@
-package org.jetlinks.community.gateway.spring;
-
-import org.jetlinks.community.gateway.MessageConnection;
-import org.jetlinks.community.gateway.MessageConnector;
-import org.jetlinks.community.gateway.Subscription;
-import org.jetlinks.community.gateway.annotation.Subscribe;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.config.BeanPostProcessor;
-import org.springframework.core.annotation.AnnotatedElementUtils;
-import org.springframework.core.annotation.AnnotationAttributes;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ClassUtils;
-import org.springframework.util.CollectionUtils;
-import org.springframework.util.ReflectionUtils;
-import org.springframework.util.StringUtils;
-import reactor.core.publisher.EmitterProcessor;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-@Component
-public class SpringMessageConnector implements MessageConnector, BeanPostProcessor {
-
-    private EmitterProcessor<MessageConnection> processor = EmitterProcessor.create(false);
-
-    FluxSink<MessageConnection> connectionSink = processor.sink();
-
-    @Nonnull
-    @Override
-    public String getId() {
-        return "spring";
-    }
-
-    @Nullable
-    @Override
-    public String getName() {
-        return "Spring Message Connector";
-    }
-
-    @Nonnull
-    @Override
-    public Flux<MessageConnection> onConnection() {
-        return processor.map(Function.identity());
-    }
-
-    @Override
-    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
-        Class<?> type = ClassUtils.getUserClass(bean);
-        ReflectionUtils.doWithMethods(type, method -> {
-            AnnotationAttributes subscribes = AnnotatedElementUtils.getMergedAnnotationAttributes(method, Subscribe.class);
-            if (CollectionUtils.isEmpty(subscribes)) {
-                return;
-            }
-            String id = subscribes.getString("id");
-            if (!StringUtils.hasText(id)) {
-                id = type.getSimpleName().concat(".").concat(method.getName());
-            }
-            SpringMessageConnection connection = new SpringMessageConnection(
-                id, Stream.of(subscribes.getStringArray("value")).map(Subscription::new).collect(Collectors.toList())
-                , new ProxyMessageListener(bean, method),
-                subscribes.getBoolean("shareCluster")
-            );
-            connectionSink.next(connection);
-        });
-
-        return bean;
-    }
-}

+ 7 - 7
jetlinks-components/logging-component/src/main/java/org/jetlinks/community/logging/event/handler/SystemLoggerEventHandler.java

@@ -1,7 +1,7 @@
 package org.jetlinks.community.logging.event.handler;
 
 import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.metadata.types.DateTimeType;
 import org.jetlinks.core.metadata.types.ObjectType;
 import org.jetlinks.core.metadata.types.StringType;
@@ -23,15 +23,15 @@ import reactor.core.publisher.Mono;
 @Order(5)
 public class SystemLoggerEventHandler {
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
 
     private final ElasticSearchService elasticSearchService;
 
     public SystemLoggerEventHandler(ElasticSearchService elasticSearchService,
                                     ElasticSearchIndexManager indexManager,
-                                    MessageGateway messageGateway) {
+                                    EventBus eventBus) {
         this.elasticSearchService = elasticSearchService;
-        this.messageGateway = messageGateway;
+        this.eventBus = eventBus;
 
         indexManager.putIndex(
             new DefaultElasticSearchIndexMetadata(LoggerIndexProvider.SYSTEM.getIndex())
@@ -53,10 +53,10 @@ public class SystemLoggerEventHandler {
 
     @EventListener
     public void acceptAccessLoggerInfo(SerializableSystemLog info) {
-        elasticSearchService.commit(LoggerIndexProvider.SYSTEM, Mono.just(info))
+        eventBus
+            .publish("/logging/system/" + info.getName().replace(".", "/") + "/" + (info.getLevel().toLowerCase()), info)
             .subscribe();
-        messageGateway
-            .publish("/logging/system/" + info.getName().replace(".", "/") + "/" + (info.getLevel().toLowerCase()), info, true)
+        elasticSearchService.commit(LoggerIndexProvider.SYSTEM, Mono.just(info))
             .subscribe();
     }
 

+ 2 - 2
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java

@@ -8,7 +8,7 @@ import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.SimpleMqttMessage;
 import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkType;
-import org.jetlinks.supports.utils.MqttTopicUtils;
+import org.jetlinks.core.utils.TopicUtils;
 import reactor.core.publisher.*;
 
 import java.io.IOException;
@@ -101,7 +101,7 @@ public class VertxMqttClient implements MqttClient {
             return messageProcessor
                 .filter(msg -> topics
                     .stream()
-                    .anyMatch(topic -> MqttTopicUtils.match(topic, msg.getTopic())));
+                    .anyMatch(topic -> TopicUtils.match(topic, msg.getTopic())));
         }).doOnCancel(() -> {
             if (!canceled.getAndSet(true)) {
                 for (String topic : topics) {

+ 2 - 2
jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/executor/MqttRuleDataCodec.java

@@ -5,11 +5,11 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.jetlinks.core.message.codec.MessagePayloadType;
 import org.jetlinks.core.message.codec.MqttMessage;
 import org.jetlinks.core.message.codec.SimpleMqttMessage;
+import org.jetlinks.core.utils.TopicUtils;
 import org.jetlinks.rule.engine.api.RuleData;
 import org.jetlinks.rule.engine.api.RuleDataCodec;
 import org.jetlinks.rule.engine.api.RuleDataCodecs;
 import org.jetlinks.rule.engine.executor.PayloadType;
-import org.jetlinks.supports.utils.MqttTopicUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -45,7 +45,7 @@ public class MqttRuleDataCodec implements RuleDataCodec<MqttMessage> {
             .map(TopicVariables::getVariables)
             .filter(CollectionUtils::isNotEmpty)
             .flatMap(list -> list.stream()
-                .map(str -> MqttTopicUtils.getPathVariables(str, message.getTopic()))
+                .map(str -> TopicUtils.getPathVariables(str, message.getTopic()))
                 .reduce((m1, m2) -> {
                     m1.putAll(m2);
                     return m1;

+ 5 - 6
jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/DefaultNotifierManager.java

@@ -1,7 +1,7 @@
 package org.jetlinks.community.notify;
 
 import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.core.event.EventBus;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.stereotype.Component;
@@ -22,11 +22,11 @@ public class DefaultNotifierManager implements NotifierManager, BeanPostProcesso
 
     private NotifyConfigManager configManager;
 
-    private MessageGateway messageGateway;
+    private EventBus eventBus;
 
-    public DefaultNotifierManager(NotifyConfigManager manager, MessageGateway messageGateway) {
+    public DefaultNotifierManager(NotifyConfigManager manager, EventBus eventBus) {
         this.configManager = manager;
-        this.messageGateway = messageGateway;
+        this.eventBus = eventBus;
     }
 
     protected Mono<NotifierProperties> getProperties(NotifyType notifyType,
@@ -35,7 +35,6 @@ public class DefaultNotifierManager implements NotifierManager, BeanPostProcesso
     }
 
     public Mono<Void> reload(String id) {
-        // TODO: 2019/12/20 集群支持
         return Mono.justOrEmpty(notifiers.remove(id))
             .flatMap(Notifier::close);
     }
@@ -47,7 +46,7 @@ public class DefaultNotifierManager implements NotifierManager, BeanPostProcesso
             .switchIfEmpty(Mono.error(new UnsupportedOperationException("不支持的服务商:" + properties.getProvider())))
             .flatMap(notifierProvider -> notifierProvider.createNotifier(properties))
             //转成代理,把通知事件发送到消息网关中.
-            .map(notifier -> new NotifierEventDispatcher<>(messageGateway, notifier))
+            .map(notifier -> new NotifierEventDispatcher<>(eventBus, notifier))
             .flatMap(notifier -> Mono.justOrEmpty(notifiers.put(properties.getId(), notifier))
                 .flatMap(Notifier::close)//如果存在旧的通知器则关掉之
                 .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))//忽略异常

+ 6 - 5
jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/NotifierEventDispatcher.java

@@ -1,24 +1,25 @@
 package org.jetlinks.community.notify;
 
-import org.jetlinks.community.gateway.MessageGateway;
 import org.jetlinks.community.notify.event.NotifierEvent;
 import org.jetlinks.community.notify.template.Template;
+import org.jetlinks.core.event.EventBus;
 import reactor.core.publisher.Mono;
 
 public class NotifierEventDispatcher<T extends Template> extends NotifierProxy<T> {
 
-    private final MessageGateway gateway;
+    private final EventBus eventBus;
 
-    public NotifierEventDispatcher(MessageGateway gateway, Notifier<T> target) {
+    public NotifierEventDispatcher(EventBus eventBus, Notifier<T> target) {
         super(target);
-        this.gateway = gateway;
+        this.eventBus = eventBus;
     }
 
     @Override
     protected Mono<Void> onEvent(NotifierEvent event) {
         // /notify/{notifierId}/success
 
-        return gateway
+
+        return eventBus
             .publish(String.join("/", "/notify", event.getNotifierId(), event.isSuccess() ? "success" : "error"), event.toSerializable())
             .then();
     }

+ 0 - 18
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineConfiguration.java

@@ -1,10 +1,7 @@
 package org.jetlinks.community.rule.engine.configuration;
 
 import lombok.extern.slf4j.Slf4j;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.TopicMessageWrap;
 import org.jetlinks.core.event.EventBus;
-import org.jetlinks.core.event.Subscription;
 import org.jetlinks.rule.engine.api.RuleEngine;
 import org.jetlinks.rule.engine.api.scheduler.Scheduler;
 import org.jetlinks.rule.engine.api.task.ConditionEvaluator;
@@ -21,7 +18,6 @@ import org.jetlinks.rule.engine.defaults.LocalWorker;
 import org.jetlinks.rule.engine.model.DefaultRuleModelParser;
 import org.jetlinks.rule.engine.model.RuleModelParserStrategy;
 import org.jetlinks.rule.engine.model.antv.AntVG6RuleModelParserStrategy;
-import org.jetlinks.supports.event.BrokerEventBus;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.context.annotation.Bean;
@@ -46,20 +42,6 @@ public class RuleEngineConfiguration {
         return new AntVG6RuleModelParserStrategy();
     }
 
-    @Bean
-    public EventBus eventBus(MessageGateway messageGateway) {
-
-        BrokerEventBus local = new BrokerEventBus();
-
-        //转发到消息网关
-        local.subscribe(Subscription.of("msg.gateway",new String[]{"/**"}, Subscription.Feature.local))
-            .flatMap(subscribePayload -> messageGateway.publish(TopicMessageWrap.wrap(subscribePayload)).then())
-            .onErrorContinue((err, obj) -> log.error(err.getMessage(), obj))
-            .subscribe();
-
-        return local;
-    }
-
     @Bean
     public Scheduler localScheduler(Worker worker) {
         LocalScheduler scheduler = new LocalScheduler("local");

+ 3 - 3
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/entity/RuleEngineExecuteEventInfo.java

@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSONObject;
 import lombok.Getter;
 import lombok.Setter;
 import org.hswebframework.web.bean.FastBeanCopier;
-import org.jetlinks.community.gateway.TopicMessage;
+import org.jetlinks.core.event.TopicPayload;
 
 import java.util.Map;
 
@@ -29,10 +29,10 @@ public class RuleEngineExecuteEventInfo {
 
     private String contextId;
 
-    public static RuleEngineExecuteEventInfo of(TopicMessage message) {
+    public static RuleEngineExecuteEventInfo of(TopicPayload message) {
         Map<String, String> vars = message.getTopicVars("/rule-engine/{instanceId}/{nodeId}/event/{event}");
         RuleEngineExecuteEventInfo info = FastBeanCopier.copy(vars, new RuleEngineExecuteEventInfo());
-        JSONObject json = message.getMessage().payloadAsJson();
+        JSONObject json = message.bodyToJson();
         info.id=json.getString("id");
         info.contextId=json.getString("contextId");
         info.setRuleData(json.toJSONString());

+ 2 - 2
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/event/handler/RuleLogHandler.java

@@ -2,9 +2,9 @@ package org.jetlinks.community.rule.engine.event.handler;
 
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.community.elastic.search.service.ElasticSearchService;
-import org.jetlinks.community.gateway.TopicMessage;
 import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
+import org.jetlinks.core.event.TopicPayload;
 import org.jetlinks.rule.engine.defaults.LogEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.annotation.Order;
@@ -20,7 +20,7 @@ public class RuleLogHandler {
     private ElasticSearchService elasticSearchService;
 
     @Subscribe("/rule-engine/*/*/event/*")
-    public Mono<Void> handleEvent(TopicMessage event) {
+    public Mono<Void> handleEvent(TopicPayload event) {
 
         return elasticSearchService.commit(RuleEngineLoggerIndexProvider.RULE_EVENT_LOG, RuleEngineExecuteEventInfo.of(event));
     }

+ 15 - 16
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java

@@ -1,15 +1,13 @@
 package org.jetlinks.community.rule.engine.executor;
 
-import com.alibaba.fastjson.JSONObject;
 import lombok.AllArgsConstructor;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
-import org.jetlinks.community.gateway.TopicMessage;
+import org.jetlinks.core.codec.defaults.JsonCodec;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.jetlinks.reactor.ql.ReactorQL;
 import org.jetlinks.rule.engine.api.RuleConstants;
 import org.jetlinks.rule.engine.api.RuleData;
 import org.jetlinks.rule.engine.api.RuleDataHelper;
-import org.jetlinks.rule.engine.api.model.RuleNodeModel;
 import org.jetlinks.rule.engine.api.task.ExecutionContext;
 import org.jetlinks.rule.engine.api.task.TaskExecutor;
 import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
@@ -17,12 +15,9 @@ import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 import reactor.core.Disposable;
-import reactor.core.Disposables;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
@@ -30,7 +25,7 @@ import java.util.Optional;
 @AllArgsConstructor
 public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
 
     @Override
     public String getExecutor() {
@@ -58,7 +53,6 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
 
         @Override
         protected Disposable doStart() {
-            Disposable.Composite composite = Disposables.composite();
             Flux<Map<String, Object>> dataStream;
             //有上游节点
             if (!CollectionUtils.isEmpty(context.getJob().getInputs())) {
@@ -80,12 +74,17 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
                         }
                         if (table.startsWith("/")) {
                             //转换为消息
-                            return messageGateway
-                                .subscribe(
-                                    Collections.singleton(new Subscription(table)),
-                                    "rule-engine:".concat(context.getInstanceId()),
-                                    false)
-                                .map(TopicMessage::convertMessage);
+                            return eventBus
+                                .subscribe(org.jetlinks.core.event.Subscription.of(
+                                    "rule-engine:"
+                                        .concat(context.getInstanceId())
+                                        .concat(":")
+                                        .concat(context.getJob().getNodeId()),
+                                    table,
+                                    Subscription.Feature.local
+                                    ),
+                                    JsonCodec.of(Map.class)
+                                );
                         }
                         return Flux.just(1);
                     });

+ 11 - 7
jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/messaging/RuleEngineSubscriptionProvider.java

@@ -1,10 +1,10 @@
 package org.jetlinks.community.rule.engine.messaging;
 
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
 import org.jetlinks.community.gateway.external.Message;
 import org.jetlinks.community.gateway.external.SubscribeRequest;
 import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 
@@ -12,10 +12,10 @@ import reactor.core.publisher.Flux;
 @Component
 public class RuleEngineSubscriptionProvider implements SubscriptionProvider {
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
 
-    public RuleEngineSubscriptionProvider(MessageGateway messageGateway) {
-        this.messageGateway = messageGateway;
+    public RuleEngineSubscriptionProvider(EventBus eventBus) {
+        this.eventBus = eventBus;
     }
 
     @Override
@@ -35,8 +35,12 @@ public class RuleEngineSubscriptionProvider implements SubscriptionProvider {
 
     @Override
     public Flux<Message> subscribe(SubscribeRequest request) {
+        String subscriber=request.getId();
 
-        return messageGateway.subscribe(Subscription.asList(request.getTopic()),messageGateway.nextSubscriberId( "rule:sub:" + request.getId()), true)
-            .map(msg -> Message.success(request.getId(), msg.getTopic(), msg.convertMessage()));
+        org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription.of(subscriber,request.getTopic(), org.jetlinks.core.event.Subscription.Feature.local, Subscription.Feature.broker);
+
+        return eventBus
+            .subscribe(subscription)
+            .map(msg -> Message.success(request.getId(), msg.getTopic(), msg.decode()));
     }
 }

+ 20 - 20
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java

@@ -1,43 +1,43 @@
 package org.jetlinks.community.device.measurements;
 
-import org.jetlinks.core.device.DeviceProductOperator;
-import org.jetlinks.core.metadata.DeviceMetadata;
 import org.jetlinks.community.dashboard.DashboardObject;
 import org.jetlinks.community.dashboard.Measurement;
 import org.jetlinks.community.dashboard.ObjectDefinition;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
-import org.jetlinks.community.gateway.MessageGateway;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
+import org.jetlinks.core.device.DeviceProductOperator;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.metadata.DeviceMetadata;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class DeviceDashboardObject implements DashboardObject {
-    private String id;
+    private final String id;
 
-    private String name;
+    private final String name;
 
-    private DeviceProductOperator productOperator;
+    private final DeviceProductOperator productOperator;
 
-    private MessageGateway messageGateway;
+    private final EventBus eventBus;
 
-    private TimeSeriesManager timeSeriesManager;
+    private final TimeSeriesManager timeSeriesManager;
 
     private DeviceDashboardObject(String id, String name,
                                   DeviceProductOperator productOperator,
-                                  MessageGateway messageGateway,
+                                  EventBus eventBus,
                                   TimeSeriesManager timeSeriesManager) {
         this.id = id;
         this.name = name;
         this.productOperator = productOperator;
-        this.messageGateway = messageGateway;
+        this.eventBus = eventBus;
         this.timeSeriesManager = timeSeriesManager;
     }
 
     public static DeviceDashboardObject of(String id, String name,
                                            DeviceProductOperator productOperator,
-                                           MessageGateway messageGateway,
+                                           EventBus eventBus,
                                            TimeSeriesManager timeSeriesManager) {
-        return new DeviceDashboardObject(id, name, productOperator, messageGateway, timeSeriesManager);
+        return new DeviceDashboardObject(id, name, productOperator, eventBus, timeSeriesManager);
     }
 
     @Override
@@ -61,17 +61,17 @@ public class DeviceDashboardObject implements DashboardObject {
 
             productOperator.getMetadata()
                 .flatMapIterable(DeviceMetadata::getEvents)
-                .map(event -> new DeviceEventMeasurement(productOperator.getId(), messageGateway, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(id, event.getId())))),
+                .map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(id, event.getId())))),
 
             productOperator.getMetadata()
-                .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(),messageGateway, metadata, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(id)))),
+                .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(),eventBus, metadata, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(id)))),
 
             productOperator.getMetadata()
-                .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), messageGateway, metadata, timeSeriesManager)),
+                .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, timeSeriesManager)),
 
             productOperator.getMetadata()
                 .flatMapIterable(DeviceMetadata::getProperties)
-                .map(event -> new DevicePropertyMeasurement(productOperator.getId(),messageGateway, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(id))))
+                .map(event -> new DevicePropertyMeasurement(productOperator.getId(),eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(id))))
         );
     }
 
@@ -79,18 +79,18 @@ public class DeviceDashboardObject implements DashboardObject {
     public Mono<Measurement> getMeasurement(String id) {
         if ("properties".equals(id)) {
             return productOperator.getMetadata()
-                .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(),messageGateway, metadata, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(this.id))));
+                .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(),eventBus, metadata, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(this.id))));
         }
         if ("events".equals(id)) {
             return productOperator.getMetadata()
-                .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), messageGateway, metadata, timeSeriesManager));
+                .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, timeSeriesManager));
         }
         return productOperator.getMetadata()
             .flatMap(metadata -> Mono.justOrEmpty(metadata.getEvent(id)))
-            .<Measurement>map(event -> new DeviceEventMeasurement(productOperator.getId(),messageGateway, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(this.id, event.getId()))))
+            .<Measurement>map(event -> new DeviceEventMeasurement(productOperator.getId(),eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(this.id, event.getId()))))
             //事件没获取到则尝试获取属性
             .switchIfEmpty(productOperator.getMetadata()
                 .flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id)))
-                .map(event -> new DevicePropertyMeasurement(productOperator.getId(),messageGateway, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(this.id)))));
+                .map(event -> new DevicePropertyMeasurement(productOperator.getId(),eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(this.id)))));
     }
 }

+ 17 - 12
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java

@@ -1,12 +1,11 @@
 package org.jetlinks.community.device.measurements;
 
-import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.community.dashboard.DashboardObject;
 import org.jetlinks.community.device.entity.DeviceProductEntity;
 import org.jetlinks.community.device.service.LocalDeviceProductService;
-import org.jetlinks.community.gateway.MessageGateway;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -16,17 +15,23 @@ import javax.annotation.PostConstruct;
 @Component
 public class DeviceDynamicDashboard implements DeviceDashboard {
 
-    @Autowired
-    private LocalDeviceProductService productService;
+    private final LocalDeviceProductService productService;
 
-    @Autowired
-    private DeviceRegistry registry;
+    private final DeviceRegistry registry;
 
-    @Autowired
-    private MessageGateway messageGateway;
+    private final EventBus eventBus;
 
-    @Autowired
-    private TimeSeriesManager timeSeriesManager;
+    private final TimeSeriesManager timeSeriesManager;
+
+    public DeviceDynamicDashboard(LocalDeviceProductService productService,
+                                  DeviceRegistry registry,
+                                  EventBus eventBus,
+                                  TimeSeriesManager timeSeriesManager) {
+        this.productService = productService;
+        this.registry = registry;
+        this.eventBus = eventBus;
+        this.timeSeriesManager = timeSeriesManager;
+    }
 
     @PostConstruct
     public void init() {
@@ -48,6 +53,6 @@ public class DeviceDynamicDashboard implements DeviceDashboard {
 
     protected Mono<DeviceDashboardObject> convertObject(DeviceProductEntity product) {
         return registry.getProduct(product.getId())
-            .map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, messageGateway, timeSeriesManager));
+            .map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, timeSeriesManager));
     }
 }

+ 17 - 17
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventMeasurement.java

@@ -1,6 +1,12 @@
 package org.jetlinks.community.device.measurements;
 
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
+import org.jetlinks.community.dashboard.*;
+import org.jetlinks.community.dashboard.supports.StaticMeasurement;
+import org.jetlinks.community.timeseries.TimeSeriesService;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.event.EventMessage;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.jetlinks.core.metadata.DataType;
@@ -8,34 +14,26 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata;
 import org.jetlinks.core.metadata.EventMetadata;
 import org.jetlinks.core.metadata.types.IntType;
 import org.jetlinks.core.metadata.types.StringType;
-import org.jetlinks.community.dashboard.*;
-import org.jetlinks.community.dashboard.supports.StaticMeasurement;
-import org.jetlinks.community.device.message.DeviceMessageUtils;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
-import org.jetlinks.community.timeseries.TimeSeriesService;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.util.Collections;
-
 class DeviceEventMeasurement extends StaticMeasurement {
 
     public EventMetadata eventMetadata;
 
-    public MessageGateway messageGateway;
+    public EventBus eventBus;
 
-    private TimeSeriesService eventTsService;
+    private final TimeSeriesService eventTsService;
 
-    private String productId;
+    private final String productId;
 
     public DeviceEventMeasurement(String productId,
-                                  MessageGateway messageGateway,
+                                  EventBus eventBus,
                                   EventMetadata eventMetadata,
                                   TimeSeriesService eventTsService) {
         super(MetadataMeasurementDefinition.of(eventMetadata));
-        this.productId=productId;
-        this.messageGateway = messageGateway;
+        this.productId = productId;
+        this.eventBus = eventBus;
         this.eventMetadata = eventMetadata;
         this.eventTsService = eventTsService;
         addDimension(new RealTimeDeviceEventDimension());
@@ -57,9 +55,11 @@ class DeviceEventMeasurement extends StaticMeasurement {
 
 
     Flux<MeasurementValue> fromRealTime(String deviceId) {
-        return messageGateway
-            .subscribe(Collections.singletonList(new Subscription("/device/"+productId+"/" + deviceId + "/message/event/" + eventMetadata.getId())), true)
-            .flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val)))
+        org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription
+            .of("deviceEventMeasurement", "/device/" + productId + "/" + deviceId + "/message/event/" + eventMetadata.getId(), Subscription.Feature.local);
+
+        return eventBus
+            .subscribe(subscription, DeviceMessage.class)
             .cast(EventMessage.class)
             .map(msg -> SimpleMeasurementValue.of(msg.getData(), msg.getTimestamp()));
     }

+ 25 - 22
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventsMeasurement.java

@@ -1,17 +1,17 @@
 package org.jetlinks.community.device.measurements;
 
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
-import org.jetlinks.core.message.event.EventMessage;
-import org.jetlinks.core.metadata.*;
-import org.jetlinks.core.metadata.types.ObjectType;
-import org.jetlinks.core.metadata.types.StringType;
 import org.jetlinks.community.dashboard.*;
 import org.jetlinks.community.dashboard.supports.StaticMeasurement;
-import org.jetlinks.community.device.message.DeviceMessageUtils;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.message.event.EventMessage;
+import org.jetlinks.core.metadata.*;
+import org.jetlinks.core.metadata.types.ObjectType;
+import org.jetlinks.core.metadata.types.StringType;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -21,22 +21,21 @@ import java.util.concurrent.atomic.AtomicLong;
 
 class DeviceEventsMeasurement extends StaticMeasurement {
 
+    private final EventBus eventBus;
 
-    private MessageGateway messageGateway;
-
-    private TimeSeriesManager timeSeriesManager;
+    private final TimeSeriesManager timeSeriesManager;
 
-    private DeviceMetadata metadata;
+    private final DeviceMetadata metadata;
 
-    private String productId;
+    private final String productId;
 
     public DeviceEventsMeasurement(String productId,
-                                   MessageGateway messageGateway,
+                                   EventBus eventBus,
                                    DeviceMetadata deviceMetadata,
                                    TimeSeriesManager timeSeriesManager) {
         super(MeasurementDefinition.of("events", "事件记录"));
         this.productId = productId;
-        this.messageGateway = messageGateway;
+        this.eventBus = eventBus;
         this.timeSeriesManager = timeSeriesManager;
         this.metadata = deviceMetadata;
         addDimension(new RealTimeDevicePropertyDimension());
@@ -62,14 +61,18 @@ class DeviceEventsMeasurement extends StaticMeasurement {
     }
 
     Flux<MeasurementValue> fromRealTime(String deviceId) {
-        return messageGateway
-            .subscribe(Subscription.asList("/device/"+productId+"/" + deviceId + "/message/event/*")
-                , "realtime-device-events-measurement:" + Math.abs(num.incrementAndGet())
-                , true)
-            .flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val)))
-            .filter(EventMessage.class::isInstance)
-            .cast(EventMessage.class)
-            .map(kv -> SimpleMeasurementValue.of(createValue(kv.getEvent(), kv.getData()), System.currentTimeMillis()));
+        org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription.of(
+            "realtime-device-events-measurement",
+            "/device/" + productId + "/" + deviceId + "/message/event/*",
+            org.jetlinks.core.event.Subscription.Feature.local, Subscription.Feature.broker
+        );
+        return
+            eventBus
+                .subscribe(subscription, DeviceMessage.class)
+                .filter(EventMessage.class::isInstance)
+                .cast(EventMessage.class)
+                .map(kv -> SimpleMeasurementValue.of(createValue(kv.getEvent(), kv.getData()), System.currentTimeMillis()))
+            ;
     }
 
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()

+ 37 - 33
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java

@@ -1,28 +1,27 @@
 package org.jetlinks.community.device.measurements;
 
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
+import org.jetlinks.community.dashboard.*;
+import org.jetlinks.community.dashboard.supports.StaticMeasurement;
+import org.jetlinks.community.timeseries.TimeSeriesService;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.property.ReadPropertyMessageReply;
 import org.jetlinks.core.message.property.ReportPropertyMessage;
 import org.jetlinks.core.message.property.WritePropertyMessageReply;
 import org.jetlinks.core.metadata.*;
 import org.jetlinks.core.metadata.types.ObjectType;
 import org.jetlinks.core.metadata.types.StringType;
-import org.jetlinks.community.dashboard.*;
-import org.jetlinks.community.dashboard.supports.StaticMeasurement;
-import org.jetlinks.community.device.message.DeviceMessageUtils;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
-import org.jetlinks.community.timeseries.TimeSeriesService;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 class DevicePropertiesMeasurement extends StaticMeasurement {
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
 
     private final TimeSeriesService timeSeriesService;
 
@@ -31,12 +30,12 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
     private final String productId;
 
     public DevicePropertiesMeasurement(String productId,
-                                       MessageGateway messageGateway,
+                                       EventBus eventBus,
                                        DeviceMetadata deviceMetadata,
                                        TimeSeriesService timeSeriesService) {
         super(MeasurementDefinition.of("properties", "属性记录"));
         this.productId = productId;
-        this.messageGateway = messageGateway;
+        this.eventBus = eventBus;
         this.timeSeriesService = timeSeriesService;
         this.metadata = deviceMetadata;
         addDimension(new RealTimeDevicePropertyDimension());
@@ -44,8 +43,6 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
 
     }
 
-    static AtomicLong num = new AtomicLong();
-
     Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history) {
         return history <= 0 ? Flux.empty() : Flux.fromIterable(metadata.getProperties())
             .flatMap(propertyMetadata -> QueryParamEntity.newQuery()
@@ -79,31 +76,38 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
     }
 
     Flux<MeasurementValue> fromRealTime(String deviceId) {
-        return messageGateway
-            .subscribe(Subscription.asList(
+
+        org.jetlinks.core.event.Subscription subscription= org.jetlinks.core.event.Subscription.of(
+            "realtime-device-properties-measurement",
+            new String[]{
                 "/device/" + productId + "/" + deviceId + "/message/property/report",
-                "/device/" + productId + "/" + deviceId + "/message/property/*/reply")
-                , "realtime-device-properties-measurement:" + Math.abs(num.incrementAndGet())
-                , true)
-            .flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val)))
-            .flatMap(msg -> {
-                if (msg instanceof ReportPropertyMessage) {
-                    return Mono.justOrEmpty(((ReportPropertyMessage) msg).getProperties());
-                }
-                if (msg instanceof ReadPropertyMessageReply) {
-                    return Mono.justOrEmpty(((ReadPropertyMessageReply) msg).getProperties());
-                }
-                if (msg instanceof WritePropertyMessageReply) {
-                    return Mono.justOrEmpty(((WritePropertyMessageReply) msg).getProperties());
-                }
-                return Mono.empty();
-            })
-            .flatMap(map -> Flux.fromIterable(map.entrySet()))
-            .map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis()));
+                "/device/" + productId + "/" + deviceId + "/message/property/*/reply"
+            },
+            org.jetlinks.core.event.Subscription.Feature.local, Subscription.Feature.broker
+        );
+
+        return
+            eventBus
+                .subscribe(subscription, DeviceMessage.class)
+                .flatMap(msg -> {
+                    if (msg instanceof ReportPropertyMessage) {
+                        return Mono.justOrEmpty(((ReportPropertyMessage) msg).getProperties());
+                    }
+                    if (msg instanceof ReadPropertyMessageReply) {
+                        return Mono.justOrEmpty(((ReadPropertyMessageReply) msg).getProperties());
+                    }
+                    if (msg instanceof WritePropertyMessageReply) {
+                        return Mono.justOrEmpty(((WritePropertyMessageReply) msg).getProperties());
+                    }
+                    return Mono.empty();
+                })
+                .flatMap(map -> Flux.fromIterable(map.entrySet()))
+                .map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis()))
+            ;
     }
 
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()
-        .add("deviceId", "设备",  "指定设备", new StringType().expand("selector", "device-selector"));
+        .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
 
     /**
      * 历史设备事件

+ 26 - 22
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java

@@ -2,6 +2,14 @@ package org.jetlinks.community.device.measurements;
 
 import org.hswebframework.utils.time.DateFormatter;
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
+import org.jetlinks.community.Interval;
+import org.jetlinks.community.dashboard.*;
+import org.jetlinks.community.dashboard.supports.StaticMeasurement;
+import org.jetlinks.community.timeseries.TimeSeriesService;
+import org.jetlinks.community.timeseries.query.Aggregation;
+import org.jetlinks.community.timeseries.query.AggregationQueryParam;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.property.ReadPropertyMessageReply;
 import org.jetlinks.core.message.property.ReportPropertyMessage;
 import org.jetlinks.core.message.property.WritePropertyMessageReply;
@@ -10,15 +18,6 @@ import org.jetlinks.core.metadata.types.IntType;
 import org.jetlinks.core.metadata.types.NumberType;
 import org.jetlinks.core.metadata.types.ObjectType;
 import org.jetlinks.core.metadata.types.StringType;
-import org.jetlinks.community.Interval;
-import org.jetlinks.community.dashboard.*;
-import org.jetlinks.community.dashboard.supports.StaticMeasurement;
-import org.jetlinks.community.device.message.DeviceMessageUtils;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
-import org.jetlinks.community.timeseries.TimeSeriesService;
-import org.jetlinks.community.timeseries.query.Aggregation;
-import org.jetlinks.community.timeseries.query.AggregationQueryParam;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -27,26 +26,24 @@ import java.time.ZoneId;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 class DevicePropertyMeasurement extends StaticMeasurement {
 
     private final PropertyMetadata metadata;
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
 
     private final TimeSeriesService timeSeriesService;
 
     private final String productId;
 
     public DevicePropertyMeasurement(String productId,
-                                     MessageGateway messageGateway,
+                                     EventBus eventBus,
                                      PropertyMetadata metadata,
                                      TimeSeriesService timeSeriesService) {
         super(MetadataMeasurementDefinition.of(metadata));
         this.productId = productId;
-        this.messageGateway = messageGateway;
+        this.eventBus = eventBus;
         this.metadata = metadata;
         this.timeSeriesService = timeSeriesService;
         addDimension(new RealTimeDevicePropertyDimension());
@@ -77,13 +74,17 @@ class DevicePropertyMeasurement extends StaticMeasurement {
     }
 
     Flux<MeasurementValue> fromRealTime(String deviceId) {
-        return messageGateway
-            .subscribe(Stream.of(
-                "/device/" + productId + "/" + deviceId + "/message/property/report"
-                , "/device/" + productId + "/" + deviceId + "/message/property/*/reply")
-                .map(Subscription::new)
-                .collect(Collectors.toList()), true)
-            .flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val)))
+        org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription.of(
+            "realtime-device-property-measurement",
+            new String[]{
+                "/device/" + productId + "/" + deviceId + "/message/property/report",
+                "/device/" + productId + "/" + deviceId + "/message/property/*/reply"
+            },
+            org.jetlinks.core.event.Subscription.Feature.local, org.jetlinks.core.event.Subscription.Feature.broker
+        );
+
+        return eventBus
+            .subscribe(subscription, DeviceMessage.class)
             .flatMap(msg -> {
                 if (msg instanceof ReportPropertyMessage) {
                     return Mono.justOrEmpty(((ReportPropertyMessage) msg).getProperties());
@@ -102,7 +103,10 @@ class DevicePropertyMeasurement extends StaticMeasurement {
 
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()
         .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"))
-        .add("history", "历史数据量", "查询出历史数据后开始推送实时数据", new IntType().min(0).expand("defaultValue", 10));
+        .add("history", "历史数据量", "查询出历史数据后开始推送实时数据", new IntType().min(0).expand("defaultValue", 10))
+        .add("from", "时间从", "", StringType.GLOBAL)
+        .add("to", "时间至", "", StringType.GLOBAL);
+    ;
 
     static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
         .add("deviceId", "设备ID", "", StringType.GLOBAL)

+ 14 - 15
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java

@@ -1,42 +1,41 @@
 package org.jetlinks.community.device.measurements.message;
 
-import org.jetlinks.core.metadata.ConfigMetadata;
-import org.jetlinks.core.metadata.DataType;
-import org.jetlinks.core.metadata.DefaultConfigMetadata;
-import org.jetlinks.core.metadata.types.DateTimeType;
-import org.jetlinks.core.metadata.types.IntType;
-import org.jetlinks.core.metadata.types.StringType;
 import org.jetlinks.community.Interval;
 import org.jetlinks.community.dashboard.*;
 import org.jetlinks.community.dashboard.supports.StaticMeasurement;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
 import org.jetlinks.community.timeseries.query.AggregationQueryParam;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.metadata.ConfigMetadata;
+import org.jetlinks.core.metadata.DataType;
+import org.jetlinks.core.metadata.DefaultConfigMetadata;
+import org.jetlinks.core.metadata.types.DateTimeType;
+import org.jetlinks.core.metadata.types.IntType;
+import org.jetlinks.core.metadata.types.StringType;
 import reactor.core.publisher.Flux;
 
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.util.Collections;
 import java.util.Date;
 
 class DeviceMessageMeasurement extends StaticMeasurement {
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
 
     private final TimeSeriesManager timeSeriesManager;
 
     static MeasurementDefinition definition = MeasurementDefinition.of("quantity", "设备消息量");
 
-    public DeviceMessageMeasurement(MessageGateway messageGateway, TimeSeriesManager timeSeriesManager) {
+    public DeviceMessageMeasurement(EventBus eventBus,
+                                    TimeSeriesManager timeSeriesManager) {
         super(definition);
-        this.messageGateway = messageGateway;
+        this.eventBus = eventBus;
         this.timeSeriesManager = timeSeriesManager;
         addDimension(new RealTimeMessageDimension());
         addDimension(new AggMessageDimension());
-
     }
 
     static ConfigMetadata realTimeConfigMetadata = new DefaultConfigMetadata()
@@ -67,8 +66,8 @@ class DeviceMessageMeasurement extends StaticMeasurement {
         @Override
         public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
             //通过订阅消息来统计实时数据量
-            return messageGateway
-                .subscribe(Collections.singleton(new Subscription("/device/**")),true)
+            return eventBus
+                .subscribe(org.jetlinks.core.event.Subscription.of("real-time-device-message", "/device/**", org.jetlinks.core.event.Subscription.Feature.local, Subscription.Feature.broker))
                 .window(parameter.getDuration("interval").orElse(Duration.ofSeconds(1)))
                 .flatMap(Flux::count)
                 .map(total -> SimpleMeasurementValue.of(total, System.currentTimeMillis()));

+ 5 - 11
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java

@@ -4,39 +4,34 @@ import io.micrometer.core.instrument.MeterRegistry;
 import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
 import org.jetlinks.community.device.measurements.DeviceDashboardDefinition;
 import org.jetlinks.community.device.measurements.DeviceObjectDefinition;
-import org.jetlinks.community.device.message.DeviceMessageUtils;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.TopicMessage;
 import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.micrometer.MeterRegistryManager;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
+import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.DeviceMessage;
 import org.springframework.stereotype.Component;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.time.Duration;
-
 @Component
 public class DeviceMessageMeasurementProvider extends StaticMeasurementProvider {
 
 
     MeterRegistry registry;
 
-    public DeviceMessageMeasurementProvider(MessageGateway messageGateway,
+    public DeviceMessageMeasurementProvider(EventBus eventBus,
                                             MeterRegistryManager registryManager,
                                             TimeSeriesManager timeSeriesManager) {
         super(DeviceDashboardDefinition.instance, DeviceObjectDefinition.message);
         registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(),
             "target", "msgType", "productId");
 
-        addMeasurement(new DeviceMessageMeasurement(messageGateway, timeSeriesManager));
+        addMeasurement(new DeviceMessageMeasurement(eventBus, timeSeriesManager));
 
     }
 
     @Subscribe("/device/*/*/message/**")
-    public Mono<Void> incrementMessage(TopicMessage message) {
+    public Mono<Void> incrementMessage(DeviceMessage message) {
         return Mono.fromRunnable(() -> {
             registry
                 .counter("message-count", convertTags(message))
@@ -46,8 +41,7 @@ public class DeviceMessageMeasurementProvider extends StaticMeasurementProvider
 
     static final String[] empty = new String[0];
 
-    private String[] convertTags(TopicMessage msg) {
-        DeviceMessage message = DeviceMessageUtils.convert(msg).orElse(null);
+    private String[] convertTags(DeviceMessage message) {
         if (message == null) {
             return empty;
         }

+ 21 - 20
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java

@@ -1,6 +1,13 @@
 package org.jetlinks.community.device.measurements.status;
 
 import org.jetlinks.community.Interval;
+import org.jetlinks.community.dashboard.*;
+import org.jetlinks.community.dashboard.supports.StaticMeasurement;
+import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
+import org.jetlinks.community.timeseries.TimeSeriesManager;
+import org.jetlinks.community.timeseries.query.AggregationQueryParam;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.MessageType;
 import org.jetlinks.core.metadata.ConfigMetadata;
@@ -10,32 +17,22 @@ import org.jetlinks.core.metadata.types.DateTimeType;
 import org.jetlinks.core.metadata.types.EnumType;
 import org.jetlinks.core.metadata.types.IntType;
 import org.jetlinks.core.metadata.types.StringType;
-import org.jetlinks.community.dashboard.*;
-import org.jetlinks.community.dashboard.supports.StaticMeasurement;
-import org.jetlinks.community.device.message.DeviceMessageUtils;
-import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
-import org.jetlinks.community.timeseries.TimeSeriesManager;
-import org.jetlinks.community.timeseries.query.AggregationQueryParam;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
 class DeviceStatusChangeMeasurement extends StaticMeasurement {
 
-    public MessageGateway messageGateway;
+    private final EventBus eventBus;
 
-    private TimeSeriesManager timeSeriesManager;
+    private final TimeSeriesManager timeSeriesManager;
 
-    static MeasurementDefinition definition = MeasurementDefinition.of("change", "设备状态变更");
+    static final MeasurementDefinition definition = MeasurementDefinition.of("change", "设备状态变更");
 
     static ConfigMetadata configMetadata = new DefaultConfigMetadata()
         .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
@@ -44,9 +41,9 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
         .addElement(EnumType.Element.of(MessageType.OFFLINE.name().toLowerCase(), "离线"))
         .addElement(EnumType.Element.of(MessageType.ONLINE.name().toLowerCase(), "在线"));
 
-    public DeviceStatusChangeMeasurement(TimeSeriesManager timeSeriesManager, MessageGateway messageGateway) {
+    public DeviceStatusChangeMeasurement(TimeSeriesManager timeSeriesManager, EventBus eventBus) {
         super(definition);
-        this.messageGateway = messageGateway;
+        this.eventBus = eventBus;
         this.timeSeriesManager = timeSeriesManager;
         addDimension(new RealTimeDeviceStateDimension());
         addDimension(new CountDeviceStateDimension());
@@ -143,11 +140,15 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
             return Mono.justOrEmpty(parameter.getString("deviceId"))
                 .flatMapMany(deviceId -> {
                     //从消息网关订阅消息
-                    return messageGateway
-                        .subscribe(Arrays.asList(
-                            new Subscription("/device/*/" + deviceId + "/online"),
-                            new Subscription("/device/*/" + deviceId + "/offline")), true)
-                        .flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val)))
+                    return eventBus.subscribe(org.jetlinks.core.event.Subscription.of(
+                        "RealTimeDeviceStateDimension"
+                        , new String[]{
+                            "/device/*/" + deviceId + "/online",
+                            "/device/*/" + deviceId + "/offline"
+                        },
+                        org.jetlinks.core.event.Subscription.Feature.local,
+                        Subscription.Feature.broker
+                    ), DeviceMessage.class)
                         .map(msg -> SimpleMeasurementValue.of(createStateValue(msg), msg.getTimestamp()))
                         ;
                 });

+ 13 - 13
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java

@@ -5,14 +5,13 @@ import io.micrometer.core.instrument.MeterRegistry;
 import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
 import org.jetlinks.community.device.measurements.DeviceDashboardDefinition;
 import org.jetlinks.community.device.measurements.DeviceObjectDefinition;
-import org.jetlinks.community.device.message.DeviceMessageUtils;
 import org.jetlinks.community.device.service.LocalDeviceInstanceService;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.TopicMessage;
 import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.micrometer.MeterRegistryManager;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.message.DeviceMessage;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 
@@ -40,10 +39,10 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
     public DeviceStatusMeasurementProvider(MeterRegistryManager registryManager,
                                            LocalDeviceInstanceService instanceService,
                                            TimeSeriesManager timeSeriesManager,
-                                           MessageGateway messageGateway) {
+                                           EventBus eventBus) {
         super(DeviceDashboardDefinition.instance, DeviceObjectDefinition.status);
 
-        addMeasurement(new DeviceStatusChangeMeasurement(timeSeriesManager, messageGateway));
+        addMeasurement(new DeviceStatusChangeMeasurement(timeSeriesManager, eventBus));
 
         addMeasurement(new DeviceStatusRecordMeasurement(instanceService, timeSeriesManager));
 
@@ -52,8 +51,8 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
     }
 
     @Subscribe("/device/*/*/online")
-    public Mono<Void> incrementOnline(TopicMessage msg){
-        return Mono.fromRunnable(()->{
+    public Mono<Void> incrementOnline(DeviceMessage msg) {
+        return Mono.fromRunnable(() -> {
             String productId = parseProductId(msg);
             counterAdder.apply(productId).increment();
             registry
@@ -63,8 +62,8 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
     }
 
     @Subscribe("/device/*/*/offline")
-    public Mono<Void> incrementOffline(TopicMessage msg){
-        return Mono.fromRunnable(()->{
+    public Mono<Void> incrementOffline(DeviceMessage msg) {
+        return Mono.fromRunnable(() -> {
             String productId = parseProductId(msg);
             counterAdder.apply(productId).increment();
             registry
@@ -73,9 +72,10 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
         });
     }
 
-    private String parseProductId(TopicMessage msg) {
-        return DeviceMessageUtils.convert(msg)
-            .flatMap(deviceMessage -> deviceMessage.getHeader("productId"))
-            .map(String::valueOf).orElse("unknown");
+    private String parseProductId(DeviceMessage deviceMessage) {
+        return deviceMessage
+            .getHeader("productId")
+            .map(String::valueOf)
+            .orElse("unknown");
     }
 }

+ 2 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java

@@ -27,9 +27,9 @@ import java.util.Date;
 class DeviceStatusRecordMeasurement
     extends StaticMeasurement {
 
-    public LocalDeviceInstanceService instanceService;
+    private final LocalDeviceInstanceService instanceService;
 
-    private TimeSeriesManager timeSeriesManager;
+    private final TimeSeriesManager timeSeriesManager;
 
     static MeasurementDefinition definition = MeasurementDefinition.of("record", "设备状态记录");
 

+ 2 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceBatchOperationSubscriptionProvider.java

@@ -7,7 +7,7 @@ import org.jetlinks.community.device.entity.DeviceInstanceEntity;
 import org.jetlinks.community.device.service.LocalDeviceInstanceService;
 import org.jetlinks.community.gateway.external.SubscribeRequest;
 import org.jetlinks.community.gateway.external.SubscriptionProvider;
-import org.jetlinks.supports.utils.MqttTopicUtils;
+import org.jetlinks.core.utils.TopicUtils;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.scheduler.Schedulers;
@@ -56,7 +56,7 @@ public class DeviceBatchOperationSubscriptionProvider implements SubscriptionPro
             .orElseGet(QueryParamEntity::new);
 
 
-        Map<String, String> var = MqttTopicUtils.getPathVariables("/device-batch/{type}", topic);
+        Map<String, String> var = TopicUtils.getPathVariables("/device-batch/{type}", topic);
         String type = var.get("type");
 
         switch (type) {

+ 27 - 67
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -3,20 +3,16 @@ package org.jetlinks.community.device.message;
 import lombok.extern.slf4j.Slf4j;
 import org.jetlinks.core.Values;
 import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.event.EventMessage;
+import org.jetlinks.core.message.firmware.*;
 import org.jetlinks.core.message.function.FunctionInvokeMessage;
 import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
 import org.jetlinks.core.message.property.*;
-import org.jetlinks.community.gateway.*;
-import reactor.core.publisher.EmitterProcessor;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 
-import javax.annotation.Nonnull;
 import java.util.HashMap;
-import java.util.function.Function;
 
 /**
  * 将设备消息连接到消息网关
@@ -25,68 +21,28 @@ import java.util.function.Function;
  * @since 1.0
  */
 @Slf4j
-public class DeviceMessageConnector
-    implements MessageConnector,
-    MessageConnection,
-    MessagePublisher {
-
-    private final EmitterProcessor<TopicMessage> messageProcessor = EmitterProcessor.create(false);
-
-    private final FluxSink<TopicMessage> sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
-
+public class DeviceMessageConnector{
     //将设备注册中心到配置追加到消息header中,下游订阅者可直接使用.
-    private final String[] appendConfigHeader = {"orgId", "productId", "deviceName"};
+    private final String[] appendConfigHeader = {"productId", "deviceName"};
+
     //设备注册中心
     private final DeviceRegistry registry;
 
-//    private final DeviceGateway gateway;
+    private final EventBus eventBus;
 
-    public DeviceMessageConnector(DeviceRegistry registry) {
+    public DeviceMessageConnector(EventBus eventBus,
+                                  DeviceRegistry registry) {
         this.registry = registry;
-    }
-
-    @Nonnull
-    @Override
-    public String getId() {
-        return "device-message-connector";
-    }
-
-    @Override
-    public String getName() {
-        return "设备消息连接器";
-    }
-
-    @Override
-    public String getDescription() {
-        return "连接设备上报的消息到消息网关";
-    }
-
-    @Override
-    public void onDisconnect(Runnable disconnectListener) {
-
-    }
-
-    @Override
-    public void disconnect() {
-        messageProcessor.onComplete();
-    }
-
-    @Override
-    public boolean isAlive() {
-        return true;
+        this.eventBus = eventBus;
     }
 
     public Mono<Void> onMessage(Message message) {
         if (null == message) {
             return Mono.empty();
         }
-        if (!messageProcessor.hasDownstreams() && !messageProcessor.isCancelled()) {
-            return Mono.empty();
-        }
-
         return this.getTopic(message)
-            .map(topic -> TopicMessage.of(topic, message))
-            .doOnNext(sink::next)
+            .flatMap(topic ->eventBus.publish(topic,message).then())
+            .onErrorResume(error -> Mono.fromRunnable(() -> log.error(error.getMessage(), error)))
             .then();
     }
 
@@ -95,6 +51,7 @@ public class DeviceMessageConnector
             DeviceMessage deviceMessage = ((DeviceMessage) message);
             String deviceId = deviceMessage.getDeviceId();
             if (deviceId == null) {
+                log.warn("无法从消息中获取设备ID:{}", deviceMessage);
                 return Mono.empty();
             }
             return registry
@@ -105,6 +62,7 @@ public class DeviceMessageConnector
                 .flatMap(configs -> {
                     configs.getAllValues().forEach(deviceMessage::addHeader);
                     String productId = deviceMessage.getHeader("productId").map(String::valueOf).orElse("null");
+
                     String topic = String.join("",
                         "/device", "/", productId, "/", deviceId, createDeviceMessageTopic(message)
                     );
@@ -160,20 +118,22 @@ public class DeviceMessageConnector
             return "/register";
         } else if (message instanceof DeviceUnRegisterMessage) { //注销
             return "/unregister";
+        } else if (message instanceof RequestFirmwareMessage) { //拉取固件请求 since 1.3
+            return "/firmware/pull";
+        } else if (message instanceof RequestFirmwareMessageReply) { //拉取固件响应 since 1.3
+            return "/firmware/pull/reply";
+        } else if (message instanceof ReportFirmwareMessage) { //上报固件信息 since 1.3
+            return "/firmware/report";
+        } else if (message instanceof UpgradeFirmwareProgressMessage) { //上报固件更新进度 since 1.3
+            return "/firmware/progress";
+        } else if (message instanceof UpgradeFirmwareMessage) { //推送固件更新 since 1.3
+            return "/firmware/push";
+        } else if (message instanceof UpgradeFirmwareMessageReply) { //推送固件更新回复 since 1.3
+            return "/firmware/push/reply";
+        } else if (message instanceof DirectDeviceMessage) { //透传消息 since 1.4
+            return "/message/direct";
         } else {
             return "/message/unknown";
         }
     }
-
-    @Nonnull
-    @Override
-    public Flux<MessageConnection> onConnection() {
-        return Flux.just(this);
-    }
-
-    @Nonnull
-    @Override
-    public Flux<TopicMessage> onMessage() {
-        return messageProcessor.map(Function.identity());
-    }
 }

+ 3 - 3
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java

@@ -5,10 +5,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
+import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.ChildDeviceMessage;
 import org.jetlinks.core.message.DeviceMessage;
 import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
-import org.jetlinks.community.gateway.MessageGateway;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 
@@ -23,7 +23,7 @@ import reactor.core.publisher.Mono;
 @AllArgsConstructor
 public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInterceptor {
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
 
     private final DeviceRegistry registry;
 
@@ -33,7 +33,7 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter
             .doOnNext(tp2 -> message.addHeader("productId", tp2.getT2().getId()))
             .flatMap(tp2 -> {
                 String topic = DeviceMessageConnector.createDeviceMessageTopic(message);
-                Mono<Void> publisher = messageGateway
+                Mono<Void> publisher = eventBus
                     .publish("/device/" + tp2.getT2().getId() + "/" + tp2.getT1().getDeviceId() + topic, message)
                     .then();
                 if (message instanceof ChildDeviceMessage) {

+ 2 - 2
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendSubscriptionProvider.java

@@ -13,7 +13,7 @@ import org.jetlinks.community.device.service.LocalDeviceInstanceService;
 import org.jetlinks.community.gateway.external.Message;
 import org.jetlinks.community.gateway.external.SubscribeRequest;
 import org.jetlinks.community.gateway.external.SubscriptionProvider;
-import org.jetlinks.supports.utils.MqttTopicUtils;
+import org.jetlinks.core.utils.TopicUtils;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -51,7 +51,7 @@ public class DeviceMessageSendSubscriptionProvider implements SubscriptionProvid
 
         String topic = request.getTopic();
 
-        Map<String, String> variables = MqttTopicUtils.getPathVariables("/device-message-sender/{productId}/{deviceId}", topic);
+        Map<String, String> variables = TopicUtils.getPathVariables("/device-message-sender/{productId}/{deviceId}", topic);
         String deviceId = variables.get("deviceId");
         String productId = variables.get("productId");
 

+ 12 - 11
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSubscriptionProvider.java

@@ -1,22 +1,19 @@
 package org.jetlinks.community.device.message;
 
 import lombok.AllArgsConstructor;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
 import org.jetlinks.community.gateway.external.Message;
 import org.jetlinks.community.gateway.external.SubscribeRequest;
 import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.Collections;
 
 @Component
 @AllArgsConstructor
 public class DeviceMessageSubscriptionProvider implements SubscriptionProvider {
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
 
     @Override
     public String id() {
@@ -37,10 +34,14 @@ public class DeviceMessageSubscriptionProvider implements SubscriptionProvider {
 
     @Override
     public Flux<Message> subscribe(SubscribeRequest request) {
-        return messageGateway
-            .subscribe(Collections.singletonList(new Subscription(request.getTopic())), true)
-            .flatMap(topic -> Mono.justOrEmpty(DeviceMessageUtils.convert(topic)
-                .map(msg -> Message.success(request.getId(), topic.getTopic(), msg))
-            ));
+        return eventBus
+            .subscribe(
+                org.jetlinks.core.event.Subscription.of(
+                    "DeviceMessageSubscriptionProvider:" + request.getAuthentication().getUser().getId(),
+                    new String[]{request.getTopic()},
+                    org.jetlinks.core.event.Subscription.Feature.local,
+                    Subscription.Feature.broker
+                ))
+            .map(topicMessage -> Message.success(request.getId(), topicMessage.getTopic(), topicMessage.decode()));
     }
 }

+ 0 - 17
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageUtils.java

@@ -1,17 +0,0 @@
-package org.jetlinks.community.device.message;
-
-import org.jetlinks.core.message.DeviceMessage;
-import org.jetlinks.core.message.MessageType;
-import org.jetlinks.community.gateway.EncodableMessage;
-import org.jetlinks.community.gateway.TopicMessage;
-
-import java.util.Map;
-import java.util.Optional;
-
-public class DeviceMessageUtils {
-
-    public static Optional<DeviceMessage> convert(TopicMessage message){
-        return org.jetlinks.community.gateway.DeviceMessageUtils.convert(message);
-    }
-
-}

+ 2 - 6
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java

@@ -8,9 +8,7 @@ import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
 import org.jetlinks.community.device.entity.DevicePropertiesEntity;
 import org.jetlinks.community.device.enums.DeviceLogType;
 import org.jetlinks.community.device.events.handler.ValueTypeTranslator;
-import org.jetlinks.community.device.message.DeviceMessageUtils;
 import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
-import org.jetlinks.community.gateway.TopicMessage;
 import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.timeseries.TimeSeriesData;
 import org.jetlinks.community.timeseries.TimeSeriesManager;
@@ -52,10 +50,8 @@ public class TimeSeriesMessageWriterConnector{
     }
 
     @Subscribe(topics = "/device/**",id = "device-message-ts-writer")
-    public Mono<Void> writeDeviceMessageToTs(TopicMessage message){
-        return Mono
-            .justOrEmpty(DeviceMessageUtils.convert(message))
-            .flatMap(this::doIndex);
+    public Mono<Void> writeDeviceMessageToTs(DeviceMessage message){
+        return this.doIndex(message);
     }
 
     private Mono<Void> doIndex(DeviceMessage message) {

+ 29 - 35
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java

@@ -1,76 +1,57 @@
 package org.jetlinks.community.device.service;
 
-import com.alibaba.excel.EasyExcel;
-import com.alibaba.excel.ExcelWriter;
-import com.alibaba.excel.write.metadata.WriteSheet;
-import com.alibaba.fastjson.JSON;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
 import org.hswebframework.ezorm.core.dsl.Query;
-import org.hswebframework.ezorm.core.param.QueryParam;
 import org.hswebframework.ezorm.core.param.TermType;
 import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
 import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
 import org.hswebframework.web.api.crud.entity.PagerResult;
 import org.hswebframework.web.api.crud.entity.QueryParamEntity;
-import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.hswebframework.web.exception.BusinessException;
 import org.hswebframework.web.exception.NotFoundException;
 import org.hswebframework.web.id.IDGenerator;
-import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.device.entity.*;
-import org.jetlinks.community.device.message.DeviceMessageUtils;
-import org.jetlinks.community.gateway.Subscription;
+import org.jetlinks.community.device.enums.DeviceState;
+import org.jetlinks.community.device.response.DeviceDeployResult;
+import org.jetlinks.community.device.response.DeviceDetail;
+import org.jetlinks.community.device.response.DeviceInfo;
+import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
 import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.community.timeseries.TimeSeriesManager;
 import org.jetlinks.community.utils.ErrorUtils;
 import org.jetlinks.core.device.DeviceConfigKey;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.enums.ErrorCode;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.exception.DeviceOperationException;
 import org.jetlinks.core.message.*;
 import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
 import org.jetlinks.core.message.property.ReadPropertyMessageReply;
 import org.jetlinks.core.message.property.WritePropertyMessageReply;
-import org.jetlinks.core.metadata.*;
+import org.jetlinks.core.metadata.DataType;
+import org.jetlinks.core.metadata.PropertyMetadata;
 import org.jetlinks.core.metadata.types.ObjectType;
 import org.jetlinks.core.metadata.types.StringType;
 import org.jetlinks.core.utils.FluxUtils;
-import org.jetlinks.community.device.entity.excel.DeviceInstanceImportExportEntity;
-import org.jetlinks.community.device.enums.DeviceState;
-import org.jetlinks.community.device.response.*;
-import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
-import org.jetlinks.community.gateway.EncodableMessage;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.io.excel.ImportExportService;
-import org.jetlinks.community.timeseries.TimeSeriesManager;
-import org.jetlinks.supports.official.JetLinksDeviceMetadata;
 import org.reactivestreams.Publisher;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.core.io.buffer.DataBufferFactory;
-import org.springframework.core.io.buffer.DefaultDataBufferFactory;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.server.reactive.ServerHttpResponse;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
-import org.springframework.util.StringUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.SignalType;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple3;
 import reactor.util.function.Tuples;
 
 import javax.annotation.PostConstruct;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.*;
 import java.util.function.Function;
@@ -89,7 +70,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
     private LocalDeviceProductService deviceProductService;
 
     @Autowired
-    private MessageGateway messageGateway;
+    private EventBus eventBus;
 
     @Autowired
     private TimeSeriesManager timeSeriesManager;
@@ -353,11 +334,24 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
     @PostConstruct
     public void init() {
 
-        //订阅设备上下线
-        FluxUtils.bufferRate(messageGateway
-            .subscribe(Subscription.asList("/device/*/*/online", "/device/*/*/offline"), "device-state-synchronizer", false)
-            .flatMap(message -> Mono.justOrEmpty(DeviceMessageUtils.convert(message))
-                .map(DeviceMessage::getDeviceId)), 800, 200, Duration.ofSeconds(2))
+        org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription.of(
+            "device-state-synchronizer",
+            new String[]{
+                "/device/*/*/online",
+                "/device/*/*/offline"
+            },
+            Subscription.Feature.local
+        );
+
+        //订阅设备上下线消息,同步数据库中的设备状态,
+        //最小间隔800毫秒,最大缓冲数量500,最长间隔2秒.
+        //如果2条消息间隔大于0.8秒则不缓冲直接更新
+        //否则缓冲,数量超过500后批量更新
+        //无论缓冲区是否超过500条,都每2秒更新一次.
+        FluxUtils.bufferRate(eventBus
+                .subscribe(subscription,DeviceMessage.class)
+                .map(DeviceMessage::getDeviceId),
+            800, Integer.getInteger("device.state.sync.batch", 500), Duration.ofSeconds(2))
             .publishOn(Schedulers.parallel())
             .concatMap(list -> syncStateBatch(Flux.just(list), false).map(List::size))
             .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))

+ 2 - 21
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceMessageController.java

@@ -6,6 +6,8 @@ import org.hswebframework.web.authorization.annotation.Authorize;
 import org.hswebframework.web.authorization.annotation.Resource;
 import org.hswebframework.web.exception.BusinessException;
 import org.hswebframework.web.id.IDGenerator;
+import org.jetlinks.community.device.entity.DevicePropertiesEntity;
+import org.jetlinks.community.utils.ErrorUtils;
 import org.jetlinks.core.device.DeviceOperator;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.enums.ErrorCode;
@@ -19,17 +21,11 @@ import org.jetlinks.core.message.property.ReadPropertyMessageReply;
 import org.jetlinks.core.message.property.WritePropertyMessageReply;
 import org.jetlinks.core.metadata.PropertyMetadata;
 import org.jetlinks.core.metadata.types.StringType;
-import org.jetlinks.community.device.entity.DevicePropertiesEntity;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.TopicMessage;
-import org.jetlinks.community.utils.ErrorUtils;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -44,21 +40,6 @@ public class DeviceMessageController {
     @Autowired
     private DeviceRegistry registry;
 
-    @Autowired
-    public MessageGateway messageGateway;
-
-
-    //获取实时事件
-    @GetMapping(value = "/{deviceId}/event", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
-    public Flux<Object> getEvent(@PathVariable String deviceId) {
-        return messageGateway
-            .subscribe("/device/*/".concat(deviceId).concat("/message/event/**"))
-            .map(TopicMessage::getMessage)
-            .map(msg -> msg.getPayload().toString(StandardCharsets.UTF_8))
-            ;
-    }
-
-
     //获取设备属性
     @GetMapping("/{deviceId}/property/{property:.+}")
     @SneakyThrows

+ 2 - 2
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/MqttClientDebugSubscriptionProvider.java

@@ -9,8 +9,8 @@ import org.jetlinks.community.network.NetworkManager;
 import org.jetlinks.community.network.manager.web.request.MqttMessageRequest;
 import org.jetlinks.community.network.manager.web.response.MqttMessageResponse;
 import org.jetlinks.community.network.mqtt.client.MqttClient;
+import org.jetlinks.core.utils.TopicUtils;
 import org.jetlinks.rule.engine.executor.PayloadType;
-import org.jetlinks.supports.utils.MqttTopicUtils;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 
@@ -47,7 +47,7 @@ public class MqttClientDebugSubscriptionProvider implements SubscriptionProvider
     @Override
     public Flux<Object> subscribe(SubscribeRequest request) {
         DebugAuthenticationHandler.handle(request);
-        Map<String, String> vars = MqttTopicUtils.getPathVariables("/network/mqtt/client/{id}/{pubsub}/{type}", request.getTopic());
+        Map<String, String> vars = TopicUtils.getPathVariables("/network/mqtt/client/{id}/{pubsub}/{type}", request.getTopic());
 
         String clientId = vars.get("id");
         String pubsub = vars.get("pubsub");

+ 2 - 2
jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/MqttServerDebugSubscriptionProvider.java

@@ -10,8 +10,8 @@ import org.jetlinks.community.network.DefaultNetworkType;
 import org.jetlinks.community.network.NetworkManager;
 import org.jetlinks.community.network.manager.web.response.MqttMessageResponse;
 import org.jetlinks.community.network.mqtt.server.*;
+import org.jetlinks.core.utils.TopicUtils;
 import org.jetlinks.rule.engine.executor.PayloadType;
-import org.jetlinks.supports.utils.MqttTopicUtils;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 
@@ -49,7 +49,7 @@ public class MqttServerDebugSubscriptionProvider implements SubscriptionProvider
     public Flux<MqttClientMessage> subscribe(SubscribeRequest request) {
         DebugAuthenticationHandler.handle(request);
 
-        Map<String, String> vars = MqttTopicUtils.getPathVariables("/network/mqtt/server/{id}/_subscribe/{type}", request.getTopic());
+        Map<String, String> vars = TopicUtils.getPathVariables("/network/mqtt/server/{id}/_subscribe/{type}", request.getTopic());
 
         String clientId = vars.get("id");
         PayloadType type = PayloadType.valueOf(vars.get("type").toUpperCase());

+ 11 - 7
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/message/NotificationsPublishProvider.java

@@ -1,10 +1,12 @@
 package org.jetlinks.community.notify.manager.message;
 
+import com.alibaba.fastjson.JSON;
 import lombok.AllArgsConstructor;
-import org.jetlinks.community.gateway.MessageGateway;
 import org.jetlinks.community.gateway.external.Message;
 import org.jetlinks.community.gateway.external.SubscribeRequest;
 import org.jetlinks.community.gateway.external.SubscriptionProvider;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 
@@ -12,7 +14,7 @@ import reactor.core.publisher.Flux;
 @AllArgsConstructor
 public class NotificationsPublishProvider implements SubscriptionProvider {
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
 
     @Override
     public String id() {
@@ -32,10 +34,12 @@ public class NotificationsPublishProvider implements SubscriptionProvider {
     @Override
     public Flux<Message> subscribe(SubscribeRequest request) {
 
-        return messageGateway
-            .subscribe(
-                "/notifications/user/" + request.getAuthentication().getUser().getId() + "/*/*"
-                , messageGateway.nextSubscriberId("notifications-publisher"))
-            .map(msg -> Message.success(request.getId(), msg.getTopic(), msg.getMessage().payloadAsJson()));
+        return eventBus
+            .subscribe(Subscription.of(
+                "notifications-publisher",
+                "/notifications/user/" + request.getAuthentication().getUser().getId() + "/*/*",
+                Subscription.Feature.local, Subscription.Feature.broker
+            ))
+            .map(msg -> Message.success(request.getId(), msg.getTopic(), JSON.parseObject(msg.bodyToString())));
     }
 }

+ 16 - 14
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifySubscriberService.java

@@ -1,17 +1,18 @@
 package org.jetlinks.community.notify.manager.service;
 
 import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.authorization.ReactiveAuthenticationHolder;
 import org.hswebframework.web.crud.events.EntityCreatedEvent;
 import org.hswebframework.web.crud.events.EntityDeletedEvent;
 import org.hswebframework.web.crud.events.EntityModifyEvent;
 import org.hswebframework.web.crud.events.EntitySavedEvent;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
 import org.jetlinks.core.cluster.ClusterManager;
-import org.jetlinks.community.gateway.MessageGateway;
 import org.jetlinks.community.notify.manager.entity.Notification;
 import org.jetlinks.community.notify.manager.entity.NotifySubscriberEntity;
 import org.jetlinks.community.notify.manager.enums.SubscribeState;
 import org.jetlinks.community.notify.manager.subscriber.SubscriberProvider;
+import org.jetlinks.core.event.EventBus;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Service;
@@ -28,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
 @Slf4j
 public class NotifySubscriberService extends GenericReactiveCrudService<NotifySubscriberEntity, String> implements CommandLineRunner {
 
-    private final MessageGateway gateway;
+    private final EventBus eventBus;
 
     private final ClusterManager clusterManager;
 
@@ -36,10 +37,10 @@ public class NotifySubscriberService extends GenericReactiveCrudService<NotifySu
 
     private final Map<String, Disposable> subscribers = new ConcurrentHashMap<>();
 
-    public NotifySubscriberService(MessageGateway gateway,
+    public NotifySubscriberService(EventBus eventBus,
                                    ClusterManager clusterManager,
                                    List<SubscriberProvider> providers) {
-        this.gateway = gateway;
+        this.eventBus = eventBus;
         this.clusterManager = clusterManager;
         for (SubscriberProvider provider : providers) {
             this.providers.put(provider.getId(), provider);
@@ -102,16 +103,17 @@ public class NotifySubscriberService extends GenericReactiveCrudService<NotifySu
         String dispatch = template.createTopic();
 
         Disposable old = subscribers
-            .put(entity.getId(), Mono.justOrEmpty(getProvider(entity.getTopicProvider()))
-                .flatMap(provider -> provider.createSubscriber(entity.getTopicConfig()))
-                .flatMap(subscriber ->
-                    subscriber
-                        .subscribe()
-                        .map(template::copyWithMessage)
-                        .flatMap(notification -> gateway.publish(dispatch, notification))
-                        .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
-                        .then())
-                .subscribe()
+            .put(entity.getId(),
+                Mono.zip(ReactiveAuthenticationHolder.get(entity.getSubscriber()), Mono.justOrEmpty(getProvider(entity.getTopicProvider())))
+                    .flatMap(tp2 -> tp2.getT2().createSubscriber(entity.getId(),tp2.getT1(), entity.getTopicConfig()))
+                    .flatMap(subscriber ->
+                        subscriber
+                            .subscribe()
+                            .map(template::copyWithMessage)
+                            .flatMap(notification -> eventBus.publish(dispatch, notification))
+                            .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
+                            .then())
+                    .subscribe()
             );
         log.debug("subscribe :{}({})", template.getTopicProvider(), template.getTopicName());
 

+ 2 - 1
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/SubscriberProvider.java

@@ -1,5 +1,6 @@
 package org.jetlinks.community.notify.manager.subscriber;
 
+import org.hswebframework.web.authorization.Authentication;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import reactor.core.publisher.Mono;
 
@@ -10,7 +11,7 @@ public interface SubscriberProvider {
 
     String getName();
 
-    Mono<Subscriber> createSubscriber(Map<String, Object> config);
+    Mono<Subscriber> createSubscriber(String id, Authentication authentication, Map<String, Object> config);
 
     ConfigMetadata getConfigMetadata();
 }

+ 13 - 12
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java

@@ -1,12 +1,13 @@
 package org.jetlinks.community.notify.manager.subscriber.providers;
 
 import com.alibaba.fastjson.JSONObject;
+import org.hswebframework.web.authorization.Authentication;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.jetlinks.core.metadata.DefaultConfigMetadata;
 import org.jetlinks.core.metadata.types.StringType;
 import org.jetlinks.community.ValueObject;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
 import org.jetlinks.community.notify.manager.subscriber.Notify;
 import org.jetlinks.community.notify.manager.subscriber.Subscriber;
 import org.jetlinks.community.notify.manager.subscriber.SubscriberProvider;
@@ -19,10 +20,10 @@ import java.util.Map;
 @Component
 public class DeviceAlarmProvider implements SubscriberProvider {
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
 
-    public DeviceAlarmProvider(MessageGateway messageGateway) {
-        this.messageGateway = messageGateway;
+    public DeviceAlarmProvider(EventBus eventBus) {
+        this.eventBus = eventBus;
     }
 
     @Override
@@ -45,20 +46,20 @@ public class DeviceAlarmProvider implements SubscriberProvider {
     }
 
     @Override
-    public Mono<Subscriber> createSubscriber(Map<String, Object> config) {
+    public Mono<Subscriber> createSubscriber(String id, Authentication authentication, Map<String, Object> config) {
         ValueObject configs = ValueObject.of(config);
 
         String productId = configs.getString("productId").orElse("*");
         String deviceId = configs.getString("deviceId").orElse("*");
         String alarmId = configs.getString("alarmId").orElse("*");
 
-        Flux<Notify> flux = messageGateway
-            .subscribe(Subscription.asList(
-                String.format("/rule-engine/device/alarm/%s/%s/%s", productId, deviceId, alarmId)),
-                messageGateway.nextSubscriberId("device-alarm-notifications"),
-                false)
+        Flux<Notify> flux = eventBus
+            .subscribe(Subscription.of("device-alarm:" + id,
+                String.format("/rule-engine/device/alarm/%s/%s/%s", productId, deviceId, alarmId),
+                Subscription.Feature.local
+            ))
             .map(msg -> {
-                JSONObject json = msg.getMessage().payloadAsJson();
+                JSONObject json = msg.bodyToJson();
 
                 return Notify.of(
                     String.format("设备[%s]发生告警:[%s]!", json.getString("deviceName"), json.getString("alarmName")),

+ 23 - 19
jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java

@@ -5,10 +5,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.ValueObject;
-import org.jetlinks.community.gateway.DeviceMessageUtils;
-import org.jetlinks.community.gateway.MessageGateway;
-import org.jetlinks.community.gateway.Subscription;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
 import org.jetlinks.core.message.DeviceMessage;
+import org.jetlinks.core.metadata.Jsonable;
 import org.jetlinks.reactor.ql.ReactorQL;
 import org.jetlinks.reactor.ql.ReactorQLContext;
 import org.jetlinks.reactor.ql.ReactorQLRecord;
@@ -19,26 +19,25 @@ import org.jetlinks.rule.engine.api.task.Task;
 import org.jetlinks.rule.engine.api.task.TaskExecutor;
 import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
 import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
-import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
+import reactor.core.scheduler.Scheduler;
 import reactor.util.function.Tuples;
 
 import java.time.Duration;
 import java.util.*;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 @Slf4j
 @AllArgsConstructor
 @Component
 public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
 
-    private final MessageGateway messageGateway;
+    private final EventBus eventBus;
+
+    private final Scheduler scheduler;
 
     @Override
     public String getExecutor() {
@@ -74,7 +73,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
         @Override
         protected Disposable doStart() {
             rule.validate();
-            return doSubscribe(messageGateway)
+            return doSubscribe(eventBus)
                 .filter(ignore -> state == Task.State.running)
                 .flatMap(result -> {
                     RuleData data = RuleData.create(result);
@@ -95,7 +94,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
             if (disposable != null) {
                 disposable.dispose();
             }
-            doStart();
+            disposable = doStart();
         }
 
         private DeviceAlarmRule createRule() {
@@ -164,7 +163,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
             return ReactorQL.builder().sql(sql).build();
         }
 
-        public Flux<Map<String, Object>> doSubscribe(MessageGateway gateway) {
+        public Flux<Map<String, Object>> doSubscribe(EventBus eventBus) {
             Set<String> topics = new HashSet<>();
 
             List<Object> binds = new ArrayList<>();
@@ -174,13 +173,18 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                 topics.add(topic);
                 binds.addAll(trigger.toFilterBinds());
             }
-            List<Subscription> subscriptions = topics.stream().map(Subscription::new).collect(Collectors.toList());
+            org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription.of(
+                "device_alarm:" + rule.getId(),
+                topics.toArray(new String[0]),
+                Subscription.Feature.local
+            );
+//            List<Subscription> subscriptions = topics.stream().map(Subscription::new).collect(Collectors.toList());
 
             ReactorQLContext context = ReactorQLContext
                 .ofDatasource(ignore ->
-                    gateway
-                        .subscribe(subscriptions, "device_alarm:" + rule.getId(), false)
-                        .flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.convert(msg).map(DeviceMessage::toJson)))
+                    eventBus
+                        .subscribe(subscription, DeviceMessage.class)
+                        .map(Jsonable::toJson)
                         .doOnNext(json -> {
                             if (StringUtils.hasText(rule.getDeviceName())) {
                                 json.putIfAbsent("deviceName", rule.getDeviceName());
@@ -210,10 +214,10 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                 resultFlux = resultFlux
                     .as(flux ->
                         StringUtils.hasText(rule.getDeviceId())
-                            ? flux.window(windowTime)//规则已经指定了固定的设备,直接开启时间窗口就行
+                            ? flux.window(windowTime, scheduler)//规则已经指定了固定的设备,直接开启时间窗口就行
                             : flux //规则配置在设备产品上,则按设备ID分组后再开窗口
                             .groupBy(map -> String.valueOf(map.get("deviceId")), Integer.MAX_VALUE)
-                            .flatMap(group -> group.window(windowTime),Integer.MAX_VALUE))
+                            .flatMap(group -> group.window(windowTime, scheduler), Integer.MAX_VALUE))
                     //处理每一组数据
                     .flatMap(group -> group
                         .index((index, data) -> Tuples.of(index + 1, data)) //给数据打上索引,索引号就是告警次数
@@ -250,11 +254,11 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
                     }
                     // 推送告警信息到消息网关中
                     // /rule-engine/device/alarm/{productId}/{deviceId}/{ruleId}
-                    return gateway
+                    return eventBus
                         .publish(String.format(
                             "/rule-engine/device/alarm/%s/%s/%s",
                             rule.getProductId(), map.get("deviceId"), rule.getId()
-                        ), map, true)
+                        ), map)
                         .then(Mono.just(map));
                 });
         }

+ 16 - 10
jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java

@@ -18,6 +18,7 @@ import org.jetlinks.core.cluster.ClusterManager;
 import org.jetlinks.core.device.DeviceOperationBroker;
 import org.jetlinks.core.device.DeviceRegistry;
 import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
+import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.message.DeviceOfflineMessage;
 import org.jetlinks.core.message.DeviceOnlineMessage;
 import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
@@ -32,6 +33,7 @@ import org.jetlinks.community.gateway.supports.DefaultMessageGateway;
 import org.jetlinks.community.gateway.supports.LocalClientSessionManager;
 import org.jetlinks.supports.cluster.ClusterDeviceRegistry;
 import org.jetlinks.supports.cluster.redis.RedisClusterManager;
+import org.jetlinks.supports.event.BrokerEventBus;
 import org.jetlinks.supports.protocol.ServiceLoaderProtocolSupports;
 import org.jetlinks.supports.protocol.management.ClusterProtocolSupportManager;
 import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
@@ -89,6 +91,11 @@ public class JetLinksConfiguration {
         return Vertx.vertx(vertxOptions);
     }
 
+    @Bean
+    public EventBus eventBus() {
+        return new BrokerEventBus();
+    }
+
     @Bean
     public StandaloneDeviceMessageBroker standaloneDeviceMessageBroker() {
         return new StandaloneDeviceMessageBroker();
@@ -101,14 +108,14 @@ public class JetLinksConfiguration {
 
     @Bean
     public ClusterDeviceRegistry clusterDeviceRegistry(ProtocolSupports supports,
-                                                ClusterManager manager,
-                                                DeviceOperationBroker handler) {
+                                                       ClusterManager manager,
+                                                       DeviceOperationBroker handler) {
         return new ClusterDeviceRegistry(supports, manager, handler, CacheBuilder.newBuilder().build());
     }
 
     @Bean
     @Primary
-    @ConditionalOnProperty(prefix = "jetlinks.device.registry",name = "auto-discover",havingValue = "enabled",matchIfMissing = true)
+    @ConditionalOnProperty(prefix = "jetlinks.device.registry", name = "auto-discover", havingValue = "enabled", matchIfMissing = true)
     public AutoDiscoverDeviceRegistry deviceRegistry(ClusterDeviceRegistry registry,
                                                      ReactiveRepository<DeviceInstanceEntity, String> instanceRepository,
                                                      ReactiveRepository<DeviceProductEntity, String> productRepository) {
@@ -117,11 +124,11 @@ public class JetLinksConfiguration {
 
 
     @Bean
-    public BeanPostProcessor interceptorRegister(ClusterDeviceRegistry registry){
+    public BeanPostProcessor interceptorRegister(ClusterDeviceRegistry registry) {
         return new BeanPostProcessor() {
             @Override
             public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
-                if(bean instanceof DeviceMessageSenderInterceptor){
+                if (bean instanceof DeviceMessageSenderInterceptor) {
                     registry.addInterceptor(((DeviceMessageSenderInterceptor) bean));
                 }
                 return bean;
@@ -141,8 +148,8 @@ public class JetLinksConfiguration {
     }
 
     @Bean
-    public DeviceMessageConnector deviceMessageConnector(DeviceRegistry registry) {
-        return new DeviceMessageConnector(registry);
+    public DeviceMessageConnector deviceMessageConnector(EventBus eventBus, DeviceRegistry registry) {
+        return new DeviceMessageConnector(eventBus, registry);
     }
 
     @Bean
@@ -159,7 +166,6 @@ public class JetLinksConfiguration {
         DefaultDecodedClientMessageHandler clientMessageHandler = new DefaultDecodedClientMessageHandler(handler, deviceSessionManager,
             EmitterProcessor.create(false)
         );
-        // TODO: 2019/12/31 应该统一由消息网关处理
         clientMessageHandler
             .subscribe()
             .parallel()
@@ -176,7 +182,7 @@ public class JetLinksConfiguration {
                                                                                DeviceRegistry registry,
                                                                                MessageHandler messageHandler,
                                                                                DecodedClientMessageHandler clientMessageHandler) {
-        return new DefaultSendToDeviceMessageHandler(properties.getServerId(), sessionManager, messageHandler, registry,clientMessageHandler);
+        return new DefaultSendToDeviceMessageHandler(properties.getServerId(), sessionManager, messageHandler, registry, clientMessageHandler);
     }
 
     @Bean
@@ -249,7 +255,7 @@ public class JetLinksConfiguration {
 
     @Bean
     @ConfigurationProperties(prefix = "hsweb.user-token")
-    public UserTokenManager userTokenManager(ReactiveRedisOperations<Object,Object> template) {
+    public UserTokenManager userTokenManager(ReactiveRedisOperations<Object, Object> template) {
         return new RedisUserTokenManager(template);
     }