`

Java实现saga分布式事务模型框架

阅读更多

 

调用第三方接口操作业务的时候经常要用到分布式事务,以保证事务的完整性,我们常用的分布式事务模型有saga、tcc、2pc等。2pc不属于接口调用的场景,所以我们调用第三方接口常用的模型有saga、tcc

 

tcc要保证comfirm时报所有dml操作请求写在comfirm实现里面,这个有一定的局限性,回滚的的实现写在cancel接口里面,这个对业务有一定的局限性

saga更适合长事务、业务比较复杂的场景

 

本文实现saga事务异常处理,如果需要处理那种接口comfirm超时的情况,需要将事务id及comfirm调用链持久化保存起来,异步检查,也可以写在缓存里面定时检查。这里没实现,应该比较容易实现。

 

  • saga的模型如下

saga模型

 

 

由上图模型不难看出我们在methodA就可以实现saga事务,只要serviceB、serviceC配合实现对应的cancel方法;本文实现java简易版本的sata框架模型代码。当然一定要实现阿里的seata框架或者其它也可以的,就是要另外搭建seata服务器跟相关数据库;

实现技术点:

1、lamdba

2、gclib动态代理

3、ThreadLocal

 

 

 

  •  saga使用测试

 

public class SagaTest {
	
	public void testFunc(){
	System.out.println("**********先测试 Func方法测试 开始***********");
	String failStr = "调用失败";
	//lambda表达式,jdk8才能执行。
	Test test1 = Func.proxy(new Test(),3, 
			Callback::success,//Callback::success方法调用
			(aa)->System.out.println(aa+failStr)
			);
//	try{
//	System.out.println("======== test start 测试动态代理,代理方法内部调用代理方法=========");
		test1.test();
	System.out.println("======== test end 测试动态代理,代理方法内部调用代理方法=========");
	System.out.println();
	System.out.println("======== successWithRetry3 start  测试动态代理失败重试三次=========");
		test1.successWithRetry3();
	System.out.println("======== successWithRetry3 end  测试动态代理失败重试三次=========");
	System.out.println();
//	System.out.println("======== successWithRetry4 start  测试动态代理失败重试四次=========");
//          因为动态代理对象test1最多重试4次,所以successWithRetry4重试4次后会报异常,不再往下执行。
//		test1.successWithRetry4();
//	System.out.println("======== successWithRetry4 end  测试动态代理失败重试四次=========");
//	}catch(Exception e){
//		
//	}
}	

/**
 * 测试saga事务
 * @author giant
 * @date 2021年12月17日
 */
public void methodA(){
		
		
		
		System.out.println("**********Gaga 测试 真正开始***********");
		Saga.begin();//saga开始。
		Test sagaTest = new Test();
//               每个saga事务节点有两个实现,
//               如下:1、业务实现comfirm方法:sagaTest.test()方法;2、cancel方法为lambda方法实现
		 Saga.service(sagaTest, (a)->{
			System.out.println("回滚t1");
		}).test();
		
		Saga.service(sagaTest, (a)->{
			System.out.println("回滚t2");
		}).test1();
		
		System.out.println("准备报错混滚:");
		Saga.service(sagaTest, (a)->{
			System.out.println("回滚t3");
		}).fail();
//         sagaTest.fail()方法抛出异常,会执行Saga.rollback(throwable)方法,
 //        rollback方法会执行所有执行过commit方法的cancel方法,回滚所有执行过的comfirm;
		
		Saga.end();//结束本次分布式事务
			
//			test1.test2();
//			Test test2 = Fun.proxy(new Test(),null,null);
//			test2.test1();
//			test2.test2();
		//successWithRetry4重试四次失败抛出异常,不往下跑了
		System.out.println("不抛出异常跑完;");
		
	}
	public static void main(String[] args) {
		new SagaTest().methodA();
	}
}
class Test{
	 int execCount = 0 ;
	public String test(){
		System.out.println("test");
		this.test1();
		return "test success";
	}
	public String test1(){
		String innerParam = "innerParam1";
		System.out.println("test1 exec,方法内部参数:"+innerParam);
		return "test1 success";
	}
	public void test2(String arg){
		System.out.println("test2:"+arg);
	}
	
	/**
	 * 跑第三次成功
	 * @author giant
	 * @date 2021年12月15日
	 * @return
	 */
	public String successWithRetry3(){
		
		if(execCount>=2){
			String str = "第"+(++execCount)+"次success";
			execCount = 0;
			return str;
		}else{
			System.out.println("fail:"+(++execCount));
			throw new RuntimeException("fail");
		}
	}
	
	
	/**
	 * 跑第第次成功
	 * @author giant
	 * @date 2021年12月15日
	 * @return
	 */
	public String successWithRetry4(){
		 execCount = 0 ;
		if(execCount>=3){
			String str = "第"+(++execCount)+"次success";
			execCount = 0;
			return str;
		}else{
			System.out.println("fail:"+(++execCount));
			throw new RuntimeException("fail");
		}
	}
	public String fail(){
			System.out.println("fail:");
			throw new RuntimeException("fail");
	}
	
	public String fail(String msg){
		System.out.println("fail:"+msg);
		throw new RuntimeException("fail");
	}
	
	
	
}



class Callback{
	 
	 public Callback(){}
		public static void success(Object obj){
			System.out.println(obj+"成功");
		}
		
		public void fail(Throwable e){
			System.out.println("执行失败");
		}
	}

 

 

 

最终执行fail方法异常报错:回滚Saga.service执行过cancel方法,输出内容如下:

 

**********Gaga 测试 真正开始***********
test
test1 exec,方法内部参数:innerParam1
test1 exec,方法内部参数:innerParam1
准备报错混滚:
fail:
回滚t1
回滚t2
回滚t3
Exception in thread "main" java.lang.RuntimeException: fail
	at com.framework.utils.func.Test.fail(SagaTest.java:132)
	at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a.CGLIB$fail$4(<generated>)
	at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a$$FastClassByCGLIB$$c8387779.invoke(<generated>)
	at net.sf.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:228)
	at com.framework.utils.func.Func.lambda$0(Func.java:51)
	at com.framework.utils.func.Func.retry(Func.java:73)
	at com.framework.utils.func.Func.intercept(Func.java:48)
	at com.framework.utils.func.Test$$EnhancerByCGLIB$$6580e53a.fail(<generated>)
	at com.framework.utils.func.SagaTest.methodA(SagaTest.java:58)
	at com.framework.utils.func.SagaTest.main(SagaTest.java:75)

 

 

 

 

  • 代码实现

 

 

 maven导入gclib动态代理

 

<dependency>
	<groupId>cglib</groupId>
	<artifactId>cglib</artifactId>
	<version>3.2.12</version>
</dependency>

 

 

1、saga事务重试接口

 

/**
 * lambda表达式实现,方法失败重试接口
 * @author giant
 * @date 2021年12月15日
 * @param <T> 返回对象
 */
@FunctionalInterface
interface IRetry<T>{

	public T run() throws Throwable;
}

 

 

 2、成功或者失败后的回调接口

 

/**
 * lambda表达式实现,方法成功或者失败的回调接口
 * @author giant
 * @date 2021年12月15日
 * @param <T> 参数对象
 */
@FunctionalInterface
public interface ICallback<T> {

	public void run(T obj);
}

 

 

3、通过动态代理service中所有的方法,实现service的重试、成功及错误的回调

 

 

import java.lang.reflect.Method;

import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;


/**
 * * 方法执行动态代理,实现代理类方法重试,方法调用成功或者失败后的反调函数实现
 * 代理对象调用方法时,方法与方法直接可嵌套使用。
 * @author giant
 * @date 2021年12月15日
 */
public class Func implements MethodInterceptor{
	
	/**
	 * 是否saga模式,如果是saga模式,建在报错的时候统一执行Saga.sagaFailCallBacks集合里面的方法
	 * 
	 */
	private boolean isSaga=false;
	
	private ICallback<Throwable> fail = null;
	private ICallback<Object> success  = null;
	private int retry = 1;
	private Object t = null;
	
	/**
	 * 不允许外表创建该对象
	 * @param t
	 * @param success
	 * @param fail
	 * @param retry
	 */
	 Func(Object t,ICallback<Object> success,ICallback<Throwable> fail,int retry,boolean isSaga) {
		// TODO Auto-generated constructor stub
		this.t = t;
		this.success = success;
		this.fail = fail;
		this.retry = retry;
		this.isSaga = isSaga;
	}
	

	
	@Override
	public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
		Object o2 = retry(()->{
			Object o1 = null;
			try{
				 o1 = methodProxy.invokeSuper(o, objects);
				return o1;
			}catch(Throwable e){
				throw e;
			}
//			return null;
		}, Object.class,this.retry,this.success,this.fail);
		return o2;
	}
	
	/**
	 * 失败重试三次
	 * @param run
	 * @param clz
	 * @return
	 */
	private <T> T retry(IRetry<T> run,Class <T> clz,int count,ICallback<Object> success,ICallback<Throwable>fail){
		int i=0;
		T result = null;
		boolean isSuccess = false;
		do{
			try {
				result = run.run();
				isSuccess = true;
				 if(success!=null){
					success.run(result);
				 }
			}catch(Throwable e){
				if(fail!=null&&!isSaga){
					fail.run(e);
				}
				if(i==retry-1){
					//向外抛出的异常只认最后一次
					if(isSaga){//如果是saga处理,统一最后回滚事务
						Saga.rollback(e);
					}
					throw e;
				}
			}
		}while((!isSuccess)&&++i<count);
		return result;
	}
	
	 //生成代理对象
	Object get(){
	  //new 一个Enhancer对象
	  Enhancer enhancer = new Enhancer();
	  //指定他的父类(注意这 是实现类,不是接口)
	  enhancer.setSuperclass(t.getClass());
	  //指定真正做事情的回调方法
	  enhancer.setCallback(this);
	  //生成代理类对象
	  Object o =  enhancer.create();
	  //返回
	  return o;
	 }
	 
	/**
	 * 给代理对象赋能,支持方法重试、成功及失败的回调
	 * @author giant
	 * @date 2021年12月15日
	 * @param obj 代理对象
	 * @param retry 代理对象失败重试次数
	 * @param success 代理对象所有方法成功回调函数(不报异常就算成功)
	 * @param fail 代理对象所有方法失败回调函数。(报异常就算失败)
	 * @return 返回代理
	 */
	 @SuppressWarnings("unchecked")
	public static <T2> T2 proxy(T2 obj,int retry,ICallback<Object> success,ICallback<Throwable> fail){
		 try {
			Func p = new Func(obj,success,fail,retry,false);
			return (T2)p.get();
		} catch (Exception e) {
			throw new RuntimeException(e);
			
		}
	 }
	 
	 /**
	  * 给代理对象赋能,支持方法成功及失败的回调
	  * @author giant
	  * @date 2021年12月15日
	  * @param obj 代理对象
	  * @param success 代理对象所有方法成功回调函数(不报异常就算成功)
	  * @param fail 代理对象所有方法失败回调函数(报异常就算失败)
	  * @return 返回代理
	  */
	 public static <T2> T2 proxy(T2 obj,ICallback <Object>success,ICallback<Throwable> fail){
		 return proxy(obj, 1, success, fail);
	 }
	 
	
}

 

 

4、Saga模型框架基于Func类实现

 

package com.framework.utils.func;

import java.util.ArrayList;
import java.util.List;
/**
 * saga分布式事务服务类
 * 所有Gaga.service()调用的方法必须在Gaga.begin()和Gaga.begin()中间。
 * xid就是threadid,可以通过Gaga.begin()获取
 * @author giant
 * @date 2021年12月15日
 */
public class Saga {
	/**
         * 
	 * 记录事务所有saga节点回滚函数,该对象代替了事件表,每次Gaga分布式事务必须调用Saga.begin()跟Saga.end();
         *
	 */
	static ThreadLocal<List<ICallback<Throwable>>> sagaFailCallBacks = new ThreadLocal<>();
	
        /**
	 * saga事务开始;每次Gaga分布式事务必须调用Saga.begin()跟Saga.end();
	 * @author giant
	 * @date 2021年12月15日
	 */
	public static long begin() {
		long xid = Thread.currentThread().getId();
		List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get();
		if (fails == null) {
			fails = new ArrayList<ICallback<Throwable>>();
			Saga.sagaFailCallBacks.set(fails);
		} else {
			throw new SagaException("程序未关闭saga,请在finally方法执行Saga.end()");
		}
		return xid;
	}

	
	
	/**
	 * 代理事务节点,该节点执行失败,会根据参数重试,如果方法报错,会回滚其它所有事务的failRollback方法
	 * (注意:failRollback方法默认时所有都能执行成功的;建议回滚方法调用Func.proxy retry多次)
	 * @author giant
	 * @date 2021年12月15日
	 * @param obj 事务节点对象
	 * @param retry 事务失败重试次数
	 * @param failRollback 如果其中一个service模块失败,将在异常的时候回归,或者调用Gaga.rollback(null)回滚。
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public static <T2> T2 service(T2 obj, int retry, ICallback<Throwable> failRollback) {
		try {
			List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get();
			if (fails == null) {
				throw new SagaException("请先开启Saga,调用Saga.begin()");
			}
			fails.add(failRollback);
			Func p = new Func(obj, null, failRollback, retry, true);
			return (T2) p.get();
		} catch (Exception e) {
			throw new SagaException(e);

		}
	}

	/**
	 * 代理事务节点,如果报错方法报错,会回滚其它所有事务的failRollback方法
	 * (注意:failRollback方法默认时所有都能执行成功的;建议回滚方法调用Func.proxy retry多次)
	 * @author giant
	 * @date 2021年12月15日
	 * @param obj 事务节点对象
	 * @param failRollback
	 * @return
	 */
	@SuppressWarnings("unchecked")
	public static <T2> T2 service(T2 obj, ICallback<Throwable> failRollback) {
		return service(obj, 1, failRollback);
	}

	/**
	 * 回滚所有saga执行的链路,在本地方方法抛异常时也可调用,回滚所有方法
	 * 
	 * @author giant
	 * @date 2021年12月15日
	 * @param e
	 */
	public static void rollback(Throwable e) {
		List<ICallback<Throwable>> failCallbacks = sagaFailCallBacks.get();
		if (failCallbacks != null) {
			// 这里认为所有方法fail调用都是成功的,如果失败,就是人工处理了。
			failCallbacks.forEach(f -> f.run(e));
			failCallbacks.clear();
		}
		sagaFailCallBacks.set(null);
	}
	
	/**
	 * 结束saga,这个方法是要在saga事务结束的时候调用,调用了Saga.begin()就必须调用该方法
	 * @author giant
	 * @date 2021年12月15日
	 */
	public static void end() {
		List<ICallback<Throwable>> fails = Saga.sagaFailCallBacks.get();
		if (fails != null) {
			fails.clear();
		}
		Saga.sagaFailCallBacks.set(null);
	}
}

 

 

saga异常实现

 

package com.framework.utils.func;

public class SagaException extends RuntimeException {
	
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	public SagaException(String e){
		super(e);
	}
	public SagaException(Throwable e){
		super(e);
	}
}

 

 

 

 

 

 

0
0
分享到:
评论

相关推荐

    Seata是一种易于使用高性能基于Java的开源分布式事务解决方案

    4. XA两阶段提交:遵循ACID(原子性、一致性、隔离性和持久性)原则的传统分布式事务模型。Seata也支持这种模式,但在高并发场景下可能存在性能瓶颈。 Seata的设计理念是轻量级和自治,这意味着它尽可能减少对业务...

    分布式事务源代码

    本资料包“分布式事务源代码”将通过源代码的形式深入解析分布式事务的工作原理及其实现方式。 分布式事务主要涉及以下知识点: 1. **两阶段提交(2PC, Two-Phase Commit)**:这是最基础的分布式事务协议,包括...

    分布式事务专题_java_分布式_

    8. **分布式事务解决方案**:现代Java框架如Spring Cloud Data Flow、Apache分布式事务组件(如Atomikos、Bitronix)提供了完善的分布式事务支持。此外,还有一些新兴技术,如Seata(前身是TCC-Action)、...

    Java 分布式事务(多数据源)

    3. **Spring框架的事务管理**:Spring提供了声明式事务管理,允许开发者通过注解或配置文件来控制事务边界,简化了分布式事务的编程模型。Spring支持JTA事务管理,可以与多种事务管理器如Atomikos、Bitronix等配合...

    多数据源分布式事务管理调研报告.zip

    - 一致性:不同的分布式事务模型在一致性和可用性之间有不同的权衡,需要根据业务需求选择合适方案。 4. **最佳实践**: - 事务设计:避免长时间持有事务,尽早提交或回滚,降低锁竞争。 - 事务隔离级别:合理...

    分布式事务管理书库库事务

    三、分布式事务模型 1. **两阶段提交(2PC)**:经典的分布式事务协议,分为准备阶段和提交阶段。但2PC存在单点故障、阻塞等问题。 2. **三阶段提交(3PC)**:为了解决2PC的问题,引入了预提交阶段,减少阻塞的...

    基于dubbo的分布式事务实现demo源码.zip

    Dubbo,作为阿里巴巴开源的高性能Java RPC框架,自然也提供了对分布式事务的支持。 在基于Dubbo的分布式事务实现中,通常有以下几种解决方案: 1. **两阶段提交(2PC)**:这是一个经典的分布式事务协议,包括准备...

    计算机课程大作业基于SpringBoot+Seata实现分布式事务管理系统.7z

    这是一个关于使用SpringBoot和Seata构建分布式...通过这个项目,学生将深入理解Spring Boot的使用,学习如何设计和实现微服务,以及如何利用Seata解决分布式事务的挑战。此外,还将锻炼到项目管理和文档编写的技能。

    分布式事务设计方案.docx

    本文档主要涵盖了基于Java的分布式事务设计,特别是利用可靠消息服务(如RocketMQ)来实现分布式事务的方案。 1.3 读者对象 本文档面向软件开发者、系统架构师以及对分布式系统和事务管理感兴趣的人员。 2. 分布式...

    浅谈常用的分布式事务选型

    本文将探讨几种常用的分布式事务处理方案,包括通用分布式事务规范XA、JAVA分布式事务规范(JTA)以及一些常见的分布式事务框架如2PC/3PC、TCC、MQ、Seata和Saga。 #### 二、分布式事务基础知识 ##### 2.1 事务的...

    Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务

    Seata 就是为了解决这个问题,它实现了基于柔性事务的TCC(Try-Confirm-Cancel)模式、Saga模式以及AT(Automatic Transaction)模式,这三种模式都是业界公认的分布式事务解决方案。 1. TCC 模式:TCC 是 Try-...

    XAtest分布式事务源代码

    10. **编程模型**:学习XAtest源码,可以了解如何在Java应用程序中嵌入分布式事务管理,包括如何声明和启动事务,以及如何处理事务边界。 通过对XAtest源代码的学习,开发者不仅可以理解分布式事务的原理,还能掌握...

    分布式事务的一种实现方式状态流转共5页.pdf.zip

    具体实现可能会结合具体的编程语言、框架和数据库系统,如Java的JTA、分布式数据库如MySQL的InnoDB Cluster,或是NoSQL数据库如Cassandra的分布式事务支持。通过深入理解和实践这些技术和概念,开发者能够构建出更...

    seata分布式事物模型代码

    Seata的默认模式是AT模式,它通过记录原始SQL和回滚SQL来实现分布式事务。在事务开始时,记录预执行的SQL和行级数据快照;如果事务提交,则执行提交SQL;如果事务回滚,则基于快照数据执行回滚SQL。 3. **TCC模式...

    分布式事务

    分布式事务是计算机科学中涉及数据库和网络通信的重要概念,它在多台计算机或者多个系统...开发者在设计和实现分布式系统时,需要根据业务需求和系统规模选择合适的分布式事务策略,并充分利用现有的开源工具和框架。

    Java 分布式应用程序设计

    - Saga:长事务模型,将大事务拆分为一系列小事务,通过回滚链路恢复。 7. **分布式锁**: - Redis和ZooKeeper:常用于实现分布式锁,提供高并发下的资源互斥访问。 - RedLock:Redis作者提出的一种分布式锁算法...

    Java面试宝典:包含Redis各种使用场景,分布式事务,分布式锁,DB,异步并发,JVM,微服务组件,常见的设计模式等

    这份宝典深入探讨了Redis的各种使用场景、分布式事务处理、分布式锁的实现、数据库操作、异步并发编程、JVM(Java虚拟机)优化、微服务组件的使用以及经典的设计模式。以下是对这些主题的详细阐述: 1. **Redis使用...

    分布式Java应用基础与实践源码.zip

    - Spring框架:Spring的分布式支持包括远程代理、消息驱动Bean和分布式事务管理等,是现代Java应用的基石。 - JMS:Java消息服务允许分布式系统中的组件通过异步消息传递进行通信。 - ZooKeeper:Apache ...

    学习总结 包括Java JVM MySQL NoSQL UML 缓存 消息 分布式事务 SOA 微服务 敏捷 架构设.zip

    这个名为"学习总结 包括Java JVM MySQL NoSQL UML 缓存 消息 分布式事务 SOA 微服务 敏捷 架构设.zip"的压缩包文件,显然包含了一系列IT领域的核心主题,这些主题都是现代软件开发和系统设计的基石。以下是对每个...

    [分布式Java应用:基础与实践].林昊.高清文字版.pdf

    6. 分布式事务管理:2PC、补偿事务(TCC)、Saga等事务管理策略在分布式环境中的应用。 7. 数据库分片技术:如Mycat,用于解决单个数据库性能瓶颈,实现数据水平扩展。 8. Linux基础与运维:书中可能包括Linux操作...

Global site tag (gtag.js) - Google Analytics