Browse Source

增加EntityEventHelper

zhou-hao 3 years ago
parent
commit
10888a311d

+ 60 - 0
hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityEventHelper.java

@@ -0,0 +1,60 @@
+package org.hswebframework.web.crud.events;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.context.Context;
+
+/**
+ * 实体事件帮助器
+ *
+ * @author zhouhao
+ * @since 4.0.12
+ */
+public class EntityEventHelper {
+
+    private static final String doEventContextKey = EntityEventHelper.class.getName() + "_doEvent";
+
+    /**
+     * 判断当前是否设置了事件
+     *
+     * @param defaultIfEmpty 如果未设置时的默认值
+     * @return 是否设置了事件
+     */
+    public static Mono<Boolean> isDoFireEvent(boolean defaultIfEmpty) {
+        return Mono
+                .subscriberContext()
+                .flatMap(ctx -> Mono.justOrEmpty(ctx.<Boolean>getOrEmpty(doEventContextKey)))
+                .defaultIfEmpty(defaultIfEmpty);
+    }
+
+    /**
+     * 设置Mono不触发实体类事件
+     *
+     * <pre>
+     *     save(...)
+     *     .as(EntityEventHelper::setDoNotFireEvent)
+     * </pre>
+     *
+     * @param stream 流
+     * @param <T>    泛型
+     * @return 流
+     */
+    public static <T> Mono<T> setDoNotFireEvent(Mono<T> stream) {
+        return stream.subscriberContext(Context.of(doEventContextKey, false));
+    }
+
+    /**
+     * 设置Flux不触发实体类事件
+     * <pre>
+     *     fetch()
+     *     .as(EntityEventHelper::setDoNotFireEvent)
+     * </pre>
+     *
+     * @param stream 流
+     * @param <T>    泛型
+     * @return 流
+     */
+    public static <T> Flux<T> setDoNotFireEvent(Flux<T> stream) {
+        return stream.subscriberContext(Context.of(doEventContextKey, false));
+    }
+}

+ 85 - 58
hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityEventListener.java

@@ -17,12 +17,9 @@ import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
 import org.hswebframework.ezorm.rdb.metadata.TableOrViewMetadata;
 import org.hswebframework.web.api.crud.entity.Entity;
 import org.hswebframework.web.bean.FastBeanCopier;
-import org.hswebframework.web.crud.annotation.EnableEntityEvent;
 import org.hswebframework.web.event.AsyncEvent;
 import org.hswebframework.web.event.GenericsPayloadApplicationEvent;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationEventPublisher;
-import org.springframework.scheduling.annotation.Async;
 import reactor.core.publisher.Mono;
 import reactor.function.Function3;
 import reactor.util.function.Tuple2;
@@ -31,6 +28,7 @@ import reactor.util.function.Tuples;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.function.Supplier;
 
 @SuppressWarnings("all")
 @AllArgsConstructor
@@ -161,7 +159,8 @@ public class EntityEventListener implements EventListener {
                                                 BeanUtilsBean
                                                         .getInstance()
                                                         .setProperty(data, stringObjectEntry.getKey(), null);
-                                            }catch (Throwable ignore){}
+                                            } catch (Throwable ignore) {
+                                            }
                                         }
                                     }
                                     return data;
