Browse Source

增加ReactiveCacheManager

zhou-hao 5 years ago
parent
commit
a6c1db727d
26 changed files with 979 additions and 20 deletions
  1. 80 0
      hsweb-concurrent/hsweb-concurrent-cache/pom.xml
  2. 39 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/ReactiveCache.java
  3. 6 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/ReactiveCacheManager.java
  4. 11 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/ReactiveCacheResolver.java
  5. 24 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/configuration/ReactiveCacheManagerConfiguration.java
  6. 169 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/configuration/ReactiveCacheProperties.java
  7. 19 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/AbstractReactiveCacheManager.java
  8. 67 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/CaffeineReactiveCache.java
  9. 20 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/CaffeineReactiveCacheManager.java
  10. 67 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/GuavaReactiveCache.java
  11. 19 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/GuavaReactiveCacheManager.java
  12. 10 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/NullValue.java
  13. 28 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/RedisLocalReactiveCacheManager.java
  14. 110 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/RedisReactiveCache.java
  15. 43 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/UnSupportedReactiveCache.java
  16. 3 0
      hsweb-concurrent/hsweb-concurrent-cache/src/main/resources/META-INF/spring.factories
  17. 68 0
      hsweb-concurrent/hsweb-concurrent-cache/src/test/java/org/hswebframework/web/cache/CaffeineReactiveCacheManagerTest.java
  18. 68 0
      hsweb-concurrent/hsweb-concurrent-cache/src/test/java/org/hswebframework/web/cache/GuavaReactiveCacheManagerTest.java
  19. 70 0
      hsweb-concurrent/hsweb-concurrent-cache/src/test/java/org/hswebframework/web/cache/RedisReactiveCacheManagerTest.java
  20. 7 0
      hsweb-concurrent/hsweb-concurrent-cache/src/test/java/org/hswebframework/web/cache/TestApplication.java
  21. 5 0
      hsweb-concurrent/hsweb-concurrent-cache/src/test/resources/application-redis.yml
  22. 19 0
      hsweb-concurrent/pom.xml
  23. 6 0
      hsweb-system/hsweb-system-authorization/hsweb-system-authorization-default/pom.xml
  24. 14 10
      hsweb-system/hsweb-system-authorization/hsweb-system-authorization-default/src/main/java/org/hswebframework/web/system/authorization/defaults/service/DefaultReactiveAuthenticationInitializeService.java
  25. 6 10
      hsweb-system/hsweb-system-authorization/hsweb-system-authorization-default/src/main/java/org/hswebframework/web/system/authorization/defaults/service/DefaultReactiveAuthenticationManager.java
  26. 1 0
      pom.xml

+ 80 - 0
hsweb-concurrent/hsweb-concurrent-cache/pom.xml

@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hsweb-concurrent</artifactId>
+        <groupId>org.hswebframework.web</groupId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hsweb-concurrent-cache</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-aspects</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-autoconfigure</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.data</groupId>
+            <artifactId>spring-data-redis</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-configuration-processor</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine -->
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.8.0</version>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>28.0-jre</version>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>io.projectreactor.addons</groupId>
+            <artifactId>reactor-extra</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

+ 39 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/ReactiveCache.java

@@ -0,0 +1,39 @@
+package org.hswebframework.web.cache;
+
+import org.reactivestreams.Publisher;
+import reactor.cache.CacheFlux;
+import reactor.cache.CacheMono;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.function.Function;
+
+public interface ReactiveCache<E> {
+
+    Flux<E> getFlux(Object key);
+
+    Mono<E> getMono(Object key);
+
+    Mono<Void> put(Object key, Publisher<E> data);
+
+    Mono<Void> evict(Object key);
+
+    Mono<Void> clear();
+
+    default CacheFlux.FluxCacheBuilderMapMiss<E> flux(Object key) {
+        return otherSupplier ->
+                Flux.defer(() ->
+                        getFlux(key)
+                                .switchIfEmpty(otherSupplier.get()
+                                        .collectList()
+                                        .flatMapMany(values -> put(key, Flux.fromIterable(values))
+                                                .thenMany(Flux.fromIterable(values)))));
+    }
+
+    default CacheMono.MonoCacheBuilderMapMiss<E> mono(Object key) {
+        return otherSupplier ->
+                Mono.defer(() -> getMono(key)
+                        .switchIfEmpty(otherSupplier.get()
+                                .flatMap(value -> put(key, Mono.just(value)).thenReturn(value))));
+    }
+}

