|
@@ -27,7 +27,10 @@ import org.hswebframework.web.authorization.token.event.UserTokenRemovedEvent;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.ApplicationEvent;
|
|
|
import org.springframework.context.ApplicationEventPublisher;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
+import reactor.core.publisher.Mono;
|
|
|
|
|
|
+import javax.validation.constraints.NotNull;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
@@ -90,97 +93,90 @@ public class DefaultUserTokenManager implements UserTokenManager {
|
|
|
return userStorage.computeIfAbsent(userId, key -> new HashSet<>());
|
|
|
}
|
|
|
|
|
|
- private SimpleUserToken checkTimeout(SimpleUserToken detail) {
|
|
|
+ private Mono<UserToken> checkTimeout(UserToken detail) {
|
|
|
if (null == detail) {
|
|
|
- return null;
|
|
|
+ return Mono.empty();
|
|
|
}
|
|
|
if (detail.getMaxInactiveInterval() <= 0) {
|
|
|
- return detail;
|
|
|
+ return Mono.just(detail);
|
|
|
}
|
|
|
if (System.currentTimeMillis() - detail.getLastRequestTime() > detail.getMaxInactiveInterval()) {
|
|
|
- changeTokenState(detail, TokenState.expired);
|
|
|
- return detail;
|
|
|
+ return changeTokenState(detail, TokenState.expired)
|
|
|
+ .thenReturn(detail);
|
|
|
}
|
|
|
- return detail;
|
|
|
+ return Mono.just(detail);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public SimpleUserToken getByToken(String token) {
|
|
|
+ public Mono<UserToken> getByToken(String token) {
|
|
|
if (token == null) {
|
|
|
- return null;
|
|
|
+ return Mono.empty();
|
|
|
}
|
|
|
return checkTimeout(tokenStorage.get(token));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public List<UserToken> getByUserId(String userId) {
|
|
|
+ public Flux<UserToken> getByUserId(String userId) {
|
|
|
if (userId == null) {
|
|
|
- return new ArrayList<>();
|
|
|
+ return Flux.empty();
|
|
|
}
|
|
|
Set<String> tokens = getUserToken(userId);
|
|
|
if (tokens.isEmpty()) {
|
|
|
userStorage.remove(userId);
|
|
|
- return new ArrayList<>();
|
|
|
+ return Flux.empty();
|
|
|
}
|
|
|
- return tokens
|
|
|
+ return Flux.fromStream(tokens
|
|
|
.stream()
|
|
|
.map(tokenStorage::get)
|
|
|
- .filter(Objects::nonNull)
|
|
|
- .collect(Collectors.toList());
|
|
|
+ .filter(Objects::nonNull));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public boolean userIsLoggedIn(String userId) {
|
|
|
+ public Mono<Boolean> userIsLoggedIn(String userId) {
|
|
|
if (userId == null) {
|
|
|
- return false;
|
|
|
+ return Mono.just(false);
|
|
|
}
|
|
|
- for (UserToken userToken : getByUserId(userId)) {
|
|
|
- if (userToken.isNormal()) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
+ return getByUserId(userId)
|
|
|
+ .any(UserToken::isNormal);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public boolean tokenIsLoggedIn(String token) {
|
|
|
+ public Mono<Boolean> tokenIsLoggedIn(String token) {
|
|
|
if (token == null) {
|
|
|
- return false;
|
|
|
+ return Mono.just(false);
|
|
|
}
|
|
|
- UserToken userToken = getByToken(token);
|
|
|
-
|
|
|
- return userToken != null && !userToken.isExpired();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public long totalUser() {
|
|
|
- return userStorage.size();
|
|
|
+ return getByToken(token)
|
|
|
+ .map(t -> !t.isExpired())
|
|
|
+ .defaultIfEmpty(false);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public long totalToken() {
|
|
|
- return tokenStorage.size();
|
|
|
+ public Mono<Integer> totalUser() {
|
|
|
+ return Mono.just(userStorage.size());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void allLoggedUser(Consumer<UserToken> consumer) {
|
|
|
- tokenStorage.values().forEach(consumer);
|
|
|
+ public Mono<Integer> totalToken() {
|
|
|
+ return Mono.just(tokenStorage.size());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public List<UserToken> allLoggedUser() {
|
|
|
- return new ArrayList<>(tokenStorage.values());
|
|
|
+ public Flux<UserToken> allLoggedUser() {
|
|
|
+ return Flux.fromIterable(tokenStorage.values());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void signOutByUserId(String userId) {
|
|
|
+ public Mono<Void> signOutByUserId(String userId) {
|
|
|
if (null == userId) {
|
|
|
- return;
|
|
|
+ return Mono.empty();
|
|
|
}
|
|
|
- Set<String> tokens = getUserToken(userId);
|
|
|
- tokens.forEach(token -> signOutByToken(token, false));
|
|
|
- tokens.clear();
|
|
|
- userStorage.remove(userId);
|
|
|
+ return Mono.defer(() -> {
|
|
|
+ Set<String> tokens = getUserToken(userId);
|
|
|
+ tokens.forEach(token -> signOutByToken(token, false));
|
|
|
+ tokens.clear();
|
|
|
+ userStorage.remove(userId);
|
|
|
+ return Mono.empty();
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
private void signOutByToken(String token, boolean removeUserToken) {
|
|
@@ -204,8 +200,8 @@ public class DefaultUserTokenManager implements UserTokenManager {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void signOutByToken(String token) {
|
|
|
- signOutByToken(token, true);
|
|
|
+ public Mono<Void> signOutByToken(String token) {
|
|
|
+ return Mono.fromRunnable(() -> signOutByToken(token, true));
|
|
|
}
|
|
|
|
|
|
protected void publishEvent(ApplicationEvent event) {
|
|
@@ -214,78 +210,92 @@ public class DefaultUserTokenManager implements UserTokenManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void changeTokenState(SimpleUserToken userToken, TokenState state) {
|
|
|
+ public Mono<Void> changeTokenState(UserToken userToken, TokenState state) {
|
|
|
if (null != userToken) {
|
|
|
- SimpleUserToken copy = userToken.copy();
|
|
|
+ SimpleUserToken token = ((SimpleUserToken) userToken);
|
|
|
+ SimpleUserToken copy = token.copy();
|
|
|
|
|
|
- userToken.setState(state);
|
|
|
+ token.setState(state);
|
|
|
syncToken(userToken);
|
|
|
|
|
|
publishEvent(new UserTokenChangedEvent(copy, userToken));
|
|
|
}
|
|
|
+ return Mono.empty();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void changeTokenState(String token, TokenState state) {
|
|
|
- changeTokenState(getByToken(token), state);
|
|
|
+ public Mono<Void> changeTokenState(String token, TokenState state) {
|
|
|
+ return getByToken(token)
|
|
|
+ .flatMap(t -> changeTokenState(t, state));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void changeUserState(String user, TokenState state) {
|
|
|
- getByUserId(user).forEach(token -> changeTokenState(token.getToken(), state));
|
|
|
+ public Mono<Void> changeUserState(String user, TokenState state) {
|
|
|
+ return Mono.from(getByUserId(user)
|
|
|
+ .flatMap(token -> changeTokenState(token.getToken(), state)));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public UserToken signIn(String token, String type, String userId, long maxInactiveInterval) {
|
|
|
- SimpleUserToken detail = new SimpleUserToken(userId, token);
|
|
|
- detail.setType(type);
|
|
|
- detail.setMaxInactiveInterval(maxInactiveInterval);
|
|
|
- AllopatricLoginMode mode = allopatricLoginModes.getOrDefault(type, allopatricLoginMode);
|
|
|
- if (mode == AllopatricLoginMode.deny) {
|
|
|
- boolean hasAnotherToken = getByUserId(userId)
|
|
|
- .stream()
|
|
|
- .filter(userToken -> type.equals(userToken.getType()))
|
|
|
- .map(SimpleUserToken.class::cast)
|
|
|
- .peek(this::checkTimeout)
|
|
|
- .anyMatch(UserToken::isNormal);
|
|
|
- if (hasAnotherToken) {
|
|
|
- throw new AccessDenyException("该用户已在其他地方登陆");
|
|
|
- }
|
|
|
- } else if (mode == AllopatricLoginMode.offlineOther) {
|
|
|
- //将在其他地方登录的用户设置为离线
|
|
|
- List<UserToken> oldToken = getByUserId(userId);
|
|
|
- for (UserToken userToken : oldToken) {
|
|
|
- //相同的tokenType才让其下线
|
|
|
- if (type.equals(userToken.getType())) {
|
|
|
- changeTokenState(userToken.getToken(), TokenState.offline);
|
|
|
- }
|
|
|
+ public Mono<UserToken> signIn(String token, String type, String userId, long maxInactiveInterval) {
|
|
|
+
|
|
|
+ return Mono.defer(() -> {
|
|
|
+ SimpleUserToken detail = new SimpleUserToken(userId, token);
|
|
|
+ detail.setType(type);
|
|
|
+ detail.setMaxInactiveInterval(maxInactiveInterval);
|
|
|
+ detail.setState(TokenState.normal);
|
|
|
+ Runnable doSign = () -> {
|
|
|
+ tokenStorage.put(token, detail);
|
|
|
+
|
|
|
+ getUserToken(userId).add(token);
|
|
|
+
|
|
|
+ publishEvent(new UserTokenCreatedEvent(detail));
|
|
|
+ };
|
|
|
+ AllopatricLoginMode mode = allopatricLoginModes.getOrDefault(type, allopatricLoginMode);
|
|
|
+ if (mode == AllopatricLoginMode.deny) {
|
|
|
+ return getByUserId(userId)
|
|
|
+ .filter(userToken -> type.equals(userToken.getType()))
|
|
|
+ .flatMap(this::checkTimeout)
|
|
|
+ .filterWhen(t -> {
|
|
|
+ if (t.isNormal()) {
|
|
|
+ return Mono.error(new AccessDenyException("该用户已在其他地方登陆"));
|
|
|
+ }
|
|
|
+ return Mono.empty();
|
|
|
+ })
|
|
|
+ .then(Mono.just(detail))
|
|
|
+ .doOnNext(__ -> doSign.run());
|
|
|
+ } else if (mode == AllopatricLoginMode.offlineOther) {
|
|
|
+ return getByUserId(userId)
|
|
|
+ .filter(userToken -> type.equals(userToken.getType()))
|
|
|
+ .flatMap(userToken -> changeTokenState(userToken, TokenState.offline))
|
|
|
+ .then(Mono.just(detail))
|
|
|
+ .doOnNext(__ -> doSign.run());
|
|
|
}
|
|
|
- }
|
|
|
- detail.setState(TokenState.normal);
|
|
|
- tokenStorage.put(token, detail);
|
|
|
+ doSign.run();
|
|
|
+ return Mono.just(detail);
|
|
|
+ });
|
|
|
|
|
|
- getUserToken(userId).add(token);
|
|
|
-
|
|
|
- publishEvent(new UserTokenCreatedEvent(detail));
|
|
|
- return detail;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void touch(String token) {
|
|
|
+ public Mono<Void> touch(String token) {
|
|
|
SimpleUserToken userToken = tokenStorage.get(token);
|
|
|
if (null != userToken) {
|
|
|
userToken.touch();
|
|
|
syncToken(userToken);
|
|
|
}
|
|
|
+ return Mono.empty();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void checkExpiredToken() {
|
|
|
- for (SimpleUserToken token : tokenStorage.values()) {
|
|
|
- if (token != null && checkTimeout(token).isExpired()) {
|
|
|
- signOutByToken(token.getToken());
|
|
|
- }
|
|
|
- }
|
|
|
+ public Mono<Void> checkExpiredToken() {
|
|
|
+
|
|
|
+ return Flux
|
|
|
+ .fromIterable(tokenStorage.values())
|
|
|
+ .doOnNext(this::checkTimeout)
|
|
|
+ .filter(UserToken::isExpired)
|
|
|
+ .map(UserToken::getToken)
|
|
|
+ .flatMap(this::signOutByToken)
|
|
|
+ .then();
|
|
|
}
|
|
|
|
|
|
/**
|