`
herman_liu76
  • 浏览: 99582 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

用AOP与Threadlocal实现超简单TCC事务框架

阅读更多
用AOP与Threadlocal实现一个mini的TCC事务框架

TCC是处理分布式事务的一种技术,每个服务提供者提供TRY/CONFIRM/CANCEL三个接口,分别对应资源锁定,提交,取消操作。看到github上有些复杂完善的TCC框架,本着简单用AOP与ThreadLocal来做一个简单的框架,验证下自己的想法是否可行,同时练练手。

其中的TCC三调用的方法切换,以及考虑后续要使用try返回值处理,本人采用了**一种投机取巧的方式**来实现。

## 一、主要目标

只考虑几个简单的目标

### 1. TCC调用的关系信息

每个TCC服务调用一般都要包装一下,而且要先定义好谁是TRY,谁是CONFIRM/CANCEL。比如在spring cloud的hystrix中,有注解中设定降级方法,另外写好降级方法,在扫描时,相关信息都记录下来备用。比如try失败了要找到执行哪个cancel。

### 2. 具体调用的参数

在运行中,具体的调用与参数,都需要在事务过程中存下来,可以通过在调用过程中增加AOP,得到相关信息。不仅当时的调用参数,包括try的结果要记录下来。

### 3. 事务中相关的调用

因为要么所有的try都成功,再全部执行confirm操作,如果有一个失败,所有已经成功的都调用cancel操作。所以已经执行的都要记录下来

### 4. 完整性保障

如果confirm/cancel操作失败了,可能记录,可能重试,要考虑。

## 二、设计方案

一般是对Try方法进行注解,而且注解中指明成功与失败执行什么。可是这些注解解析,保存,工作量不少,这个先不考虑。这里要删繁就简的处理。

在执行方法时,肯定有公共的信息要存,所以spring AOP来切入保存数据。对每个参与者方法与聚合方法都要处理,所以使用两个注解进行AOP即可。一个总控,一个在具体执行中。

由于在AOP的@Around过程中,可以得到当前调用的ProceedingJoinPoint对象,它包含了当前调用的所有信息的封装,可以很好的利用一下。

比如ProceedingJoinPoint可以得到当前调用的参数,而参数是可以修改后再执行的,那么切换TCC方法就可以利用这个特点。于是对一个参与者的TCC调用包装成一个单一方法,并用一个TccType的参数进行区分,switch内部一分为三,对应TCC。为了方便修改参数,这个参数排第一位置。

另外,Try方法可能返回参与者自己系统的ID,用于根据ID进行确认或者取消,这个参数正好也利用一下ProceedingJoinPoint。为了方便设置这个参数,就定在单一方法的第二个参数位置,Try调用时,这个位置置null即可。

至于每个调用都的调用保存,就把它们的ProceedingJoinPoint放在一个list中,这个list保存在ThreadLocal中就行了,一开始产生一个空的list,成功了就放进去,如果都成功,就取出来修改参数运行,失败了也是修改参数再运行。

另外考虑本地事务如果与多个参与者在一起怎么办?本地就用@Transactional吧,不管哪个步骤有问题,都抛出异常,让本地事务也回滚。

最后提到完整性,如果Confirm/Cancel失败了怎么办?同样为了简化,提供一个接口,把ProceedingJoinPoint传出来,由使用都来实现,是先记录呢,还是再重试呢,还是同时要分发进行不同的处理都不管了,外部实现。

## 三、主要代码

核心方法都已经注解

```java
	//两个AOP切点
	@Pointcut("@annotation(com.so_mini.tcc.annotation.Tcc)")
    public void tccAspect() {
        System.out.println("Tcc");//注解聚合方法
    }
    
    @Pointcut("@annotation(com.so_mini.tcc.annotation.TccEach)")
    public void tccEachAspect() {
        System.out.println("TccEach");//注解每一个TCC参与者方法
    }

	//对应的环绕AOP处理
    @Around("tccAspect()")
    public Object aroundMethod(ProceedingJoinPoint pjd) {
        Object result = null;
        String methodName = pjd.getSignature().getName();
        Map<String,Object> tccInfo=new HashMap<String,Object>();
        tccInfo.put("list", new ArrayList<ProceedingJoinPoint>());
        threadLocal.set(tccInfo);//当前线程局部变量中保存
        try {
            result = pjd.proceed();
            Map<String,Object> tccInfoRe=(Map<String, Object>) threadLocal.get();
            List<ProceedingJoinPoint> participant=(List<ProceedingJoinPoint>)(tccInfoRe.get("list"));
            System.out.println("【正常,需确认数】"+participant.size());
            //没有异常,执行所有try对应的confirm广场
            for(ProceedingJoinPoint pj:participant){
				toConfirm(pj, _CCFailListenerList);
            }
//            participant.forEach(joinPoint-->(org.aspectj.lang.JoinPoint)joinPoint);
            return result;
        } catch (Throwable e) {
            Map<String,Object> tccInfoRe=(Map<String, Object>) threadLocal.get();
            List<ProceedingJoinPoint> participant=(List<ProceedingJoinPoint>)(tccInfoRe.get("list"));
            System.out.println("【有问题,取消已完成数】"+participant.size());
            //有异常,已经成功的try方法,都要执行对应的cancel方法
            for(ProceedingJoinPoint pj:participant){
				toCancle(pj,_CCFailListenerList);
            }
            throw new RuntimeException("TCC异常");//抛异常,本地DB事务回滚。
        }
        finally{
        	threadLocal.remove();
			System.out.println("【threadLocal removed!】");
        }
    }


    @Around("tccEachAspect()")
    public Object aroundEachMethod(ProceedingJoinPoint pjd) {
        Object result = null;
        String methodName = pjd.getSignature().getName();
        Map tccInfo=(Map) threadLocal.get();
        List participant=(List)(tccInfo.get("list"));
        try {
        	participant.add(pjd);//预存下来
            result = pjd.proceed();
            System.out.println("["+methodName+"] return value:"+result);
            setTryResult(pjd,result);//【设置try执行的结果】
            return result;
        } catch (Throwable e) {
        	participant.remove(pjd);//从成功的列表中移除,抛异常让整体cancel
            throw new RuntimeException(e);
        }
        finally{
        }
    }

    //正常try,第二个参数记录结果
    public static void setTryResult(ProceedingJoinPoint point,Object result) {
		int i=0;
		Object[] args = point.getArgs();
		args[1]=result;
	}
    
    //需要确认时,第一个参数修改为confirm
    public static void toConfirm(ProceedingJoinPoint point,List<CCFailListener> cCFailListenerList)  {
		Object[] args = point.getArgs();
		args[0]=(Object)(TccPhaseEnum.Confirm);
		try {
			point.proceed(args);
		} catch (Throwable e) {
			cCFailListenerList.forEach(listener->listener.notify(point, TccPhaseEnum.Confirm));//出错由监听器处理,用户自己实现
		}
	}