@@ -214,7 +213,7 @@ public class EntityEventListener implements EventListener {
                                      EntityEventPhase.before,
                                      EntityEventPhase.after)) {
                            holder.before(
-                                   ((ReactiveRepository<Object, ?>) repo)
+                                   this.doAsyncEvent(() -> ((ReactiveRepository<Object, ?>) repo)
                                            .createQuery()
                                            .setParam(update.toQueryParam())
                                            .fetch()
@@ -227,13 +226,13 @@ public class EntityEventListener implements EventListener {
                                                                       entityType,
                                                                       EntityPrepareModifyEvent::new);
 
-                                           })
-                                           .then()
+                                           }).then()
+                                   )
                            );
                        }
                        //before
                        if (isEnabled(entityType, EntityEventType.modify, EntityEventPhase.before)) {
-                           holder.invoke(Mono.defer(() -> {
+                           holder.invoke(this.doAsyncEvent(() -> {
                                Tuple2<List<Object>, List<Object>> _tmp = updated.get();
                                if (_tmp != null) {
                                    return sendUpdateEvent(_tmp.getT1(),
@@ -247,19 +246,20 @@ public class EntityEventListener implements EventListener {
 
                        //after
                        if (isEnabled(entityType, EntityEventType.modify, EntityEventPhase.after)) {
-                           holder.after(v -> {
-                               return Mono
-                                       .defer(() -> {
-                                           Tuple2<List<Object>, List<Object>> _tmp = updated.getAndSet(null);
-                                           if (_tmp != null) {
-                                               return sendUpdateEvent(_tmp.getT1(),
-                                                                      _tmp.getT2(),
-                                                                      entityType,
-                                                                      EntityModifyEvent::new);
-                                           }
-                                           return Mono.empty();
-                                       });
-                           });
+                           holder.after(v -> this
+                                   .doAsyncEvent(() -> {
+                                       return Mono
+                                               .defer(() -> {
+                                                   Tuple2<List<Object>, List<Object>> _tmp = updated.getAndSet(null);
+                                                   if (_tmp != null) {
+                                                       return sendUpdateEvent(_tmp.getT1(),
+                                                                              _tmp.getT2(),
+                                                                              entityType,
+                                                                              EntityModifyEvent::new);
+                                                   }
+                                                   return Mono.empty();
+                                               });
+                                   }));
                        }
 
                    });
@@ -299,30 +299,30 @@ public class EntityEventListener implements EventListener {
                               .ifPresent(holder -> {
                                   AtomicReference<List<Object>> deleted = new AtomicReference<>();
                                   if (isEnabled(entityType, EntityEventType.delete, EntityEventPhase.before, EntityEventPhase.after)) {
-                                      holder.before(((ReactiveRepository<Object, ?>) repo)
-                                                            .createQuery()
-                                                            .setParam(dslUpdate.toQueryParam())
-                                                            .fetch()
-                                                            .collectList()
-                                                            .filter(CollectionUtils::isNotEmpty)
-                                                            .flatMap(list -> {
-                                                                deleted.set(list);
-                                                                return this
-                                                                        .sendDeleteEvent(list, (Class) mapping.getEntityType(), EntityBeforeDeleteEvent::new);
-                                                            })
+                                      holder.before(
+                                              this.doAsyncEvent(() -> ((ReactiveRepository<Object, ?>) repo)
+                                                      .createQuery()
+                                                      .setParam(dslUpdate.toQueryParam())
+                                                      .fetch()
+                                                      .collectList()
+                                                      .filter(CollectionUtils::isNotEmpty)
+                                                      .flatMap(list -> {
+                                                          deleted.set(list);
+                                                          return this
+                                                                  .sendDeleteEvent(list, (Class) mapping.getEntityType(), EntityBeforeDeleteEvent::new);
+                                                      })
+                                              )
                                       );
                                   }
                                   if (isEnabled(entityType, EntityEventType.delete, EntityEventPhase.after)) {
-                                      holder.after(v -> {
-                                          return Mono
-                                                  .defer(() -> {
-                                                      List<Object> _tmp = deleted.getAndSet(null);
-                                                      if (CollectionUtils.isNotEmpty(_tmp)) {
-                                                          return sendDeleteEvent(_tmp, (Class) mapping.getEntityType(), EntityDeletedEvent::new);
-                                                      }
-                                                      return Mono.empty();
-                                                  });
-                                      });
+                                      holder.after(v -> this
+                                              .doAsyncEvent(() -> {
+                                                  List<Object> _tmp = deleted.getAndSet(null);
+                                                  if (CollectionUtils.isNotEmpty(_tmp)) {
+                                                      return sendDeleteEvent(_tmp, (Class) mapping.getEntityType(), EntityDeletedEvent::new);
+                                                  }
+                                                  return Mono.empty();
+                                              }));
                                   }
 
                               });
@@ -362,19 +362,28 @@ public class EntityEventListener implements EventListener {
                        if (resultHolder.isPresent()) {
                            ReactiveResultHolder holder = resultHolder.get();
                            if (null != prepareEvent && isEnabled(clazz, entityEventType, EntityEventPhase.prepare)) {
-                               eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, prepareEvent, clazz));
-                               holder.before(prepareEvent.getAsync());
+                               holder.before(
+                                       this.doAsyncEvent(() -> {
+                                           eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, prepareEvent, clazz));
+                                           return prepareEvent.getAsync();
+                                       })
+                               );
                            }
+
                            if (null != beforeEvent && isEnabled(clazz, entityEventType, EntityEventPhase.before)) {
-                               holder.invoke(Mono.defer(() -> {
-                                   eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, beforeEvent, clazz));
-                                   return beforeEvent.getAsync();
-                               }));
+                               holder.invoke(
+                                       this.doAsyncEvent(() -> {
+                                           eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, beforeEvent, clazz));
+                                           return beforeEvent.getAsync();
+                                       })
+                               );
                            }
                            if (null != afterEvent && isEnabled(clazz, entityEventType, EntityEventPhase.after)) {
                                holder.after(v -> {
-                                   eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, afterEvent, clazz));
-                                   return afterEvent.getAsync();
+                                   return this.doAsyncEvent(() -> {
+                                       eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, afterEvent, clazz));
+                                       return afterEvent.getAsync();
+                                   });
                                });
                            }
                            return;