+ 6 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/ReactiveCacheManager.java

@@ -0,0 +1,6 @@
+package org.hswebframework.web.cache;
+
+public interface ReactiveCacheManager {
+
+    <E> ReactiveCache<E> getCache(String name);
+}

+ 11 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/ReactiveCacheResolver.java

@@ -0,0 +1,11 @@
+package org.hswebframework.web.cache;
+
+import org.springframework.cache.Cache;
+import org.springframework.cache.interceptor.CacheOperationInvocationContext;
+
+import java.util.Collection;
+
+public interface ReactiveCacheResolver {
+    Collection<? extends ReactiveCache> resolveCaches(CacheOperationInvocationContext<?> context);
+
+}

+ 24 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/configuration/ReactiveCacheManagerConfiguration.java

@@ -0,0 +1,24 @@
+package org.hswebframework.web.cache.configuration;
+
+import org.hswebframework.web.cache.ReactiveCacheManager;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnMissingBean(ReactiveCacheManager.class)
+@EnableConfigurationProperties(ReactiveCacheProperties.class)
+public class ReactiveCacheManagerConfiguration {
+
+
+    @Bean
+    public ReactiveCacheManager reactiveCacheManager(ReactiveCacheProperties properties, ApplicationContext context) {
+
+        return properties.createCacheManager(context);
+
+    }
+
+
+}

+ 169 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/configuration/ReactiveCacheProperties.java

@@ -0,0 +1,169 @@
+package org.hswebframework.web.cache.configuration;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.cache.CacheBuilder;
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.web.cache.ReactiveCache;
+import org.hswebframework.web.cache.ReactiveCacheManager;
+import org.hswebframework.web.cache.supports.CaffeineReactiveCacheManager;
+import org.hswebframework.web.cache.supports.GuavaReactiveCacheManager;
+import org.hswebframework.web.cache.supports.RedisLocalReactiveCacheManager;
+import org.hswebframework.web.cache.supports.UnSupportedReactiveCache;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.core.ResolvableType;
+import org.springframework.data.redis.core.ReactiveRedisOperations;
+import org.springframework.util.ClassUtils;
+import org.springframework.util.StringUtils;
+
+import java.time.Duration;
+
+@ConfigurationProperties(prefix = "hsweb.cache")
+@Getter
+@Setter
+public class ReactiveCacheProperties {
+
+
+    private Type type = Type.none;
+
+    private GuavaProperties guava = new GuavaProperties();
+
+    private CaffeineProperties caffeine = new CaffeineProperties();
+
+    private RedisProperties redis = new RedisProperties();
+
+
+    public boolean anyProviderPresent() {
+        return ClassUtils.isPresent("com.google.common.cache.Cache", this.getClass().getClassLoader())
+                || ClassUtils.isPresent("com.github.benmanes.caffeine.cache.Cache", this.getClass().getClassLoader())
+                || ClassUtils.isPresent("org.springframework.data.redis.core.ReactiveRedisOperations", this.getClass().getClassLoader());
+    }
+
+
+    private ReactiveCacheManager createUnsupported() {
+        return new ReactiveCacheManager() {
+            @Override
+            public <E> ReactiveCache<E> getCache(String name) {
+                return UnSupportedReactiveCache.getInstance();
+            }
+        };
+    }
+
+    @SuppressWarnings("all")
+    public ReactiveCacheManager createCacheManager(ApplicationContext context) {
+        if (!anyProviderPresent()) {
+            return new ReactiveCacheManager() {
+                @Override
+                public <E> ReactiveCache<E> getCache(String name) {
+                    return UnSupportedReactiveCache.getInstance();
+                }
+            };
+        }
+
+        if (type == Type.redis) {
+            ReactiveRedisOperations<Object, Object> operations;
+            if (StringUtils.hasText(redis.getBeanName())) {
+                operations = context.getBean(redis.getBeanName(), ReactiveRedisOperations.class);
+            } else {
+                operations = (ReactiveRedisOperations) context.getBeanProvider(ResolvableType.forClassWithGenerics(ReactiveRedisOperations.class, Object.class, Object.class)).getIfAvailable();
+            }
+            return new RedisLocalReactiveCacheManager(operations, createCacheManager(type));
+        }
+
+        return createCacheManager(type);
+    }
+
+    private ReactiveCacheManager createCacheManager(Type type) {
+        switch (type) {
+            case guava:
+                return getGuava().createCacheManager();
+            case caffeine:
+                return getCaffeine().createCacheManager();
+
+        }
+        return createUnsupported();
+    }
+
+
+    @Getter
+    @Setter
+    public static class RedisProperties {
+        private String beanName;
+
+        private Type localCacheType = Type.caffeine;
+
+    }
+
+    @Getter
+    @Setter
+    public static class GuavaProperties {
+        long maximumSize = 1024;
+        int initialCapacity = 64;
+        Duration expireAfterWrite = Duration.ofHours(6);
+        Duration expireAfterAccess = Duration.ofHours(1);
+        Strength keyStrength = Strength.SOFT;
+        Strength valueStrength = Strength.SOFT;
+
+        ReactiveCacheManager createCacheManager() {
+            return new GuavaReactiveCacheManager(createBuilder());
+        }
+
+        CacheBuilder<Object, Object> createBuilder() {
+            CacheBuilder builder = CacheBuilder.newBuilder()
+                    .expireAfterAccess(expireAfterAccess)
+                    .expireAfterWrite(expireAfterWrite)
+                    .maximumSize(maximumSize);
+            if (valueStrength == Strength.SOFT) {
+                builder.softValues();
+            } else {
+                builder.weakValues();
+            }
+            if (keyStrength == Strength.WEAK) {
+                builder.weakKeys();
+            }
+            return builder;
+        }
+    }
+
+    @Getter
+    @Setter
+    public static class CaffeineProperties {
+        long maximumSize = 1024;
+        int initialCapacity = 64;
+        Duration expireAfterWrite = Duration.ofHours(6);
+        Duration expireAfterAccess = Duration.ofHours(1);
+        Strength keyStrength = Strength.SOFT;
+        Strength valueStrength = Strength.SOFT;
+
+        ReactiveCacheManager createCacheManager() {
+            return new CaffeineReactiveCacheManager(createBuilder());
+        }
+
+        Caffeine<Object, Object> createBuilder() {
+            Caffeine builder = Caffeine.newBuilder()
+                    .expireAfterAccess(expireAfterAccess)
+                    .expireAfterWrite(expireAfterWrite)
+                    .maximumSize(maximumSize);
+            if (valueStrength == Strength.SOFT) {
+                builder.softValues();
+            } else {
+                builder.weakValues();
+            }
+            if (keyStrength == Strength.WEAK) {
+                builder.weakKeys();
+            }
+            return builder;
+        }
+    }
+
+    enum Strength {WEAK, SOFT}
+
+    public enum Type {
+        redis,
+        caffeine,
+        guava,
+        none,
+    }
+
+}