```


## 四、结果

聚合方法及其中一个参与者Service

```java
	@Transactional
	@Tcc
	public void createTripTrans(String orderId){
		System.out.println("BEGIN db-trans...");
		staffService.addStaff(orderId);//简单的本地数据库操作
		System.out.println("BEGIN remote...");
		eatBook.bookEat(TccPhaseEnum.Try,null,orderId);//前两个参数固定!!!
		houseBook.bookHouse(TccPhaseEnum.Try,null,orderId);
		planeBook.bookPlane(TccPhaseEnum.Try,null,orderId);
	}


@Service
public class HouseBook {
	@TccEach
	public String bookHouse(TccPhaseEnum type,Object tryResult,String orderId){
		switch (type) {
		case Try:
			System.out.println("house book...!");
			if("222".equals(orderId))
				throw new RuntimeException("house failure");
			return "HS100083";//模拟try的返回值
		case Confirm:
			confirmHouse(tryResult);//必须使用前面的返回值,否则异常
			return null;
		case Cancel:
			System.out.println("house cancel...!"+tryResult);
			return null;
		}
		return null;
	}
	
	private void confirmHouse(Object houseId){
		if("HS100083".equals(houseId)) 
			System.out.println("house confirm...!"+houseId);
		else{
			throw new RuntimeException("houseId missing and confirme failure");
		}
	}
}

