Browse Source

修复访问日志导致无法监听流onCancel事件

zhouhao 3 years ago
parent
commit
16da624c1a

+ 30 - 0
hsweb-core/src/main/java/org/hswebframework/web/utils/FluxCache.java

@@ -0,0 +1,30 @@
+package org.hswebframework.web.utils;
+
+import org.reactivestreams.Publisher;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.function.Function;
+
+public class FluxCache {
+
+
+    public static <T> Flux<T> cache(Flux<T> source, Function<Flux<T>, Publisher<?>> handler) {
+        Disposable[] ref = new Disposable[1];
+        Flux<T> cache = source
+                .doFinally((s) -> ref[0] = null)
+                .replay()
+                .autoConnect(1, dis -> ref[0] = dis);
+        return Mono
+                .from(handler.apply(cache))
+                .thenMany(cache)
+                .doFinally((s) -> {
+                    if (ref[0] != null) {
+                        ref[0].dispose();
+                    }
+                });
+
+    }
+
+}

+ 37 - 33
hsweb-logging/hsweb-access-logging-aop/src/main/java/org/hswebframework/web/logging/aop/ReactiveAopAccessLoggerSupport.java

@@ -1,5 +1,6 @@
 package org.hswebframework.web.logging.aop;
 
+import lombok.SneakyThrows;
 import org.aopalliance.intercept.MethodInterceptor;
 import org.hswebframework.web.aop.MethodInterceptorHolder;
 import org.hswebframework.web.authorization.Authentication;
@@ -11,6 +12,7 @@ import org.hswebframework.web.logging.AccessLoggerListener;
 import org.hswebframework.web.logging.LoggerDefine;
 import org.hswebframework.web.logging.events.AccessLoggerAfterEvent;
 import org.hswebframework.web.logging.events.AccessLoggerBeforeEvent;
+import org.hswebframework.web.utils.FluxCache;
 import org.hswebframework.web.utils.ReactiveWebUtils;
 import org.springframework.aop.support.StaticMethodMatcherPointcutAdvisor;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -22,13 +24,16 @@ import org.springframework.util.ConcurrentReferenceHashMap;
 import org.springframework.web.server.ServerWebExchange;
 import org.springframework.web.server.WebFilter;
 import org.springframework.web.server.WebFilterChain;
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.util.context.Context;
 
 import java.lang.reflect.Method;
+import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * 使用AOP记录访问日志,并触发{@link AccessLoggerListener#onLogger(AccessLoggerInfo)}
@@ -76,41 +81,40 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA
     }
 
     protected Flux<?> wrapFluxResponse(Flux<?> flux, AccessLoggerInfo loggerInfo) {
+        return Flux.deferWithContext(ctx -> this
+                           .currentRequestInfo()
+                           .doOnNext(loggerInfo::putAccessInfo)
+                           .then(beforeRequest(loggerInfo))
+                           .thenMany(flux)
+                           .doOnError(loggerInfo::setException)
+                           .doFinally(signal -> completeRequest(loggerInfo, ctx)))
+                   .subscriberContext(ReactiveLogger.start("accessLogId", loggerInfo.getId()));
+    }
 
-        Flux<?> cache = flux.share();
-        return this
-                .currentRequestInfo()
-                .doOnNext(loggerInfo::putAccessInfo)
-                .then(Mono.defer(() -> {
-                    AccessLoggerBeforeEvent event = new AccessLoggerBeforeEvent(loggerInfo);
-                    event.first(cache);
-                    return Authentication
-                            .currentReactive()
-                            .flatMap(auth -> {
-                                loggerInfo.putContext("userId", auth.getUser().getId());
-                                loggerInfo.putContext("username", auth.getUser().getUsername());
-                                loggerInfo.putContext("userName", auth.getUser().getName());
-                                return ReactiveLogger
-                                        .mdc("userId", auth.getUser().getId(),
-                                             "username", auth.getUser().getUsername(),
-                                             "userName", auth.getUser().getName())
-                                        .thenReturn(auth);
-                            })
-                            .then(event.publish(eventPublisher));
-                }))
-                .then(Mono.defer(() -> {
-                    loggerInfo.setResponseTime(System.currentTimeMillis());
-                    return new AccessLoggerAfterEvent(loggerInfo).publish(eventPublisher);
-                }))
-                .thenMany(cache)
-                .onErrorResume(err -> {
-                    loggerInfo.setException(err);
-                    loggerInfo.setResponseTime(System.currentTimeMillis());
-                    return new AccessLoggerAfterEvent(loggerInfo)
-                            .publish(eventPublisher)
-                            .then(Mono.error(err));
+    private Mono<Void> beforeRequest(AccessLoggerInfo loggerInfo) {
+
+        AccessLoggerBeforeEvent event = new AccessLoggerBeforeEvent(loggerInfo);
+        return Authentication
+                .currentReactive()
+                .flatMap(auth -> {
+                    loggerInfo.putContext("userId", auth.getUser().getId());
+                    loggerInfo.putContext("username", auth.getUser().getUsername());
+                    loggerInfo.putContext("userName", auth.getUser().getName());
+                    return ReactiveLogger
+                            .mdc("userId", auth.getUser().getId(),
+                                 "username", auth.getUser().getUsername(),
+                                 "userName", auth.getUser().getName())
+                            .thenReturn(auth);
                 })
-                .subscriberContext(ReactiveLogger.start("accessLogId", loggerInfo.getId()));
+                .then(event.publish(eventPublisher));
+    }
+
+    private void completeRequest(AccessLoggerInfo loggerInfo, Context ctx) {
+        loggerInfo.setResponseTime(System.currentTimeMillis());
+        new AccessLoggerAfterEvent(loggerInfo)
+                .publish(eventPublisher)
+                .subscriberContext(ctx)
+                .subscribe();
     }
 
     protected Mono<?> wrapMonoResponse(Mono<?> mono, AccessLoggerInfo loggerInfo) {