Browse Source

优化缓存错误处理,redis出错时忽略缓存

zhou-hao 5 years ago
parent
commit
07e0fc41dc

+ 45 - 19
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/RedisReactiveCache.java

@@ -1,5 +1,6 @@
 package org.hswebframework.web.cache.supports;
 
+import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.cache.ReactiveCache;
 import org.reactivestreams.Publisher;
 import org.springframework.data.redis.connection.ReactiveSubscription;
@@ -14,13 +15,14 @@ import java.util.function.Function;
 import java.util.stream.StreamSupport;
 
 @SuppressWarnings("all")
+@Slf4j
 public class RedisReactiveCache<E> implements ReactiveCache<E> {
 
     private ReactiveRedisOperations<Object, Object> operations;
 
     private String redisKey;
 
-    private ReactiveCache localCache;
+    private ReactiveCache<E> localCache;
 
     private String topicName;
 
@@ -45,16 +47,26 @@ public class RedisReactiveCache<E> implements ReactiveCache<E> {
     public Flux<E> getFlux(Object key) {
         return localCache
                 .getFlux(key)
-                .switchIfEmpty(operations
-                        .opsForHash()
-                        .get(redisKey, key)
-                        .flatMapIterable(r -> {
-                            if (r instanceof Iterable) {
-                                return ((Iterable) r);
-                            }
-                            return Collections.singletonList(r);
-                        })
-                        .map(Function.identity()));
+                .switchIfEmpty(Flux.<E>defer(() -> {
+                    return operations
+                            .opsForHash()
+                            .get(redisKey, key)
+                            .flatMapIterable(r -> {
+                                if (r instanceof Iterable) {
+                                    return ((Iterable) r);
+                                }
+                                return Collections.singletonList(r);
+                            })
+                            .map(r -> (E) r);
+                }))
+                .onErrorResume(err -> this.<E>handleError((Throwable) err));
+
+    }
+
+    protected <T> Mono<T> handleError(Throwable error) {
+        return Mono.fromRunnable(() -> {
+            log.error(error.getMessage(), error);
+        });
     }
 
     @Override
@@ -62,24 +74,29 @@ public class RedisReactiveCache<E> implements ReactiveCache<E> {
         return localCache.getMono(key)
                 .switchIfEmpty(operations.opsForHash()
                         .get(redisKey, key)
+                        .map(v -> (E) v)
                         .flatMap(r -> localCache.put(key, Mono.just(r))
-                                .thenReturn(r)));
+                                .thenReturn(r)))
+                .onErrorResume(err -> this.handleError(err));
     }
 
     @Override
     public Mono<Void> put(Object key, Publisher<E> data) {
         if (data instanceof Mono) {
-            return ((Mono) data)
+            return ((Mono<?>) data)
                     .flatMap(r -> {
                         return operations.opsForHash()
                                 .put(redisKey, key, r)
                                 .then(localCache.put(key, data))
                                 .then(operations.convertAndSend(topicName, key));
 
-                    }).then();
+                    })
+                    .then()
+                    .onErrorResume(err -> this.handleError(err))
+                    ;
         }
         if (data instanceof Flux) {
-            return ((Flux) data)
+            return ((Flux<?>) data)
                     .collectList()
                     .flatMap(r -> {
                         return operations.opsForHash()
@@ -87,7 +104,10 @@ public class RedisReactiveCache<E> implements ReactiveCache<E> {
                                 .then(localCache.put(key, data))
                                 .then(operations.convertAndSend(topicName, key));
 
-                    }).then();
+                    })
+                    .then()
+                    .onErrorResume(err -> this.handleError(err))
+                    ;
         }
         return Mono.error(new UnsupportedOperationException("unsupport publisher:" + data));
     }
@@ -97,8 +117,11 @@ public class RedisReactiveCache<E> implements ReactiveCache<E> {
         return operations.opsForHash()
                 .remove(redisKey, StreamSupport.stream(key.spliterator(), false).toArray())
                 .then(localCache.evictAll(key))
-                .flatMap(nil -> Flux.fromIterable(key).flatMap(k -> operations.convertAndSend(topicName, key)))
-                .then();
+                .flatMap(nil -> Flux.fromIterable(key)
+                        .flatMap(k -> operations.convertAndSend(topicName, key))
+                        .then()
+                )
+                .onErrorResume(err -> this.handleError(err));
     }
 
     @Override
@@ -112,7 +135,8 @@ public class RedisReactiveCache<E> implements ReactiveCache<E> {
         return operations.opsForHash()
                 .multiGet(redisKey, Arrays.asList(keys))
                 .flatMapIterable(Function.identity())
-                .map(r -> (E) r);
+                .map(r -> (E) r)
+                .onErrorResume(err -> this.handleError(err));
     }
 
 
@@ -123,6 +147,7 @@ public class RedisReactiveCache<E> implements ReactiveCache<E> {
                 .remove(redisKey, key)
                 .then(localCache.evict(key))
                 .then(operations.convertAndSend(topicName, key))
+                .onErrorResume(err -> this.handleError(err))
                 .then();
     }
 
@@ -133,6 +158,7 @@ public class RedisReactiveCache<E> implements ReactiveCache<E> {
                 .delete(redisKey)
                 .then(localCache.clear())
                 .then(operations.convertAndSend(topicName, "___all"))
+                .onErrorResume(err -> this.handleError(err))
                 .then();
     }
 }