Jelajahi Sumber

新增异步任务模块,提供异步任务功能。支持多线程事务,等待实现

zhouhao 7 tahun lalu
induk
melakukan
cd458f7009

+ 15 - 0
hsweb-concurrent/hsweb-concurrent-async-job/pom.xml

@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hsweb-concurrent</artifactId>
+        <groupId>org.hswebframework.web</groupId>
+        <version>3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hsweb-concurrent-async-job</artifactId>
+
+
+</project>

+ 10 - 0
hsweb-concurrent/hsweb-concurrent-async-job/src/main/java/org/hswebframework/web/async/AsyncJobService.java

@@ -0,0 +1,10 @@
+package org.hswebframework.web.async;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public interface AsyncJobService {
+    BatchAsyncJobContainer batch();
+}

+ 16 - 0
hsweb-concurrent/hsweb-concurrent-async-job/src/main/java/org/hswebframework/web/async/BatchAsyncJobContainer.java

@@ -0,0 +1,16 @@
+package org.hswebframework.web.async;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public interface BatchAsyncJobContainer {
+
+    <V> BatchAsyncJobContainer submit(Callable<V> callable);
+
+    List<Object> getResult() throws Exception;
+}

+ 82 - 0
hsweb-concurrent/hsweb-concurrent-async-job/src/main/java/org/hswebframework/web/async/TranslationBatchAsyncJobContainer.java

@@ -0,0 +1,82 @@
+package org.hswebframework.web.async;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * @author zhouhao
+ */
+public class TranslationBatchAsyncJobContainer implements BatchAsyncJobContainer {
+
+    private ExecutorService executorService;
+
+    private List<Exception> exceptions = new ArrayList<>();
+
+    private Supplier<TranslationSupportJobWrapper> supportJobSupplierBuilder;
+
+    private CountDownLatch downLatch = new CountDownLatch(1);
+
+    private AtomicInteger failCounter = new AtomicInteger();
+
+    private AtomicInteger overCounter = new AtomicInteger();
+
+    private List<Future> futures = new ArrayList<>();
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    @Override
+    public <V> BatchAsyncJobContainer submit(Callable<V> callable) {
+        TranslationSupportJob<V> translationJob = supportJobSupplierBuilder
+                .get()
+                .wrapper(callable);
+        Callable<V> proxy = () -> {
+            try {
+                if (failCounter.get() > 0) {
+                    return null;
+                }
+                V val = translationJob.call();
+                overCounter.incrementAndGet();
+                downLatch.await();
+                if (failCounter.get() > 0) {
+                    translationJob.rollBack();
+                    return null;
+                }
+                return val;
+            } catch (Exception e) {
+                exceptions.add(e);
+                failCounter.incrementAndGet();
+                overCounter.incrementAndGet();
+            }
+            return null;
+        };
+        futures.add(executorService.submit(proxy));
+        return this;
+    }
+
+    @Override
+    public List<Object> getResult() throws Exception {
+        while (overCounter.get() != futures.size()) {
+            Thread.sleep(10);
+        }
+        downLatch.countDown();
+        if (!exceptions.isEmpty()) {
+            throw new RuntimeException();
+        }
+
+        return futures.stream().map(this::getValue).collect(Collectors.toList());
+    }
+
+    private Object getValue(Future future) {
+        try {
+            return future.get();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

+ 14 - 0
hsweb-concurrent/hsweb-concurrent-async-job/src/main/java/org/hswebframework/web/async/TranslationSupportJob.java

@@ -0,0 +1,14 @@
+package org.hswebframework.web.async;
+
+import java.util.concurrent.Callable;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public interface TranslationSupportJob<V> extends Callable<V> {
+
+    void rollBack();
+
+}

+ 12 - 0
hsweb-concurrent/hsweb-concurrent-async-job/src/main/java/org/hswebframework/web/async/TranslationSupportJobWrapper.java

@@ -0,0 +1,12 @@
+package org.hswebframework.web.async;
+
+import java.util.concurrent.Callable;
+
+/**
+ * TODO 完成注释
+ *
+ * @author zhouhao
+ */
+public interface TranslationSupportJobWrapper {
+    <V> TranslationSupportJob<V> wrapper(Callable<V> callable);
+}

+ 1 - 0
hsweb-concurrent/pom.xml

@@ -32,6 +32,7 @@
         <module>hsweb-concurrent-cache</module>
         <module>hsweb-concurrent-lock</module>
         <module>hsweb-concurrent-counter</module>
+        <module>hsweb-concurrent-async-job</module>
     </modules>