浏览代码

日志事件增加响应式支持

zhouhao 3 年之前
父节点
当前提交
33f83a24de

+ 21 - 16
hsweb-logging/hsweb-access-logging-aop/src/main/java/org/hswebframework/web/logging/aop/ReactiveAopAccessLoggerSupport.java

@@ -9,6 +9,7 @@ import org.hswebframework.web.logging.AccessLoggerInfo;
 import org.hswebframework.web.logging.AccessLoggerListener;
 import org.hswebframework.web.logging.LoggerDefine;
 import org.hswebframework.web.logging.events.AccessLoggerAfterEvent;
+import org.hswebframework.web.logging.events.AccessLoggerBeforeEvent;
 import org.hswebframework.web.utils.ReactiveWebUtils;
 import org.springframework.aop.support.StaticMethodMatcherPointcutAdvisor;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -67,37 +68,41 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA
                 .handle((context, sink) -> {
                     if (context.hasKey(RequestInfo.class)) {
                         RequestInfo info = context.get(RequestInfo.class);
-                        ReactiveLogger.log(context, info::setContext);
+                        ReactiveLogger.log(context, ctx -> info.setContext(new HashMap<>(ctx)));
                         sink.next(info);
                     }
                 });
     }
 
     protected Flux<?> wrapFluxResponse(Flux<?> flux, AccessLoggerInfo loggerInfo) {
-        return this
+
+        Flux<?> cache = this
                 .currentRequestInfo()
                 .doOnNext(loggerInfo::putAccessInfo)
+                .then(Mono.defer(() -> new AccessLoggerBeforeEvent(loggerInfo).publish(eventPublisher)))
                 .thenMany(flux)
-                .doOnError(loggerInfo::setException)
-                .doFinally(f -> {
+                .cache();
+
+        return cache
+                .flatMap(ignore -> Mono.empty())
+                .then(Mono.defer(() -> {
                     loggerInfo.setResponseTime(System.currentTimeMillis());
-                    eventPublisher.publishEvent(new AccessLoggerAfterEvent(loggerInfo));
+                    return new AccessLoggerAfterEvent(loggerInfo).publish(eventPublisher);
+                }))
+                .thenMany(cache)
+                .onErrorResume(err -> {
+                    loggerInfo.setException(err);
+                    loggerInfo.setResponseTime(System.currentTimeMillis());
+                    return new AccessLoggerAfterEvent(loggerInfo)
+                            .publish(eventPublisher)
+                            .then(Mono.error(err));
                 })
                 .subscriberContext(ReactiveLogger.start("accessLogId", loggerInfo.getId()));
     }
 
     protected Mono<?> wrapMonoResponse(Mono<?> mono, AccessLoggerInfo loggerInfo) {
-        return this
-                .currentRequestInfo()
-                .doOnNext(loggerInfo::putAccessInfo)
-                .then(mono)
-                .doOnError(loggerInfo::setException)
-                .doOnSuccess(loggerInfo::setResponse)
-                .doFinally(f -> {
-                    loggerInfo.setResponseTime(System.currentTimeMillis());
-                    eventPublisher.publishEvent(new AccessLoggerAfterEvent(loggerInfo));
-                })
-                .subscriberContext(ReactiveLogger.start("accessLogId", loggerInfo.getId()));
+        return wrapFluxResponse(mono.flux(), loggerInfo)
+                .singleOrEmpty();
     }
 
     private LoggerDefine createDefine(MethodInterceptorHolder holder) {

+ 8 - 0
hsweb-logging/hsweb-access-logging-api/pom.xml

@@ -13,5 +13,13 @@
     <artifactId>hsweb-access-logging-api</artifactId>
 
     <description>访问日志API模块</description>
+    <dependencies>
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-core</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
 
 </project>

+ 2 - 1
hsweb-logging/hsweb-access-logging-api/src/main/java/org/hswebframework/web/logging/events/AccessLoggerAfterEvent.java

@@ -2,11 +2,12 @@ package org.hswebframework.web.logging.events;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import org.hswebframework.web.event.DefaultAsyncEvent;
 import org.hswebframework.web.logging.AccessLoggerInfo;
 
 @AllArgsConstructor
 @Getter
-public class AccessLoggerAfterEvent {
+public class AccessLoggerAfterEvent extends DefaultAsyncEvent {
 
     private AccessLoggerInfo logger;
 }

+ 2 - 1
hsweb-logging/hsweb-access-logging-api/src/main/java/org/hswebframework/web/logging/events/AccessLoggerBeforeEvent.java

@@ -2,11 +2,12 @@ package org.hswebframework.web.logging.events;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import org.hswebframework.web.event.DefaultAsyncEvent;
 import org.hswebframework.web.logging.AccessLoggerInfo;
 
 @AllArgsConstructor
 @Getter
-public class AccessLoggerBeforeEvent {
+public class AccessLoggerBeforeEvent extends DefaultAsyncEvent {
 
     private AccessLoggerInfo logger;
 }