Parcourir la source

Subscribe注解可以使用表达式来引用配置值,如: ${a.b.c}

zhou-hao il y a 3 ans
Parent
commit
7a13499887

+ 52 - 2
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/annotation/Subscribe.java

@@ -6,10 +6,19 @@ import org.springframework.core.annotation.AliasFor;
 import java.lang.annotation.*;
 
 /**
- * 订阅来自消息网关的消息
+ * 从事件总线{@link org.jetlinks.core.event.EventBus}中订阅消息并执行注解的方法,
+ * 事件总线的输出数据可以作为方法参数,如果类型不一致会自动转换。
+ * 也可以通过方法参数直接获取事件总线的原始数据:{@link org.jetlinks.core.event.TopicPayload}
  *
+ * <pre>
+ * &#64;Subscribe("/device/&#42;/&#42;/message")
+ * public Mono&lt;Void&gt; handleEvent(DeviceMessage msg){
+ *      return doSomeThing(msg);
+ * }
+ * </pre>
  * @author zhouhao
- * @see org.jetlinks.community.gateway.MessageSubscriber
+ * @see org.jetlinks.core.event.EventBus
+ * @see org.jetlinks.community.gateway.spring.SpringMessageBroker
  * @since 1.0
  */
 @Target({ElementType.METHOD})
@@ -18,14 +27,55 @@ import java.lang.annotation.*;
 @Documented
 public @interface Subscribe {
 
+    /**
+     * 要订阅的topic,topic是树结构,
+     * 和{@link org.springframework.util.AntPathMatcher}类似,支持通配符: **表示多层目录,*表示单层目录.
+     * <ul>
+     *     <li>
+     *        /device/p1/d1/online
+     *     </li>
+     *     <li>
+     *       /device/p1/d1,d2/online
+     *     </li>
+     *     <li>
+     *        /device/p1/&#42;/online
+     *     </li>
+     *     <li>
+     *       /device/&#42;&#42;
+     *    </li>
+     * </ul>
+     * <p>
+     * 支持使用表达式
+     * <pre>
+     * /device/${sub.product-id}/**
+     * </pre>
+     *
+     * @return topics
+     * @see Subscribe#value()
+     * @see org.jetlinks.core.event.EventBus#subscribe(Subscription)
+     */
     @AliasFor("value")
     String[] topics() default {};
 
+    /**
+     * @return topics
+     * @see Subscribe#topics()
+     */
     @AliasFor("topics")
     String[] value() default {};
 
+    /**
+     * 指定订阅者ID,默认为方法名
+     *
+     * @return 订阅者ID
+     */
     String id() default "";
 
+    /**
+     * 订阅特性,默认只订阅本地进程内部的消息
+     *
+     * @return 订阅特性
+     */
     Subscription.Feature[] features() default Subscription.Feature.local;
 
 }

+ 48 - 13
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageBroker.java

@@ -3,18 +3,26 @@ package org.jetlinks.community.gateway.spring;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.logger.ReactiveLogger;
+import org.hswebframework.web.utils.TemplateParser;
 import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
+import org.jetlinks.core.utils.TopicUtils;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.core.annotation.AnnotatedElementUtils;
 import org.springframework.core.annotation.AnnotationAttributes;
+import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ClassUtils;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.ReflectionUtils;
 import org.springframework.util.StringUtils;
+import reactor.core.publisher.Signal;
+
+import java.util.Arrays;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 @Component
 @Slf4j
@@ -23,6 +31,8 @@ public class SpringMessageBroker implements BeanPostProcessor {
 
     private final EventBus eventBus;
 
+    private final Environment environment;
+
     @Override
     public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
         Class<?> type = ClassUtils.getUserClass(bean);
@@ -35,24 +45,35 @@ public class SpringMessageBroker implements BeanPostProcessor {
             if (!StringUtils.hasText(id)) {
                 id = type.getSimpleName().concat(".").concat(method.getName());
             }
-            Subscription subscription = Subscription.of(
-                "spring:" + id,
-                subscribes.getStringArray("value"),
-                (Subscription.Feature[]) subscribes.get("features"));
+
+            Subscription subscription = Subscription
+                .builder()
+                .subscriberId("spring:" + id)
+                .topics(Arrays.stream(subscribes.getStringArray("value"))
+                              .map(this::convertTopic)
+                              .flatMap(topic -> TopicUtils.expand(topic).stream())
+                              .collect(Collectors.toList())
+                )
+                .features((Subscription.Feature[]) subscribes.get("features"))
+                .build();
 
             ProxyMessageListener listener = new ProxyMessageListener(bean, method);
 
+            Consumer<Signal<Void>> logError = ReactiveLogger
+                .onError(error -> log.error("handle[{}] event message error", listener, error));
+
             eventBus
                 .subscribe(subscription)
-                .doOnNext(msg ->
-                    listener
-                        .onMessage(msg)
-                        .doOnEach(ReactiveLogger.onError(error -> {
-                            log.error(error.getMessage(), error);
-                        }))
-                        .subscribe()
-                )
-                .onErrorContinue((err, v) -> log.error(err.getMessage(), err))
+                .doOnNext(msg -> {
+                    try {
+                        listener
+                            .onMessage(msg)
+                            .doOnEach(logError)
+                            .subscribe();
+                    } catch (Throwable e) {
+                        log.error("handle[{}] event message error", listener, e);
+                    }
+                })
                 .subscribe();
 
         });
@@ -60,4 +81,18 @@ public class SpringMessageBroker implements BeanPostProcessor {
         return bean;
     }
 
+    protected String convertTopic(String topic) {
+        if (!topic.contains("${")) {
+            return topic;
+        }
+        return TemplateParser.parse(topic, template -> {
+            String[] arr = template.split(":", 2);
+            String property = environment.getProperty(arr[0], arr.length > 1 ? arr[1] : "");
+            if (StringUtils.isEmpty(property)) {
+                throw new IllegalArgumentException("Parse topic [" + template + "] error, can not get property : " + arr[0]);
+            }
+            return property;
+        });
+    }
+
 }