|
@@ -23,7 +23,7 @@ import java.util.function.Consumer;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * TODO 完成注释
|
|
|
|
|
|
+ * 默认的事务执行器
|
|
*
|
|
*
|
|
* @author zhouhao
|
|
* @author zhouhao
|
|
*/
|
|
*/
|
|
@@ -51,6 +51,7 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
|
|
|
|
private volatile boolean running = false;
|
|
private volatile boolean running = false;
|
|
|
|
|
|
|
|
+ /* 线程循环开始等待sql进入的时候执行一次,sql进入的时候执行一次,然后唤醒线程开始执行sql */
|
|
private CyclicBarrier waitToReady = new CyclicBarrier(2);
|
|
private CyclicBarrier waitToReady = new CyclicBarrier(2);
|
|
|
|
|
|
private CountDownLatch waitClose = new CountDownLatch(1);
|
|
private CountDownLatch waitClose = new CountDownLatch(1);
|
|
@@ -83,8 +84,10 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
try {
|
|
try {
|
|
logger.debug("wait transaction {} close", transactionId);
|
|
logger.debug("wait transaction {} close", transactionId);
|
|
if (!running) {
|
|
if (!running) {
|
|
|
|
+ //先唤醒执行,继续执行任务
|
|
waitToReady.await();
|
|
waitToReady.await();
|
|
}
|
|
}
|
|
|
|
+ //等待执行结束
|
|
waitClose.await();
|
|
waitClose.await();
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
throw new RuntimeException(e);
|
|
throw new RuntimeException(e);
|
|
@@ -136,18 +139,20 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
if (datasourceId != null) {
|
|
if (datasourceId != null) {
|
|
DataSourceHolder.switcher().use(datasourceId);
|
|
DataSourceHolder.switcher().use(datasourceId);
|
|
}
|
|
}
|
|
|
|
+ //开启事务
|
|
transactionStatus = transactionTemplate.getTransactionManager().getTransaction(transactionTemplate);
|
|
transactionStatus = transactionTemplate.getTransactionManager().getTransaction(transactionTemplate);
|
|
if (sqlRequestExecutor == null) {
|
|
if (sqlRequestExecutor == null) {
|
|
buildDefaultSqlRequestExecutor();
|
|
buildDefaultSqlRequestExecutor();
|
|
}
|
|
}
|
|
while (!shutdown) {
|
|
while (!shutdown) {
|
|
logger.debug("wait sql execute request {}", transactionId);
|
|
logger.debug("wait sql execute request {}", transactionId);
|
|
- waitToReady.await();
|
|
|
|
- waitToReady.reset();
|
|
|
|
|
|
+ waitToReady.await();//等待有新的sql进来
|
|
|
|
+ waitToReady.reset();//重置,下一次循环继续等待
|
|
|
|
+ //执行sql
|
|
doExecute();
|
|
doExecute();
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- rollback();
|
|
|
|
|
|
+ rollback();//回滚
|
|
logger.error("execute sql error {}", transactionId, e);
|
|
logger.error("execute sql error {}", transactionId, e);
|
|
} finally {
|
|
} finally {
|
|
try {
|
|
try {
|
|
@@ -158,6 +163,7 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
logger.debug("Roll Back transaction {}", transactionId);
|
|
logger.debug("Roll Back transaction {}", transactionId);
|
|
transactionTemplate.getTransactionManager().rollback(transactionStatus);
|
|
transactionTemplate.getTransactionManager().rollback(transactionStatus);
|
|
}
|
|
}
|
|
|
|
+ //结束事务
|
|
waitClose.countDown();
|
|
waitClose.countDown();
|
|
} finally {
|
|
} finally {
|
|
DataSourceHolder.switcher().reset();
|
|
DataSourceHolder.switcher().reset();
|
|
@@ -171,15 +177,18 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
running = true;
|
|
running = true;
|
|
logger.debug("start execute sql {}", transactionId);
|
|
logger.debug("start execute sql {}", transactionId);
|
|
try {
|
|
try {
|
|
- List<SqlExecuteResult> requests = execution.request.getSql().stream()
|
|
|
|
|
|
+ List<SqlExecuteResult> requests = execution.request.getSql()
|
|
|
|
+ .stream()
|
|
.map(sqlInfo -> {
|
|
.map(sqlInfo -> {
|
|
try {
|
|
try {
|
|
|
|
+ //执行sql
|
|
return sqlRequestExecutor.apply(sqlExecutor, sqlInfo);
|
|
return sqlRequestExecutor.apply(sqlExecutor, sqlInfo);
|
|
} catch (SQLException e) {
|
|
} catch (SQLException e) {
|
|
throw new RuntimeException(e);
|
|
throw new RuntimeException(e);
|
|
}
|
|
}
|
|
})
|
|
})
|
|
.collect(Collectors.toList());
|
|
.collect(Collectors.toList());
|
|
|
|
+ //通过回调返回执行结果
|
|
execution.callback.accept(requests);
|
|
execution.callback.accept(requests);
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
rollback();
|
|
rollback();
|
|
@@ -194,9 +203,11 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
if (shutdown) {
|
|
if (shutdown) {
|
|
throw new UnsupportedOperationException("transaction is close");
|
|
throw new UnsupportedOperationException("transaction is close");
|
|
}
|
|
}
|
|
|
|
+ //执行倒计时,执行sql是异步的,通过此方式等待sql执行完毕
|
|
CountDownLatch countDownLatch = new CountDownLatch(1);
|
|
CountDownLatch countDownLatch = new CountDownLatch(1);
|
|
List<SqlExecuteResult> results = new ArrayList<>();
|
|
List<SqlExecuteResult> results = new ArrayList<>();
|
|
|
|
|
|
|
|
+ //异常信息
|
|
Exception[] exceptions = new Exception[1];
|
|
Exception[] exceptions = new Exception[1];
|
|
|
|
|
|
Execution execution = new Execution();
|
|
Execution execution = new Execution();
|
|
@@ -213,9 +224,12 @@ public class DefaultLocalTransactionExecutor implements TransactionExecutor {
|
|
logger.debug("submit sql execute job {}", transactionId);
|
|
logger.debug("submit sql execute job {}", transactionId);
|
|
executionQueue.add(execution);
|
|
executionQueue.add(execution);
|
|
try {
|
|
try {
|
|
|
|
+ //当前没有在执行sql,说明现在正在等待新的sql进入,唤醒之
|
|
if (!running)
|
|
if (!running)
|
|
waitToReady.await();
|
|
waitToReady.await();
|
|
|
|
+ //等待sql执行完毕
|
|
countDownLatch.await();
|
|
countDownLatch.await();
|
|
|
|
+ //判断是否有异常
|
|
Exception exception;
|
|
Exception exception;
|
|
if ((exception = exceptions[0]) != null) {
|
|
if ((exception = exceptions[0]) != null) {
|
|
if (exception instanceof RuntimeException) {
|
|
if (exception instanceof RuntimeException) {
|