|
@@ -17,11 +17,16 @@ import org.springframework.data.redis.core.*;
|
|
|
import org.springframework.data.redis.serializer.RedisSerializationContext;
|
|
|
import org.springframework.data.redis.serializer.RedisSerializer;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
+import reactor.core.publisher.FluxSink;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
import java.time.Duration;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.function.Supplier;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
public class RedisUserTokenManager implements UserTokenManager {
|
|
@@ -32,21 +37,41 @@ public class RedisUserTokenManager implements UserTokenManager {
|
|
|
|
|
|
private final ReactiveSetOperations<Object, Object> userTokenMapping;
|
|
|
|
|
|
+ @Setter
|
|
|
+ private Map<String, SimpleUserToken> localCache = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ private FluxSink<UserToken> touchSink;
|
|
|
+
|
|
|
public RedisUserTokenManager(ReactiveRedisOperations<Object, Object> operations) {
|
|
|
this.operations = operations;
|
|
|
this.userTokenStore = operations.opsForHash();
|
|
|
this.userTokenMapping = operations.opsForSet();
|
|
|
+ this.operations
|
|
|
+ .listenToChannel("_user_token_removed")
|
|
|
+ .subscribe(msg -> localCache.remove(String.valueOf(msg.getMessage())));
|
|
|
+
|
|
|
+ Flux.<UserToken>create(sink -> this.touchSink = sink)
|
|
|
+ .buffer(Flux.interval(Duration.ofSeconds(10)), HashSet::new)
|
|
|
+ .flatMap(list -> Flux
|
|
|
+ .fromIterable(list)
|
|
|
+ .flatMap(token -> operations
|
|
|
+ .expire(getTokenRedisKey(token.getToken()), Duration.ofMillis(token.getMaxInactiveInterval()))
|
|
|
+ .then())
|
|
|
+ .onErrorResume(err -> Mono.empty()))
|
|
|
+ .subscribe();
|
|
|
+
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("all")
|
|
|
public RedisUserTokenManager(ReactiveRedisConnectionFactory connectionFactory) {
|
|
|
this(new ReactiveRedisTemplate<>(connectionFactory,
|
|
|
- RedisSerializationContext.newSerializationContext()
|
|
|
- .key((RedisSerializer) RedisSerializer.string())
|
|
|
- .value(RedisSerializer.java())
|
|
|
- .hashKey(RedisSerializer.string())
|
|
|
- .hashValue(RedisSerializer.java())
|
|
|
- .build()
|
|
|
+ RedisSerializationContext
|
|
|
+ .newSerializationContext()
|
|
|
+ .key((RedisSerializer) RedisSerializer.string())
|
|
|
+ .value(RedisSerializer.java())
|
|
|
+ .hashKey(RedisSerializer.string())
|
|
|
+ .hashValue(RedisSerializer.java())
|
|
|
+ .build()
|
|
|
));
|
|
|
}
|
|
|
|
|
@@ -72,11 +97,17 @@ public class RedisUserTokenManager implements UserTokenManager {
|
|
|
|
|
|
@Override
|
|
|
public Mono<UserToken> getByToken(String token) {
|
|
|
+ SimpleUserToken inCache = localCache.get(token);
|
|
|
+ if (inCache != null && inCache.isNormal()) {
|
|
|
+ return Mono.just(inCache);
|
|
|
+ }
|
|
|
return userTokenStore
|
|
|
.entries(getTokenRedisKey(token))
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
|
|
|
.filter(map -> !map.isEmpty())
|
|
|
- .map(SimpleUserToken::of);
|
|
|
+ .map(SimpleUserToken::of)
|
|
|
+ .doOnNext(userToken -> localCache.put(userToken.getToken(), userToken))
|
|
|
+ .cast(UserToken.class);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -185,7 +216,7 @@ public class RedisUserTokenManager implements UserTokenManager {
|
|
|
public Mono<UserToken> signIn(String token, String type, String userId, long maxInactiveInterval) {
|
|
|
return Mono
|
|
|
.defer(() -> {
|
|
|
- Mono<UserToken> doSign = Mono.defer(() -> {
|
|
|
+ Mono<SimpleUserToken> doSign = Mono.defer(() -> {
|
|
|
Map<String, Object> map = new HashMap<>();
|
|
|
map.put("token", token);
|
|
|
map.put("type", type);
|
|
@@ -237,12 +268,17 @@ public class RedisUserTokenManager implements UserTokenManager {
|
|
|
|
|
|
@Override
|
|
|
public Mono<Void> touch(String token) {
|
|
|
+ SimpleUserToken inCache = localCache.get(token);
|
|
|
+ if (inCache != null && inCache.isNormal()) {
|
|
|
+ inCache.setLastRequestTime(System.currentTimeMillis());
|
|
|
+ //异步touch
|
|
|
+ touchSink.next(inCache);
|
|
|
+ return Mono.empty();
|
|
|
+ }
|
|
|
return getByToken(token)
|
|
|
.flatMap(userToken -> {
|
|
|
if (userToken.getMaxInactiveInterval() > 0) {
|
|
|
- return operations
|
|
|
- .expire(getTokenRedisKey(token), Duration.ofMillis(userToken.getMaxInactiveInterval()))
|
|
|
- .then();
|
|
|
+ touchSink.next(userToken);
|
|
|
}
|
|
|
return Mono.empty();
|
|
|
});
|
|
@@ -268,26 +304,37 @@ public class RedisUserTokenManager implements UserTokenManager {
|
|
|
.then();
|
|
|
}
|
|
|
|
|
|
+ private Mono<Void> notifyTokenRemoved(String token) {
|
|
|
+ return operations.convertAndSend("_user_token_removed", token).then();
|
|
|
+ }
|
|
|
+
|
|
|
private Mono<Void> onTokenRemoved(UserToken token) {
|
|
|
+ localCache.remove(token.getToken());
|
|
|
+
|
|
|
if (eventPublisher == null) {
|
|
|
- return Mono.empty();
|
|
|
+ return notifyTokenRemoved(token.getToken());
|
|
|
}
|
|
|
- return Mono.fromRunnable(() -> eventPublisher.publishEvent(new UserTokenRemovedEvent(token)));
|
|
|
+ return Mono.fromRunnable(() -> eventPublisher.publishEvent(new UserTokenRemovedEvent(token)))
|
|
|
+ .then(notifyTokenRemoved(token.getToken()));
|
|
|
}
|
|
|
|
|
|
- private Mono<Void> onTokenChanged(UserToken old, UserToken newToken) {
|
|
|
+ private Mono<Void> onTokenChanged(UserToken old, SimpleUserToken newToken) {
|
|
|
+ localCache.put(newToken.getToken(), newToken);
|
|
|
if (eventPublisher == null) {
|
|
|
- return Mono.empty();
|
|
|
+ return notifyTokenRemoved(newToken.getToken());
|
|
|
}
|
|
|
return Mono.fromRunnable(() -> eventPublisher.publishEvent(new UserTokenChangedEvent(old, newToken)));
|
|
|
}
|
|
|
|
|
|
- private Mono<UserToken> onUserTokenCreated(UserToken token) {
|
|
|
+ private Mono<UserToken> onUserTokenCreated(SimpleUserToken token) {
|
|
|
+ localCache.put(token.getToken(), token);
|
|
|
if (eventPublisher == null) {
|
|
|
- return Mono.just(token);
|
|
|
+ return notifyTokenRemoved(token.getToken())
|
|
|
+ .thenReturn(token);
|
|
|
}
|
|
|
return Mono
|
|
|
.fromRunnable(() -> eventPublisher.publishEvent(new UserTokenCreatedEvent(token)))
|
|
|
+ .then(notifyTokenRemoved(token.getToken()))
|
|
|
.thenReturn(token);
|
|
|
}
|
|
|
|