Bläddra i källkod

基本功能实现

zhouhao 7 år sedan
förälder
incheckning
2ab390f059
19 ändrade filer med 472 tillägg och 49 borttagningar
  1. 24 0
      hsweb-eventbus/hsweb-eventbus-api/src/main/java/org/hswebframework/web/eventbus/EventListenerDefine.java
  2. 1 4
      hsweb-eventbus/hsweb-eventbus-api/src/main/java/org/hswebframework/web/eventbus/EventSubscriber.java
  3. 5 0
      hsweb-eventbus/hsweb-eventbus-default/pom.xml
  4. 6 19
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/DefaultEventBus.java
  5. 48 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/AbstractEventContainer.java
  6. 31 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/AsyncEventTaskSupplier.java
  7. 33 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/AsyncTranscationEventTaskSupplier.java
  8. 72 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/BackGroundEventTaskSupplier.java
  9. 26 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/DefaultEventExecutor.java
  10. 9 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/EventExecuteTask.java
  11. 16 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/EventExecuteTaskSupplier.java
  12. 2 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/EventListenerContainer.java
  13. 0 25
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/EventListenerDefine.java
  14. 2 1
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/EventListenerExecutor.java
  15. 21 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/SyncEventListenerExecutor.java
  16. 32 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/spring/ReflectListener.java
  17. 52 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/spring/SpringEventBus.java
  18. 40 0
      hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/spring/SpringEventContainer.java
  19. 52 0
      hsweb-eventbus/hsweb-eventbus-default/src/test/java/org/hswebframework/web/eventbus/spring/SpringEventBusTest.java

+ 24 - 0
hsweb-eventbus/hsweb-eventbus-api/src/main/java/org/hswebframework/web/eventbus/EventListenerDefine.java

@@ -0,0 +1,24 @@
+package org.hswebframework.web.eventbus;
+
+import lombok.*;
+import org.hswebframework.web.eventbus.annotation.EventMode;
+
+
+/**
+ * @author zhouhao
+ * @since 3.0
+ */
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class EventListenerDefine {
+    private EventListener listener;
+
+    private EventMode eventMode = EventMode.SYNC;
+
+    private boolean transaction = true;
+
+    private int priority = 0;
+}

+ 1 - 4
hsweb-eventbus/hsweb-eventbus-api/src/main/java/org/hswebframework/web/eventbus/EventSubscriber.java

@@ -5,8 +5,5 @@ package org.hswebframework.web.eventbus;
  * @since 1.0
  */
 public interface EventSubscriber {
-    <E> void subscribe(EventListener<E> listener);
-
-    <E> void subscribe(Class<E> eventType, EventListener<? extends E> listener);
-
+    <E> void subscribe(Class<E> eventType, EventListenerDefine listener);
 }

+ 5 - 0
hsweb-eventbus/hsweb-eventbus-default/pom.xml

@@ -21,6 +21,11 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.hswebframework.web</groupId>
+            <artifactId>hsweb-concurrent-async-job</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.hswebframework</groupId>
             <artifactId>hsweb-utils</artifactId>

+ 6 - 19
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/DefaultEventBus.java

@@ -1,9 +1,7 @@
 package org.hswebframework.web.eventbus;
 
 import lombok.extern.slf4j.Slf4j;
-import org.hswebframework.utils.ClassUtils;
 import org.hswebframework.web.eventbus.executor.EventListenerContainer;
-import org.hswebframework.web.eventbus.executor.EventListenerDefine;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -13,7 +11,7 @@ import java.util.concurrent.ConcurrentMap;
  * @since 1.0
  */
 @Slf4j