+ 19 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/AbstractReactiveCacheManager.java

@@ -0,0 +1,19 @@
+package org.hswebframework.web.cache.supports;
+
+import org.hswebframework.web.cache.ReactiveCache;
+import org.hswebframework.web.cache.ReactiveCacheManager;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class AbstractReactiveCacheManager implements ReactiveCacheManager {
+    private Map<String, ReactiveCache> caches = new ConcurrentHashMap<>();
+
+    @Override
+    @SuppressWarnings("all")
+    public <E> ReactiveCache<E> getCache(String name) {
+        return caches.computeIfAbsent(name, this::createCache);
+    }
+
+    protected abstract <E> ReactiveCache<E> createCache(String name);
+}

+ 67 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/CaffeineReactiveCache.java

@@ -0,0 +1,67 @@
+package org.hswebframework.web.cache.supports;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.cache.ReactiveCache;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@SuppressWarnings("all")
+@AllArgsConstructor
+public class CaffeineReactiveCache<E> implements ReactiveCache<E> {
+
+    private Cache<Object, Object> cache;
+
+    @Override
+    public Flux<E> getFlux(Object key) {
+        return (Flux)Flux.defer(() -> {
+            Object v = cache.getIfPresent(key);
+            if (v == null) {
+                return Flux.empty();
+            }
+            if (v instanceof Iterable) {
+                return Flux.fromIterable(((Iterable) v));
+            }
+            return Flux.just(v);
+        });
+    }
+
+    @Override
+    public Mono<E> getMono(Object key) {
+        return Mono.defer(() -> {
+            Object v = cache.getIfPresent(key);
+            if (v == null) {
+                return Mono.empty();
+            }
+            return (Mono) Mono.just(v);
+        });
+    }
+
+    @Override
+    public Mono<Void> put(Object key, Publisher<E> data) {
+        return Mono.defer(() -> {
+            if (data instanceof Flux) {
+                return ((Flux<E>) data).collectList()
+                        .doOnNext(v -> cache.put(key, v))
+                        .then();
+            }
+            if (data instanceof Mono) {
+                return ((Mono<E>) data)
+                        .doOnNext(v -> cache.put(key, v))
+                        .then();
+            }
+            return Mono.error(new UnsupportedOperationException("unsupport publisher:" + data));
+        });
+    }
+
+    @Override
+    public Mono<Void> evict(Object key) {
+        return Mono.fromRunnable(() -> cache.invalidate(key));
+    }
+
+    @Override
+    public Mono<Void> clear() {
+        return Mono.fromRunnable(() -> cache.cleanUp());
+    }
+}

