版本说明
<groupId>io.seata</groupId>
<artifactId>seata-parent</artifactId>
<version>1.2.0</version>
复制代码
1、seata使用
seata AT模式有三种角色:
- TC 事务协调者 维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚
- RM 事务参与者 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚
- TM 事务发起者 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议
1.1 事务协调者-seata server
搭建seata server
三张表:
t_global_transaction:全局事务表
t_branch_transaction:分支事务表
t_global_lock:全局锁表
复制代码
1.2 事务发起者和事务参与者-seata client
- 引入相关依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>
复制代码
-
事务发起者和事务参与者在本地新建undo_log表
-
事务发起者和事务参与者配置seata-server的相关信息
-
事务发起者的方法上加
@GlobalTransactionl
注解
2、seata整合springboot
在seata-spring-boot-starter模块类路径META-INF/spring.factories文件中,导入了 SeataAutoConfiguration
配置类。
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
//保存applicationContext的一个单例对象
@Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
@ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
public SpringApplicationContextProvider springApplicationContextProvider() {
return new SpringApplicationContextProvider();
}
//失败处理器
@Bean(BEAN_NAME_FAILURE_HANDLER)
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}
//对加了@GlobalTransaction或@GlobalLock的方法所属的对象进行代理
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
//seat提供的代理DataSource的对象
@Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());
}
}
复制代码
GlobalTransactionScanner
实现了InitializingBean接口的afterProperties方法,用来初始化RM和TM,RM、TM与RS之间的通信通过netty实现。
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,
DisposableBean {
//生命周期初始化方法
@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
//初始化netty客户端 TM RM
initClient();
}
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
/**
* TM (Transaction Manager):全局事务管理器,控制全局事务边界,负责全局事务开启、全局提交、全局回滚。
* RM (Resource Manager):资源管理器,控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。
* TC (Transaction Coordinator):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
*/
//初始化TM
TMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//初始化RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();
}
}
复制代码
同时,GlobalTransactionScanner还是一个BeanPostProcessor类型的bean,重写了AbstractAutoProxyCreator
的wrapIfNecessary
方法。AbstractAutoProxyCreator是spring实现aop的核心类。对spring aop原理感兴趣的同学见笔者的spring aop原理
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,
DisposableBean {
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
//是否禁用分布式事务 默认为flase
if (disableGlobalTransaction) {
//如果禁用了全局事务 直接返回
return bean;
}
try {
synchronized (PROXYED_SET) {
//如果已经代理过了 返回
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//判断是否开启TCC模式 默认不开启
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
//TCC实现的拦截器
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
//拿到bean的class类型
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
/**
* 判断此类中是否有方法 标注了GlobalTransactional或GlobalLock注解
*/
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
//不存在 直接返回
return bean;
}
//如果存在
if (interceptor == null) {
/**
* 实例化全局事务拦截器 目标方法执行时实际上会调用GlobalTransactionalInterceptor.invoke()方法
* interceptor在getAdvicesAndAdvisorsForBean()方法中返回给父类
*/
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener) interceptor);
}
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
//判断当前bean是否已经是spring代理过的bean
if (!AopUtils.isAopProxy(bean)) {
//如果还不是 走一轮 spring的代理逻辑 在父类中会把当前类的interceptor(GlobalTransactionalInterceptor)添加到拦截器链中(因为实现了父类的getAdvicesAndAdvisorsForBean方法)
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
/**
* 如果已经是spring的代理类了, 那么反射获取代理类中已经存在的拦截器集合
* 然后把interceptor(getAdvicesAndAdvisorsForBean()返回)添加到该集合
*/
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
//判断class类里是否有方法加了GlobalTransactional或GlobalLock注解
private boolean existsAnnotation(Class<?>[] classes) {
if (CollectionUtils.isNotEmpty(classes)) {
for (Class<?> clazz : classes) {
if (clazz == null) {
continue;
}
Method[] methods = clazz.getMethods();
for (Method method : methods) {
GlobalTransactional trxAnno = method.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) {
return true;
}
GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
if (lockAnno != null) {
return true;
}
}
}
}
return false;
}
protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource)
throws BeansException {
return new Object[]{interceptor};
}
}
复制代码
3、目标方法执行
经过spring 的代理后,目标方法(加了@GlobalTransaction或@GlobalLock注解的方法)执行的时候就会被GlobalTransactionalInterceptor‘的
invoke`拦截。
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())
: null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//拿到注解的元数据
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (!disable && globalTransactionalAnnotation != null) {
//处理GlobalTransactional
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (!disable && globalLockAnnotation != null) {
//处理GlobalLock
return handleGlobalLock(methodInvocation);
} else {
//直接调用目标方法
return methodInvocation.proceed();
}
}
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
//TransactionalTemplate#execute
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
//执行原始方法
return methodInvocation.proceed();
}
//自定义或者格式化生成事务的名称
public String name() {
//如果用户指定了名字 就使用用户指定的
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
//没有指定就生成一个
return formatMethod(methodInvocation.getMethod());
}
//解析注解 将GlobalTransaction注解信息包装成TransactionInfo对象
@Override
public TransactionInfo getTransactionInfo() {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.rollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getCause());
throw e.getCause();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
}
}
}
复制代码
GlobalTransactionalInterceptor#invoke针对各种可能的情况区分处理,这里我们比较关注加了@GlobalTransaction注解的处理逻辑(加了@GlobalLock注解的比较简单,放到后面说)
继续看TransactionalTemplate#execute
方法
public class TransactionalTemplate {
public Object execute(TransactionalExecutor business) throws Throwable {
// 1 get transactionInfo
//1、拿到TransactionInfo(包装注解信息的对象)
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 get or create a transaction
//1.1获取或创建事务 如果是第一次调用 此处返回DefaultGlobalTransaction
/**
* 确定全局事务的发起者和参与者
*/
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.2 Handle the Transaction propatation and the branchType
//1.2处理事务传播类型和分支类型
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
suspendedResourcesHolder = tx.suspend(true);
return business.execute();
case REQUIRES_NEW:
suspendedResourcesHolder = tx.suspend(true);
break;
case SUPPORTS:
if (!existingTransaction()) {
return business.execute();
}
break;
case REQUIRED:
break;
case NEVER:
if (existingTransaction()) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
,RootContext.getXID()));
} else {
return business.execute();
}
case MANDATORY:
if (!existingTransaction()) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
try {
// 2. begin transaction
/**
* 2 开始事务
* 向seata server申请zid
* 将xid绑定到当前线程
*/
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business
/**
* 做你的事情 执行目标方法 最终会执行到PreparedStatementProxy.execute()
*/
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
/**
* 3 rollback全局事务
*/
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 4. everything is fine, commit.
/**
* 4提交事务
*/
commitTransaction(tx);
return rs;
} finally {
//5. clear
//回调钩子函数
triggerAfterCompletion();
cleanUp();
}
} finally {
tx.resume(suspendedResourcesHolder);
}
}
}
复制代码
3.1 确定分布式事务的发起者与参与者
根据本地线程中是否存在分布式事务id(保存在ThreadLocal中)判断当前是分布式事务的发起者或参与者
public class GlobalTransactionContext {
public static GlobalTransaction getCurrentOrCreate() {
//获取全局事务
GlobalTransaction tx = getCurrent();
//如果获取不到现有的全局事务 就创建一个
if (tx == null) {
//以全局事务发起者的身份发起全局事务
return createNew();
}
return tx;
}
private static GlobalTransaction getCurrent() {
//获取全局事务的xid
String xid = RootContext.getXID();
//xid不存在 表示不存在全局事务
if (xid == null) {
return null;
}
/**
* 否则以参与者的身份加入全局事务
*/
return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}
private static GlobalTransaction createNew() {
return new DefaultGlobalTransaction();
}
}
public class RootContext {
public static final String KEY_XID = "TX_XID";
public static final String KEY_XID_INTERCEPTOR_TYPE = "tx-xid-interceptor-type";
private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
public static String getXID() {
String xid = CONTEXT_HOLDER.get(KEY_XID);
if (StringUtils.isNotBlank(xid)) {
return xid;
}
String xidType = CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR_TYPE);
if (StringUtils.isNotBlank(xidType) && xidType.contains("_")) {
return xidType.split("_")[0];
}
return null;
}
}
复制代码
3.2 处理spring事务的传播行为
根据对应传播行为的特性进行相应处理
3.3 分布式事务
3.3.1 开启分布式事务
public class TransactionalTemplate {
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
//回调钩子函数
triggerBeforeBegin();
//DefaultGlobalTransaction.begin() 真正开启事务
//底层将和seata服务器通信 创建全局事务
tx.begin(txInfo.getTimeOut(), txInfo.getName());
//回调钩子函数
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
}
复制代码
向seata-server申请开启分布式事务
public class DefaultGlobalTransaction implements GlobalTransaction {
public void begin(int timeout, String name) throws TransactionException {
//全局事务发起者
//加了GlobalTransactional注解的方法是Launcher
if (role != GlobalTransactionRole.Launcher) {
//全局事务参与者
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
if (RootContext.getXID() != null) {
throw new IllegalStateException();
}
//向seata-server申请开启分布式事务,返回分布式事务id - xid
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
/**
* 将xid绑定到当前线程
*/
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
}
复制代码
到此,已经开启了分布式事务,下一步就是要执行目标方法了。