@@ -415,19 +424,28 @@ public class EntityEventListener implements EventListener {
                        if (resultHolder.isPresent()) {
                            ReactiveResultHolder holder = resultHolder.get();
                            if (null != prepareEvent && isEnabled(clazz, entityEventType, EntityEventPhase.prepare)) {
-                               eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, prepareEvent, clazz));
-                               holder.before(prepareEvent.getAsync());
+                               holder.before(
+                                       this.doAsyncEvent(() -> {
+                                           eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, prepareEvent, clazz));
+                                           return prepareEvent.getAsync();
+                                       })
+                               );
                            }
+
                            if (null != beforeEvent && isEnabled(clazz, entityEventType, EntityEventPhase.before)) {
-                               holder.invoke(Mono.defer(() -> {
-                                   eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, beforeEvent, clazz));
-                                   return beforeEvent.getAsync();
-                               }));
+                               holder.invoke(
+                                       this.doAsyncEvent(() -> {
+                                           eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, beforeEvent, clazz));
+                                           return beforeEvent.getAsync();
+                                       })
+                               );
                            }
                            if (null != afterEvent && isEnabled(clazz, entityEventType, EntityEventPhase.after)) {
                                holder.after(v -> {
-                                   eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, afterEvent, clazz));
-                                   return afterEvent.getAsync();
+                                   return this.doAsyncEvent(() -> {
+                                       eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, afterEvent, clazz));
+                                       return afterEvent.getAsync();
+                                   });
                                });
                            }
                            return;
@@ -438,4 +456,13 @@ public class EntityEventListener implements EventListener {
                    afterEvent.getAsync().block();
                });
     }
+
+    protected Mono<Void> doAsyncEvent(Supplier<Mono<Void>> eventSupplier) {
+        return EntityEventHelper
+                .isDoFireEvent(true)
+                .filter(Boolean::booleanValue)
+                .flatMap(ignore -> {
+                    return eventSupplier.get();
+                });
+    }
 }

+ 36 - 24
hsweb-commons/hsweb-commons-crud/src/test/java/org/hswebframework/web/crud/events/EntityEventListenerTest.java