```


正常与回滚的运行结果

```java
//第三个TCC出错,前两个执行CANCEL,本地DB事务回滚
BEGIN db-trans...
Hibernate: select staffbo0_.id .....
BEGIN remote...
Eat book...!//第一个参与者 try
[bookEat] return value:null//try 的 返回值
house book...!//第二个参与者 try
[bookHouse] return value:HS100083//try 的 返回值
【有问题,取消已完成数】2//第三个失败了,取消前两个成功的try
Eat cnacel...!//第一个参与者
house cancel...!HS100083//第二个参与者 cancel使用了try的结果
【threadLocal removed!】//all finished
java.lang.RuntimeException: TCC异常
```

```java
//正常情况
BEGIN db-trans...
Hibernate: select staffbo0_.id ...
BEGIN remote...
Eat book...!
[bookEat] return value:null
house book...!
[bookHouse] return value:HS100083
plane booked!
[bookPlane] return value:null
【正常,需确认数】3
Eat confirm...!
house confirm...!HS100083
plane confirmed!
【threadLocal removed!】
Hibernate: insert into staff (age, name, id) values (?, ?, ?)
```


## 五、源码

https://github.com/herriman76/tcc-mini
3
4
分享到:
评论

相关推荐

    aop-log:项目正式命名为aop-log,基于Spring AOP,ThreadLocal实现方法埋点,埋点信息记录和自定义收集

    AopLogAopLog是基于SpringAop和ThreadLocal实现的一个对请求方法埋点记录与处理的日志工具包。设计目的和场景:使用Spring Aop拦截程序,基本上都是同一个小异,不想日后每个项目都柏林都写一份这样的Aop拦截处理...

    java事务 - threadlocal

    当Java事务与ThreadLocal结合使用时,可以在不同的线程中维护各自的事务状态,比如在Spring框架中,每个线程的ThreadLocal可以存储一个TransactionStatus对象,这样就可以在线程内部管理当前事务的状态,而不会影响...

    Asp.net Core 3.1基于AspectCore实现AOP实现事务、缓存拦截器功能

    最近想给我的框架加一种功能,就是比如给一个方法加一个事务的特性Attribute,那这个方法就会启用事务处理。给一个方法加一个缓存特性,那这个方法就会进行缓存。 这个也是网上说的面向切面编程AOP。 AOP的概念也很...

    dubbo分布式tcc事务demo

    【压缩包子文件的文件名称列表】"tcc-transaction-master-1.2.x"表明这是一个开源项目的源码仓库,版本为1.2.x,包含了整个TCC事务实现的代码结构。我们可以从中学习到如何定义服务、编写TCC的尝试、确认和取消操作...

    C#自己实现AOP的事务操作

    总的来说,AOP事务实现结合C#的特性机制,能够帮助开发者更优雅地管理事务,同时保持代码的简洁和清晰。通过创建自定义的事务特性,我们可以将事务管理的复杂性隐藏起来,专注于业务逻辑,从而提高代码的可复用性...

    Spring事务处理-ThreadLocal的使用

    在Spring事务管理中,连接池与ThreadLocal结合,确保每个线程在事务内使用的是同一连接,避免了事务间的干扰。例如,`HikariCP`和`C3P0`等流行的连接池实现都支持与Spring事务管理的无缝集成。 在实际应用中,理解...

    spring基于AOP实现事务

    本文将深入探讨如何基于AOP(面向切面编程)来实现Spring的事务管理,特别是通过TransactionProxyFactoryBean。让我们一起探索这个主题。 首先,了解什么是AOP。AOP是Spring框架的核心特性,它允许我们在不修改业务...

    C# 实现 AOP微型框架

    6. **C#实现**:在VS2008中,实现AOP框架可能涉及到使用`System.Reflection`来获取类型信息,然后利用`System.Reflection.Emit`或第三方库创建动态代理类,实现对方法调用的拦截和通知的插入。 7. **微型框架**:与...

    手写简单实现ioc、aop事务demo

    在这个“手写简单实现ioc、aop事务demo”中,我们将探讨如何通过工厂模式和动态代理来实现这些概念。 首先,让我们了解IOC的核心思想。在传统的编程中,对象创建和管理的控制权在程序员手中,而IOC则是将这种控制权...

    Spring Aop的简单实现

    Spring AOP,全称Aspect-Oriented Programming(面向切面编程),是Spring框架的重要组成部分,它为应用程序提供了声明式的企业级服务,如日志、事务管理等。在本项目中,我们将探讨如何通过配置文件实现Spring AOP...

    C# 实现的AOP框架

    在C#中实现AOP,我们可以使用.NET提供的System.Reflection.Emit或System.Linq.Expressions库来动态生成代理类,或者使用IL weaving工具如PostSharp进行编译时织入。同时,我们也可以利用.NET Framework中的`System....

    基于tcc的Java分布式事务框架.zip

    在实际应用中,Java开发人员可以利用Spring框架的扩展点,如AOP(面向切面编程)和Spring Cloud Data Flow等工具,实现TCC模式。例如,使用Spring的@Compensable注解可以标记一个方法为TCC操作,而@Try、@Confirm和@...

    从ThreadLocal的使用到Spring的事务管理

    本文将深入探讨ThreadLocal的使用以及Spring框架中的事务管理,这两个主题都是Java开发人员必须掌握的关键技能。 首先,让我们了解ThreadLocal。ThreadLocal是Java提供的一种线程绑定变量的工具类,它允许我们在一...

    C#实现的IOC和AOP框架,供学习

    这个名为“GreeFramOfficial”的压缩包文件,很可能是提供了一个基于C#实现的IOC和AOP框架,供开发者学习和使用。 IOC(Inversion of Control)的核心思想是将对象的创建和管理交给一个容器来处理,而不是由对象...

    Spring AOP框架实现的结构分析

    "Spring AOP 框架实现的结构分析" 本文主要目标是从实现的角度来认识 SpringAOP 框架。首先,我们需要了解 AOP 的基本概念,包括关注点、核心关注点、横切关注点、方面、连接点、切入点、增强、引介、混入继承和织...

    简易的AOP框架

    在Java中,Spring框架是最著名的AOP实现之一,但这里我们讨论的是一个简易的AOP框架,它可以帮助理解AOP的基本概念和工作原理。 该简易AOP框架包含以下几个关键组件: 1. **配置文件**:这是定义切面(aspect)和...

    Hibernate编程式事务与Spring Aop的声明式事务(spring与hibernate集成)

    Spring作为一款全功能的轻量级框架,提供了丰富的功能,包括依赖注入、面向切面编程(AOP)以及事务管理等。而Hibernate则是一款强大的对象关系映射(ORM)工具,它简化了数据库操作,使开发者可以使用面向对象的...

    aop与spring事务处理

    ### AOP与Spring事务处理详解 #### 一、引言:为什么使用框架和设计模式? 在软件开发领域,设计模式和框架是两个重要的概念。设计模式作为一种指导思想,能够帮助开发者更好地解决常见的软件设计问题,确保系统...

    AOP实现事务的代码

    8. **AOP框架集成**:如果你的项目使用了某种IoC(Inversion of Control)容器(如Autofac、Unity或Ninject),那么可以利用它们的AOP支持将事务切面集成到依赖注入系统中,实现更灵活的事务管理。 9. **性能考虑**...

    反射实现 AOP 动态代理模式(Spring AOP 的实现原理)

    Spring框架中的AOP模块使用了动态代理来实现AOP概念。Spring AOP允许开发者定义切面,并在这些切面中指定拦截的方法。Spring AOP支持不同的代理策略,包括JDK动态代理和CGLIB代理。如果被代理的类没有实现接口,...

Global site tag (gtag.js) - Google Analytics