+ 20 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/CaffeineReactiveCacheManager.java

@@ -0,0 +1,20 @@
+package org.hswebframework.web.cache.supports;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.cache.ReactiveCache;
+
+import java.time.Duration;
+
+@AllArgsConstructor
+public class CaffeineReactiveCacheManager extends AbstractReactiveCacheManager {
+
+    private Caffeine<Object, Object> builder;
+
+
+    @Override
+    protected <E> ReactiveCache<E> createCache(String name) {
+        return new CaffeineReactiveCache<>(builder.build());
+    }
+
+}

+ 67 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/GuavaReactiveCache.java

@@ -0,0 +1,67 @@
+package org.hswebframework.web.cache.supports;
+
+import com.google.common.cache.Cache;
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.cache.ReactiveCache;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@SuppressWarnings("all")
+@AllArgsConstructor
+public class GuavaReactiveCache<E> implements ReactiveCache<E> {
+
+    private Cache<Object, Object> cache;
+
+    @Override
+    public Flux<E> getFlux(Object key) {
+        return (Flux)Flux.defer(() -> {
+            Object v = cache.getIfPresent(key);
+            if (v == null) {
+                return Flux.empty();
+            }
+            if (v instanceof Iterable) {
+                return Flux.fromIterable(((Iterable) v));
+            }
+            return Flux.just(v);
+        });
+    }
+
+    @Override
+    public Mono<E> getMono(Object key) {
+        return (Mono)Mono.defer(() -> {
+            Object v = cache.getIfPresent(key);
+            if (v == null) {
+                return Mono.empty();
+            }
+            return (Mono) Mono.just(v);
+        });
+    }
+
+    @Override
+    public Mono<Void> put(Object key, Publisher<E> data) {
+        return Mono.defer(() -> {
+            if (data instanceof Flux) {
+                return ((Flux<E>) data).collectList()
+                        .doOnNext(v -> cache.put(key, v))
+                        .then();
+            }
+            if (data instanceof Mono) {
+                return ((Mono<E>) data)
+                        .doOnNext(v -> cache.put(key, v))
+                        .then();
+            }
+            return Mono.error(new UnsupportedOperationException("unsupport publisher:" + data));
+        });
+    }
+
+    @Override
+    public Mono<Void> evict(Object key) {
+        return Mono.fromRunnable(() -> cache.invalidate(key));
+    }
+
+    @Override
+    public Mono<Void> clear() {
+        return Mono.fromRunnable(() -> cache.cleanUp());
+    }
+}

+ 19 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/GuavaReactiveCacheManager.java

@@ -0,0 +1,19 @@
+package org.hswebframework.web.cache.supports;
+
+import com.google.common.cache.CacheBuilder;
+import lombok.AllArgsConstructor;
+import org.hswebframework.web.cache.ReactiveCache;
+
+import java.time.Duration;
+
+@AllArgsConstructor
+public class GuavaReactiveCacheManager extends AbstractReactiveCacheManager {
+
+    private CacheBuilder<Object, Object> builder;
+
+    @Override
+    protected <E> ReactiveCache<E> createCache(String name) {
+        return new GuavaReactiveCache<>(builder.build());
+    }
+
+}

+ 10 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/NullValue.java

@@ -0,0 +1,10 @@
+package org.hswebframework.web.cache.supports;
+
+import java.io.Serializable;
+
+public class NullValue implements Serializable {
+    private static final long serialVersionUID = -1;
+
+    public static final NullValue INSTANCE = new NullValue();
+
+}

