1 AT模式示例
1.1 代码示例
AT模式使用非常简单,这里仅摘取和实现原理相关的部分代码,并结合这些代码进行原理分析。这里的示例摘自github上seata-sample
1.1.1 TM系统配置
<bean class="io.seata.spring.annotation.GlobalTransactionScanner">
<constructor-arg value="dubbo-demo-app"/>
<constructor-arg value="my_test_tx_group"/>
</bean>
复制代码
GlobalTransactionScanner将会自动扫描@GlobalTransactional,并为其开启分布式事务。
使用时:只需要在需要开启全局事务的地方加上@GlobalTransactional注解即可,如下所示:
@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);
}
复制代码
1.1.2 RM系统配置:
<bean id="storageDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
<constructor-arg ref="storageDataSource" />
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="storageDataSourceProxy" />
</bean>
复制代码
本地方法不用做任何处理,即可参与全局的分布式事务:
public void deduct(String commodityCode, int count) {
jdbcTemplate.update("update storage_tbl set count = count - ? where commodity_code = ?",
new Object[] {count, commodityCode});
}
复制代码
RM系统启动前要在本地DB中创建undo log(用于失败回滚),示例语句如下:
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
复制代码
1.1.3 启动seata-server
seata-server承担的是「seata-分布式事务方案」中TC的职责,即作为事务协调者,管理全局事务的生命周期。
2 AT模式简介
2.1 分布式事务流程
根据示例内容,将seata分布式事务执行过程拆解,那么分布式事务的整体流程如下所示:
整个过程图所示:
- 第一步是开启全局事务,即begin阶段,由TM(微服务A)发起。
- 接着就是在TM(微服务A)中执行正常的业务逻辑,并分别调用「微服务B」、「微服务C」的服务;「微服务B」、「微服务C」执行过程中会向TC中注册分支,即:register branch阶段。
- TM(微服务A)中的业务逻辑都执行完毕后,会通知TC进行全局事务提交或回滚。
- TC收到全局事务提交或回滚请求后,遍历所有分支,然后和RM(微服务B、微服务C)进行通讯,通知RM进行分支事务提交或回滚,即:Global Commit/Rollback阶段。
3 原理分析
3.1 几个问题
- 在示例代码中,我们看到RM(微服务B、微服务C)业务代码层面我们没有做任何特殊处理,但本地事务确能正常的参与全局事务,那seata是如何做到的呢?
- RM(微服务B、微服务C)中的本地事务执行完后就会commit,那当全局事务回滚的时候,RM(微服务B、微服务C)是怎么将之前已经commit的数据全部回滚掉的?
- 假设多个全局事务(T1、T2)都需要同时修改一条记录(RM-B中的同一条记录,例如:两个人同时下单抢购同一个商品),那如何确保共享资源的数据不出问题?
3.2 原理分析
3.2.1 时序图
AT请求的时序如图所示,接下来基于时序图来聊一下整个分布式事务方案。
3.2.2 Begin阶段
3.2.2.1 TM请求TC开启一个全局事务
当代码执行到被@GlobalTransactional修饰的方法时,会触发GlobalTransactionalInterceptor#invoke方法。此方法会调用TransactionalTemplate#execute来开启事务、执行业务逻辑、提交或回滚事务。具体逻辑为:
- 首先基于当前方法的事务传播属性来确定开启事务的方式,例如:NOT_SUPPORTED、REQUIRES_NEW、SUPPORTS、REQUIRED、NEVER、MANDATORY等。
- 向TC注册并开启分布式事务,此过程通过DefaultGlobalTransaction#begin完成,开启成功后返回xid,代码如下:
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
复制代码
- 然后通过RootContext.bind(xid)绑定到上下文中。
3.2.2.2 TC的收到开启全局事务请求时的处理流程。
TC收到与事务相关请请求后都会交给AbstractTCInboundHandler#handle处理
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
@Override
public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
try {
doGlobalBegin(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e);
}
}
}, request, response);
return response;
}
复制代码
TC中begin全局事务的核心步骤有:
- 创建GlobalSession,在这一步会生成xid(xid示例:172.20.10.2:8091:2612206521027831332)
public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
this.transactionId = UUIDGenerator.generateUUID();
this.status = GlobalStatus.Begin;
this.applicationId = applicationId;
this.transactionServiceGroup = transactionServiceGroup;
this.transactionName = transactionName;
this.timeout = timeout;
this.xid = XID.generateXID(transactionId);
}
public static String generateXID(long tranId) {
return ipAddress + ":" + port + ":" + tranId;
}
复制代码
- 添加SessionLifecycleListener
- 向global_table中插入一条全局事务记录
- 注册一个全局事务事件(GlobalTransactionEvent)。
TC中全局事务记录(global_table)的数据示例:
[
{
"xid":"10.0.106.94:8091:2612209823397711922",
"transaction_id":2612209823397710000,
"status":1,
"application_id":"dubbo-demo-app",
"transaction_service_group":"my_test_tx_group",
"transaction_name":"dubbo-demo-tx",
"timeout":300000,
"begin_time":1618230212964,
"application_data":"NULL",
"gmt_create":44298.8496759259,
"gmt_modified":44298.8496759259
}
]
复制代码
3.2.3 TM执行业务逻辑
3.2.3.1 TM中执行业务逻辑代码
在GlobalTransactionalInterceptor拦截器中完成begin阶段以后,就会正式执行TM中的业务逻辑代码,即示例中的:
storageService.deduct(commodityCode, orderCount);
orderService.create(userId, commodityCode, orderCount);
复制代码
这里执行业务逻辑的时候,会RPC调用其他微服务(微服务B、微服务C)的接口,RM在执行业务逻辑的过程中将进入Register Branch阶段。
3.2.4 Register Branch阶段
3.2.4.1 RM注册向TC分支事务
在RM所在的微服务中,我们使用了io.seata.rm.datasource.DataSourceProxy对原有数据源进行代理,所以在RM进行DB执行写操作时,会将请求委派给PreparedStatementProxy处理,如下代码所示:
@Override
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
}
@Override
public ResultSet executeQuery() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());
}
@Override
public int executeUpdate() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
}
复制代码
如上代码所示,所有对DB的操作,都会交给各种代理器、执行器处理,seata也就是通过这种方式对DB操作进行增强,实现在执行DB事务前向TC注册分支事务、插入undo log、执行原有db操作、删除undo log等等操作,如下图所示:
其具体过程如下
- 创建SQL执行器:解析SQL语义,根据insert、update、delete等创建合适的执行器:InsertExecutor、UpdateExecutor、DeleteExecutor、SelectForUpdateExecutor、PlainExecutor
- 生成sql:这里包括除了业务逻辑中写db的sql以外,还包括用于回滚的undo log。如下代码所示:
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
复制代码
-
通过connectionProxy.commit()提交事务时,会启动Register Branch流程,其主要是通过LockRetryPolicy的策略执行commit操作,包括以下几个步骤:
- RM向TC注册分支事务,这一步会和TC交互,并进行全局资源锁定。
- 执行本地commit操作,这一步将业务sql和undo log以原子的方式一起保存到db.
-
如果以上动作失败,基于重试策略进行重试。
private void processGlobalTransactionCommit() throws SQLException {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
private void register() throws TransactionException {
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), null, context.buildLockKeys());
context.setBranchId(branchId);
}
复制代码
- 若上面步骤执行顺利,那么代表分支事务已经注册成功、本地事务执行成功。
AT模式基于DataSourceProxy来拦截SQL的执行,判断是否在全局事务中,如果在全局事务中,那么除了执行业务逻辑的SQL外,还会创建undo log、启动Register Branch流程。所以业务层面我们不用做任何操作,即可以参与全局事务。
虽然本地事务执行完毕就commit了,但是因为在这个过程中创建的undo log,当全局事务需要rollback时,就可以通过undo log将数据回滚。
undo log数据示例:
{
"@class":"io.seata.rm.datasource.undo.BranchUndoLog",
"xid":"172.20.10.2:8091:2612206521027831332",
"branchId":2612206521027831365,
"sqlUndoLogs":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType":"UPDATE",
"tableName":"storage_tbl",
"beforeImage":{
"@class":"io.seata.rm.datasource.sql.struct.TableRecords",
"tableName":"storage_tbl",
"rows":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Row",
"fields":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"id",
"keyType":"PRIMARY_KEY",
"type":4,
"value":13
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"count",
"keyType":"NULL",
"type":4,
"value":100
}
]
]
}
]
]
},
"afterImage":{
"@class":"io.seata.rm.datasource.sql.struct.TableRecords",
"tableName":"storage_tbl",
"rows":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Row",
"fields":[
"java.util.ArrayList",
[
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"id",
"keyType":"PRIMARY_KEY",
"type":4,
"value":13
},
{
"@class":"io.seata.rm.datasource.sql.struct.Field",
"name":"count",
"keyType":"NULL",
"type":4,
"value":98
}
]
]
}
]
]
}
}
]
]
}
复制代码
3.2.4.2 TC受理RM的Register Branch流程
TC中,收到Register Branch的请求后会交给AbstractTCInboundHandler处理,如下所示:
public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
BranchRegisterResponse response = new BranchRegisterResponse();
exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
@Override
public void execute(BranchRegisterRequest request, BranchRegisterResponse response)
throws TransactionException {
try {
doBranchRegister(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore, String
.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
}
}
}, request, response);
return response;
}
复制代码
最终register branch会交个AbstractCore#branchRegister处理,其核心内容主要是包括以下步骤:
- 根据xid查询全局事务Session,即:GlobalSession;接着进行必要的校验,例如状态校验。
- 创建分支session,即BranchSession,后续基于分支的操作都为交个BranchSession处理。
- 尝试上锁:如果资源没有被其他全局事务锁定,那么上锁,如果资源已经被其他全局事务锁定,那么上锁失败。基于DB的上锁过程其实是在lock_table中插入一条或者几条记录,基于redis、file的上锁也类似。
- 注册分支:创建分支记录,基于DB的注册分支其实就是向branch_table中插入一条分支注册记录。
- 返回分支id,即branchId
示例代码如下所示:
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
branchSessionLock(globalSession, branchSession);
try {
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession);
throw new BranchTransactionException(FailedToAddBranch, String
.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
branchSession.getBranchId()), ex);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
}
return branchSession.getBranchId();
});
}
复制代码
Branch Regishter时会对RM中的资源进行上锁,这也就保证了业务执行过程同一个时间仅有一个全局事务可以修改共享资源,也就保证了全局数据的一致性。
TC中分支事务记录(branch_table)的数据示例:
[
{
"branch_id":2612209823397710000,
"xid":"10.0.106.94:8091:2612209823397711922",
"transaction_id":2612209823397710000,
"resource_group_id":"NULL",
"resource_id":"jdbc:mysql://localhost:3306/seata_storage",
"branch_type":"AT",
"status":0,
"client_id":"dubbo-demo-storage-service:127.0.0.1:50878",
"application_data":"NULL",
"gmt_create":44298.8496889931,
"gmt_modified":44298.8496889931
},
{
"branch_id":2612209823397710000,
"xid":"10.0.106.94:8091:2612209823397711922",
"transaction_id":2612209823397710000,
"resource_group_id":"NULL",
"resource_id":"jdbc:mysql://localhost:3306/seata_account",
"branch_type":"AT",
"status":0,
"client_id":"dubbo-demo-account-service:127.0.0.1:50860",
"application_data":"NULL",
"gmt_create":44298.8496970023,
"gmt_modified":44298.8496970023
},
{
"branch_id":2612209823397710000,
"xid":"10.0.106.94:8091:2612209823397711922",
"transaction_id":2612209823397710000,
"resource_group_id":"NULL",
"resource_id":"jdbc:mysql://localhost:3306/seata_order",
"branch_type":"AT",
"status":0,
"client_id":"dubbo-demo-order-service:127.0.0.1:50968",
"application_data":"NULL",
"gmt_create":44298.8497165625,
"gmt_modified":44298.8497165625
}
]
复制代码
TC中资源锁定(lock_table)的数据示例:
[
{
"row_key":"jdbc:mysql://localhost:3306/seata_account^^^account_tbl^^^15",
"xid":"10.0.106.94:8091:2612209823397711922",
"transaction_id":2612209823397710000,
"branch_id":2612209823397710000,
"resource_id":"jdbc:mysql://localhost:3306/seata_account",
"table_name":"account_tbl",
"pk":15,
"gmt_create":44298.8496875,
"gmt_modified":44298.8496875
},
{
"row_key":"jdbc:mysql://localhost:3306/seata_order^^^order_tbl^^^32",
"xid":"10.0.106.94:8091:2612209823397711922",
"transaction_id":2612209823397710000,
"branch_id":2612209823397710000,
"resource_id":"jdbc:mysql://localhost:3306/seata_order",
"table_name":"order_tbl",
"pk":32,
"gmt_create":44298.8497106481,
"gmt_modified":44298.8497106481
},
{
"row_key":"jdbc:mysql://localhost:3306/seata_storage^^^storage_tbl^^^21",
"xid":"10.0.106.94:8091:2612209823397711922",
"transaction_id":2612209823397710000,
"branch_id":2612209823397710000,
"resource_id":"jdbc:mysql://localhost:3306/seata_storage",
"table_name":"storage_tbl",
"pk":21,
"gmt_create":44298.8496875,
"gmt_modified":44298.8496875
}
]
复制代码
3.2.5 Global Commit/RollBack阶段
TM方法中的业务正常逻辑执行完毕,就进入Global Commit阶段;如果期间抛出异常,那么进入Global RollBack阶段。
Global Commit/RollBack阶段主要包括以下过程:
- TM通知TC进行全局事务Commit/Rollback,这是一个同步过程,如果执行成功,则认为Global Commit/RollBack完成。
- TC收到Global Commit/RollBack的请求后,遍历所有此事务下注册的分支,然后和每个分支事务的客户端通讯,进行Branch Commit/Rollback。
- RM收到Branch Commit/Rollback
3.2.5.1 TM通知TC进行Global Commit/Rollback阶段
TM中通知TC进行Global Commit/Rollback的逻辑很简单,就是基于重试策略,调用它事务管理器进行commit/rollback即可。默认重试次数都是5次。
以下为DefaultGlobalTransaction中commit/rollback的逻辑。
public void commit() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status);
}
}
public void rollback() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
// Participant has no responsibility of rollback
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
try {
while (retry > 0) {
try {
status = transactionManager.rollback(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
retry--;
if (retry == 0) {
throw new TransactionException("Failed to report global rollback", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] rollback status: {}", xid, status);
}
}
复制代码
DefaultTransactionManager中commit/rollback非常简单,就是同步调用TC中的commit/rollback方法即可,如下代码所示:
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
return response.getGlobalStatus();
}
复制代码
3.2.5.2 TC进行Global Commit/Rollback阶段
TC通过AbstractTCInboundHandler#handle处理TM发起的Global Commit/Rollback请求,请求最后还是会交给DefaultCore进行处理。
TC的Global Commit/Rollback流程主要分为以下几个阶段:
- 根据xid查找全局事务session,即GlobalSession。
- 判断全局事务需要同步提交或者异步提交。
- 查询出所有分支事务
- 和分支事务对应的RM通讯,进行Branch Commit/Rollback。
- 如果所有分支完成Branch Commit/Rollback,那么删除TC中的全局事务Session,并清除global_table中的数据。
以下是commit的代码(rollback基本相同,就暂不做演示了):
public GlobalStatus commit(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
// Highlight: Firstly, close the session, then no more branch can be registered.
globalSession.closeAndClean();
if (globalSession.getStatus() == GlobalStatus.Begin) {
if (globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return false;
} else {
globalSession.changeStatus(GlobalStatus.Committing);
return true;
}
}
return false;
});
if (shouldCommit) {
boolean success = doGlobalCommit(globalSession, false);
//If successful and all remaining branches can be committed asynchronously, do async commit.
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
// start committing event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), null, globalSession.getStatus()));
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
// if not retrying, skip the canBeCommittedAsync branches
if (!retrying && branchSession.canBeCommittedAsync()) {
return CONTINUE;
}
BranchStatus currentStatus = branchSession.getStatus();
if (currentStatus == BranchStatus.PhaseOne_Failed) {
globalSession.removeBranch(branchSession);
return CONTINUE;
}
try {
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Committed:
globalSession.removeBranch(branchSession);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
if (globalSession.canBeCommittedAsync()) {
LOGGER.error(
"Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
return CONTINUE;
} else {
SessionHelper.endCommitFailed(globalSession);
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {
LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
branchSession.getBranchId(), branchStatus);
return CONTINUE;
} else {
LOGGER.error(
"Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
new String[] {branchSession.toString()});
if (!retrying) {
globalSession.queueToRetryCommit();
throw new TransactionException(ex);
}
}
return CONTINUE;
});
// Return if the result is not null
if (result != null) {
return result;
}
//If has branch and not all remaining branches can be committed asynchronously,
//do print log and return false
if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
return false;
}
}
//If success and there is no branch, end the global transaction.
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession);
// committed event
eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));
LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
}
return success;
}
复制代码
3.2.5.3 RM进行Branch Commit/Rollback阶段
RM收到Branch Commit/Rollback请求以后,会交给AbstractRMHandler处理,如下代码所示:
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
}
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch commit result: " + status);
}
}
复制代码
Branch Commit/Rollback很简单,就是删除undo log或通过undo log进行数据还原。主要包括以下几个步骤:
-
Branch Commit过程:
- 将commit请求加入异步处理队列(AsyncWorker#commitQueue)中
- RM中的定时任务会每隔1s处理一次队列中的数据。
- 对Branch Commit的过程非常简单,主要就是将undo log进行批量删除。
-
Branch Rollback过程:
Branch Rollback逻辑非常简单,就是查询undo log,然后反序列化,通过before image内容重构DO对象,然后还原原始记录。注意:Branch Rollback是同步进行的。
3.3 补偿
在DefaultCoordinator中有几个定时任务,用于失败重试或补偿,确保以上流程出异常或者系统被强行关机以后,依然能保证全局事务的准确性。
4 最佳实践
4.1 性能问题
通过原理分析部分我们可以看到RM对DB操作时需要:解析SQL,查询原始记录,向TC注册分支事务,这是一次远程通讯,而且这次通讯还在DB的事务内部,这必定会怎增加本地事务的时间,降低SQL的执行效率。
另外:Branch Register阶段需要创建全局锁,如果一次锁定资源较多,性能影响势必明显增加,事务失败的可能性也会大大增加。
4.2 脏读问题
RM本地事务执行完毕后会,DB事务会提交,此时全局的分布式事务可能尚未结束,而且后续如果全局事务回滚的话,之前已经持久化到DB中的数据也要被回滚,所以从「分支事务提交前」到「全局事务回滚前」这段时间内,对其他线程而言,可能读取到脏数据。
可以通过@GlobalLock注解解决这个问题。这个注解会解析sql需要操作的记录,然后查询TC的lock_table,查看要操作的资源是否被其他全局资源占据。代码如下所示:
ConnectionProxy#doCommit:DB执行前判断是否要检查全局锁
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
private void processLocalCommitWithGlobalLocks() throws SQLException {
checkLock(context.buildLockKeys());
try {
targetConnection.commit();
} catch (Throwable ex) {
throw new SQLException(ex);
}
context.reset();
}
public void checkLock(String lockKeys) throws SQLException {
if (StringUtils.isBlank(lockKeys)) {
return;
}
// Just check lock without requiring lock by now.
try {
boolean lockable = DefaultResourceManager.get().lockQuery(BranchType.AT,
getDataSourceProxy().getResourceId(), context.getXid(), lockKeys);
if (!lockable) {
throw new LockConflictException();
}
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, lockKeys);
}
}
复制代码