zhou-hao 4 лет назад
Родитель
Сommit
fc558a400c

+ 4 - 3
hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityCreatedEvent.java

@@ -2,6 +2,7 @@ package org.hswebframework.web.crud.events;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import org.hswebframework.web.event.DefaultAsyncEvent;
 
 import java.io.Serializable;
 import java.util.List;
@@ -12,9 +13,9 @@ import java.util.List;
  */
 @AllArgsConstructor
 @Getter
-public class EntityCreatedEvent<E> implements Serializable {
+public class EntityCreatedEvent<E> extends DefaultAsyncEvent implements Serializable {
 
-    private List<E> entity;
+    private final List<E> entity;
 
-    private Class<E> entityType;
+    private final Class<E> entityType;
 }

+ 4 - 3
hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityDeletedEvent.java

@@ -2,6 +2,7 @@ package org.hswebframework.web.crud.events;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import org.hswebframework.web.event.DefaultAsyncEvent;
 
 import java.io.Serializable;
 import java.util.Collection;
@@ -13,12 +14,12 @@ import java.util.List;
  */
 @AllArgsConstructor
 @Getter
-public class EntityDeletedEvent<E> implements Serializable {
+public class EntityDeletedEvent<E> extends DefaultAsyncEvent implements Serializable {
 
     private static final long serialVersionUID = -7158901204884303777L;
 
-    private List<E> entity;
+    private final List<E> entity;
 
-    private Class<E> entityType;
+    private final Class<E> entityType;
 
 }

+ 55 - 15
hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityEventListener.java

@@ -6,19 +6,23 @@ import org.hswebframework.ezorm.rdb.events.*;
 import org.hswebframework.ezorm.rdb.mapping.*;
 import org.hswebframework.ezorm.rdb.mapping.events.MappingContextKeys;
 import org.hswebframework.ezorm.rdb.mapping.events.MappingEventTypes;
+import org.hswebframework.ezorm.rdb.mapping.events.ReactiveResultHolder;
 import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
 import org.hswebframework.ezorm.rdb.metadata.TableOrViewMetadata;
 import org.hswebframework.web.api.crud.entity.Entity;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.hswebframework.web.crud.annotation.EnableEntityEvent;
+import org.hswebframework.web.event.AsyncEvent;
 import org.hswebframework.web.event.GenericsPayloadApplicationEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationEventPublisher;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 
@@ -51,7 +55,7 @@ public class EntityEventListener implements EventListener {
             return;
         }
 
-        if (type == MappingEventTypes.insert_after) {
+        if (type == MappingEventTypes.insert_before) {
             boolean single = context.get(MappingContextKeys.type).map("single"::equals).orElse(false);
             if (single) {
                 handleSingleOperation(mapping.getEntityType(), context, EntityCreatedEvent::new);
@@ -59,7 +63,7 @@ public class EntityEventListener implements EventListener {
                 handleBatchOperation(mapping.getEntityType(), context, EntityCreatedEvent::new);
             }
         }
-        if (type == MappingEventTypes.save_after) {
+        if (type == MappingEventTypes.save_before) {
             boolean single = context.get(MappingContextKeys.type).map("single"::equals).orElse(false);
             if (single) {
                 handleSingleOperation(mapping.getEntityType(), context, EntitySavedEvent::new);
@@ -76,13 +80,13 @@ public class EntityEventListener implements EventListener {
         }
     }
 
-    protected void sendUpdateEvent(List<?> olds, EventContext context) {
+    protected Mono<Void> sendUpdateEvent(List<?> olds, EventContext context) {
         List<Object> newValues = new ArrayList<>(olds.size());
         EntityColumnMapping mapping = context.get(MappingContextKeys.columnMapping).orElseThrow(UnsupportedOperationException::new);
         TableOrViewMetadata table = context.get(ContextKeys.table).orElseThrow(UnsupportedOperationException::new);
         RDBColumnMetadata idColumn = table.getColumns().stream().filter(RDBColumnMetadata::isPrimaryKey).findFirst().orElse(null);
         if (idColumn == null) {
-            return;
+            return Mono.empty();
         }
         for (Object old : olds) {
             Object newValue = context.get(MappingContextKeys.instance)
@@ -101,15 +105,19 @@ public class EntityEventListener implements EventListener {
             }
             newValues.add(newValue);
         }
-        eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, new EntityModifyEvent(olds, newValues, mapping.getEntityType()), mapping.getEntityType()));
+        EntityModifyEvent event = new EntityModifyEvent(olds, newValues, mapping.getEntityType());
+        eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, event, mapping.getEntityType()));
+        return event.getAsync();
     }
 
-    protected void sendDeleteEvent(List<?> olds, EventContext context) {
+    protected Mono<Void> sendDeleteEvent(List<?> olds, EventContext context) {
 
         EntityColumnMapping mapping = context.get(MappingContextKeys.columnMapping).orElseThrow(UnsupportedOperationException::new);
         TableOrViewMetadata table = context.get(ContextKeys.table).orElseThrow(UnsupportedOperationException::new);
-        eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, new EntityDeletedEvent(olds, mapping.getEntityType()), mapping.getEntityType()));
 
+        EntityDeletedEvent deletedEvent = new EntityDeletedEvent(olds, mapping.getEntityType());
+        eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, deletedEvent, mapping.getEntityType()));
+        return deletedEvent.getAsync();
     }
 
     protected void handleReactiveUpdateBefore(DSLUpdate<?, ?> update, EventContext context) {
@@ -119,11 +127,13 @@ public class EntityEventListener implements EventListener {
                     .ifPresent(holder -> {
                         AtomicReference<List<?>> updated = new AtomicReference<>();
                         holder.after(v -> {
-                            return Mono.fromRunnable(() -> {
+                            return Mono.defer(() -> {
                                 List<?> _tmp = updated.getAndSet(null);
+
                                 if (CollectionUtils.isNotEmpty(_tmp)) {
-                                    sendUpdateEvent(_tmp, context);
+                                    return sendUpdateEvent(_tmp, context);
                                 }
+                                return Mono.empty();
                             });
                         });
                         holder.before(
@@ -159,11 +169,12 @@ public class EntityEventListener implements EventListener {
                                 .ifPresent(holder -> {
                                     AtomicReference<List<?>> deleted = new AtomicReference<>();
                                     holder.after(v -> {
-                                        return Mono.fromRunnable(() -> {
+                                        return Mono.defer(() -> {
                                             List<?> _tmp = deleted.getAndSet(null);
                                             if (CollectionUtils.isNotEmpty(_tmp)) {
-                                                sendDeleteEvent(_tmp, context);
+                                                return sendDeleteEvent(_tmp, context);
                                             }
+                                            return Mono.empty();
                                         });
                                     });
                                     holder.before(
@@ -183,22 +194,51 @@ public class EntityEventListener implements EventListener {
 
     }
 
-    protected void handleBatchOperation(Class clazz, EventContext context, BiFunction<List<?>, Class, Object> mapper) {
+    protected void handleBatchOperation(Class clazz, EventContext context, BiFunction<List<?>, Class, AsyncEvent> mapper) {
 
         context.get(MappingContextKeys.instance)
                 .filter(List.class::isInstance)
                 .map(List.class::cast)
                 .ifPresent(lst -> {
-                    eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, mapper.apply(lst, clazz), clazz));
+                    AsyncEvent event = mapper.apply(lst, clazz);
+                    Object repo = context.get(MappingContextKeys.repository).orElse(null);
+                    if (repo instanceof ReactiveRepository) {
+                        Optional<ReactiveResultHolder> resultHolder = context.get(MappingContextKeys.reactiveResultHolder);
+                        if (resultHolder.isPresent()) {
+                            resultHolder
+                                    .get()
+                                    .after(v -> {
+                                        eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, event, clazz));
+                                        return event.getAsync();
+                                    });
+                            return;
+                        }
+                    }
+                    eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, event, clazz));
+
                 });
     }
 
-    protected void handleSingleOperation(Class clazz, EventContext context, BiFunction<List<?>, Class, Object> mapper) {
+    protected void handleSingleOperation(Class clazz, EventContext context, BiFunction<List<?>, Class, AsyncEvent> mapper) {
         context.get(MappingContextKeys.instance)
                 .filter(Entity.class::isInstance)
                 .map(Entity.class::cast)
                 .ifPresent(entity -> {
-                    eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, mapper.apply(Collections.singletonList(entity), clazz), clazz));
+                    AsyncEvent event = mapper.apply(Collections.singletonList(entity), clazz);
+                    Object repo = context.get(MappingContextKeys.repository).orElse(null);
+                    if (repo instanceof ReactiveRepository) {
+                        Optional<ReactiveResultHolder> resultHolder = context.get(MappingContextKeys.reactiveResultHolder);
+                        if (resultHolder.isPresent()) {
+                            resultHolder
+                                    .get()
+                                    .after(v -> {
+                                        eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, event, clazz));
+                                        return event.getAsync();
+                                    });
+                            return;
+                        }
+                    }
+                    eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, event, clazz));
                 });
     }
 }

+ 5 - 4
hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityModifyEvent.java

@@ -2,6 +2,7 @@ package org.hswebframework.web.crud.events;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import org.hswebframework.web.event.DefaultAsyncEvent;
 
 import java.io.Serializable;
 import java.util.List;
@@ -12,14 +13,14 @@ import java.util.List;
  */
 @AllArgsConstructor
 @Getter
-public class EntityModifyEvent<E> implements Serializable{
+public class EntityModifyEvent<E> extends DefaultAsyncEvent implements Serializable{
 
     private static final long serialVersionUID = -7158901204884303777L;
 
-    private List<E> before;
+    private final List<E> before;
 
-    private List<E> after;
+    private final List<E> after;
 
-    private Class<E> entityType;
+    private final Class<E> entityType;
 
 }

+ 4 - 3
hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntitySavedEvent.java

@@ -2,6 +2,7 @@ package org.hswebframework.web.crud.events;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import org.hswebframework.web.event.DefaultAsyncEvent;
 
 import java.io.Serializable;
 import java.util.List;
@@ -12,9 +13,9 @@ import java.util.List;
  */
 @AllArgsConstructor
 @Getter
-public class EntitySavedEvent<E> implements Serializable {
+public class EntitySavedEvent<E> extends DefaultAsyncEvent implements Serializable {
 
-    private List<E> entity;
+    private final List<E> entity;
 
-    private Class<E> entityType;
+    private final Class<E> entityType;
 }

+ 17 - 8
hsweb-commons/hsweb-commons-crud/src/test/java/org/hswebframework/web/crud/events/TestEntityListener.java

@@ -3,6 +3,7 @@ package org.hswebframework.web.crud.events;
 import org.hswebframework.web.crud.entity.EventTestEntity;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
 
 import javax.annotation.PostConstruct;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -19,25 +20,33 @@ public class TestEntityListener {
 
     @EventListener
     public void handleCreated(EntityCreatedEvent<EventTestEntity> event) {
-        System.out.println(event);
-        created.addAndGet(event.getEntity().size());
+        event.async(Mono.fromRunnable(() -> {
+            System.out.println(event);
+            created.addAndGet(event.getEntity().size());
+        }));
     }
 
     @EventListener
     public void handleCreated(EntityDeletedEvent<EventTestEntity> event) {
-        System.out.println(event);
-        deleted.addAndGet(event.getEntity().size());
+        event.async(Mono.fromRunnable(() -> {
+            System.out.println(event);
+            deleted.addAndGet(event.getEntity().size());
+        }));
     }
 
     @EventListener
     public void handleModify(EntityModifyEvent<EventTestEntity> event) {
-        System.out.println(event);
-        modified.addAndGet(event.getAfter().size());
+        event.async(Mono.fromRunnable(() -> {
+            System.out.println(event);
+            modified.addAndGet(event.getAfter().size());
+        }));
     }
 
     @EventListener
     public void handleSave(EntitySavedEvent<EventTestEntity> event) {
-        System.out.println(event);
-        saved.addAndGet(event.getEntity().size());
+        event.async(Mono.fromRunnable(() -> {
+            System.out.println(event);
+            saved.addAndGet(event.getEntity().size());
+        }));
     }
 }

+ 2 - 0
hsweb-core/src/main/java/org/hswebframework/web/event/AsyncEvent.java

@@ -12,6 +12,8 @@ import reactor.core.publisher.Mono;
  */
 public interface AsyncEvent {
 
+    Mono<Void> getAsync();
+
     /**
      * 注册一个异步任务
      *

+ 2 - 0
hsweb-core/src/main/java/org/hswebframework/web/event/DefaultAsyncEvent.java

@@ -1,11 +1,13 @@
 package org.hswebframework.web.event;
 
+import lombok.Getter;
 import org.reactivestreams.Publisher;
 import org.springframework.context.ApplicationEventPublisher;
 import reactor.core.publisher.Mono;
 
 public class DefaultAsyncEvent implements AsyncEvent {
 
+    @Getter
     private Mono<Void> async = Mono.empty();
 
     public synchronized void async(Publisher<?> publisher) {