+ 28 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/RedisLocalReactiveCacheManager.java

@@ -0,0 +1,28 @@
+package org.hswebframework.web.cache.supports;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.web.cache.ReactiveCache;
+import org.hswebframework.web.cache.ReactiveCacheManager;
+import org.springframework.data.redis.core.ReactiveRedisOperations;
+
+public class RedisLocalReactiveCacheManager extends AbstractReactiveCacheManager {
+
+    private ReactiveRedisOperations<Object, Object> operations;
+
+    private ReactiveCacheManager localCacheManager;
+
+    public RedisLocalReactiveCacheManager(ReactiveRedisOperations<Object, Object> operations, ReactiveCacheManager localCacheManager) {
+        this.operations = operations;
+        this.localCacheManager = localCacheManager;
+    }
+
+    @Setter
+    @Getter
+    private String redisCachePrefix = "spring-cache:";
+
+    @Override
+    protected <E> ReactiveCache<E> createCache(String name) {
+        return new RedisReactiveCache<>(redisCachePrefix.concat(name), operations, localCacheManager.getCache(name));
+    }
+}

+ 110 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/RedisReactiveCache.java

@@ -0,0 +1,110 @@
+package org.hswebframework.web.cache.supports;
+
+import org.hswebframework.web.cache.ReactiveCache;
+import org.reactivestreams.Publisher;
+import org.springframework.data.redis.connection.ReactiveSubscription;
+import org.springframework.data.redis.core.ReactiveRedisOperations;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Collections;
+import java.util.function.Function;
+
+@SuppressWarnings("all")
+public class RedisReactiveCache<E> implements ReactiveCache<E> {
+
+    private ReactiveRedisOperations<Object, Object> operations;
+
+    private String redisKey;
+
+    private ReactiveCache localCache;
+
+    private String topicName;
+
+    public RedisReactiveCache(String redisKey, ReactiveRedisOperations<Object, Object> operations, ReactiveCache<E> localCache) {
+        this.operations = operations;
+        this.localCache = localCache;
+        this.redisKey = redisKey;
+        operations.listenToChannel(topicName = ("_cache_changed:" + redisKey))
+                .map(ReactiveSubscription.Message::getMessage)
+                .cast(String.class)
+                .subscribe(s -> {
+                    if (s.equals("___all")) {
+                        localCache.clear().subscribe();
+                        return;
+                    }
+                    //清空本地缓存
+                    localCache.evict(s).subscribe();
+                });
+    }
+
+    @Override
+    public Flux<E> getFlux(Object key) {
+        return localCache
+                .getFlux(key)
+                .switchIfEmpty(operations
+                        .opsForHash()
+                        .get(redisKey, key)
+                        .flatMapIterable(r -> {
+                            if (r instanceof Iterable) {
+                                return ((Iterable) r);
+                            }
+                            return Collections.singletonList(r);
+                        })
+                        .map(Function.identity()));
+    }
+
+    @Override
+    public Mono<E> getMono(Object key) {
+        return localCache.getMono(key)
+                .switchIfEmpty(operations.opsForHash()
+                        .get(redisKey, key)
+                        .flatMap(r -> localCache.put(key, Mono.just(r))
+                                .thenReturn(r)));
+    }
+
+    @Override
+    public Mono<Void> put(Object key, Publisher<E> data) {
+        if (data instanceof Mono) {
+            return ((Mono) data)
+                    .flatMap(r -> {
+                        return operations.opsForHash()
+                                .put(redisKey, key, r)
+                                .then(localCache.put(key, data))
+                                .then(operations.convertAndSend(topicName, key));
+
+                    }) .then();
+        }
+        if (data instanceof Flux) {
+            return ((Flux) data)
+                    .collectList()
+                    .flatMap(r -> {
+                        return operations.opsForHash()
+                                .put(redisKey, key, r)
+                                .then(localCache.put(key, data))
+                                .then(operations.convertAndSend(topicName, key));
+
+                    }).then();
+        }
+        return Mono.error(new UnsupportedOperationException("unsupport publisher:" + data));
+    }
+
+    @Override
+    public Mono<Void> evict(Object key) {
+        return operations
+                .opsForHash()
+                .remove(redisKey, key)
+                .then(localCache.evict(key))
+                .then(operations.convertAndSend(topicName, key));
+    }
+
+    @Override
+    public Mono<Void> clear() {
+        return operations
+                .opsForHash()
+                .delete(redisKey)
+                .then(localCache.clear())
+                .then(operations.convertAndSend(topicName, "___all"))
+                .then();
+    }
+}

