`
y806839048
  • 浏览: 1128357 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Mycat 分布式事务的实现

阅读更多

 

Mycat 分布式事务的实现

 

引言:Mycat已经成为了一个强大的开源分布式数据库中间件产品。面对企业应用的海量数据事务处理,是目前最好的开源解决方案。但是如果想让多台机器中的数据保存一致,比较常规的解决方法是引入“协调者”来统一调度所有节点的执行。 
本文选自《分布式数据库架构及企业实践——基于Mycat中间件》。

mycat也是用相似的原理实现二阶段提交的,无论是raincat还是myth都会从线程的上下文找到分布式事物的事物群归属(messagegroup)(包括buddo,springcloud),无论是否是远程调用都会带上,其实mycat》raincat(raincat只有事物特性)

 

mycat相当于一个虚拟节点包含多数据节点,其对服务相当于一个虚拟数据库分布式系统公用这个数据库,外部看是一个数据库的分布式事物,其实mycat内部做了多个数据库(分片)的分布式事物,myth等的分布式事物是也可实现多个库之间的事物一致

  随着并发量、数据量越来越大及业务已经细化到不能再按照业务划分,我们不得不使用分布式数据库提高系统的性能。在分布式系统中,各个节点在物理上都是相对独立的,每个节点上的数据操作都可以满足 ACID。但是,各独立节点之间无法知道其他节点事务的执行情况,如果想让多台机器中的数据保存一致,就必须保证所有节点上的数据操作要么全部执行成功,要么全部不执行,比较常规的解决方法是引入“协调者”来统一调度所有节点的执行。

XA 规范

  X/Open 组织(即现在的 Open Group)定义了分布式事务处理模型。X/Open DTP 模型(1994)包括应用程序(AP)、事务管理器(TM)、资源管理器(RM)、通信资源管理器(CRM)四部分。事务管理器(TM)是交易中间件,资源管理器(RM)是数据库,通信资源管理器(CRM)是消息中间件。通常把一个数据库内部的事务处理看作本地事务,而分布式事务处理的对象是全局事务。全局事务是指在分布式事务处理环境中,多个数据库可能需要共同完成一个工作,这个工作就是一个全局事务。在一个事务中可能更新几个不同的数据库,此时一个数据库对自己内部所做操作的提交不仅需要本身的操作成功,还需要全局事务相关的其他数据库的操作成功。如果任一数据库的任一操作失败,则参与此事务的所有数据库所做的所有操作都必须回滚。XA就是X/Open DTP 定义的交易中间件与数据库之间的接口规范(即接口函数),交易中间件用它来通知数据库事务的开始、结束、提交、回滚等,XA 接口函数由数据库厂商提供,根据这一思想衍生出二阶段提交协议和三阶段提交协议。

二阶段提交

  所谓的两个阶段是指准备阶段和提交阶段。 
  准备阶段指事务协调者(事务管理器)向每个参与者(资源管理器)发送准备消息,每个参与者要么直接返回失败消息(如权限验证失败),要么在本地执行事务,写本地的 redo 和undo日志但不提交,可以进一步将准备阶段分为以下三步。 
  (1)协调者节点向所有参与者节点询问是否可以执行提交操作(vote),并开始等待各参与者节点的响应。 
  (2)参与者节点执行询问发起为止的所有事务操作,并将 undo 信息和 redo 信息写入日志。 
  (3)各参与者节点响应协调者节点发起的询问。如果参与者节点的事务操作实际执行成功,则它返回一个“同意”消息;如果参与者节点的事务操作实际执行失败,则它返回一个“中止”消息。 
  提交阶段指如果协调者收到了参与者的失败消息或者超时,则直接向每个参与者发送回滚(Rollback)消息,否则发送提交(Commit)消息,参与者根据协调者的指令执行提交或者回滚操作,释放所有事务在处理过程中使用的锁资源。 
  二阶段提交所存在的缺点如下。 
  (1)同步阻塞问题,在执行过程中所有参与节点都是事务阻塞型的,当参与者占用公共资源时,其他第三方节点访问公共资源时不得不处于阻塞状态。 
  (2)单点故障,由于协调者的重要性,一旦协调者发生故障,则参与者会一直阻塞下去。 
  (3)数据不一致,在二阶段提交的第 2 个阶段中,当协调者向参与者发送 commit 请求之后发生了局部网络异常或者在发送 commit 请求的过程中协调者发生了故障,则会导致只有一部分参与者接收到了 commit 请求,而在这部分参与者在接收到 commit 请求之后就会执行commit操作,其他部分未接收到 commit 请求的机器则无法执行事务提交,于是整个分布式系统便出现了数据不一致的现象。 
  由于二阶段提交存在诸如同步阻塞、单点问题、数据不一致、宕机等缺陷,所以,研究者们在二阶段提交的基础上做了改进,提出了三阶段提交。

三阶段提交

  三阶段提交(Three-phase commit,3PC),也叫作三阶段提交协议(Three-phase commitprotocol),是二阶段提交(2PC)的改进版本。三阶段提交把二阶段提交的准备阶段再次一分为二,这样三阶段提交就有 CanCommit、PreCommit、DoCommit 三个阶段。 
  (1)CanCommit 阶段:三阶段提交的 CanCommit 阶段其实和二阶段提交的准备阶段很像,协调者向参与者发送 commit 请求,参与者如果可以提交就返回 Yes 响应,否则返回 No 响应。 
  (2)PreCommit 阶段:协调者根据参与者的反应情况来决定是否可以记录事务的 PreCommit操作。根据响应情况,有以下两种可能。

  • 假如协调者从所有参与者那里获得的反馈都是 Yes 响应,则执行事务。
  • 假如有任何一个参与者向协调者发送了 No 响应,或者等待超时之后协调者都没有接到参与者的响应,则执行事务的中断。

(3)DoCommit阶段:该阶段进行真正的事务提交,也可以分为执行提交、中断事务两种执行情况。

  执行提交的过程如下。

  • 协调者接收到参与者发送的ACK响应后,将从预提交状态进入提交状态,并向所有参与者发送doCommit请求。
  • 事务提交参与者接收到doCommit请求之后,执行正式的事务提交,并在完成事务提交之后释放所有的事务资源。
  • 事务提交完之后,向协调者发送ACK响应。
  • 协调者接收到所有参与者的ACK响应之后,完成事务。中断事务的过程如下。
  • 协调者向所有参与者发送abort请求。
  • 参与者接收到 abort 请求之后,利用其在第 2 个阶段记录的 undo 信息来执行事务的回滚操作,并在完成回滚之后释放所有的事务资源。
  • 参与者完成事务回滚之后,向协调者发送 ACK 消息。
  • 协调者接收到参与者反馈的 ACK 消息之后,执行事务的中断。

Mycat 中分布式事务的实现

  Mycat在1.6版本以后已经完全支持 XA 分布式强事务类型了,先通过一个简单的示例来了解Mycat中XA的用法。 
  用户应用侧(AP)的使用流程如下: 
  (1)set autocommit=0 
  在应用层需要设置事务不能自动提交; 
  (2)set xa=on 
  在 SQL 中设置 XA 为开启状态; 
  (3)执行 SQL 
   insert into travelrecord(id,name) values(1,’N’),(6000000,’A’),(321,’D’),(13400000,’C’),(59,’E’); 
  (4)commit 或者 rollback 
  对事务进行提交(提交成功或者回滚异常)。 
  完整的流程图如图所示。 
 
  Mycat 内部实现侧的实现流程如下: 
  (1)set autocommit=0 
  将 MysqlConnection 中的 autocommit 设置为 false; 
  (2)set xa=on 
  在Mycat中开启 XA 事务管理器,用 MycatServer.getInstance().genXATXID()生成 XID,用XA START XID 命令进行 XA 事务开始标记,继续拼装 SQL 业务(Mycat 会将上面的 insert 数据分片到不同的节点上),拼装 XA END XID,XA PREPARE XID 最后进行 1pc 提交并记录日志到 tm.log 中,如果 1pc 阶段有异常,则直接回滚事务 XA ROLLBACK xid。 
  (3)在多节点 MySQL 中全部进行 2pc 提交(XA COMMIT),提交成功后,事务结束;如果有异常,则对事务进行重新提交或者回滚。 
  Mycat 中的 XA 分布式事务的异常处理流程如下: 
  (1)一阶段 commit 异常:如果 1pc 提交任意一个 mysql 节点无法提交或者异常,则全部节点的事务进行回滚,抛出异常给应用侧事务回滚。 
  (2)Mycat Crash Recovery 
  Mycat 崩溃以后,根据 tm.log 事务日志再进行重启恢复,mycat 启动后执行事务日志查找各个节点中已经 prepared 的 XA 事务,进行 commit 或者 rollback。

1. 相关类说明

  通过用户应用侧发送 set xa = on ; SQL 开启 Mycat 内部 XA 事务管理器的功能,事务管理器将对 MySQL 数据库进行 XA 方式的事务管理,具体事务管理功能的实现代码如下:

  • MySQLConnection:数据库连接。
  • NonBlockingSession:用户连接 Session。
  • MultiNodeCoordinator:协调者。
  • CommitNodeHandler:分片提交处理。
  • RollbackNodeHandler:分片回滚处理。

2. 代码解析

  XA 事务启动的源码如下:

public class MySQLConnection extends BackendAIOConnection {
    //设置开启事务
    private void getAutocommitCommand(StringBuilder sb, boolean autoCommit) {
        if (autoCommit) {
            sb.append("SET autocommit=1;");
        } else {
            sb.append("SET autocommit=0;");
        }
    }
    public void execute(RouteResultsetNode rrn, ServerConnection sc,boolean autocommit) throws UnsupportedEncodingException {
        if(!modifiedSQLExecuted && rrn.isModifySQL()) {
            modifiedSQLExecuted = true;
        }
        //获取当前事务 ID
        String xaTXID = sc.getSession2().getXaTXID();
        synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),autocommit);
    }
……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 MySQLConnection.java源码
}

  用户应用侧设置手动提交以后,Mycat 会在当前连接中加入

  SET autocommit=0;

  将该语句加入到 StringBuffer 中,等待提交到数据库。 
  用户连接 Session 的源码如下:

public class NonBlockingSession implements Session {
    ……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 NonBlockingSession.java 源码
}
SET XA = ON ;语句分析

  用户应用侧发送该语句到 Mycat 中,由 SQL 语句解析器解析后交由 SetHandle 进行处理c.getSession2().setXATXEnabled (true); 
  调用 NonBlockSession 中的 setXATXEnable d 方法设置 XA 开关启动,并生成 XID,代码如下:

public void setXATXEnabled(boolean xaTXEnabled) {
    LOGGER.info("XA Transaction enabled ,con " + this.getSource());
    if (xaTXEnabled && this.xaTXID == null) {
        xaTXID = genXATXID();
    }
}

  另外,NonBlockSession 会接收来自于用户应用侧的 commit, 调用 commit 方法进行处理事务提交的逻辑。 
  在 commit()方法中,首先会 check 节点个数,一个节点和多个节点分为不同的处理过程,这里只讲下多个节点的处理方法 checkDistriTransaxAndExecute(); 
  该方法会对多个节点的事务进行提交。 
  协调者的源码如下:

public class MultiNodeCoordinator implements ResponseHandler {
    ……
……//省略此处代码,建议读者参考 GitHub 仓库 MyCAT-Server 项目的 MultiNodeCoordinator.java 源码
}

  在 NonBlockSession 的 checkDistriTransaxAndExecute()方法中, NonBlockSession 会话类会调用专门进行多节点协同的 MultiNodeCoordinator 类进行具体的处理,在 MultiNodeCoordinator类中,executeBatchNodeCmd 方法加入 XA 1PC 提交的处理,代码片段如下:

for (RouteResultsetNode rrn : session.getTargetKeys()) {
    ……
    if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE){
        //recovery Log
        participantLogEntry[started] = new
        ParticipantLogEntry(xaTxId,conn.getHost(),0,conn.getSchema(),((MySQLConnection) conn).getXaStatus());
        String[] cmds = new String[]{"XA END " + xaTxId,"XA PREPARE " + xaTxId};
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Start execute the batch cmd : "+ cmds[0] + ";" +cmds[1]+","+"current connection:"+conn.getHost()+":"+conn.getPort());
        }
    mysqlCon.execBatchCmd(cmds);
    }
……
}

  在 MultiNodeCoordinator 类的 okResponse 方法中,则进行 2pc 的事务提交

MySQLConnection mysqlCon = (MySQLConnection) conn;
switch (mysqlCon.getXaStatus()){
    case TxState.TX_STARTED_STATE:
    if (mysqlCon.batchCmdFinished()){
        String xaTxId = session.getXaTXID();
        String cmd = "XA COMMIT " + xaTxId;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Start execute the cmd :"+cmd+",current host:"+mysqlCon.getHost()+":"+mysqlCon.getPort());
        }
        //recovery log
        CoordinatorLogEntry coordinatorLogEntry =inMemoryRepository.get(xaTxId);
        for(int i=0; i<coordinatorLogEntry.participants.length;i++){
            LOGGER.debug("[In MemoryCoordinatorLogEntry]"+coordinatorLogEntry.participants[i]);
            if(coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())){
                coordinatorLogEntry.participants[i].txState =TxState.TX_PREPARED_STATE;
            }
        }
        inMemoryRepository.put(session.getXaTXID(),coordinatorLogEntry);
        fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());
        //send commit
        mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE);
        mysqlCon.execCmd(cmd);
    }
    return;
……
}

  分片事务提交处理的源码如下:

public class CommitNodeHandler implements ResponseHandler {
    //结束 XA
    public void commit(BackendConnection conn) {
        ……
……//省略此处代码,建议读者参考 GitHub 仓库 MyCAT-Server 项目的 CommitNodeHandler.java源码
    }
    //提交 XA
    @Override
    public void okResponse(byte[] ok, BackendConnection conn) {
        ……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 CommitNodeHandler.java 源码
}

  在 Mycat 中同样支持单节点 MySQL 数据库的 XA 事务处理,在 CommitNodeHandler 类中就是对单节点的 XA 二阶段处理,处理方式与 MultiNodeCoordinator 类同,通过 commit 方法进行 1pc 的提交,而通过 okResponse 的方法进行 2pc 阶段的事务提交。 
  分片事务回滚处理的源码如下:

public class RollbackNodeHandler extends MultiNodeHandler {
    ……
……//省略此处代码,建议读者参考 GitHub 仓库的 MyCAT-Server 项目的 RollbackNodeHandler.java 源码
}

  在 RollbackNodeHandler 的 rollback 方法中加入了对 XA 事务的 rollback 处理,用户应用侧发起的 rollback 会在这个方法中进行处理。

for (final RouteResultsetNode node : session.getTargetKeys()) {
    ……
    //support the XA rollback
    MySQLConnection mysqlCon = (MySQLConnection) conn;
    if(session.getXaTXID()!=null) {
        String xaTxId = session.getXaTXID();
        mysqlCon.execCmd("XA END " + xaTxId + ";");
        mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";");
    }else {
    conn.rollback();
    }
……
}

  同样,该方法会对所有的 MySQL 数据库节点发起 xa rollback 指令。 

分享到:
评论

相关推荐

    Mycat分布式架构之Mycat入门到精通.rar

    5. **分布式事务**:Mycat支持分布式事务,采用两阶段提交协议来保证跨节点的事务一致性,这对于保持数据完整性至关重要。 6. **高可用性**:Mycat提供了主备切换和集群管理功能,当某个节点出现故障时,能够自动...

    Mycat分布式事务的实现

    但是,各独立节点之间无法知道其他节点事务的执行情况,如果想让多台机器中的数据保存一致,就必须保证所有节点上的数据操作要么全部执行成功,要么全部不执行,比较常规的解决方法是引入“协调者”来统一调度所有...

    关系型数据的分布式处理系统MyCAT

    - **数据一致性**:虽然 MyCAT 是一个分布式系统,但它仍然保持了事务处理的一致性。通过支持 ACID 特性,确保了数据的一致性和完整性。 #### 3. MyCAT 的应用场景 - **大型网站后端**:适用于处理高并发、大数据...

    基于Mycat中间件分布式数据库架构及企业实践

    尽管Mycat在分布式数据库领域展现出强大能力,但面临数据一致性、跨节点事务处理、复杂查询优化等挑战。企业需结合业务特点,合理设计分片策略,同时关注Mycat的持续更新和优化,以应对不断变化的技术环境。 总结,...

    SpringBoot整合多数据源,并实现本地分布式事务

    下面将详细介绍如何在Spring Boot中整合多数据源并实现本地分布式事务。 一、多数据源设计 1. **配置多数据源**:Spring Boot允许我们配置多个数据源,通过不同的配置类或YAML/Properties文件来区分。每个数据源...

    分布式数据库架构及企业实践-基于Mycat中间件

    本书对 Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心...

    微服务架构的分布式事务控制解决方案day02.zip

    5. **分布式事务API与框架**:例如,Apache的Atomikos、Seata(前身是MyCAT的SEATA)、Google的gRPC等,提供了实现分布式事务的工具和框架。 6. **分布式事务的挑战**:除了技术实现,分布式事务还面临着性能、可用...

    分布式事务书籍随书源码-distribute-transaction.zip

    5. **分布式事务中间件**:如阿里巴巴的Seata、Apache的Atomikos或MyCAT等,这些中间件提供了分布式事务管理的功能。源码可能涉及如何集成这些中间件并利用其特性。 6. **最终一致性**:在某些场景下,强一致性不...

    分布式数据库架构及企业实践-基于Mycat中间件.pdf

    Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心技术。通过...

    Mycat 大型分布式系统案例实战

    2. 数据一致性:在分布式环境中,事务处理需考虑CAP原理,Mycat提供了不同的事务隔离级别,以适应不同的业务需求。 3. 负载均衡:通过轮询、权重分配等方式,Mycat能动态调整请求的分配,避免热点数据出现,均衡各...

    分布式数据库架构及企业实践_基于mycat中间件

    Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心技术。通过...

    分布式数据库架构及企业实践-基于Mycat中间件 学习书籍

    Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心技术。通过...

    分布式数据库架构及企业实践 基于Mycat中间件

    Mycat 从入门到进阶、从高级技术实践到架构剖析、从网络通信协议解析到系统工作原理的方方面面进行了详细讲解,并剖析了 Mycat的 SQL 路由、跨库联合查询、分布式事务及原生 MySQL、PostgreSQL 协议等核心技术。通过...

    分布式数据库架构及企业实践 基于Mycat中间件.pdf

    Mycat通过两阶段提交、补偿事务(TCC)等方式保证分布式事务的ACID特性。 7. **容错与恢复**:Mycat具有节点失效检测和故障切换机制,当某个节点出现问题时,能够自动将流量切换到其他健康节点,确保服务的连续性。...

    mycat1.6.5源码,分库分表,分布式

    4. **事务一致性**:Mycat支持分布式事务,通过两阶段提交协议保证跨节点的数据一致性。 5. **高可用**:Mycat自身具有高可用性,可以通过集群部署实现故障切换,确保服务不中断。 6. **动态数据源**:可以在运行...

    mycat资料、分布式、mycat、数据库中间件

    1. **Mycat基础知识**:Mycat的核心功能是分库分表,通过路由规则将SQL请求分发到不同的数据库实例上,实现数据的水平扩展,减轻单个数据库的压力。它支持多种数据库类型,如MySQL、Oracle等,并具备事务一致性、...

    MySQL分布式集群之MyCAT资源下载

    MyCAT,全称MetaCat,是一款开源的数据库中间件,它能够将单一的MySQL数据库扩展为分布式数据库系统,支持分库分表,实现数据的水平扩展。在Windows平台上,MyCAT提供了便捷的部署和管理方式,使得在分布式环境中对...

    分布式数据库架构及企业实践__基于Mycat中间件

    4. **事务支持**:Mycat实现了分布式事务,保证了跨节点操作的一致性和原子性。 5. **高可用性**:通过心跳检测和故障切换,Mycat能够及时发现并处理节点故障,保证服务连续性。 6. **动态扩展**:在业务增长时,...

Global site tag (gtag.js) - Google Analytics