|
@@ -18,6 +18,7 @@ import java.util.List;
|
|
import java.util.Queue;
|
|
import java.util.Queue;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.function.Consumer;
|
|
import java.util.function.Consumer;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@@ -46,7 +47,7 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
|
|
|
|
private TransactionTemplate transactionTemplate;
|
|
private TransactionTemplate transactionTemplate;
|
|
|
|
|
|
- private boolean commit = false;
|
|
|
|
|
|
+ private volatile boolean commit = false;
|
|
|
|
|
|
private volatile boolean running = false;
|
|
private volatile boolean running = false;
|
|
|
|
|
|
@@ -95,10 +96,16 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void rollback() {
|
|
public void rollback() {
|
|
- shutdown = true;
|
|
|
|
|
|
+ tryRollback();
|
|
waitToClose();
|
|
waitToClose();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void tryRollback() {
|
|
|
|
+ running = false;
|
|
|
|
+ shutdown = true;
|
|
|
|
+ commit = false;
|
|
|
|
+ }
|
|
|
|
+
|
|
public void setSqlExecutor(SqlExecutor sqlExecutor) {
|
|
public void setSqlExecutor(SqlExecutor sqlExecutor) {
|
|
this.sqlExecutor = sqlExecutor;
|
|
this.sqlExecutor = sqlExecutor;
|
|
}
|
|
}
|
|
@@ -124,13 +131,17 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
}
|
|
}
|
|
while (!shutdown) {
|
|
while (!shutdown) {
|
|
logger.debug("wait sql execute request {}", transactionId);
|
|
logger.debug("wait sql execute request {}", transactionId);
|
|
- waitToReady.await();//等待有新的sql进来
|
|
|
|
|
|
+ if (transactionTemplate.getTimeout() > 0) {
|
|
|
|
+ waitToReady.await(transactionTemplate.getTimeout(), TimeUnit.MILLISECONDS);//等待有新的sql进来
|
|
|
|
+ } else {
|
|
|
|
+ waitToReady.await();
|
|
|
|
+ }
|
|
waitToReady.reset();//重置,下一次循环继续等待
|
|
waitToReady.reset();//重置,下一次循环继续等待
|
|
//执行sql
|
|
//执行sql
|
|
doExecute();
|
|
doExecute();
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- rollback();//回滚
|
|
|
|
|
|
+ tryRollback();//回滚
|
|
logger.error("execute sql error {}", transactionId, e);
|
|
logger.error("execute sql error {}", transactionId, e);
|
|
} finally {
|
|
} finally {
|
|
try {
|
|
try {
|
|
@@ -175,8 +186,8 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
//通过回调返回执行结果
|
|
//通过回调返回执行结果
|
|
execution.callback.accept(requests);
|
|
execution.callback.accept(requests);
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- rollback();
|
|
|
|
execution.onError.accept(e);
|
|
execution.onError.accept(e);
|
|
|
|
+ return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
running = false;
|
|
running = false;
|
|
@@ -194,7 +205,7 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
//异常信息
|
|
//异常信息
|
|
Exception[] exceptions = new Exception[1];
|
|
Exception[] exceptions = new Exception[1];
|
|
Execution execution = new Execution();
|
|
Execution execution = new Execution();
|
|
- execution.datasourceId=DataSourceHolder.switcher().currentDataSourceId();
|
|
|
|
|
|
+ execution.datasourceId = DataSourceHolder.switcher().currentDataSourceId();
|
|
|
|
|
|
execution.request = request;
|
|
execution.request = request;
|
|
execution.callback = sqlExecuteResults -> {
|
|
execution.callback = sqlExecuteResults -> {
|
|
@@ -217,6 +228,7 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
//判断是否有异常
|
|
//判断是否有异常
|
|
Exception exception;
|
|
Exception exception;
|
|
if ((exception = exceptions[0]) != null) {
|
|
if ((exception = exceptions[0]) != null) {
|
|
|
|
+ rollback();
|
|
throw exception;
|
|
throw exception;
|
|
}
|
|
}
|
|
return results;
|
|
return results;
|