+ 43 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/java/org/hswebframework/web/cache/supports/UnSupportedReactiveCache.java

@@ -0,0 +1,43 @@
+package org.hswebframework.web.cache.supports;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.hswebframework.web.cache.ReactiveCache;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class UnSupportedReactiveCache<E> implements ReactiveCache<E> {
+
+    private static final UnSupportedReactiveCache INSTANCE = new UnSupportedReactiveCache();
+
+    public static <E> ReactiveCache<E> getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public Flux<E> getFlux(Object key) {
+        return Flux.empty();
+    }
+
+    @Override
+    public Mono<E> getMono(Object key) {
+        return Mono.empty();
+    }
+
+    @Override
+    public Mono<Void> put(Object key, Publisher<E> data) {
+        return Mono.empty();
+    }
+
+    @Override
+    public Mono<Void> evict(Object key) {
+        return Mono.empty();
+    }
+
+    @Override
+    public Mono<Void> clear() {
+        return Mono.empty();
+    }
+}

+ 3 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/main/resources/META-INF/spring.factories

@@ -0,0 +1,3 @@
+# Auto Configure
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.hswebframework.web.cache.configuration.ReactiveCacheManagerConfiguration

+ 68 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/test/java/org/hswebframework/web/cache/CaffeineReactiveCacheManagerTest.java

@@ -0,0 +1,68 @@
+package org.hswebframework.web.cache;
+
+import org.hswebframework.web.cache.supports.CaffeineReactiveCacheManager;
+import org.hswebframework.web.cache.supports.GuavaReactiveCacheManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+
+@SpringBootTest(classes = TestApplication.class,args = {
+        "--hsweb.cache.type=caffeine"
+})
+@RunWith(SpringRunner.class)
+public class CaffeineReactiveCacheManagerTest {
+
+    @Autowired
+    ReactiveCacheManager cacheManager;
+
+    @Test
+    public void test(){
+        Assert.assertNotNull(cacheManager);
+        Assert.assertTrue(cacheManager instanceof CaffeineReactiveCacheManager);
+
+        ReactiveCache<String> cache= cacheManager.getCache("test");
+        cache.clear()
+                .as(StepVerifier::create)
+                .verifyComplete();
+
+        cache.flux("test-flux")
+                .onCacheMissResume(Flux.just("1","2","3"))
+                .as(StepVerifier::create)
+                .expectNext("1","2","3")
+                .verifyComplete();
+
+        cache.put("test-flux",Flux.just("3","2","1"))
+                .as(StepVerifier::create)
+                .verifyComplete();
+
+        cache.getFlux("test-flux")
+                .as(StepVerifier::create)
+                .expectNext("3","2","1")
+                .verifyComplete();
+
+
+        cache.mono("test-mono")
+                .onCacheMissResume(Mono.just("1"))
+                .as(StepVerifier::create)
+                .expectNext("1")
+                .verifyComplete();
+
+        cache.put("test-mono",Mono.just("2"))
+                .as(StepVerifier::create)
+                .verifyComplete();
+
+        cache.getMono("test-mono")
+                .as(StepVerifier::create)
+                .expectNext("2")
+                .verifyComplete();
+
+
+    }
+}

+ 68 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/test/java/org/hswebframework/web/cache/GuavaReactiveCacheManagerTest.java

