`
wxb880114
  • 浏览: 678930 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Java 分布式事务(多数据源)

阅读更多
最近这几天一直在整 怎么实现分布式事务。找了很多资料,不过大都相近类同。对Oracle、SQL Server、Mysql数已做过测试,其中Mysql5.0以上的才支持分布式事务。
对于这些,主要是之前根本没有接触过分布式事务,还错找了一些分布式事数据库的资料,呵呵,结果不是我目前所需要的。
测试过程中出现了很多错误,一直都通不过,以为是用户权限还有数据库服务的问题,但一切都配置良好的情况下还一直都通不过。结果发现,我导入的都是一些普通的JDBC连接包,于是狂搜实现XA事务的jar包。
Mysql:            mysql-connector-java-5.1.6-bin.jar
SQL Server:       sqljdbc.jar
Oracle:           ojdbc14.jar
用的是这些包才顺利通过运行。后面会附上这几个jar包。
好了,把源代码也附上:

import com.microsoft.sqlserver.jdbc.SQLServerXADataSource;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import java.sql.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.sql.*;
import javax.transaction.xa.*;
import oracle.jdbc.xa.client.OracleXADataSource;

public class Mutil_DataSource_Test {
    public static void main(String[] args){
        Mutil_DataSource_Test mdt = new Mutil_DataSource_Test();
        try {
            mdt.test1();
        } catch (Exception ex) {
            System.out.println("除SQLException、XAException之外的异常: \n");
            Logger.getLogger(Mutil_DataSource_Test.class.getName()).log(Level.SEVERE, null, ex);
         }
    }

    class MyXid implements Xid{
        int formatId;
        byte globalTransactionId[];
        byte branchQualifier[];
        public MyXid(){

        }
        public MyXid(int formatId,byte[] globalTransactionId,byte[] branchQualifier){
             this.formatId = formatId;
            this.globalTransactionId = globalTransactionId;
            this.branchQualifier = branchQualifier;
        }

        public int getFormatId() {
            return this.formatId;
        }
        public void setFormatId(int formatId){
            this.formatId = formatId;
        }
        public byte[] getGlobalTransactionId() {
            return this.globalTransactionId;
        }
        public void setGlobalTransactionId(byte[] globalTransactionId){
            this.globalTransactionId = globalTransactionId;
        }
        public byte[] getBranchQualifier() {
            return this.branchQualifier;
        }
        public void setBranchQualifier(byte[] branchQualifier){
            this.branchQualifier = branchQualifier;
        }
    }

    //多数据库测试
    public void test1() {
        //定义所需用到的变量
         Connection mysqlCn = null;
        Connection sqlCn = null;
        Connection mysqlCn2 = null;
        Connection oraCn = null;

        MysqlXADataSource mysqlDs = null;
        SQLServerXADataSource sqlDs = null;
        MysqlXADataSource mysqlDs2 = null;
        OracleXADataSource oraDs = null;

        XAConnection xamysqlCn = null;
        XAConnection xasqlCn = null;
        XAConnection xamysqlCn2 = null;
        XAConnection xaoraCn = null;

        XAResource xamysqlRes = null;
        XAResource xasqlRes = null;
        XAResource xamysqlRes2 = null;
        XAResource xaoraRes = null;

        Xid mysqlXid = null;
        Xid sqlXid = null;
        Xid mysqlXid2 = null;
        Xid oraXid = null;

        Statement mysqlpst = null;
        Statement sqlpst = null;
        Statement mysqlpst2 = null;
        Statement orapst = null;
    try{
        //获得数据源
        mysqlDs = new MysqlXADataSource();
        mysqlDs.setURL("jdbc:mysql://localhost:3306/test");
        mysqlDs2 = new MysqlXADataSource();
        mysqlDs2.setURL("jdbc:mysql://10.10.10.119:3306/test");
        sqlDs = new SQLServerXADataSource();
        sqlDs.setURL("jdbc:sqlserver://10.10.10.119:1433;DatabaseName=RTC;loginTimeout=20;user=sa;password=chgpwd122105");
//        sqlDs.setUser("sa");
//        sqlDs.setPassword("chgpwd122105");
//        sqlDs.setServerName("10.10.10.119");
//        sqlDs.setPortNumber(1433);
//        sqlDs.setDatabaseName("RTC");
        oraDs = new OracleXADataSource();
        oraDs.setURL("jdbc:oracle:thin:@10.10.10.119:1521:WMS");
        //获得连接
        xamysqlCn = mysqlDs.getXAConnection("root", "9999");
System.out.println("xamysqlCn: "+xamysqlCn);
        xasqlCn = sqlDs.getXAConnection();
System.out.println("xasqlCn: "+xasqlCn);
        xamysqlCn2 = mysqlDs2.getXAConnection("root", "9999");
System.out.println("xamysqlCn2: "+xamysqlCn2);
        xaoraCn = oraDs.getXAConnection("tiger", "tiger");
System.out.println("xaoraCn: "+xaoraCn);

        mysqlCn = xamysqlCn.getConnection();
        sqlCn = xasqlCn.getConnection();
        mysqlCn2 = xamysqlCn2.getConnection();
        oraCn = xaoraCn.getConnection();

        mysqlpst = mysqlCn.createStatement();
        sqlpst = sqlCn.createStatement();
        mysqlpst2 = mysqlCn2.createStatement();
        orapst = oraCn.createStatement();
        //定义XAResource
        xamysqlRes = xamysqlCn.getXAResource();
        xasqlRes = xasqlCn.getXAResource();
        xamysqlRes2 = xamysqlCn2.getXAResource();
        xaoraRes = xaoraCn.getXAResource();
        //定义Xid
        mysqlXid = new MyXid(0, new byte[]{0x01}, new byte[]{0x02});
        sqlXid = new MyXid(0, new byte[]{0x01}, new byte[]{0x03});
        mysqlXid2 = new MyXid(0, new byte[]{0x01}, new byte[]{0x04});
        oraXid = new MyXid(0, new byte[]{0x01}, new byte[]{0x05});
        //执行Mysql
        xamysqlRes.start(mysqlXid, XAResource.TMNOFLAGS);       
        mysqlpst.executeUpdate("insert into test values(4,'XA','F','Class4')");
        xamysqlRes.end(mysqlXid, XAResource.TMSUCCESS);
        //执行SQLServer
        xasqlRes.start(sqlXid, XAResource.TMNOFLAGS);
        sqlpst.executeUpdate("insert into test values('444')");
        xasqlRes.end(sqlXid, XAResource.TMSUCCESS);
        //执行Mysql
        xamysqlRes2.start(mysqlXid2, XAResource.TMNOFLAGS);
        mysqlpst2.executeUpdate("insert into test values(4,'XA','F','Class4')");
        xamysqlRes2.end(mysqlXid2, XAResource.TMSUCCESS);
        //执行Oracle
        System.out.println("xaoraRes: "+xaoraRes);
        xaoraRes.start(oraXid, XAResource.TMNOFLAGS);
        orapst.executeUpdate("insert into test123 values('4','44','444')");
        System.out.println("oraXid: "+oraXid);
        xaoraRes.end(oraXid, XAResource.TMSUCCESS);
        //准备
        int mysqlRea = xamysqlRes.prepare(mysqlXid);
        int sqlRea = xasqlRes.prepare(sqlXid);
        int mysqlRea2 = xamysqlRes2.prepare(mysqlXid2);
        int oraRea = xaoraRes.prepare(oraXid);
        //判断准备就绪与否  提交或回滚
        if(mysqlRea == xamysqlRes.XA_OK && mysqlRea2 == xamysqlRes.XA_OK && oraRea == xaoraRes.XA_OK && sqlRea == xasqlRes.XA_OK){
//        if(mysqlRea == xamysqlRes.XA_OK && mysqlRea2 == xamysqlRes.XA_OK && oraRea == xaoraRes.XA_OK){
//        if(mysqlRea == xamysqlRes.XA_OK && sqlRea == xasqlRes.XA_OK && mysqlRea2 == xamysqlRes.XA_OK){
//        if(mysqlRea == xamysqlRes.XA_OK && mysqlRea2 == xamysqlRes.XA_OK){
            xamysqlRes.commit(mysqlXid, false);
System.out.println("Mysql 事务提交成功!");
            xasqlRes.commit(sqlXid, false);
System.out.println("SQLServer 事务提交成功!");
            xamysqlRes2.commit(mysqlXid2, false);
System.out.println("Mysql2 事务提交成功!");
            xaoraRes.commit(oraXid, false);
System.out.println("Oracle 事务提交成功!");
        }else{
            xamysqlRes.rollback(mysqlXid);
            xasqlRes.rollback(sqlXid);
            xamysqlRes2.rollback(mysqlXid2);
            xaoraRes.rollback(oraXid);
System.out.println("事务回滚成功!");
        }
    }catch(SQLException ex){
        Logger.getLogger(Mutil_DataSource_Test.class.getName()).log(Level.SEVERE, null, ex);
         try{
            xamysqlRes.rollback(mysqlXid);
            xasqlRes.rollback(sqlXid);
            xamysqlRes2.rollback(mysqlXid2);
            xaoraRes.rollback(oraXid);
        }catch(XAException e){
            System.out.println("回滚也出错咯!~");
            e.printStackTrace();
        }
    }catch(XAException ex){
        Logger.getLogger(Mutil_DataSource_Test.class.getName()).log(Level.SEVERE, null, ex);
     }finally{
        try {
        //关闭
        mysqlpst.close();
        mysqlCn.close();
        xamysqlCn.close();
        sqlpst.close();
        sqlCn.close();
        xasqlCn.close();
        mysqlpst2.close();
        mysqlCn2.close();
        xamysqlCn2.close();
        orapst.close();
        oraCn.close();
        xaoraCn.close();
        } catch (SQLException ex) {
            Logger.getLogger(Mutil_DataSource_Test.class.getName()).log(Level.SEVERE, null, ex);
         }
    }
    }
}

分布式事务分为两个阶段,第一个阶段相当于是一个预提交,第二阶段才是正真的提交。
首先要实现的是Xid接口,formatId可以理解为一个全局事务的ID,不过我上面的代码没有去做一些异常的处理,还有正确的链接的关闭,只是自己做的一个小小的测试,以后项目中再去处理这些。第一阶段如果出错或是不顺利则不会提交,需要做一些回滚处理。如果顺利则准备提交,进入第二阶段,在第二阶段可能会出现问题,如果第一个分支事务提交成功了,而后有一个分支事务提交失败,这样则会造成数据不准确,目前还不知道有没有方法可以解决些问题,好像XAResource.recover()可以处理,但具体怎么解决,在项目中自己还是可以想到办法解决。
对XA分布式事务  我是初学者,而且也没有谁指点,只是在网上找些资料。我上面的理解可能也还有些不足或是错误。希望高手看到后能指点指点。

原文:http://blog.csdn.net/kangojian/article/details/6780305
分享到:
评论

相关推荐

    springboot多数据源即分布式事务解决方案

    SpringBoot作为一款轻量级的框架,提供了便捷的多数据源配置和分布式事务管理方案,使得开发者能够高效地管理和操作不同的数据库。本文将详细探讨SpringBoot如何实现多数据源以及分布式事务。 首先,我们要理解什么...

    SpringBoot+Atomikos分布式事务及多数据源动态切换,两种demo

    在现代企业级应用开发中,分布式事务处理和多数据源管理是常见的需求。Spring Boot作为轻量级的Java开发框架,结合Atomikos这样的分布式事务管理器,可以有效地解决这些问题。本文将深入探讨如何在Spring Boot项目中...

    springboot多数据源即分布式事务解决方案,添加对多线程的支持

    本教程将深入探讨如何在Spring Boot环境下实现多数据源操作及分布式事务管理,并加入对多线程的支持。 首先,我们来理解多数据源的概念。在大型系统中,往往需要连接多个数据库,如主库、从库、测试库等。Spring ...

    spring+druid+AtomikosDataSource实现多数据源切换及分布式事务控制

    在现代企业级应用开发中,多数据源切换和分布式事务管理是常见的需求,尤其是在大型分布式系统中。Spring框架因其强大的依赖注入和AOP(面向切面编程)特性,成为Java领域首选的轻量级框架。Druid是一个优秀的数据库...

    多数据源分布式事务管理调研报告.zip

    在现代企业级应用中,随着业务复杂性和...综上所述,多数据源分布式事务管理是一项复杂的任务,需要深入理解各种事务模型、解决方案和最佳实践,以便在保证数据一致性的前提下,构建高效、可靠且可扩展的Java应用程序。

    Springboot 动态多数据源 jta分布式事务

    本资源针对的是Spring Boot动态多数据源和JTA(Java Transaction API)分布式事务的实现,对于初学者来说非常实用。下面我们将深入探讨这些知识点。 首先,让我们了解一下Spring Boot的多数据源。在许多业务场景下...

    多数据源分布式事务管理调研报告.docx

    分布式事务是指在分布式环境下,跨越多个数据源的操作需要保证一致性,即所有操作要么全部成功,要么全部失败。这是因为业务功能往往需要横跨多个服务和数据库,而这些服务和数据库可能位于不同的资源服务器上。...

    Spring+Mybatis+Atomikos实现JAVA初始化并控制多个数据源+分布式事务

    在Java开发中,特别是在大型企业级应用中,处理多个数据源和分布式事务是常见的需求。本DEMO展示了如何使用Spring框架、Mybatis持久层框架以及Atomikos事务管理器来实现这一目标。以下是对这个DEMO中涉及的技术点的...

    jta分布式事务完成例子,测试通过

    例子虽小,可覆盖面广,设计spring载入.properties文件,spring配置jta和jotm分布式事务,设置数据源连接池,注解事务驱动。功能是在mysql上建立了两个数据库分别为dbone和dbtwo,在dbone里有表tb1,表中只有一个字段...

    Spring3.0+Hibernate+Atomikos多数据源分布式事务管理

    因此,多数据源分布式事务管理成为了必不可少的技术。本教程将聚焦于如何利用Spring 3.0、Hibernate ORM框架以及Atomikos这个开源事务管理器来实现高效、可靠的多数据源分布式事务处理。 **Spring 3.0**: Spring是...

    分布式事务源代码

    在大型网络应用中,数据往往分散在多个数据库或服务器上,为了保证这些数据的一致性,就需要使用分布式事务来协调跨多个节点的操作。本资料包“分布式事务源代码”将通过源代码的形式深入解析分布式事务的工作原理...

    java分布式系统架构源码

    Java分布式系统架构是一种将应用程序分布在多个计算节点上运行的技术,以提高系统的可伸缩性、容错性和性能。源码分析对于理解这种架构至关重要,尤其是对于开发者来说,它提供了深入学习和自定义系统的机会。本资源...

    java简单分布式架构,多个数据源,线程池多线程访问

    综上所述,这个项目涉及到的知识点包括:分布式系统设计、Java多线程与线程池、Spring框架的多数据源支持、MyBatis的使用以及Spring的事务管理。通过这些技术的组合,可以构建出一个高效、可扩展的分布式应用,以...

    多数据源动态切换及XA分布式事务Java实现v1.0.zip

    使用Atomikos支持分布式事务,Spring+Mybatis+Druid+AtomikosDataSource 使用手册: https://www.yuque.com/itman/wosfkn/mreame

    Spring多数据源分布式事务管理

    总的来说,Spring多数据源分布式事务管理是一项复杂的任务,涉及到Spring的AOP、数据源路由、事务管理等多个方面。通过Atomikos这样的JTA实现,我们可以有效地解决分布式环境下的事务一致性问题。同时,结合Druid和...

    分布式数据源,数据源的动态寻找,分布式事务JTA实现-spring-jta-mybatis.zip

    在分布式事务场景下,Spring的声明式事务管理可以简化代码,使得在处理多数据源事务时,只需在方法上添加@Transactional注解,事务的开始、提交、回滚等操作将自动进行。 具体到这个项目"spring-jta-mybatis-master...

    java+spring+mybatis+mysql+RuoYi-atomikos-实现分布式事务.zip

    本项目"java+spring+mybatis+mysql+RuoYi-atomikos-实现分布式事务.zip"是一个基于若依(RuoYi)框架改造的多模块分布式事务解决方案,它利用了Atomikos这一强大的分布式事务管理器。以下将详细解析这个项目的知识点...

    ejb3.0 分布式事务

    通过JNDI(Java Naming and Directory Interface)查找,EJB可以透明地访问这些数据源,参与分布式事务。 4. **JTA 和 XA 事务** JTA 是Java平台的标准事务API,支持分布式事务。在ejb3.0中,使用JTA进行事务管理...

    SpringBoot整合多数据源,并实现本地分布式事务

    总结,Spring Boot整合多数据源并实现本地分布式事务,需要理解数据源的配置、事务管理、读写分离策略以及分布式事务的协调机制。在实际项目中,结合Atomikos这样的工具,可以有效地处理复杂的事务场景,提高系统的...

Global site tag (gtag.js) - Google Analytics