-public abstract class DefaultEventBus implements EventBus, EventSubscriber {
+public abstract class AbstractEventBus implements EventBus, EventSubscriber {
     private ConcurrentMap<String, EventListenerContainer> listenerStorage = new ConcurrentHashMap<>();
 
     @Override
@@ -22,7 +20,7 @@ public abstract class DefaultEventBus implements EventBus, EventSubscriber {
         if (null != container) {
             container.doExecute(event);
         } else {
-            log.warn("event:{},not support!", event);
+            log.warn("没有监听器处理此事件:{}", event);
         }
     }
 
@@ -35,23 +33,12 @@ public abstract class DefaultEventBus implements EventBus, EventSubscriber {
     }
 
     @Override
-    public <E> void subscribe(EventListener<E> listener) {
-        Class<E> type = (Class<E>) ClassUtils.getGenericType(listener.getClass());
-        if (type == Object.class) {
-            throw new UnsupportedOperationException("event type [Object] not support!");
-        }
-        subscribe(type, listener);
-    }
-
-    @Override
-    public <E> void subscribe(Class<E> eventType, EventListener<? extends E> listener) {
+    public <E> void subscribe(Class<E> eventType, EventListenerDefine define) {
         EventListenerContainer container = listenerStorage
-                .computeIfAbsent(getKey(eventType), type -> createEventListenerContainer(listener));
-        container.addListener(createListenerDefine(listener));
+                .computeIfAbsent(getKey(eventType), type -> createEventListenerContainer());
+        container.addListener(define);
     }
 
-    protected abstract EventListenerContainer createEventListenerContainer(EventListener listener);
-
-    protected abstract EventListenerDefine createListenerDefine(EventListener listener);
+    protected abstract EventListenerContainer createEventListenerContainer();
 
 }

+ 48 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/AbstractEventContainer.java

@@ -0,0 +1,48 @@
+package org.hswebframework.web.eventbus.executor;
+
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.eventbus.EventListenerDefine;
+
+import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * @author zhouhao
+ * @since 1.0
+ */
+@Slf4j
+public abstract class AbstractEventContainer implements EventListenerContainer {
+
+    protected final List<EventListenerDefine> defines = new ArrayList<>();
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    protected abstract EventListenerExecutor newExecutor();
+
+    protected abstract void executeAfter(EventListenerExecutor executor);
+
+    @Override
+    public void doExecute(Object event) {
+        lock.readLock().lock();
+        try {
+            EventListenerExecutor executor = newExecutor();
+            defines.forEach(define -> executor.doExecute(define, event));
+            executeAfter(executor);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void addListener(EventListenerDefine listener) {
+        lock.writeLock().lock();
+        try {
+            defines.add(listener);
+            defines.sort(Comparator.comparingInt(EventListenerDefine::getPriority));
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+}

+ 31 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/AsyncEventTaskSupplier.java

@@ -0,0 +1,31 @@
+package org.hswebframework.web.eventbus.executor;
+
+import org.hswebframework.web.eventbus.EventListener;
+import org.hswebframework.web.eventbus.EventListenerDefine;
+import org.hswebframework.web.eventbus.annotation.EventMode;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * @author zhouhao
+ * @since 3.0
+ */
+public class AsyncEventTaskSupplier implements EventExecuteTaskSupplier {
+
+    private static final ExecutorService executorService;
+
+    static {
+        executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+    }
+
+    @Override
+    public boolean isSupport(EventListenerDefine define) {
+        return define.getEventMode() == EventMode.ASYNC && !define.isTransaction();
+    }
+
+    @Override
+    public EventExecuteTask get(EventListener listener, Object event) {
+        return () -> executorService.execute(() -> listener.onEvent(event));
+    }
+}

+ 33 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/AsyncTranscationEventTaskSupplier.java

@@ -0,0 +1,33 @@
+package org.hswebframework.web.eventbus.executor;
+
+import org.hswebframework.web.async.BatchAsyncJobContainer;
+import org.hswebframework.web.eventbus.EventListener;
+import org.hswebframework.web.eventbus.EventListenerDefine;
+import org.hswebframework.web.eventbus.annotation.EventMode;
+
+
+/**
+ * @author zhouhao
+ * @since 3.0
+ */
+public class AsyncTranscationEventTaskSupplier implements EventExecuteTaskSupplier {
+
+    public AsyncTranscationEventTaskSupplier(BatchAsyncJobContainer container) {
+        this.container = container;
+    }
+
+    private BatchAsyncJobContainer container;
+
+    @Override
+    public boolean isSupport(EventListenerDefine define) {
+        return define.getEventMode() == EventMode.ASYNC && define.isTransaction();
+    }
+
+    @Override
+    public EventExecuteTask get(EventListener listener, Object event) {
+        return () -> container.submit(() -> {
+            listener.onEvent(event);
+            return true;
+        }, true);
+    }
+}

+ 72 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/BackGroundEventTaskSupplier.java

@@ -0,0 +1,72 @@
+package org.hswebframework.web.eventbus.executor;
+
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.eventbus.EventListener;
+import org.hswebframework.web.eventbus.EventListenerDefine;
+import org.hswebframework.web.eventbus.annotation.EventMode;
+
+import java.util.Queue;
+import java.util.concurrent.*;
+
+/**
+ * @author zhouhao
+ * @since 1.0
+ */
+@Slf4j
+public class BackGroundEventTaskSupplier implements EventExecuteTaskSupplier {
+    private static final Queue<Runnable> queue = new ConcurrentLinkedDeque<>();
+
+    private static volatile boolean running   = true;
+    private static volatile boolean executing = false;
+
+    private static CyclicBarrier barrier = new CyclicBarrier(2);
+
+    static {
+        Thread thread = new Thread(() -> {
+            while (running) {
+                Runnable job = queue.poll();
+                if (job != null) {
+                    executing = true;
+                    try {
+                        job.run();
+                    } catch (Exception e) {
+                        log.error("执行事件执行失败", e);
+                    }
+                } else {
+                    executing = false;
+                    try {
+                        barrier.await(10, TimeUnit.SECONDS);
+                        barrier.reset();
+                    } catch (Exception e) {
+                        log.error(e.getMessage(), e);
+                    }
+                }
+            }
+        });
+        thread.setDaemon(false);
+        thread.setName("BackGroundEventExecutor");
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> running = false));
+        thread.start();
+    }
+
+
+    @Override
+    public boolean isSupport(EventListenerDefine define) {
+        return define.getEventMode() == EventMode.BACKGROUND;
+    }
+
+
+    @Override
+    public EventExecuteTask get(EventListener listener, Object event) {
+        return () -> {
+            queue.add(() -> listener.onEvent(event));
+            try {
+                if (!executing) {
+                    barrier.await();
+                }
+            } catch (Exception e) {
+              //  e.printStackTrace();
+            }
+        };
+    }
+}

+ 26 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/DefaultEventExecutor.java

@@ -0,0 +1,26 @@
+package org.hswebframework.web.eventbus.executor;
+
+import lombok.Setter;
+import org.hswebframework.web.eventbus.EventListenerDefine;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author zhouhao
+ * @since 3.0
+ */
+public class DefaultEventExecutor implements EventListenerExecutor {
+    @Setter
+    private List<EventExecuteTaskSupplier> suppliers = new ArrayList<>();
+
+    @Override
+    public void doExecute(EventListenerDefine define, Object event) {
+        suppliers.stream().filter(supplier -> supplier.isSupport(define))
+                .findFirst()
+                .orElseThrow(() -> new UnsupportedOperationException("不支持的listener定义:" + define))
+                .get(define.getListener(), event)
+                .run();
+    }
+
+}

+ 9 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/EventExecuteTask.java

@@ -0,0 +1,9 @@
+package org.hswebframework.web.eventbus.executor;
+
+/**
+ * @author zhouhao
+ * @since 1.0
+ */
+public interface EventExecuteTask {
+    void run();
+}

+ 16 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/EventExecuteTaskSupplier.java

@@ -0,0 +1,16 @@
+package org.hswebframework.web.eventbus.executor;
+
+import org.hswebframework.web.eventbus.EventListener;
+import org.hswebframework.web.eventbus.EventListenerDefine;
+
+/**
+ * @author zhouhao
+ * @since 1.0
+ */
+public interface EventExecuteTaskSupplier {
+    boolean isSupport(EventListenerDefine define);
+
+    EventExecuteTask get(EventListener listener, Object event);
+
+   default void close(){}
+}

+ 2 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/EventListenerContainer.java

@@ -1,6 +1,8 @@
 package org.hswebframework.web.eventbus.executor;
 
 
+import org.hswebframework.web.eventbus.EventListenerDefine;
+
 /**
  * @author zhouhao
  * @since 3.0

+ 0 - 25
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/EventListenerDefine.java

@@ -1,25 +0,0 @@
-package org.hswebframework.web.eventbus.executor;
-
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Setter;
-import org.hswebframework.web.eventbus.annotation.EventMode;
-
-import java.util.EventListener;
-
-/**
- * @author zhouhao
- * @since 3.0
- */
-@Getter
-@Setter
-@Builder
-public class EventListenerDefine {
-    private EventListener listener;
-
-    private EventMode eventMode;
-
-    private boolean transaction;
-
-    private int priority;
-}

+ 2 - 1
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/EventListenerExecutor.java

@@ -1,11 +1,12 @@
 package org.hswebframework.web.eventbus.executor;
 
 import org.hswebframework.web.eventbus.EventListener;
+import org.hswebframework.web.eventbus.EventListenerDefine;
 
 /**
  * @author zhouhao
  * @since 3.0
  */
 public interface EventListenerExecutor {
-    <E> void doExecute(EventListener<E> listener, E event);
+    void doExecute(EventListenerDefine define, Object event);
 }

+ 21 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/executor/SyncEventListenerExecutor.java

@@ -0,0 +1,21 @@
+package org.hswebframework.web.eventbus.executor;
+
+import org.hswebframework.web.eventbus.EventListener;
+import org.hswebframework.web.eventbus.EventListenerDefine;
+import org.hswebframework.web.eventbus.annotation.EventMode;
+
+/**
+ * @author zhouhao
+ * @since 1.0
+ */
+public class SyncEventListenerExecutor implements EventExecuteTaskSupplier {
+    @Override
+    public boolean isSupport(EventListenerDefine define) {
+        return define.getEventMode() == EventMode.SYNC;
+    }
+
+    @Override
+    public EventExecuteTask get(EventListener listener, Object event) {
+        return () -> listener.onEvent(event);
+    }
+}

+ 32 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/spring/ReflectListener.java

@@ -0,0 +1,32 @@
+package org.hswebframework.web.eventbus.spring;
+
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.web.eventbus.EventListener;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * @author zhouhao
+ * @since 3.0
+ */
+@Slf4j
+public class ReflectListener<E> implements EventListener<E> {
+
+    private Object target;
+    private Method method;
+
+    public ReflectListener(Object target, Method method) {
+        this.target = target;
+        this.method = method;
+    }
+
+    @Override
+    public void onEvent(E event) {
+        try {
+            method.invoke(target, event);
+        } catch (Exception e) {
+            log.error("反射执行事件失败. target={},method={},event={}", target, method, event, e);
+        }
+    }
+}

+ 52 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/spring/SpringEventBus.java

@@ -0,0 +1,52 @@
+package org.hswebframework.web.eventbus.spring;
+
+import org.hswebframework.web.eventbus.AbstractEventBus;
+import org.hswebframework.web.eventbus.EventListenerDefine;
+import org.hswebframework.web.eventbus.annotation.Subscribe;
+import org.hswebframework.web.eventbus.executor.EventListenerContainer;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+import java.lang.reflect.Method;
+
+/**
+ * @author zhouhao
+ * @since 3.0
+ */
+public class SpringEventBus extends AbstractEventBus implements BeanPostProcessor {
+
+    @Override
+    protected EventListenerContainer createEventListenerContainer() {
+        return new SpringEventContainer();
+    }
+
+    @Override
+    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+        return bean;
+    }
+
+    @Override
+    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+        return processListener(bean);
+    }
+
+    protected Object processListener(Object object) {
+        Method[] methods = object.getClass().getDeclaredMethods();
+        for (Method method : methods) {
+            Subscribe subscribe = method.getAnnotation(Subscribe.class);
+            if (subscribe != null) {
+                createListener(subscribe, object, method);
+            }
+        }
+        return object;
+    }
+
+    protected void createListener(Subscribe type, Object target, Method method) {
+        EventListenerDefine define = EventListenerDefine.builder().eventMode(type.mode())
+                .transaction(type.transaction())
+                .listener(new ReflectListener(target, method))
+                .build();
+
+        subscribe(method.getParameterTypes()[0], define);
+    }
+}

+ 40 - 0
hsweb-eventbus/hsweb-eventbus-default/src/main/java/org/hswebframework/web/eventbus/spring/SpringEventContainer.java

@@ -0,0 +1,40 @@
+package org.hswebframework.web.eventbus.spring;
+
+import org.hswebframework.web.eventbus.annotation.EventMode;
+import org.hswebframework.web.eventbus.executor.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @author zhouhao
+ * @since 3.0
+ */
+public class SpringEventContainer extends AbstractEventContainer {
+    List<EventExecuteTaskSupplier> suppliers = Arrays.asList(
+            new BackGroundEventTaskSupplier(), new AsyncEventTaskSupplier(),new SyncEventListenerExecutor());
+
+
+    protected boolean hasAsyncTx() {
+        return defines.stream()
+                .anyMatch(define -> define.getEventMode() == EventMode.ASYNC && define.isTransaction());
+    }
+
+    @Override
+    protected EventListenerExecutor newExecutor() {
+        DefaultEventExecutor eventExecutor = new DefaultEventExecutor();
+        List<EventExecuteTaskSupplier> suppliers = new ArrayList<>(this.suppliers);
+        if (hasAsyncTx()) {
+            // TODO: 18-3-24
+        }
+        eventExecutor.setSuppliers(suppliers);
+        return eventExecutor;
+    }
+
+    @Override
+    protected void executeAfter(EventListenerExecutor executor) {
+
+    }
+
+}

+ 52 - 0
hsweb-eventbus/hsweb-eventbus-default/src/test/java/org/hswebframework/web/eventbus/spring/SpringEventBusTest.java

@@ -0,0 +1,52 @@
+package org.hswebframework.web.eventbus.spring;
+
+import org.hswebframework.web.eventbus.annotation.EventMode;
+import org.hswebframework.web.eventbus.annotation.Subscribe;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author zhouhao
+ * @since 3.0
+ */
+public class SpringEventBusTest {
+    SpringEventBus eventBus = new SpringEventBus();
+
+    AtomicInteger counter = new AtomicInteger();
+
+    @Test
+    public void test() throws InterruptedException {
+        System.out.println(Thread.currentThread().getName());
+        eventBus.postProcessAfterInitialization(new Test2(), "test");
+        eventBus.publish(eventBus);
+        Thread.sleep(1000);
+        Assert.assertEquals(counter.get(), 3);
+    }
+
+    public class Test2 {
+        @Subscribe(mode = EventMode.SYNC)
+        public void test1(SpringEventBus eventBus) {
+            System.out.println(Thread.currentThread().getName());
+            System.out.println(eventBus);
+            counter.addAndGet(1);
+        }
+
+        @Subscribe(mode = EventMode.ASYNC, transaction = false)
+        public void test2(SpringEventBus eventBus) {
+            System.out.println(Thread.currentThread().getName());
+            System.out.println(eventBus);
+            counter.addAndGet(1);
+        }
+
+        @Subscribe(mode = EventMode.BACKGROUND, transaction = false)
+        public void test3(SpringEventBus eventBus) {
+            System.out.println(Thread.currentThread().getName());
+            System.out.println(eventBus);
+            counter.addAndGet(1);
+        }
+    }
+}