Browse Source

Merge pull request #200 from wujun8/master

老周 2 years ago
parent
commit
080f52336a

+ 4 - 4
hsweb-core/src/main/java/org/hswebframework/web/event/DefaultAsyncEvent.java

@@ -15,23 +15,23 @@ public class DefaultAsyncEvent implements AsyncEvent {
 
     public synchronized void async(Publisher<?> publisher) {
         hasListener = true;
-        this.async = async.then(Mono.from(publisher).then());
+        this.async = async.then(Mono.fromDirect(publisher).then());
     }
 
     @Override
     public synchronized void first(Publisher<?> publisher) {
         hasListener = true;
-        this.first = Mono.from(publisher).then(first);
+        this.first = Mono.fromDirect(publisher).then(first);
     }
 
     @Override
     public void transformFirst(Function<Mono<?>, Publisher<?>> mapper) {
-        this.first = Mono.from(mapper.apply(this.first));
+        this.first = Mono.fromDirect(mapper.apply(this.first));
     }
 
     @Override
     public void transform(Function<Mono<?>, Publisher<?>> mapper) {
-        this.async = Mono.from(mapper.apply(this.async));
+        this.async = Mono.fromDirect(mapper.apply(this.async));
     }
 
     @Override

+ 27 - 0
hsweb-core/src/test/java/org/hswebframework/web/event/EventTest.java

@@ -0,0 +1,27 @@
+package org.hswebframework.web.event;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Slf4j
+public class EventTest {
+
+    @Test
+    public void testMonoFrom() {
+        Flux<String> source =  Flux.just("1", "2", "3").doOnNext(s -> log.info("get {}", s));
+
+        Mono.from(source).subscribe();
+
+        Mono.fromDirect(source).subscribe();
+    }
+
+    @Test
+    public void testAsync() {
+        Flux<String> source =  Flux.just("1", "2", "3").doOnNext(s -> log.info("get {}", s));
+        AsyncEvent event = new DefaultAsyncEvent();
+        event.async(source);
+        event.getAsync().subscribe();
+    }
+}