Browse Source

异步事件增加非响应式支持

zhou-hao 4 years ago
parent
commit
e55d8cf17e

+ 21 - 8
hsweb-commons/hsweb-commons-crud/src/main/java/org/hswebframework/web/crud/events/EntityEventListener.java

@@ -2,6 +2,7 @@ package org.hswebframework.web.crud.events;
 
 
 import org.apache.commons.collections.CollectionUtils;
+import org.hswebframework.ezorm.core.param.QueryParam;
 import org.hswebframework.ezorm.rdb.events.*;
 import org.hswebframework.ezorm.rdb.mapping.*;
 import org.hswebframework.ezorm.rdb.mapping.events.MappingContextKeys;
@@ -16,7 +17,6 @@ import org.hswebframework.web.event.AsyncEvent;
 import org.hswebframework.web.event.GenericsPayloadApplicationEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationEventPublisher;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.ArrayList;
@@ -120,7 +120,7 @@ public class EntityEventListener implements EventListener {
         return deletedEvent.getAsync();
     }
 
-    protected void handleReactiveUpdateBefore(DSLUpdate<?, ?> update, EventContext context) {
+    protected void handleUpdateBefore(DSLUpdate<?, ?> update, EventContext context) {
         Object repo = context.get(MappingContextKeys.repository).orElse(null);
         if (repo instanceof ReactiveRepository) {
             context.get(MappingContextKeys.reactiveResultHolder)
@@ -145,17 +145,20 @@ public class EntityEventListener implements EventListener {
                                         .then()
                         );
                     });
+        }else if (repo instanceof SyncRepository) {
+            QueryParam param = update.toQueryParam();
+            SyncRepository<?, ?> syncRepository = ((SyncRepository<?, ?>) repo);
+            List<?> list = syncRepository.createQuery()
+                    .setParam(param)
+                    .fetch();
+            sendUpdateEvent(list,context).block();
         }
     }
 
     protected void handleUpdateBefore(EventContext context) {
         context.<DSLUpdate<?, ?>>get(ContextKeys.source())
                 .ifPresent(dslUpdate -> {
-                    if (context.get(MappingContextKeys.reactive).orElse(false)) {
-                        handleReactiveUpdateBefore(dslUpdate, context);
-                    } else {
-                        // TODO: 2019-11-09
-                    }
+                    handleUpdateBefore(dslUpdate, context);
                 });
 
     }
@@ -186,6 +189,13 @@ public class EntityEventListener implements EventListener {
                                                     .then()
                                     );
                                 });
+                    } else if (repo instanceof SyncRepository) {
+                        QueryParam param = dslUpdate.toQueryParam();
+                        SyncRepository<?, ?> syncRepository = ((SyncRepository<?, ?>) repo);
+                        List<?> list = syncRepository.createQuery()
+                                .setParam(param)
+                                .fetch();
+                        sendDeleteEvent(list,context).block();
                     }
                 });
     }
@@ -215,7 +225,8 @@ public class EntityEventListener implements EventListener {
                         }
                     }
                     eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, event, clazz));
-
+                    //block非响应式的支持
+                    event.getAsync().block();
                 });
     }
 
@@ -239,6 +250,8 @@ public class EntityEventListener implements EventListener {
                         }
                     }
                     eventPublisher.publishEvent(new GenericsPayloadApplicationEvent<>(this, event, clazz));
+                    //block非响应式的支持
+                    event.getAsync().block();
                 });
     }
 }