@@ -0,0 +1,68 @@
+package org.hswebframework.web.cache;
+
+import org.hswebframework.web.cache.supports.GuavaReactiveCacheManager;
+import org.hswebframework.web.cache.supports.RedisLocalReactiveCacheManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+
+@SpringBootTest(classes = TestApplication.class,args = {
+        "--hsweb.cache.type=guava"
+})
+@RunWith(SpringRunner.class)
+public class GuavaReactiveCacheManagerTest {
+
+    @Autowired
+    ReactiveCacheManager cacheManager;
+
+    @Test
+    public void test(){
+        Assert.assertNotNull(cacheManager);
+        Assert.assertTrue(cacheManager instanceof GuavaReactiveCacheManager);
+
+        ReactiveCache<String> cache= cacheManager.getCache("test");
+        cache.clear()
+                .as(StepVerifier::create)
+                .verifyComplete();
+
+        cache.flux("test-flux")
+                .onCacheMissResume(Flux.just("1","2","3"))
+                .as(StepVerifier::create)
+                .expectNext("1","2","3")
+                .verifyComplete();
+
+        cache.put("test-flux",Flux.just("3","2","1"))
+                .as(StepVerifier::create)
+                .verifyComplete();
+
+        cache.getFlux("test-flux")
+                .as(StepVerifier::create)
+                .expectNext("3","2","1")
+                .verifyComplete();
+
+
+        cache.mono("test-mono")
+                .onCacheMissResume(Mono.just("1"))
+                .as(StepVerifier::create)
+                .expectNext("1")
+                .verifyComplete();
+
+        cache.put("test-mono",Mono.just("2"))
+                .as(StepVerifier::create)
+                .verifyComplete();
+
+        cache.getMono("test-mono")
+                .as(StepVerifier::create)
+                .expectNext("2")
+                .verifyComplete();
+
+
+    }
+}

+ 70 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/test/java/org/hswebframework/web/cache/RedisReactiveCacheManagerTest.java

@@ -0,0 +1,70 @@
+package org.hswebframework.web.cache;
+
+import org.hswebframework.web.cache.supports.RedisLocalReactiveCacheManager;
+import org.hswebframework.web.cache.supports.RedisReactiveCache;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import static org.junit.Assert.*;
+
+
+@SpringBootTest(classes = TestApplication.class,args = {
+        "--hsweb.cache.type=redis"
+})
+@RunWith(SpringRunner.class)
+public class RedisReactiveCacheManagerTest {
+
+    @Autowired
+    ReactiveCacheManager cacheManager;
+
+    @Test
+    public void test(){
+        Assert.assertNotNull(cacheManager);
+        Assert.assertTrue(cacheManager instanceof RedisLocalReactiveCacheManager);
+
+        ReactiveCache<String> cache= cacheManager.getCache("test");
+        cache.clear()
+                .as(StepVerifier::create)
+                .verifyComplete();
+
+        cache.flux("test-flux")
+                .onCacheMissResume(Flux.just("1","2","3"))
+                .as(StepVerifier::create)
+                .expectNext("1","2","3")
+                .verifyComplete();
+
+        cache.put("test-flux",Flux.just("3","2","1"))
+                .as(StepVerifier::create)
+                .verifyComplete();
+
+        cache.getFlux("test-flux")
+                .as(StepVerifier::create)
+                .expectNext("3","2","1")
+                .verifyComplete();
+
+
+        cache.mono("test-mono")
+                .onCacheMissResume(Mono.just("1"))
+                .as(StepVerifier::create)
+                .expectNext("1")
+                .verifyComplete();
+
+        cache.put("test-mono",Mono.just("2"))
+                .as(StepVerifier::create)
+                .verifyComplete();
+
+        cache.getMono("test-mono")
+                .as(StepVerifier::create)
+                .expectNext("2")
+                .verifyComplete();
+
+
+    }
+}

+ 7 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/test/java/org/hswebframework/web/cache/TestApplication.java

@@ -0,0 +1,7 @@
+package org.hswebframework.web.cache;
+
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class TestApplication {
+}

+ 5 - 0
hsweb-concurrent/hsweb-concurrent-cache/src/test/resources/application-redis.yml

@@ -0,0 +1,5 @@
+hsweb:
+  cache:
+    redis:
+      local-cache-type: none
+    type: redis

+ 19 - 0
hsweb-concurrent/pom.xml

@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hsweb-framework</artifactId>
+        <groupId>org.hswebframework.web</groupId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hsweb-concurrent</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>hsweb-concurrent-cache</module>
+    </modules>
+
+
+</project>

+ 6 - 0
hsweb-system/hsweb-system-authorization/hsweb-system-authorization-default/pom.xml

@@ -25,6 +25,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-concurrent-cache</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>

+ 14 - 10
hsweb-system/hsweb-system-authorization/hsweb-system-authorization-default/src/main/java/org/hswebframework/web/system/authorization/defaults/service/DefaultReactiveAuthenticationInitializeService.java

