Browse Source

优化异步事件

zhouhao 3 years ago
parent
commit
0da2441a69

+ 6 - 0
hsweb-core/src/main/java/org/hswebframework/web/event/AsyncEvent.java

@@ -4,6 +4,8 @@ import org.reactivestreams.Publisher;
 import org.springframework.context.ApplicationEventPublisher;
 import reactor.core.publisher.Mono;
 
+import java.util.function.Function;
+
 /**
  * 异步事件,使用响应式编程进行事件监听时,请使用此事件接口
  *
@@ -27,6 +29,10 @@ public interface AsyncEvent {
      */
     void first(Publisher<?> publisher);
 
+    void first(Function<Mono<?>,Publisher<?>> mapper);
+
+    void async(Function<Mono<?>,Publisher<?>> mapper);
+
     /**
      * 推送事件到 ApplicationEventPublisher
      *

+ 15 - 4
hsweb-core/src/main/java/org/hswebframework/web/event/DefaultAsyncEvent.java

@@ -1,14 +1,15 @@
 package org.hswebframework.web.event;
 
-import lombok.Getter;
 import org.reactivestreams.Publisher;
 import org.springframework.context.ApplicationEventPublisher;
 import reactor.core.publisher.Mono;
 
+import java.util.function.Function;
+
 public class DefaultAsyncEvent implements AsyncEvent {
 
-    private Mono<Void> async = Mono.empty();
-    private Mono<Void> first = Mono.empty();
+    private Mono<?> async = Mono.empty();
+    private Mono<?> first = Mono.empty();
 
     private boolean hasListener;
 
@@ -23,9 +24,19 @@ public class DefaultAsyncEvent implements AsyncEvent {
         this.first = Mono.from(publisher).then(first);
     }
 
+    @Override
+    public void first(Function<Mono<?>, Publisher<?>> mapper) {
+        this.first = Mono.from(mapper.apply(this.first));
+    }
+
+    @Override
+    public void async(Function<Mono<?>, Publisher<?>> mapper) {
+        this.async = Mono.from(mapper.apply(this.async));
+    }
+
     @Override
     public Mono<Void> getAsync() {
-        return this.first.then(this.async);
+        return this.first.then(this.async).then();
     }
 
     @Override

+ 7 - 7
hsweb-logging/hsweb-access-logging-aop/src/main/java/org/hswebframework/web/logging/aop/ReactiveAopAccessLoggerSupport.java

@@ -76,15 +76,15 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA
 
     protected Flux<?> wrapFluxResponse(Flux<?> flux, AccessLoggerInfo loggerInfo) {
 
-        Flux<?> cache = this
+        Flux<?> cache = flux.cache();
+        return this
                 .currentRequestInfo()
                 .doOnNext(loggerInfo::putAccessInfo)
-                .then(Mono.defer(() -> new AccessLoggerBeforeEvent(loggerInfo).publish(eventPublisher)))
-                .thenMany(flux)
-                .cache();
-
-        return cache
-                .flatMap(ignore -> Mono.empty())
+                .then(Mono.defer(() -> {
+                    AccessLoggerBeforeEvent event = new AccessLoggerBeforeEvent(loggerInfo);
+                    event.first(cache);
+                    return event.publish(eventPublisher);
+                }))
                 .then(Mono.defer(() -> {
                     loggerInfo.setResponseTime(System.currentTimeMillis());
                     return new AccessLoggerAfterEvent(loggerInfo).publish(eventPublisher);