`
y806839048
  • 浏览: 1120801 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

rainCat/myth事物框架核心原理解析

阅读更多

 

mycat也是用相似的原理实现二阶段提交的,无论是raincat还是myth都会通过切面,从线程的上下文找到分布式事物的事物群归属(messagegroup)(包括buddo,springcloud--远程调用会带上组信息的),其实mycat》raincat(raincat只有事物特性)

 

在每个角色-发起方,参与方都有事物service层注解

1,一个线程去执行切点任务,扫master的协调操作信息   ----第一阶段:操作session持久化状态

2,主线程等待,等步骤1的线程返回指令后,提交事物或回滚事物  ----第二阶段:从session最总提交commit数据库,或回滚就是直接变为游离态

 

业务切面中的方法操作的还是在数据库内存中持久态的对象,最终commit才会清除之后到数据库中持久化

 

 

提交,回滚--transactionStatus(用这个对象)

TransactionStatus transactionStatus = platformTransactionManager.getTransaction(def);  自动复制为当前方法中的事物对象(获取当前事物对象),然后操作这个事物对象就是操作本方法的事物对象

 

 

 

数据         session     数据库

瞬时状态      不关联      无数据

持久状态       关联        commit之后才有数据

移除状态       关联        无数据

游离状态       不关联      有数据

 

 

1.瞬时状态 
也就是实体对象处于new的状态 
它是在session缓存区中不存在的,也不会跟session有关联 
更不会跟数据库有关联 
2.持久状态 
持久状态是最重要的。、 
当你用save(),load(),get(),update(), 
list,iterater,scroll,saveOrUpdate方法时, 
都会编程持久状态,会把对象放在session缓存区中 
当session进行缓存区清理的时候,也就是commit的时候 
会把数据与数据库进行同步。 

3.移除状态 
用remove(),delete就会从持久状态变成移除状态 
移除状态的对象还是存在缓存区中的,只是数据库中不在有 
此对象,不在和它关联 
一旦session关闭,这个对象也就没有意义了 

4.游离状态 
当session关闭后,存在session缓存区中的对象就变成了游离状态 
。他们还是可以访问各自的数据,但是他们不在和数据库关联起来, 
他们只是曾经与session有过一段绯闻,而最后被抛弃的东西, 
而他们拥有对象标识符

  

具体分析过程:

 

 

StartTxTransactionHandler

 

这里参与者(消费端)自己的事物最后提交需要等待

  final Boolean commit = txManagerMessageService.preCommmitTxTransaction(groupId); ---netty阻塞等待 所有参与者的事物正常提交,之后提交自己的事物

整个事物连中有一个事物异常发起者通知这个消息事物的提交回滚--发起者调用参与者

 //通知tm整个事务组失败,需要回滚,(回滚那些正常提交的模块,他们正在等待通知。。。。)

txManagerMessageService.rollBackTxTransaction(groupId);

 

 

  ActorTxTransactionHandler  提供方提交事物

 

 

 

大致的思路都是用了自定义的锁

final BlockTask task = BlockTaskHelper.getInstance().getTask(taskKey);

 

 

主线程等待,一个定时线程池轮询协调者状态可以就向下执行,提交---这就是阻塞等待的原理

 

 

//线程轮询

 transactionThreadPool

                .newFixedThreadPool()

                .execute(() -> {

                    final String waitKey = IdWorkerUtils.getInstance().createTaskKey();

                    final BlockTask waitTask = BlockTaskHelper.getInstance().getTask(waitKey);

                    DefaultTransactionDefinition def = new DefaultTransactionDefinition();

                    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);

                    TransactionStatus transactionStatus = platformTransactionManager.getTransaction(def);

                      /*

                      1 设置返回数据res

                      2 唤醒主线程

                      3  向TxManager 发送提交整个事务组请求

                      4 自身进入阻塞,等待TxManager通知是否提交 超时会唤醒  我觉得这里应该是继续提交

                      5 TxManager 发送提交通知,唤醒线程, 进行提交

                      6 提交完成后,想txManager 发送事务已经完成的通知

                     */

                    try {

 

                        //添加事务组信息

                        final Boolean success = txManagerMessageService.addTxTransaction(info.getTxGroupId(),

                                build(waitKey, info));

                        if (success) {

                            //发起调用

                            final Object res = point.proceed();

 

                            //设置返回数据,并唤醒之前等待的主线程

                            task.setAsyncCall(objects -> res);

                            task.signal();

 

                            //调用成功 保存本地补偿信息

                            String compensateId = txCompensationCommand.saveTxCompensation(info.getInvocation(),

                                    info.getTxGroupId(), waitKey);

 

                            /*

                             *

                             *等待txManager通知(提交或者回滚) 此线程唤醒(txManager通知客户端,然后唤醒)

                             * 如果此时TxManager down机或者网络通信异常 需要再开一个调度线程来唤醒

                             */

                            final ScheduledFuture scheduledFuture = transactionThreadPool.multiScheduled(() -> {

                                LogUtil.info(LOGGER, "事务组id:{},自动超时处理", info::getTxGroupId);

                                final BlockTask blockTask = BlockTaskHelper.getInstance().getTask(waitKey);

                                if (!blockTask.isNotify()) {

                                    //如果获取通知超时了,那么就去获取事务组的状态

                                    final int transactionGroupStatus = txManagerMessageService.findTransactionGroupStatus(info.getTxGroupId());

                                    if (TransactionStatusEnum.PRE_COMMIT.getCode() == transactionGroupStatus ||

                                            TransactionStatusEnum.COMMIT.getCode() == transactionGroupStatus) {

                                        //如果事务组是预提交,或者是提交状态

                                        //表明事务组是成功的,这时候就算超时也应该去提交

                                        waitTask.setAsyncCall(objects -> TransactionStatusEnum.COMMIT.getCode());

                                        waitTask.signal();

 

                                    } else {

                                        LogUtil.info(LOGGER, "事务组id:{},自动超时进行回滚!", info::getTxGroupId);

                                        waitTask.setAsyncCall(objects -> NettyResultEnum.TIME_OUT.getCode());

                                        waitTask.signal();

                                    }

                                    LOGGER.error("============通过定时任务来唤醒线程!事务状态为:{}", transactionGroupStatus);

                                    return true;

                                } else {

                                    return false;

                                }

 

                            }, info.getWaitMaxTime());

 

                            waitTask.await();

 

                            LogUtil.info(LOGGER, "已经成功接收txManager指令,并执行!{}",

                                    () -> info.getTxGroupId() + ":" + waitKey);

                            try {

 

                                //如果已经被唤醒,就不需要去执行调度线程了 ,关闭上面的调度线程池中的任务

                                if (!scheduledFuture.isDone()) {

                                    scheduledFuture.cancel(false);

                                }

                                final Integer status = (Integer) waitTask.getAsyncCall().callBack();

                                if (TransactionStatusEnum.COMMIT.getCode() == status) {

 

                                    //提交事务

                                    platformTransactionManager.commit(transactionStatus);

 

                                    //通知tm 自身事务提交

                                    asyncComplete(info.getTxGroupId(), waitKey,

                                            TransactionStatusEnum.COMMIT.getCode(), res);

 

                                } else {

                                    //回滚当前事务

                                    platformTransactionManager.rollback(transactionStatus);

 

 

                                    //通知tm 自身事务回滚

                                    asyncComplete(info.getTxGroupId(), waitKey,

                                            TransactionStatusEnum.ROLLBACK.getCode(), res);

 

                                }

                            } catch (Throwable throwable) {

                                platformTransactionManager.rollback(transactionStatus);

                                throwable.printStackTrace();

                            } finally {

                                BlockTaskHelper.getInstance().removeByKey(waitKey);

                                //删除本地补偿信息

                                txCompensationCommand.removeTxCompensation(compensateId);

                            }

 

                        } else {

                            platformTransactionManager.rollback(transactionStatus);

                        }

 

                    } catch (final Throwable throwable) {

                        throwable.printStackTrace();

                        //如果有异常 当前项目事务进行回滚

                        platformTransactionManager.rollback(transactionStatus);

                        //通知tm 自身事务失败

                        asyncComplete(info.getTxGroupId(),

                                waitKey, TransactionStatusEnum.FAILURE.getCode(), throwable.getMessage());

 

                        task.setAsyncCall(objects -> {

                            throw throwable;

                        });

                        task.signal();

 

                    }

 

                });

 

 

//主线程等待

 

        task.await();

 

 

 

 

 

 

 

 

回滚(也用消息日志中的信息回滚),提交成功,同时也删除消息事物日志  txCompensationCommand.removeTxCompensation(compensateId);

补偿回滚(也用消息日志中的信息提交)----宕机

 

//调用成功 保存本地补偿信息

                            String compensateId = txCompensationCommand.saveTxCompensation(info.getInvocation(),

                                    info.getTxGroupId(), waitKey);

 

专门有一个线程不断轮询queue中的补偿信息,只要状态对,就执行补偿

 

StartCompensationHandler:补偿就是哪个事物回滚就去除这个事物消息组中的补偿信息(根据状态)(TxCompensationServiceImpl)

 

 

 

 

 

检查补偿队列补偿信息(根据状态)就进入补偿处理器:再一次执行业务代码,提交事物

 

@Service

public class TxTransactionFactoryServiceImpl implements TxTransactionFactoryService {

 

    @Override

    public Class factoryOf(TxTransactionInfo info) throws Throwable {

        if (StringUtils.isNoneBlank(info.getCompensationId())) {

            return StartCompensationHandler.class;

        }

        if (StringUtils.isBlank(info.getTxGroupId())) {

            return StartTxTransactionHandler.class;

        } else {

            if (Objects.equals(CommonConstant.COMPENSATE_ID, info.getTxGroupId())) {

                return InsideCompensationHandler.class;

            }

            return ActorTxTransactionHandler.class;

        }

 

    }

}

 

补偿处理器再次处理业务方法,提交一次

 

 final Object proceed = point.proceed();

            platformTransactionManager.commit(transactionStatus);

 

 

最后删除整个补偿信息

 TxTransactionLocal.getInstance().removeTxGroupId();

            CompensationLocal.getInstance().removeCompensationId();

 

 

    

 

分享到:
评论

相关推荐

    raincat:强一致分布式事务框架

    高度一致的分布式交易框架模组raincat-admin:事务日志管理背景raincat-annotation:框架通用注释raincat-common:框架通用类raincat-core:框架核心软件包(注释处理,日志存储...) raincat-dashboard:管理后台...

    基于Java的2阶段提交分布式事务中间件Raincat设计源码

    本源码为基于Java的2阶段提交分布式事务中间件Raincat设计,包含261个Java文件、39个XML文件等,共366个文件。该项目旨在为用户提供一个全面、便捷的分布式事务解决方案,通过Java、JavaScript、Vue、HTML技术的结合...

    raincat-snap:基于Haskell的Raincat游戏的Snap软件包

    这是Raincat的诀窍, “益智游戏中有一只猫扮演主角。” 。 它适用于Ubuntu,Fedora,Debian和其他主要Linux发行版。 发布时间: 和 :heart_with_ribbon: 由Snapcrafters 安装 sudo snap install raincat-test ( ...

    tcc-transaction:tcc-transaction是TCC型事务java实现

    使用指南1.1.x: : 1.1.x源码分支: : 使用指南1.2.x: : 1.2.x源码分支: : 1.2.x版本不向下兼容1.1.x,主要是在声明中tcc服务方法的注解有改变。1.2.x版本1.1.x主要的地方在于发布...在rpc框架为dubbo情况下,可利

    .net 二维码识别开源组件

    在二维码识别组件中,这两个文件可能包含了一些核心的类库或API,用于支持二维码的生成和解析。 综上所述,.NET二维码识别开源组件为开发者提供了强大的工具,简化了二维码的生成和识别过程。通过利用这些组件,...

    联想移动保险箱V2.4.

    联想移动保险箱V2.4联想移动保险箱V2.4.

Global site tag (gtag.js) - Google Analytics