Browse Source

优化异步任务

zhou-hao 7 years ago
parent
commit
13a6ceb7b3

+ 22 - 9
hsweb-concurrent/hsweb-concurrent-async-job/src/main/java/org/hswebframework/web/async/TransactionBatchAsyncJobContainer.java

@@ -33,7 +33,9 @@ public class TransactionBatchAsyncJobContainer implements BatchAsyncJobContainer
 
     private List<Future> futures = new ArrayList<>();
 
-    private int transactionJobNumber = 0;
+    private AtomicInteger transactionJobNumber = new AtomicInteger(0);
+
+    private volatile boolean shutdown = false;
 
     public void setExecutorService(ExecutorService executorService) {
         this.executorService = executorService;
@@ -41,24 +43,34 @@ public class TransactionBatchAsyncJobContainer implements BatchAsyncJobContainer
 
     @Override
     public <V> BatchAsyncJobContainer submit(Callable<V> callable, boolean enableTransaction) {
+        if (shutdown) {
+            logger.warn("TransactionBatchAsyncJobContainer is shutdown, fail job number :{}", failCounter.get());
+            return this;
+        }
         if (!enableTransaction) {
             if (logger.isDebugEnabled()) {
-                logger.debug("submit not transaction support job {}", transactionJobNumber);
+                logger.debug("submit not transaction support job");
             }
-            futures.add(executorService.submit(callable));
+            futures.add(executorService.submit(() -> {
+                if (shutdown) {
+                    return null;
+                }
+                return callable.call();
+            }));
             return this;
         }
-        transactionJobNumber++;
+
+        int tmpJobFlag = transactionJobNumber.incrementAndGet();
+
         if (logger.isDebugEnabled()) {
             logger.debug("submit transaction support job {}", transactionJobNumber);
         }
-        int tmpJobFlag = transactionJobNumber;
 
         TransactionSupportJob<V> translationJob = translationSupportJobWrapper.wrapper(callable);
         Callable<V> proxy = () -> {
             V value = null;
             try {
-                if (failCounter.get() > 0) {
+                if (failCounter.get() > 0 || shutdown) {
                     return null;
                 }
                 value = translationJob.call();
@@ -82,8 +94,9 @@ public class TransactionBatchAsyncJobContainer implements BatchAsyncJobContainer
             } catch (Exception e) {
                 exceptions.add(e);
                 failCounter.incrementAndGet();
-                transactionJobOverCounter.incrementAndGet();
                 logger.warn("transaction support job {} fail.", tmpJobFlag, e);
+            }finally {
+                transactionJobOverCounter.incrementAndGet();
             }
             return value;
         };
@@ -93,7 +106,7 @@ public class TransactionBatchAsyncJobContainer implements BatchAsyncJobContainer
 
     @Override
     public List<Object> getResult() throws Exception {
-        while (transactionJobOverCounter.get() != transactionJobNumber) {
+        while (transactionJobOverCounter.get() != transactionJobNumber.get() && failCounter.get() == 0) {
             Thread.sleep(50);
         }
         countDownLatch.countDown();
@@ -116,7 +129,7 @@ public class TransactionBatchAsyncJobContainer implements BatchAsyncJobContainer
 
     @Override
     public BatchAsyncJobContainer cancel() {
-        failCounter.incrementAndGet();
+        shutdown = true;
         return this;
     }
 }

+ 29 - 4
hsweb-concurrent/hsweb-concurrent-async-job/src/test/java/org/hswebframework/web/async/TransactionSupportAsyncJobServiceTest.java

@@ -50,17 +50,42 @@ public class TransactionSupportAsyncJobServiceTest extends SimpleWebApplicationT
 
         try {
             BatchAsyncJobContainer jobContainer = asyncJobService.batch();
-
+            jobContainer.submit(() -> {
+                Thread.sleep(50);
+                throw new RuntimeException();
+            }, true);
             for (int i = 0; i < 100; i++) {
                 jobContainer.submit(() -> sqlExecutor.insert("insert into test values('test')", null), true);
             }
+
+            System.out.println(jobContainer.getResult().size());
+        } catch (Exception ignore) {
+            ignore.printStackTrace();
+        }
+        Assert.assertTrue(sqlExecutor.list("select * from test").isEmpty());
+    }
+
+    @Test
+    public void testSimple() throws Exception {
+
+        sqlExecutor.exec("create table test(id varchar(32))");
+
+        try {
+            BatchAsyncJobContainer jobContainer = asyncJobService.batch();
             jobContainer.submit(() -> {
-                Thread.sleep(200);
+                Thread.sleep(10);
+                jobContainer.cancel();
                 throw new RuntimeException();
-            }, true);
+            }, false);
+            for (int i = 0; i < 100; i++) {
+                jobContainer.submit(() -> sqlExecutor.insert("insert into test values('test')", null), false);
+            }
+
             System.out.println(jobContainer.getResult().size());
+
         } catch (Exception ignore) {
+            ignore.printStackTrace();
         }
-        Assert.assertTrue(sqlExecutor.list("select * from test").isEmpty());
+        Assert.assertTrue(sqlExecutor.list("select * from test").size()>0);
     }
 }