调用第三方接口操作业务的时候经常要用到分布式事务,以保证事务的完整性,我们常用的分布式事务模型有saga、tcc、2pc等。2pc不属于接口调用的场景,所以我们调用第三方接口常用的模型有saga、tcc
tcc要保证comfirm时报所有dml操作请求写在comfirm实现里面,这个有一定的局限性,回滚的的实现写在cancel接口里面,这个对业务有一定的局限性
saga更适合长事务、业务比较复杂的场景
本文实现saga事务异常处理,如果需要处理那种接口comfirm超时的情况,需要将事务id及comfirm调用链持久化保存起来,异步检查,也可以写在缓存里面定时检查。这里没实现,应该比较容易实现。
- saga的模型如下
由上图模型不难看出我们在methodA就可以实现saga事务,只要serviceB、serviceC配合实现对应的cancel方法;本文实现java简易版本的sata框架模型代码。当然一定要实现阿里的seata框架或者其它也可以的,就是要另外搭建seata服务器跟相关数据库;
实现技术点:
1、lamdba
2、gclib动态代理
3、ThreadLocal
- saga使用测试
public class SagaTest { public void testFunc(){ System.out.println("**********先测试 Func方法测试 开始***********"); String failStr = "调用失败"; //lambda表达式,jdk8才能执行。 Test test1 = Func.proxy(new Test(),3, Callback::success,//Callback::success方法调用 (aa)->System.out.println(aa+failStr) ); // try{ // System.out.println("======== test start 测试动态代理,代理方法内部调用代理方法========="); test1.test(); System.out.println("======== test end 测试动态代理,代理方法内部调用代理方法========="); System.out.println(); System.out.println("======== successWithRetry3 start 测试动态代理失败重试三次========="); test1.successWithRetry3(); System.out.println("======== successWithRetry3 end 测试动态代理失败重试三次========="); System.out.println(); // System.out.println("======== successWithRetry4 start 测试动态代理失败重试四次========="); // 因为动态代理对象test1最多重试4次,所以successWithRetry4重试4次后会报异常,不再往下执行。 // test1.successWithRetry4(); // System.out.println("======== successWithRetry4 end 测试动态代理失败重试四次========="); // }catch(Exception e){ // // } } /** * 测试saga事务 * @author giant * @date 2021年12月17日 */ public void methodA(){ System.out.println("**********Gaga 测试 真正开始***********"); Saga.begin();//saga开始。 Test sagaTest = new Test(); // 每个saga事务节点有两个实现, // 如下:1、业务实现comfirm方法:sagaTest.test()方法;2、cancel方法为lambda方法实现 Saga.service(sagaTest, (a)->{ System.out.println("回滚t1"); }).test(); Saga.service(sagaTest, (a)->{ System.out.println("回滚t2"); }).test1(); System.out.println("准备报错混滚:"); Saga.service(sagaTest, (a)->{ System.out.println("回滚t3"); }).fail(); // sagaTest.fail()方法抛出异常,会执行Saga.rollback(throwable)方法, // rollback方法会执行所有执行过commit方法的cancel方法,回滚所有执行过的comfirm; Saga.end();//结束本次分布式事务 // test1.test2(); // Test test2 = Fun.proxy(new Test(),null,null); // test2.test1(); // test2.test2(); //successWithRetry4重试四次失败抛出异常,不往下跑了 System.out.println("不抛出异常跑完;"); } public static void main(String[] args) { new SagaTest().methodA(); } } class Test{ int execCount = 0 ; public String test(){ System.out.println("test"); this.test1(); return "test success"; } public String test1(){ String innerParam = "innerParam1"; System.out.println("test1 exec,方法内部参数:"+innerParam); return "test1 success"; } public void test2(String arg){ System.out.println("test2:"+arg); } /** * 跑第三次成功 * @author giant * @date 2021年12月15日 * @return */ public String successWithRetry3(){ if(execCount>=2){ String str = "第"+(++execCount)+"次success"; execCount = 0; return str; }else{ System.out.println("fail:"+(++execCount)); throw new RuntimeException("fail"); } } /** * 跑第第次成功 * @author giant * @date 2021年12月15日 * @return */ public String successWithRetry4(){ execCount = 0 ; if(execCount>=3){ String str = "第"+(++execCount)+"次success"; execCount = 0; return str; }else{ System.out.println("fail:"+(++execCount)); throw new RuntimeException("fail"); } } public String fail(){ System.out.println("fail:"); throw new RuntimeException("fail"); } public String fail(String msg){ System.out.println("fail:"+msg); throw new RuntimeException("fail"); } } class Callback{ public Callback(){} public static void success(Object obj){ System.out.println(obj+"成功"); } public void fail(Throwable e){ System.out.println("执行失败"); } }
最终执行fail方法异常报错:回滚Saga.service执行过cancel方法,输出内容如下:
**********Gaga 测试 真正开始*********** test test1 exec,方法内部参数:innerParam1 test1 exec,方法内部参数:innerParam1 准备报错混滚: fail: 回滚t1 回滚t2 回滚t3 Exception in thread "main" java.lang.RuntimeException: fail at com.framework.utils.func.Test.fail(SagaTest.java:132) at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a.CGLIB$fail$4(<generated>) at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a$$FastClassByCGLIB$$c8387779.invoke(<generated>) at net.sf.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:228) at com.framework.utils.func.Func.lambda$0(Func.java:51) at com.framework.utils.func.Func.retry(Func.java:73) at com.framework.utils.func.Func.intercept(Func.java:48) at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a.fail(<generated>) at com.framework.utils.func.SagaTest.methodA(SagaTest.java:58) at com.framework.utils.func.SagaTest.main(SagaTest.java:75)
- 代码实现
maven导入gclib动态代理
<dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>3.2.12</version> </dependency>
1、saga事务重试接口
/** * lambda表达式实现,方法失败重试接口 * @author giant * @date 2021年12月15日 * @param <T> 返回对象 */ @FunctionalInterface interface IRetry<T>{ public T run() throws Throwable; }
2、成功或者失败后的回调接口
/** * lambda表达式实现,方法成功或者失败的回调接口 * @author giant * @date 2021年12月15日 * @param <T> 参数对象 */ @FunctionalInterface public interface ICallback<T> { public void run(T obj); }
3、通过动态代理service中所有的方法,实现service的重试、成功及错误的回调
import java.lang.reflect.Method; import net.sf.cglib.proxy.Enhancer; import net.sf.cglib.proxy.MethodInterceptor; import net.sf.cglib.proxy.MethodProxy; /** * * 方法执行动态代理,实现代理类方法重试,方法调用成功或者失败后的反调函数实现 * 代理对象调用方法时,方法与方法直接可嵌套使用。 * @author giant * @date 2021年12月15日 */ public class Func implements MethodInterceptor{ /** * 是否saga模式,如果是saga模式,建在报错的时候统一执行Saga.sagaFailCallBacks集合里面的方法 * */ private boolean isSaga=false; private ICallback<Throwable> fail = null; private ICallback<Object> success = null; private int retry = 1; private Object t = null; /** * 不允许外表创建该对象 * @param t * @param success * @param fail * @param retry */ Func(Object t,ICallback<Object> success,ICallback<Throwable> fail,int retry,boolean isSaga) { // TODO Auto-generated constructor stub this.t = t; this.success = success; this.fail = fail; this.retry = retry; this.isSaga = isSaga; } @Override public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { Object o2 = retry(()->{ Object o1 = null; try{ o1 = methodProxy.invokeSuper(o, objects); return o1; }catch(Throwable e){ throw e; } // return null; }, Object.class,this.retry,this.success,this.fail); return o2; } /** * 失败重试三次 * @param run * @param clz * @return */ private <T> T retry(IRetry<T> run,Class <T> clz,int count,ICallback<Object> success,ICallback<Throwable>fail){ int i=0; T result = null; boolean isSuccess = false; do{ try { result = run.run(); isSuccess = true; if(success!=null){ success.run(result); } }catch(Throwable e){ if(fail!=null&&!isSaga){ fail.run(e); } if(i==retry-1){ //向外抛出的异常只认最后一次 if(isSaga){//如果是saga处理,统一最后回滚事务 Saga.rollback(e); } throw e; } } }while((!isSuccess)&&++i<count); return result; } //生成代理对象 Object get(){ //new 一个Enhancer对象 Enhancer enhancer = new Enhancer(); //指定他的父类(注意这 是实现类,不是接口) enhancer.setSuperclass(t.getClass()); //指定真正做事情的回调方法 enhancer.setCallback(this); //生成代理类对象 Object o = enhancer.create(); //返回 return o; } /** * 给代理对象赋能,支持方法重试、成功及失败的回调 * @author giant * @date 2021年12月15日 * @param obj 代理对象 * @param retry 代理对象失败重试次数 * @param success 代理对象所有方法成功回调函数(不报异常就算成功) * @param fail 代理对象所有方法失败回调函数。(报异常就算失败) * @return 返回代理 */ @SuppressWarnings("unchecked") public static <T2> T2 proxy(T2 obj,int retry,ICallback<Object> success,ICallback<Throwable> fail){ try { Func p = new Func(obj,success,fail,retry,false); return (T2)p.get(); } catch (Exception e) { throw new RuntimeException(e); } } /** * 给代理对象赋能,支持方法成功及失败的回调 * @author giant * @date 2021年12月15日 * @param obj 代理对象 * @param success 代理对象所有方法成功回调函数(不报异常就算成功) * @param fail 代理对象所有方法失败回调函数(报异常就算失败) * @return 返回代理 */ public static <T2> T2 proxy(T2 obj,ICallback <Object>success,ICallback<Throwable> fail){ return proxy(obj, 1, success, fail); } }
4、Saga模型框架基于Func类实现
package com.framework.utils.func; import java.util.ArrayList; import java.util.List; /** * saga分布式事务服务类 * 所有Gaga.service()调用的方法必须在Gaga.begin()和Gaga.begin()中间。 * xid就是threadid,可以通过Gaga.begin()获取 * @author giant * @date 2021年12月15日 */ public class Saga { /** * * 记录事务所有saga节点回滚函数,该对象代替了事件表,每次Gaga分布式事务必须调用Saga.begin()跟Saga.end(); * */ static ThreadLocal<List<ICallback<Throwable>>> sagaFailCallBacks = new ThreadLocal<>(); /** * saga事务开始;每次Gaga分布式事务必须调用Saga.begin()跟Saga.end(); * @author giant * @date 2021年12月15日 */ public static long begin() { long xid = Thread.currentThread().getId(); List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get(); if (fails == null) { fails = new ArrayList<ICallback<Throwable>>(); Saga.sagaFailCallBacks.set(fails); } else { throw new SagaException("程序未关闭saga,请在finally方法执行Saga.end()"); } return xid; } /** * 代理事务节点,该节点执行失败,会根据参数重试,如果方法报错,会回滚其它所有事务的failRollback方法 * (注意:failRollback方法默认时所有都能执行成功的;建议回滚方法调用Func.proxy retry多次) * @author giant * @date 2021年12月15日 * @param obj 事务节点对象 * @param retry 事务失败重试次数 * @param failRollback 如果其中一个service模块失败,将在异常的时候回归,或者调用Gaga.rollback(null)回滚。 * @return */ @SuppressWarnings("unchecked") public static <T2> T2 service(T2 obj, int retry, ICallback<Throwable> failRollback) { try { List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get(); if (fails == null) { throw new SagaException("请先开启Saga,调用Saga.begin()"); } fails.add(failRollback); Func p = new Func(obj, null, failRollback, retry, true); return (T2) p.get(); } catch (Exception e) { throw new SagaException(e); } } /** * 代理事务节点,如果报错方法报错,会回滚其它所有事务的failRollback方法 * (注意:failRollback方法默认时所有都能执行成功的;建议回滚方法调用Func.proxy retry多次) * @author giant * @date 2021年12月15日 * @param obj 事务节点对象 * @param failRollback * @return */ @SuppressWarnings("unchecked") public static <T2> T2 service(T2 obj, ICallback<Throwable> failRollback) { return service(obj, 1, failRollback); } /** * 回滚所有saga执行的链路,在本地方方法抛异常时也可调用,回滚所有方法 * * @author giant * @date 2021年12月15日 * @param e */ public static void rollback(Throwable e) { List<ICallback<Throwable>> failCallbacks = sagaFailCallBacks.get(); if (failCallbacks != null) { // 这里认为所有方法fail调用都是成功的,如果失败,就是人工处理了。 failCallbacks.forEach(f -> f.run(e)); failCallbacks.clear(); } sagaFailCallBacks.set(null); } /** * 结束saga,这个方法是要在saga事务结束的时候调用,调用了Saga.begin()就必须调用该方法 * @author giant * @date 2021年12月15日 */ public static void end() { List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get(); if (fails != null) { fails.clear(); } Saga.sagaFailCallBacks.set(null); } }
saga异常实现
package com.framework.utils.func; public class SagaException extends RuntimeException { /** * */ private static final long serialVersionUID = 1L; public SagaException(String e){ super(e); } public SagaException(Throwable e){ super(e); } }
相关推荐
4. XA两阶段提交:遵循ACID(原子性、一致性、隔离性和持久性)原则的传统分布式事务模型。Seata也支持这种模式,但在高并发场景下可能存在性能瓶颈。 Seata的设计理念是轻量级和自治,这意味着它尽可能减少对业务...
本资料包“分布式事务源代码”将通过源代码的形式深入解析分布式事务的工作原理及其实现方式。 分布式事务主要涉及以下知识点: 1. **两阶段提交(2PC, Two-Phase Commit)**:这是最基础的分布式事务协议,包括...
8. **分布式事务解决方案**:现代Java框架如Spring Cloud Data Flow、Apache分布式事务组件(如Atomikos、Bitronix)提供了完善的分布式事务支持。此外,还有一些新兴技术,如Seata(前身是TCC-Action)、...
3. **Spring框架的事务管理**:Spring提供了声明式事务管理,允许开发者通过注解或配置文件来控制事务边界,简化了分布式事务的编程模型。Spring支持JTA事务管理,可以与多种事务管理器如Atomikos、Bitronix等配合...
- 一致性:不同的分布式事务模型在一致性和可用性之间有不同的权衡,需要根据业务需求选择合适方案。 4. **最佳实践**: - 事务设计:避免长时间持有事务,尽早提交或回滚,降低锁竞争。 - 事务隔离级别:合理...
三、分布式事务模型 1. **两阶段提交(2PC)**:经典的分布式事务协议,分为准备阶段和提交阶段。但2PC存在单点故障、阻塞等问题。 2. **三阶段提交(3PC)**:为了解决2PC的问题,引入了预提交阶段,减少阻塞的...
Dubbo,作为阿里巴巴开源的高性能Java RPC框架,自然也提供了对分布式事务的支持。 在基于Dubbo的分布式事务实现中,通常有以下几种解决方案: 1. **两阶段提交(2PC)**:这是一个经典的分布式事务协议,包括准备...
本文档主要涵盖了基于Java的分布式事务设计,特别是利用可靠消息服务(如RocketMQ)来实现分布式事务的方案。 1.3 读者对象 本文档面向软件开发者、系统架构师以及对分布式系统和事务管理感兴趣的人员。 2. 分布式...
本文将探讨几种常用的分布式事务处理方案,包括通用分布式事务规范XA、JAVA分布式事务规范(JTA)以及一些常见的分布式事务框架如2PC/3PC、TCC、MQ、Seata和Saga。 #### 二、分布式事务基础知识 ##### 2.1 事务的...
Seata 就是为了解决这个问题,它实现了基于柔性事务的TCC(Try-Confirm-Cancel)模式、Saga模式以及AT(Automatic Transaction)模式,这三种模式都是业界公认的分布式事务解决方案。 1. TCC 模式:TCC 是 Try-...
10. **编程模型**:学习XAtest源码,可以了解如何在Java应用程序中嵌入分布式事务管理,包括如何声明和启动事务,以及如何处理事务边界。 通过对XAtest源代码的学习,开发者不仅可以理解分布式事务的原理,还能掌握...
具体实现可能会结合具体的编程语言、框架和数据库系统,如Java的JTA、分布式数据库如MySQL的InnoDB Cluster,或是NoSQL数据库如Cassandra的分布式事务支持。通过深入理解和实践这些技术和概念,开发者能够构建出更...
Seata的默认模式是AT模式,它通过记录原始SQL和回滚SQL来实现分布式事务。在事务开始时,记录预执行的SQL和行级数据快照;如果事务提交,则执行提交SQL;如果事务回滚,则基于快照数据执行回滚SQL。 3. **TCC模式...
分布式事务是计算机科学中涉及数据库和网络通信的重要概念,它在多台计算机或者多个系统...开发者在设计和实现分布式系统时,需要根据业务需求和系统规模选择合适的分布式事务策略,并充分利用现有的开源工具和框架。
- Saga:长事务模型,将大事务拆分为一系列小事务,通过回滚链路恢复。 7. **分布式锁**: - Redis和ZooKeeper:常用于实现分布式锁,提供高并发下的资源互斥访问。 - RedLock:Redis作者提出的一种分布式锁算法...
这份宝典深入探讨了Redis的各种使用场景、分布式事务处理、分布式锁的实现、数据库操作、异步并发编程、JVM(Java虚拟机)优化、微服务组件的使用以及经典的设计模式。以下是对这些主题的详细阐述: 1. **Redis使用...
- Spring框架:Spring的分布式支持包括远程代理、消息驱动Bean和分布式事务管理等,是现代Java应用的基石。 - JMS:Java消息服务允许分布式系统中的组件通过异步消息传递进行通信。 - ZooKeeper:Apache ...
这个名为"学习总结 包括Java JVM MySQL NoSQL UML 缓存 消息 分布式事务 SOA 微服务 敏捷 架构设.zip"的压缩包文件,显然包含了一系列IT领域的核心主题,这些主题都是现代软件开发和系统设计的基石。以下是对每个...
6. 分布式事务管理:2PC、补偿事务(TCC)、Saga等事务管理策略在分布式环境中的应用。 7. 数据库分片技术:如Mycat,用于解决单个数据库性能瓶颈,实现数据水平扩展。 8. Linux基础与运维:书中可能包括Linux操作...
- 两阶段提交(2PC)、三阶段提交(3PC):经典的分布式事务协议,但存在缺点,如阻塞和单点故障。 - TCC(Try-Confirm-Cancel):补偿型事务处理模式,避免了2PC的缺点。 - Saga:长事务处理,通过一系列原子...