|
@@ -82,19 +82,37 @@ public class RedisReactiveCache<E> implements ReactiveCache<E> {
|
|
|
|
|
|
@Override
|
|
|
public Mono<Void> put(Object key, Publisher<E> data) {
|
|
|
- return Flux
|
|
|
- .from(data)
|
|
|
- .collectList()
|
|
|
- .flatMap(r -> {
|
|
|
- return operations.opsForHash()
|
|
|
- .put(redisKey, key, r)
|
|
|
- .then(localCache.put(key, data))
|
|
|
- .then(operations.convertAndSend(topicName, key));
|
|
|
- })
|
|
|
- .then()
|
|
|
- .onErrorResume(err -> this.handleError(err));
|
|
|
+ if (data instanceof Mono) {
|
|
|
+ return ((Mono<?>) data)
|
|
|
+ .flatMap(r -> {
|
|
|
+ return operations.opsForHash()
|
|
|
+ .put(redisKey, key, r)
|
|
|
+ .then(localCache.put(key, data))
|
|
|
+ .then(operations.convertAndSend(topicName, key));
|
|
|
+
|
|
|
+ })
|
|
|
+ .then()
|
|
|
+ .onErrorResume(err -> this.handleError(err))
|
|
|
+ ;
|
|
|
+ }
|
|
|
+ if (data instanceof Flux) {
|
|
|
+ return ((Flux<?>) data)
|
|
|
+ .collectList()
|
|
|
+ .flatMap(r -> {
|
|
|
+ return operations.opsForHash()
|
|
|
+ .put(redisKey, key, r)
|
|
|
+ .then(localCache.put(key, data))
|
|
|
+ .then(operations.convertAndSend(topicName, key));
|
|
|
+
|
|
|
+ })
|
|
|
+ .then()
|
|
|
+ .onErrorResume(err -> this.handleError(err))
|
|
|
+ ;
|
|
|
+ }
|
|
|
+ return Mono.error(new UnsupportedOperationException("unsupport publisher:" + data));
|
|
|
}
|
|
|
|
|
|
+
|
|
|
@Override
|
|
|
public Mono<Void> evictAll(Iterable<?> key) {
|
|
|
return operations.opsForHash()
|