seata AT模式源码(一)

版本说明

<groupId>io.seata</groupId>
<artifactId>seata-parent</artifactId>
<version>1.2.0</version>
复制代码

1、seata使用

seata AT模式有三种角色:

  • TC 事务协调者 维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚
  • RM 事务参与者 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚
  • TM 事务发起者 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议

1.1 事务协调者-seata server

搭建seata server

三张表:

t_global_transaction:全局事务表
t_branch_transaction:分支事务表
t_global_lock:全局锁表
复制代码

1.2 事务发起者和事务参与者-seata client

  • 引入相关依赖
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-seata</artifactId>
</dependency>
复制代码
  • 事务发起者和事务参与者在本地新建undo_log表

  • 事务发起者和事务参与者配置seata-server的相关信息

  • 事务发起者的方法上加@GlobalTransactionl注解

2、seata整合springboot

在seata-spring-boot-starter模块类路径META-INF/spring.factories文件中,导入了 SeataAutoConfiguration配置类。

public class SeataAutoConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

    //保存applicationContext的一个单例对象
    @Bean(BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER)
    @ConditionalOnMissingBean(name = {BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER})
    public SpringApplicationContextProvider springApplicationContextProvider() {
        return new SpringApplicationContextProvider();
    }
    
    //失败处理器
    @Bean(BEAN_NAME_FAILURE_HANDLER)
    @ConditionalOnMissingBean(FailureHandler.class)
    public FailureHandler failureHandler() {
        return new DefaultFailureHandlerImpl();
    }

    //对加了@GlobalTransaction或@GlobalLock的方法所属的对象进行代理
    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Automatically configure Seata");
        }
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

    //seat提供的代理DataSource的对象
    @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    @ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = {"enableAutoDataSourceProxy", "enable-auto-data-source-proxy"}, havingValue = "true", matchIfMissing = true)
    @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
        return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),seataProperties.getExcludesForAutoProxying());
    }
}
复制代码

GlobalTransactionScanner实现了InitializingBean接口的afterProperties方法,用来初始化RM和TM,RM、TM与RS之间的通信通过netty实现。

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements InitializingBean, ApplicationContextAware,
    DisposableBean {

    //生命周期初始化方法
    @Override
    public void afterPropertiesSet() {
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        //初始化netty客户端 TM RM
        initClient();
    }
    
    private void initClient() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Initializing Global Transaction Clients ... ");
        }
        if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
            throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
        }

        /**
         * TM (Transaction Manager):全局事务管理器,控制全局事务边界,负责全局事务开启、全局提交、全局回滚。
         * RM (Resource Manager):资源管理器,控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。
         * TC (Transaction Coordinator):事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
         */
        //初始化TM
        TMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }

        //初始化RM
        RMClient.init(applicationId, txServiceGroup);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Global Transaction Clients are initialized. ");
        }
        registerSpringShutdownHook();
    }
}
复制代码