@@ -39,10 +39,10 @@ public class EntityEventListenerTest {
     @Test
     public void test() {
         Mono.just(EventTestEntity.of("test", 1))
-                .as(reactiveRepository::insert)
-                .as(StepVerifier::create)
-                .expectNext(1)
-                .verifyComplete();
+            .as(reactiveRepository::insert)
+            .as(StepVerifier::create)
+            .expectNext(1)
+            .verifyComplete();
         Assert.assertEquals(listener.created.getAndSet(0), 1);
 
 
@@ -51,7 +51,7 @@ public class EntityEventListenerTest {
     @Test
     public void testInsertBatch() {
         reactiveRepository.createQuery()
-                          .where(EventTestEntity::getId,"test")
+                          .where(EventTestEntity::getId, "test")
                           .fetch()
                           .then()
                           .as(StepVerifier::create)
@@ -61,15 +61,15 @@ public class EntityEventListenerTest {
 
 
         Flux.just(EventTestEntity.of("test2", 1), EventTestEntity.of("test3", 2))
-                .as(reactiveRepository::insert)
-                .as(StepVerifier::create)
-                .expectNext(2)
-                .verifyComplete();
+            .as(reactiveRepository::insert)
+            .as(StepVerifier::create)
+            .expectNext(2)
+            .verifyComplete();
         Assert.assertEquals(listener.created.getAndSet(0), 2);
         Assert.assertEquals(listener.beforeCreate.getAndSet(0), 2);
 
         reactiveRepository
-                .createUpdate().set("age",3).where().in("name","test2","test3").execute()
+                .createUpdate().set("age", 3).where().in("name", "test2", "test3").execute()
                 .as(StepVerifier::create)
                 .expectNext(2)
                 .verifyComplete();
@@ -77,39 +77,51 @@ public class EntityEventListenerTest {
         Assert.assertEquals(listener.modified.getAndSet(0), 2);
         Assert.assertEquals(listener.beforeModify.getAndSet(0), 2);
 
-        reactiveRepository.createDelete().where().in("name","test2","test3").execute()
-                .as(StepVerifier::create)
-                .expectNext(2)
-                .verifyComplete();
+        reactiveRepository.createDelete().where().in("name", "test2", "test3").execute()
+                          .as(StepVerifier::create)
+                          .expectNext(2)
+                          .verifyComplete();
 
         Assert.assertEquals(listener.deleted.getAndSet(0), 2);
         Assert.assertEquals(listener.beforeDelete.getAndSet(0), 2);
 
         reactiveRepository.save(EventTestEntity.of("test2", 1))
-                .then()
-                .as(StepVerifier::create)
-                .expectComplete()
-                .verify();
+                          .then()
+                          .as(StepVerifier::create)
+                          .expectComplete()
+                          .verify();
 
         Assert.assertEquals(listener.saved.getAndSet(0), 1);
         Assert.assertEquals(listener.beforeSave.getAndSet(0), 1);
 
 
-
     }
 
     @Test
     @Ignore
     public void testInsertError() {
         Flux.just(EventTestEntity.of("test2", 1), EventTestEntity.of("test3", 2))
-                .as(reactiveRepository::insert)
-                .flatMap(i -> Mono.error(new RuntimeException()))
-                .as(transactionalOperator::transactional)
-                .as(StepVerifier::create)
-                .verifyError();
+            .as(reactiveRepository::insert)
+            .flatMap(i -> Mono.error(new RuntimeException()))
+            .as(transactionalOperator::transactional)
+            .as(StepVerifier::create)
+            .verifyError();
 
         Assert.assertEquals(listener.created.getAndSet(0), 0);
     }
 
 
+    @Test
+    public void testDoNotFire() {
+        Mono.just(EventTestEntity.of("test", 1))
+            .as(reactiveRepository::insert)
+            .as(EntityEventHelper::setDoNotFireEvent)
+            .as(StepVerifier::create)
+            .expectNext(1)
+            .verifyComplete();
+        Assert.assertEquals(listener.created.getAndSet(0), 0);
+
+
+    }
+
 }