@@ -68,13 +68,14 @@ public class DefaultReactiveAuthenticationInitializeService
     protected Flux<AuthorizationSettingEntity> getSettings(List<Dimension> dimensions) {
         return Flux.fromIterable(dimensions)
                 .groupBy(d -> d.getType().getId(), (Function<Dimension, Object>) Dimension::getId)
-                .flatMap(group -> group.collectList()
-                        .flatMapMany(list -> settingRepository
-                                .createQuery()
-                                .where(AuthorizationSettingEntity::getState, 1)
-                                .and(AuthorizationSettingEntity::getDimensionType, group.key())
-                                .in(AuthorizationSettingEntity::getDimensionTarget, list)
-                                .fetch()));
+                .flatMap(group ->
+                        group.collectList()
+                                .flatMapMany(list -> settingRepository
+                                        .createQuery()
+                                        .where(AuthorizationSettingEntity::getState, 1)
+                                        .and(AuthorizationSettingEntity::getDimensionType, group.key())
+                                        .in(AuthorizationSettingEntity::getDimensionTarget, list)
+                                        .fetch()));
     }
 
     protected Mono<Authentication> initPermission(SimpleAuthentication authentication) {
@@ -82,9 +83,12 @@ public class DefaultReactiveAuthenticationInitializeService
                 .flatMap(provider -> provider.getDimensionByUserId(authentication.getUser().getId()))
                 .collectList()
                 .doOnNext(authentication::setDimensions)
-                .flatMap(allDimension -> Mono.zip(getAllPermission(), getSettings(allDimension)
-                                .collect(Collectors.groupingBy(AuthorizationSettingEntity::getPermission))
-                        , (_p, _s) -> handlePermission(authentication, allDimension, _p, _s)));
+                .flatMap(allDimension ->
+                        Mono.zip(
+                                getAllPermission()
+                                , getSettings(allDimension).collect(Collectors.groupingBy(AuthorizationSettingEntity::getPermission))
+                                , (_p, _s) -> handlePermission(authentication, allDimension, _p, _s)
+                        ));
 
     }
 

+ 6 - 10
hsweb-system/hsweb-system-authorization/hsweb-system-authorization-default/src/main/java/org/hswebframework/web/system/authorization/defaults/service/DefaultReactiveAuthenticationManager.java

@@ -3,6 +3,7 @@ package org.hswebframework.web.system.authorization.defaults.service;
 import org.hswebframework.web.authorization.*;
 import org.hswebframework.web.authorization.exception.AccessDenyException;
 import org.hswebframework.web.authorization.simple.PlainTextUsernamePasswordAuthenticationRequest;
+import org.hswebframework.web.cache.ReactiveCacheManager;
 import org.hswebframework.web.system.authorization.api.entity.UserEntity;
 import org.hswebframework.web.system.authorization.api.service.reactive.ReactiveUserService;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -19,7 +20,7 @@ public class DefaultReactiveAuthenticationManager implements ReactiveAuthenticat
     private ReactiveAuthenticationInitializeService initializeService;
 
     @Autowired(required = false)
-    private CacheManager cacheManager;
+    private ReactiveCacheManager cacheManager;
 
     @Override
     public Mono<Authentication> authenticate(Mono<AuthenticationRequest> request) {
@@ -38,14 +39,9 @@ public class DefaultReactiveAuthenticationManager implements ReactiveAuthenticat
 
         return Mono.justOrEmpty(userId)
                 .flatMap(_id -> Mono.justOrEmpty(cacheManager)
-                        .map(cm -> cm.getCache("user-auth"))
-                        .flatMap(cache -> Mono.justOrEmpty(cache.get(userId))
-                                .switchIfEmpty(initializeService.initUserAuthorization(_id)
-                                        .doOnNext(autz -> cache.put(userId, autz))
-                                        .map(SimpleValueWrapper::new)))
-                        .flatMap(valueWrapper -> Mono.justOrEmpty(valueWrapper.get())))
-                .cast(Authentication.class)
-                .switchIfEmpty(initializeService.initUserAuthorization(userId))
-                .cache();
+                        .map(cm -> cacheManager.<Authentication>getCache("user-auth"))
+                        .flatMap(cache -> cache.mono(userId).onCacheMissResume(() -> initializeService.initUserAuthorization(userId)))
+                        .cast(Authentication.class)
+                        .switchIfEmpty(initializeService.initUserAuthorization(userId)));
     }
 }

+ 1 - 0
pom.xml

@@ -34,6 +34,7 @@
         <module>hsweb-commons</module>
         <module>hsweb-logging</module>
         <module>hsweb-tests</module>
+        <module>hsweb-concurrent</module>
     </modules>
 
     <packaging>pom</packaging>