同时,GlobalTransactionScanner还是一个BeanPostProcessor类型的bean,重写了AbstractAutoProxyCreatorwrapIfNecessary方法。AbstractAutoProxyCreator是spring实现aop的核心类。对spring aop原理感兴趣的同学见笔者的spring aop原理

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements InitializingBean, ApplicationContextAware,
    DisposableBean {

    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        //是否禁用分布式事务 默认为flase
        if (disableGlobalTransaction) {
            //如果禁用了全局事务 直接返回
            return bean;
        }
        try {
            synchronized (PROXYED_SET) {
                //如果已经代理过了 返回
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                //判断是否开启TCC模式   默认不开启
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    //TCC实现的拦截器
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                } else {
                    //拿到bean的class类型
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                    /**
                     * 判断此类中是否有方法 标注了GlobalTransactional或GlobalLock注解
                     */
                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        //不存在 直接返回
                        return bean;
                    }
                    //如果存在
                    if (interceptor == null) {
                        /**
                         *    实例化全局事务拦截器 目标方法执行时实际上会调用GlobalTransactionalInterceptor.invoke()方法
                         *    interceptor在getAdvicesAndAdvisorsForBean()方法中返回给父类
                         */
                        interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                        ConfigurationFactory.getInstance().addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener) interceptor);
                    }
                }

                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                //判断当前bean是否已经是spring代理过的bean
                if (!AopUtils.isAopProxy(bean)) {
                    //如果还不是 走一轮 spring的代理逻辑   在父类中会把当前类的interceptor(GlobalTransactionalInterceptor)添加到拦截器链中(因为实现了父类的getAdvicesAndAdvisorsForBean方法)
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    /**
                     * 如果已经是spring的代理类了, 那么反射获取代理类中已经存在的拦截器集合
                     * 然后把interceptor(getAdvicesAndAdvisorsForBean()返回)添加到该集合
                     */
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

    //判断class类里是否有方法加了GlobalTransactional或GlobalLock注解
    private boolean existsAnnotation(Class<?>[] classes) {
        if (CollectionUtils.isNotEmpty(classes)) {
            for (Class<?> clazz : classes) {
                if (clazz == null) {
                    continue;
                }
                Method[] methods = clazz.getMethods();
                for (Method method : methods) {
                    GlobalTransactional trxAnno = method.getAnnotation(GlobalTransactional.class);
                    if (trxAnno != null) {
                        return true;
                    }

                    GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
                    if (lockAnno != null) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource)
        throws BeansException {
        return new Object[]{interceptor};
    }
}
复制代码

3、目标方法执行

经过spring 的代理后,目标方法(加了@GlobalTransaction或@GlobalLock注解的方法)执行的时候就会被GlobalTransactionalInterceptor‘的invoke`拦截。

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {


    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())
            : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        //拿到注解的元数据
        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        if (!disable && globalTransactionalAnnotation != null) {
            //处理GlobalTransactional
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (!disable && globalLockAnnotation != null) {
            //处理GlobalLock
            return handleGlobalLock(methodInvocation);
        } else {
            //直接调用目标方法
            return methodInvocation.proceed();
        }
    }
    

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                           final GlobalTransactional globalTrxAnno) throws Throwable {
        try {
            //TransactionalTemplate#execute
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    //执行原始方法
                    return methodInvocation.proceed();
                }

                //自定义或者格式化生成事务的名称
                public String name() {
                    //如果用户指定了名字 就使用用户指定的
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    //没有指定就生成一个
                    return formatMethod(methodInvocation.getMethod());
                }

                //解析注解 将GlobalTransaction注解信息包装成TransactionInfo对象
                @Override
                public TransactionInfo getTransactionInfo() {
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(globalTrxAnno.propagation());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));

            }
        }
    }

}
复制代码

GlobalTransactionalInterceptor#invoke针对各种可能的情况区分处理,这里我们比较关注加了@GlobalTransaction注解的处理逻辑(加了@GlobalLock注解的比较简单,放到后面说)

继续看TransactionalTemplate#execute方法

public class TransactionalTemplate {
   
    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1 get transactionInfo
        //1、拿到TransactionInfo(包装注解信息的对象)
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 get or create a transaction
        //1.1获取或创建事务 如果是第一次调用 此处返回DefaultGlobalTransaction
        /**
         * 确定全局事务的发起者和参与者
         */
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.2 Handle the Transaction propatation and the branchType
        //1.2处理事务传播类型和分支类型
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    suspendedResourcesHolder = tx.suspend(true);
                    return business.execute();
                case REQUIRES_NEW:
                    suspendedResourcesHolder = tx.suspend(true);
                    break;
                case SUPPORTS:
                    if (!existingTransaction()) {
                        return business.execute();
                    }
                    break;
                case REQUIRED:
                    break;
                case NEVER:
                    if (existingTransaction()) {
                        throw new TransactionException(
                                String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
                                        ,RootContext.getXID()));
                    } else {
                        return business.execute();
                    }
                case MANDATORY:
                    if (!existingTransaction()) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }


            try {

                // 2. begin transaction
                /**
                 *  2 开始事务
                 *  向seata server申请zid
                 *  将xid绑定到当前线程
                 */

                 beginTransaction(txInfo, tx);

                Object rs = null;
                try {

                    // Do Your Business
                    /**
                     * 做你的事情 执行目标方法   最终会执行到PreparedStatementProxy.execute()
                     */
                    rs = business.execute();

                } catch (Throwable ex) {

                    // 3.the needed business exception to rollback.
                    /**
                     * 3 rollback全局事务
                     */
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }

                // 4. everything is fine, commit.
                /**
                 * 4提交事务
                 */
                commitTransaction(tx);

                return rs;
            } finally {
                //5. clear
                //回调钩子函数
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            tx.resume(suspendedResourcesHolder);
        }

    }

}
复制代码

3.1 确定分布式事务的发起者与参与者

根据本地线程中是否存在分布式事务id(保存在ThreadLocal中)判断当前是分布式事务的发起者或参与者

public class GlobalTransactionContext {
   
    public static GlobalTransaction getCurrentOrCreate() {
        //获取全局事务
        GlobalTransaction tx = getCurrent();
        //如果获取不到现有的全局事务 就创建一个
        if (tx == null) {
            //以全局事务发起者的身份发起全局事务
            return createNew();
        }
        return tx;
    }
    
   
    private static GlobalTransaction getCurrent() {
        //获取全局事务的xid
        String xid = RootContext.getXID();
        //xid不存在 表示不存在全局事务
        if (xid == null) {
            return null;
        }
        /**
         * 否则以参与者的身份加入全局事务
         */
        return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
    }
    
    
    private static GlobalTransaction createNew() {
        return new DefaultGlobalTransaction();
    }

}

public class RootContext {

    public static final String KEY_XID = "TX_XID";

    public static final String KEY_XID_INTERCEPTOR_TYPE = "tx-xid-interceptor-type";


    private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();

    public static String getXID() {
        String xid = CONTEXT_HOLDER.get(KEY_XID);
        if (StringUtils.isNotBlank(xid)) {
            return xid;
        }

        String xidType = CONTEXT_HOLDER.get(KEY_XID_INTERCEPTOR_TYPE);
        if (StringUtils.isNotBlank(xidType) && xidType.contains("_")) {
            return xidType.split("_")[0];
        }

        return null;
    }
}

复制代码

3.2 处理spring事务的传播行为

根据对应传播行为的特性进行相应处理

3.3 分布式事务

3.3.1 开启分布式事务

public class TransactionalTemplate {
 
    private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            //回调钩子函数
            triggerBeforeBegin();
            //DefaultGlobalTransaction.begin()  真正开启事务
            //底层将和seata服务器通信 创建全局事务
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            //回调钩子函数
            triggerAfterBegin();
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
    }
}
复制代码

向seata-server申请开启分布式事务

public class DefaultGlobalTransaction implements GlobalTransaction {
   
    public void begin(int timeout, String name) throws TransactionException {
        //全局事务发起者
        //加了GlobalTransactional注解的方法是Launcher
        if (role != GlobalTransactionRole.Launcher) {
            //全局事务参与者
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        //向seata-server申请开启分布式事务,返回分布式事务id - xid
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;

        /**
         * 将xid绑定到当前线程
         */
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }
    }
}
复制代码

到此,已经开启了分布式事务,下一步就是要执行目标方法了。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享