|
@@ -1,15 +1,17 @@
|
|
|
package org.hswebframework.web.i18n;
|
|
|
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
import org.hswebframework.web.exception.I18nSupportException;
|
|
|
import org.reactivestreams.Publisher;
|
|
|
+import org.reactivestreams.Subscription;
|
|
|
import org.springframework.context.MessageSource;
|
|
|
-import reactor.core.publisher.Flux;
|
|
|
-import reactor.core.publisher.Mono;
|
|
|
-import reactor.core.publisher.Signal;
|
|
|
-import reactor.core.publisher.SignalType;
|
|
|
+import reactor.core.CoreSubscriber;
|
|
|
+import reactor.core.publisher.*;
|
|
|
import reactor.util.context.Context;
|
|
|
|
|
|
+import javax.annotation.Nonnull;
|
|
|
import java.util.Locale;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
import java.util.function.BiConsumer;
|
|
|
import java.util.function.BiFunction;
|
|
|
import java.util.function.Consumer;
|
|
@@ -23,7 +25,6 @@ import java.util.function.Function;
|
|
|
* <li>{@link LocaleUtils#current()} </li>
|
|
|
* <li>{@link LocaleUtils#currentReactive()}</li>
|
|
|
* <li>{@link LocaleUtils#resolveMessageReactive(String, Object...)}</li>
|
|
|
- * <li>{@link LocaleUtils#doOnNext(BiConsumer)}</li>
|
|
|
* </ul>
|
|
|
*
|
|
|
* @author zhouhao
|
|
@@ -63,11 +64,12 @@ public final class LocaleUtils {
|
|
|
* @return 返回值
|
|
|
*/
|
|
|
public static <T, R> R doWith(T data, Locale locale, BiFunction<T, Locale, R> mapper) {
|
|
|
+ Locale old = CONTEXT_THREAD_LOCAL.get();
|
|
|
try {
|
|
|
CONTEXT_THREAD_LOCAL.set(locale);
|
|
|
return mapper.apply(data, locale);
|
|
|
} finally {
|
|
|
- CONTEXT_THREAD_LOCAL.remove();
|
|
|
+ CONTEXT_THREAD_LOCAL.set(old);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -78,11 +80,12 @@ public final class LocaleUtils {
|
|
|
* @param consumer 任务
|
|
|
*/
|
|
|
public static void doWith(Locale locale, Consumer<Locale> consumer) {
|
|
|
+ Locale old = CONTEXT_THREAD_LOCAL.get();
|
|
|
try {
|
|
|
CONTEXT_THREAD_LOCAL.set(locale);
|
|
|
consumer.accept(locale);
|
|
|
} finally {
|
|
|
- CONTEXT_THREAD_LOCAL.remove();
|
|
|
+ CONTEXT_THREAD_LOCAL.set(old);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -112,6 +115,23 @@ public final class LocaleUtils {
|
|
|
.subscriberContext()
|
|
|
.map(ctx -> ctx.getOrDefault(Locale.class, DEFAULT_LOCALE));
|
|
|
}
|
|
|
+ public static <T> Mono<T> doInReactive(Callable<T> call) {
|
|
|
+ return currentReactive()
|
|
|
+ .handle((locale, sink) -> {
|
|
|
+ Locale old = CONTEXT_THREAD_LOCAL.get();
|
|
|
+ try {
|
|
|
+ CONTEXT_THREAD_LOCAL.set(locale);
|
|
|
+ T data = call.call();
|
|
|
+ if (data != null) {
|
|
|
+ sink.next(data);
|
|
|
+ }
|
|
|
+ } catch (Throwable e) {
|
|
|
+ sink.error(e);
|
|
|
+ } finally {
|
|
|
+ CONTEXT_THREAD_LOCAL.set(old);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* 响应式方式解析出异常的区域消息,并进行结果转换.
|
|
@@ -450,4 +470,86 @@ public final class LocaleUtils {
|
|
|
return doOn(SignalType.ON_ERROR, (s, l) -> operation.accept(s.getThrowable(), l));
|
|
|
}
|
|
|
|
|
|
+ public static <T> Flux<T> transform(Flux<T> flux) {
|
|
|
+ return new LocaleFlux<>(flux);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static <T> Mono<T> transform(Mono<T> mono) {
|
|
|
+ return new LocaleMono<>(mono);
|
|
|
+ }
|
|
|
+
|
|
|
+ @AllArgsConstructor
|
|
|
+ static class LocaleMono<T> extends Mono<T> {
|
|
|
+ private final Mono<T> source;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void subscribe(@Nonnull CoreSubscriber<? super T> actual) {
|
|
|
+ doWith(actual,
|
|
|
+ actual.currentContext().getOrDefault(Locale.class, DEFAULT_LOCALE),
|
|
|
+ (a, l) -> {
|
|
|
+ source.subscribe(
|
|
|
+ new LocaleSwitchSubscriber<>(a)
|
|
|
+ );
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @AllArgsConstructor
|
|
|
+ static class LocaleFlux<T> extends Flux<T> {
|
|
|
+ private final Flux<T> source;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void subscribe(@Nonnull CoreSubscriber<? super T> actual) {
|
|
|
+ source.subscribe(
|
|
|
+ new LocaleSwitchSubscriber<>(actual)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @AllArgsConstructor
|
|
|
+ static class LocaleSwitchSubscriber<T> extends BaseSubscriber<T> {
|
|
|
+ private final CoreSubscriber<T> actual;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @Nonnull
|
|
|
+ public Context currentContext() {
|
|
|
+ return actual
|
|
|
+ .currentContext();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void hookOnSubscribe(@Nonnull Subscription subscription) {
|
|
|
+ actual.onSubscribe(this);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Locale current() {
|
|
|
+ return currentContext()
|
|
|
+ .getOrDefault(Locale.class, DEFAULT_LOCALE);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void hookOnComplete() {
|
|
|
+ doWith(current(), (l) -> actual.onComplete());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void hookOnError(@Nonnull Throwable error) {
|
|
|
+
|
|
|
+ doWith(error, current(), (v, l) -> {
|
|
|
+ actual.onError(v);
|
|
|
+ return null;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void hookOnNext(@Nonnull T value) {
|
|
|
+
|
|
|
+ doWith(value, current(), (v, l) -> {
|
|
|
+ actual.onNext(v);
|
|
|
+ return null;
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|