|
@@ -16,6 +16,7 @@ import org.springframework.context.ApplicationEventPublisher;
|
|
|
import org.springframework.core.Ordered;
|
|
|
import org.springframework.http.server.reactive.ServerHttpRequest;
|
|
|
import org.springframework.util.ClassUtils;
|
|
|
+import org.springframework.util.ConcurrentReferenceHashMap;
|
|
|
import org.springframework.web.server.ServerWebExchange;
|
|
|
import org.springframework.web.server.WebFilter;
|
|
|
import org.springframework.web.server.WebFilterChain;
|
|
@@ -41,6 +42,11 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA
|
|
|
@Autowired
|
|
|
private ApplicationEventPublisher eventPublisher;
|
|
|
|
|
|
+ private final Map<Method, LoggerDefine> defineCache = new ConcurrentReferenceHashMap<>();
|
|
|
+
|
|
|
+ private static final LoggerDefine UNSUPPORTED = new LoggerDefine();
|
|
|
+
|
|
|
+ @SuppressWarnings("all")
|
|
|
public ReactiveAopAccessLoggerSupport() {
|
|
|
setAdvice((MethodInterceptor) methodInvocation -> {
|
|
|
MethodInterceptorHolder methodInterceptorHolder = MethodInterceptorHolder.create(methodInvocation);
|
|
@@ -55,23 +61,34 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ private Mono<RequestInfo> currentRequestInfo() {
|
|
|
+ return Mono
|
|
|
+ .subscriberContext()
|
|
|
+ .handle((context, sink) -> {
|
|
|
+ if (context.hasKey(RequestInfo.class)) {
|
|
|
+ RequestInfo info = context.get(RequestInfo.class);
|
|
|
+ ReactiveLogger.log(context, info::setContext);
|
|
|
+ sink.next(info);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
protected Flux<?> wrapFluxResponse(Flux<?> flux, AccessLoggerInfo loggerInfo) {
|
|
|
- return Mono.subscriberContext()
|
|
|
- .<RequestInfo>flatMap(ctx -> Mono.<RequestInfo>justOrEmpty(ctx.getOrEmpty(RequestInfo.class))
|
|
|
- .doOnNext(info -> ReactiveLogger.log(ctx, info::setContext)))
|
|
|
+ return this
|
|
|
+ .currentRequestInfo()
|
|
|
.doOnNext(loggerInfo::putAccessInfo)
|
|
|
.thenMany(flux)
|
|
|
.doOnError(loggerInfo::setException)
|
|
|
.doFinally(f -> {
|
|
|
loggerInfo.setResponseTime(System.currentTimeMillis());
|
|
|
eventPublisher.publishEvent(new AccessLoggerAfterEvent(loggerInfo));
|
|
|
- }).subscriberContext(ReactiveLogger.start("accessLogId", loggerInfo.getId()));
|
|
|
+ })
|
|
|
+ .subscriberContext(ReactiveLogger.start("accessLogId", loggerInfo.getId()));
|
|
|
}
|
|
|
|
|
|
protected Mono<?> wrapMonoResponse(Mono<?> mono, AccessLoggerInfo loggerInfo) {
|
|
|
- return Mono.subscriberContext()
|
|
|
- .<RequestInfo>flatMap(ctx -> Mono.<RequestInfo>justOrEmpty(ctx.getOrEmpty(RequestInfo.class))
|
|
|
- .doOnNext(info -> ReactiveLogger.log(ctx, info::setContext)))
|
|
|
+ return this
|
|
|
+ .currentRequestInfo()
|
|
|
.doOnNext(loggerInfo::putAccessInfo)
|
|
|
.then(mono)
|
|
|
.doOnError(loggerInfo::setException)
|
|
@@ -79,20 +96,26 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA
|
|
|
.doFinally(f -> {
|
|
|
loggerInfo.setResponseTime(System.currentTimeMillis());
|
|
|
eventPublisher.publishEvent(new AccessLoggerAfterEvent(loggerInfo));
|
|
|
- }).subscriberContext(ReactiveLogger.start("accessLogId", loggerInfo.getId()));
|
|
|
+ })
|
|
|
+ .subscriberContext(ReactiveLogger.start("accessLogId", loggerInfo.getId()));
|
|
|
+ }
|
|
|
+
|
|
|
+ private LoggerDefine createDefine(MethodInterceptorHolder holder) {
|
|
|
+ return loggerParsers
|
|
|
+ .stream()
|
|
|
+ .filter(parser -> parser.support(ClassUtils.getUserClass(holder.getTarget()), holder.getMethod()))
|
|
|
+ .findAny()
|
|
|
+ .map(parser -> parser.parse(holder))
|
|
|
+ .orElse(UNSUPPORTED);
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("all")
|
|
|
protected AccessLoggerInfo createLogger(MethodInterceptorHolder holder) {
|
|
|
AccessLoggerInfo info = new AccessLoggerInfo();
|
|
|
info.setId(IDGenerator.MD5.generate());
|
|
|
-
|
|
|
info.setRequestTime(System.currentTimeMillis());
|
|
|
- LoggerDefine define = loggerParsers.stream()
|
|
|
- .filter(parser -> parser.support(ClassUtils.getUserClass(holder.getTarget()), holder.getMethod()))
|
|
|
- .findAny()
|
|
|
- .map(parser -> parser.parse(holder))
|
|
|
- .orElse(null);
|
|
|
+
|
|
|
+ LoggerDefine define = defineCache.computeIfAbsent(holder.getMethod(), method -> createDefine(holder));
|
|
|
|
|
|
if (define != null) {
|
|
|
info.setAction(define.getAction());
|
|
@@ -113,14 +136,14 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA
|
|
|
continue;
|
|
|
}
|
|
|
if (val instanceof Mono) {
|
|
|
- args[i] = ((Mono) val)
|
|
|
+ args[i] = ((Mono<?>) val)
|
|
|
.doOnNext(param -> {
|
|
|
value.put(name, param);
|
|
|
});
|
|
|
} else if (val instanceof Flux) {
|
|
|
List<Object> arr = new ArrayList<>();
|
|
|
value.put(name, arr);
|
|
|
- args[i] = ((Flux) val)
|
|
|
+ args[i] = ((Flux<?>) val)
|
|
|
.doOnNext(param -> {
|
|
|
arr.add(param);
|
|
|
});
|
|
@@ -143,7 +166,9 @@ public class ReactiveAopAccessLoggerSupport extends StaticMethodMatcherPointcutA
|
|
|
|
|
|
@Override
|
|
|
public boolean matches(Method method, Class<?> aClass) {
|
|
|
- return loggerParsers.stream().anyMatch(parser -> parser.support(aClass, method));
|
|
|
+ return loggerParsers
|
|
|
+ .stream()
|
|
|
+ .anyMatch(parser -> parser.support(aClass, method));
|
|
|
}
|
|
|
|
|
|
@Override
|