Jelajahi Sumber

优化消息订阅

zhou-hao 4 tahun lalu
induk
melakukan
097f199db4

+ 9 - 1
jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageBroker.java

@@ -2,6 +2,7 @@ package org.jetlinks.community.gateway.spring;
 
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.logger.ReactiveLogger;
 import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.core.event.EventBus;
 import org.jetlinks.core.event.Subscription;
@@ -43,7 +44,14 @@ public class SpringMessageBroker implements BeanPostProcessor {
 
             eventBus
                 .subscribe(subscription)
-                .flatMap(listener::onMessage)
+                .doOnNext(msg ->
+                    listener
+                        .onMessage(msg)
+                        .doOnEach(ReactiveLogger.onError(error -> {
+                            log.error(error.getMessage(), error);
+                        }))
+                        .subscribe()
+                )
                 .onErrorContinue((err, v) -> log.error(err.getMessage(), err))
                 .subscribe();