|
@@ -6,9 +6,12 @@ import org.hswebframework.ezorm.rdb.mapping.ReactiveDelete;
|
|
|
import org.hswebframework.ezorm.rdb.mapping.ReactiveUpdate;
|
|
|
import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
|
|
|
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
|
|
|
+import org.hswebframework.web.event.AsyncEvent;
|
|
|
import org.hswebframework.web.exception.BusinessException;
|
|
|
import org.hswebframework.web.system.authorization.api.entity.DimensionUserEntity;
|
|
|
import org.hswebframework.web.system.authorization.api.event.ClearUserAuthorizationCacheEvent;
|
|
|
+import org.hswebframework.web.system.authorization.api.event.DimensionBindEvent;
|
|
|
+import org.hswebframework.web.system.authorization.api.event.DimensionUnbindEvent;
|
|
|
import org.hswebframework.web.system.authorization.api.event.UserDeletedEvent;
|
|
|
import org.reactivestreams.Publisher;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@@ -16,9 +19,11 @@ import org.springframework.context.ApplicationEventPublisher;
|
|
|
import org.springframework.context.event.EventListener;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
+import reactor.function.Function3;
|
|
|
|
|
|
import java.util.Collection;
|
|
|
-import java.util.stream.Collectors;
|
|
|
+import java.util.List;
|
|
|
+import java.util.function.Function;
|
|
|
|
|
|
@Slf4j
|
|
|
public class DefaultDimensionUserService extends GenericReactiveCrudService<DimensionUserEntity, String> {
|
|
@@ -37,10 +42,9 @@ public class DefaultDimensionUserService extends GenericReactiveCrudService<Dime
|
|
|
|
|
|
@Override
|
|
|
public Mono<SaveResult> save(Publisher<DimensionUserEntity> entityPublisher) {
|
|
|
- return Flux.from(entityPublisher)
|
|
|
- .doOnNext(DimensionUserEntity::generateId)
|
|
|
- .doOnNext(entity -> eventPublisher.publishEvent(ClearUserAuthorizationCacheEvent.of(entity.getUserId())))
|
|
|
- .as(super::save);
|
|
|
+ return this
|
|
|
+ .publishEvent(entityPublisher, DimensionBindEvent::new)
|
|
|
+ .as(super::save);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -52,62 +56,79 @@ public class DefaultDimensionUserService extends GenericReactiveCrudService<Dime
|
|
|
|
|
|
@Override
|
|
|
public Mono<Integer> insert(Publisher<DimensionUserEntity> entityPublisher) {
|
|
|
- return Flux.from(entityPublisher)
|
|
|
- .doOnNext(DimensionUserEntity::generateId)
|
|
|
- .doOnNext(entity -> eventPublisher.publishEvent(ClearUserAuthorizationCacheEvent.of(entity.getUserId())))
|
|
|
- .as(super::insert)
|
|
|
- .onErrorMap(DuplicateKeyException.class, (err) -> new BusinessException("重复的绑定请求"));
|
|
|
+ return this
|
|
|
+ .publishEvent(entityPublisher, DimensionBindEvent::new)
|
|
|
+ .as(super::insert)
|
|
|
+ .onErrorMap(DuplicateKeyException.class, (err) -> new BusinessException("重复的绑定请求"));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Mono<Integer> insertBatch(Publisher<? extends Collection<DimensionUserEntity>> entityPublisher) {
|
|
|
- return Flux.from(entityPublisher)
|
|
|
- .doOnNext(entity -> eventPublisher
|
|
|
- .publishEvent(ClearUserAuthorizationCacheEvent
|
|
|
- .of(entity.stream()
|
|
|
- .map(DimensionUserEntity::getUserId)
|
|
|
- .collect(Collectors.toSet()))))
|
|
|
- .as(super::insertBatch);
|
|
|
+
|
|
|
+ Flux<? extends Collection<DimensionUserEntity>> cache = Flux.from(entityPublisher).cache();
|
|
|
+
|
|
|
+ return this
|
|
|
+ .publishEvent(cache.flatMapIterable(Function.identity()), DimensionBindEvent::new)
|
|
|
+ .then(super.insertBatch(cache));
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public Mono<Integer> deleteById(Publisher<String> idPublisher) {
|
|
|
- return findById(Flux.from(idPublisher))
|
|
|
- .doOnNext(entity -> eventPublisher.publishEvent(ClearUserAuthorizationCacheEvent.of(entity.getUserId())))
|
|
|
- .map(DimensionUserEntity::getId)
|
|
|
- .as(super::deleteById);
|
|
|
+ private Flux<DimensionUserEntity> publishEvent(Publisher<DimensionUserEntity> stream,
|
|
|
+ Function3<String, String, List<String>, AsyncEvent> event) {
|
|
|
+ Flux<DimensionUserEntity> cache = Flux.from(stream).doOnNext(DimensionUserEntity::generateId).cache();
|
|
|
+ return cache
|
|
|
+ .groupBy(DimensionUserEntity::getDimensionTypeId)
|
|
|
+ .flatMap(typeGroup -> {
|
|
|
+ String type = typeGroup.key();
|
|
|
+ return typeGroup
|
|
|
+ .groupBy(DimensionUserEntity::getDimensionId)
|
|
|
+ .flatMap(dimensionIdGroup -> {
|
|
|
+ String dimensionId = dimensionIdGroup.key();
|
|
|
+
|
|
|
+ return dimensionIdGroup
|
|
|
+ .map(DimensionUserEntity::getUserId)
|
|
|
+ .collectList()
|
|
|
+ .flatMap(userIdList -> {
|
|
|
+ eventPublisher.publishEvent(ClearUserAuthorizationCacheEvent.of(userIdList));
|
|
|
+ return event.apply(type, dimensionId, userIdList).publish(eventPublisher);
|
|
|
+ });
|
|
|
+ });
|
|
|
+ })
|
|
|
+ .thenMany(cache);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@SuppressWarnings("all")
|
|
|
public ReactiveUpdate<DimensionUserEntity> createUpdate() {
|
|
|
- return super.createUpdate()
|
|
|
- .onExecute((update, r) -> r
|
|
|
- .doOnSuccess(i -> {
|
|
|
- this.createQuery()
|
|
|
- .select(DimensionUserEntity::getUserId)
|
|
|
- .setParam(update.toQueryParam())
|
|
|
- .fetch()
|
|
|
- .map(DimensionUserEntity::getUserId)
|
|
|
- .collectList()
|
|
|
- .map(ClearUserAuthorizationCacheEvent::of)
|
|
|
- .subscribe();
|
|
|
- }));
|
|
|
+ return super
|
|
|
+ .createUpdate()
|
|
|
+ .onExecute((update, r) -> r
|
|
|
+ .flatMap(result -> this
|
|
|
+ .createQuery()
|
|
|
+ .select(DimensionUserEntity::getUserId)
|
|
|
+ .setParam(update.toQueryParam())
|
|
|
+ .fetch()
|
|
|
+ .map(DimensionUserEntity::getUserId)
|
|
|
+ .distinct()
|
|
|
+ .collectList()
|
|
|
+ .map(ClearUserAuthorizationCacheEvent::of)
|
|
|
+ .doOnNext(eventPublisher::publishEvent)
|
|
|
+ .thenReturn(result)
|
|
|
+ )
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@SuppressWarnings("all")
|
|
|
public ReactiveDelete createDelete() {
|
|
|
- return super.createDelete()
|
|
|
- .onExecute((delete, r) -> r.doOnSuccess(i -> {
|
|
|
- this.createQuery()
|
|
|
- .select(DimensionUserEntity::getUserId)
|
|
|
- .setParam(delete.toQueryParam())
|
|
|
- .fetch()
|
|
|
- .map(DimensionUserEntity::getUserId)
|
|
|
- .collectList()
|
|
|
- .map(ClearUserAuthorizationCacheEvent::of)
|
|
|
- .subscribe();
|
|
|
- }));
|
|
|
+ return super
|
|
|
+ .createDelete()
|
|
|
+ .onExecute((delete, r) -> this
|
|
|
+ .publishEvent(this.createQuery()
|
|
|
+ .select(DimensionUserEntity::getUserId)
|
|
|
+ .setParam(delete.toQueryParam())
|
|
|
+ .fetch(),
|
|
|
+ DimensionUnbindEvent::new
|
|
|
+ ).then(r)
|
|
|
+ );
|
|
|
}
|
|
|
}
|