`

解密@Transactional和@Async传播有效性

阅读更多

微信公众号文章列表:关注公众号(coding_song)阅读更清晰

微信原文地址:https://mp.weixin.qq.com/s/fx1tDzezuE3NWqGpIOU7og

 

现象

Controller类中调用Service类中标有@Async或@Transactional注解的方法,此方法上的@Async或@Transactional会生效,而在Service类中A方法直接调用标有@Async或@Transactional注解的B方法,B方法上的@Async或@Transactional注解不会生效;如果A方法中重新通过动态代理生成一个Service Bean类,再通过这个新生成的Bean调用B方法,则B方法上的@Async或@Transactional注解将会生效;当B方法上同时含有@Async和@Transactional注解时,只会有一个注解生效,@Async会生效而@Transactional不会生效

原因

Spring项目中,A类调用B类的方法,是A类通过CglibAopProxy动态代理来生成B类,而B类中方法之间的调用,是直接调用其方法,不会再通过动态代理重新生成一个相同的B类,再调用目标方法;CglibAopProxy动态代理生成B类时,会获取到B类目标方法上面的所有注解(@Async或@Transactional等),并获取这些注解的拦截器和拦截器切面(AsyncAnnotationAdvisor和BeanFactoryTransactionAttributeSourceAdvisor),然后判断这些切面的是否是PointcutAdvisor的子类,方法是否匹配注解方法匹配器AnnotataionMethodMatcher,如果是,则获取这个拦截器切面对应的方法拦截器(AnnotationAsyncExceutionInterceptor或TransactionInterceptor),获取到这些方法拦截器后(如果有多个),只取第一个拦截器,如果第一个是TransactionInterceptor拦截器,则会为该方法开启一个事物;如果第一个是AnnotationAsyncExceutionInterceptor,则会为这个方法创建一个线程,让此方法运行在新建的线程中,由此起到了异步执行的作用

样例代码

Controller层代码

  1. @RestController

  2. publicclassTestController{

  3.  

  4.    @Autowired

  5.    privateTestService testService;

  6.  

  7.    @GetMapping("/test")

  8.    publicvoid test(){

  9.        testService.getId();

  10.    }

  11.  

  12. }

服务层代码及@Async和@Transactional生效说明

  1. @Service

  2. publicclassTestServiceImplimplementsTestService,ApplicationContextAware{

  3.  

  4.    privateApplicationContext applicationContext;

  5.  

  6.    @Autowired

  7.    privateTestService testService;

  8.  

  9.    @Override

  10.    publicvoid setApplicationContext(ApplicationContext applicationContext)throwsBeansException{

  11.        this.applicationContext = applicationContext;

  12.    }

  13.  

  14.    /**

  15.     * 当Controller层调用此方法时,@Transactional生效,将会开启一个事物

  16.     * @return

  17.     */

  18.    @Override

  19.    @Transactional(rollbackFor =RuntimeException.class)

  20.    publicInteger getId(){

  21.        TestService testService =(TestService) applicationContext.getBean("testServiceImpl");

  22.        Integer integer = testService.testTransactional();

  23.        return integer;

  24.    }

  25.  

  26.    /**

  27.     * 1)本类中,如果是通过applicationContext.getBean创建一个TestService后,再调用此方法,

  28.     * 由于方法上同时有@Async和@Transactional注解,获取到这两个注解的拦截器后,只会取第一个拦截器进行后续的操作,

  29.     * 所以@Async会生效,而@Transactional不会生效

  30.     * 2)如果是直接调testTransactional()方法,而不是从新生成一个新的TestService Bean来调用此方法,

  31.     * 则此类上的@Async和@Transactional注解都不会生效,因为不会走AOP动态代理CglibAopProxy,

  32.     * 而@Async和@Transactional注解生效的实现,是在CglibAopProxy中实现的

  33.     * @return

  34.     */

  35.    @Override

  36.    @Async

  37.    @Transactional(rollbackFor =RuntimeException.class)

  38.    publicInteger testTransactional(){

  39.        Integer test = test();

  40.        return test;

  41.    }

  42.  

  43.    /**

  44.     * 此方法上的@Async和@Transactional都不会生效,因为在testTransactional()是直接调用此方法

  45.     * @return

  46.     */

  47.    @Async

  48.    @Transactional(rollbackFor =RuntimeException.class)

  49.    privateInteger test(){

  50.        return10;

  51.    }

  52. }

源码分析

在Controller类中注入Service时,是通过Cglib动态代理生成的Service,所以当Controller类调用Service中的方法时,会调用CglibAopProxy内部类DynamicAdvisedInterceptor的intercept()方法

  1. @Override

  2. @Nullable

  3. publicObject intercept(Object proxy,Method method,Object[] args,MethodProxy methodProxy)throwsThrowable{

  4.    Object oldProxy =null;

  5.    boolean setProxyContext =false;

  6.    Object target =null;

  7.    TargetSource targetSource =this.advised.getTargetSource();

  8.    try{

  9.        if(this.advised.exposeProxy){

  10.            // Make invocation available if necessary.

  11.            oldProxy =AopContext.setCurrentProxy(proxy);

  12.            setProxyContext =true;

  13.        }

  14.        // Get as late as possible to minimize the time we "own" the target, in case it comes from a pool...

  15.        target = targetSource.getTarget();

  16.        Class<?> targetClass =(target !=null? target.getClass():null);

  17.        //获取方法上的所有注解的切面拦截器

  18.        List<Object> chain =this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

  19.        Object retVal;

  20.        //当在方法上没有获取到相应注解的拦截器时,直接进入方法执行相应逻辑

  21.        if(chain.isEmpty()&&Modifier.isPublic(method.getModifiers())){

  22.            // We can skip creating a MethodInvocation: just invoke the target directly.

  23.            // Note that the final invoker must be an InvokerInterceptor, so we know

  24.            // it does nothing but a reflective operation on the target, and no hot

  25.            // swapping or fancy proxying.

  26.            Object[] argsToUse =AopProxyUtils.adaptArgumentsIfNecessary(method, args);

  27.            retVal = methodProxy.invoke(target, argsToUse);

  28.        }else{

  29.            // 获取当方法拦截器后,通过反射调用具体的拦截器方法

  30.                        //接下来先调用ReflectiveMethodInvocation的proceed方法

  31.            retVal =newCglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();

  32.        }

  33.        retVal = processReturnType(proxy, target, method, retVal);

  34.        return retVal;

  35.    }

  36.    //省略finally代码

  37. }

ReflectiveMethodInvocation.proceed方法只获取拦截器切面列表中的第一个拦截器切面,执行相应的拦截器逻辑

  1. @Override

  2. @Nullable

  3. publicObject proceed()throwsThrowable{

  4.    //  We start with an index of -1 and increment early.

  5.    if(this.currentInterceptorIndex ==this.interceptorsAndDynamicMethodMatchers.size()-1){

  6.        return invokeJoinpoint();

  7.    }

  8.  

  9.    //如果方法上有多个拦截器或拦截器切面,只取第一个拦截器或拦截器切面

  10.    Object interceptorOrInterceptionAdvice =

  11.        this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);

  12.    if(interceptorOrInterceptionAdvice instanceofInterceptorAndDynamicMethodMatcher){

  13.        // Evaluate dynamic method matcher here: static part will already have

  14.        // been evaluated and found to match.

  15.        InterceptorAndDynamicMethodMatcher dm =

  16.                (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;

  17.        Class<?> targetClass =(this.targetClass !=null?this.targetClass :this.method.getDeclaringClass());

  18.        if(dm.methodMatcher.matches(this.method, targetClass,this.arguments)){

  19.            return dm.interceptor.invoke(this);

  20.        }else{

  21.            // Dynamic matching failed.

  22.            // Skip this interceptor and invoke the next in the chain.

  23.            return proceed();

  24.        }

  25.    }

  26.    else{

  27.        // It's an interceptor, so we just invoke it: The pointcut will have

  28.        // been evaluated statically before this object was constructed.

  29.        //调用具体的拦截器

  30.        return((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);

  31.    }

  32. }

如果第一个拦截器切面是TransactionInterceptor,则调用TransactionInterceptor相关的方法,执行相应的逻辑,开启事务等

  1. @Override

  2. @Nullable

  3. publicObject invoke(MethodInvocation invocation)throwsThrowable{

  4.    // Work out the target class: may be {@code null}.

  5.    // The TransactionAttributeSource should be passed the target class

  6.    // as well as the method, which may be from an interface.

  7.    Class<?> targetClass =(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null);

  8.  

  9.    // Adapt to TransactionAspectSupport's invokeWithinTransaction...

  10.    //适配TransactionAspectSupport的invokeWithinTransaction方法

  11.    return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);

  12. }

  13.  

  14. @Nullable

  15. protectedObject invokeWithinTransaction(Method method,@NullableClass<?> targetClass,

  16.        finalInvocationCallback invocation)throwsThrowable{

  17.  

  18.    // If the transaction attribute is null, the method is non-transactional.

  19.    TransactionAttributeSource tas = getTransactionAttributeSource();

  20.    finalTransactionAttribute txAttr =(tas !=null? tas.getTransactionAttribute(method, targetClass):null);

  21.    finalPlatformTransactionManager tm = determineTransactionManager(txAttr);

  22.    finalString joinpointIdentification = methodIdentification(method, targetClass, txAttr);

  23.  

  24.    if(txAttr ==null||!(tm instanceofCallbackPreferringPlatformTransactionManager)){

  25.        // Standard transaction demarcation with getTransaction and commit/rollback calls.

  26.        //调用事务管理器的getTransaction和commit/rollback方法,获取一个事务对象,并开启事物

  27.        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

  28.        Object retVal =null;

  29.        try{

  30.            // This is an around advice: Invoke the next interceptor in the chain.

  31.            // This will normally result in a target object being invoked.

  32.            //调用目标类的方法

  33.            retVal = invocation.proceedWithInvocation();

  34.        }

  35.        catch(Throwable ex){

  36.            // target invocation exception

  37.            completeTransactionAfterThrowing(txInfo, ex);

  38.            throw ex;

  39.        }

  40.        finally{

  41.            //执行目标方法后,关闭事务,清除事物对象

  42.            cleanupTransactionInfo(txInfo);

  43.        }

  44.        //结果返回后提交事物状态等信息

  45.        commitTransactionAfterReturning(txInfo);

  46.        return retVal;

  47.    }else{

  48.        //省略其他代码

  49.    }

  50. }

  51. /**

  52. * 根据给定的事务参数创建一个事物.

  53. * @see #getTransactionAttributeSource()

  54. */

  55. protectedTransactionInfo createTransactionIfNecessary(@NullablePlatformTransactionManager tm,

  56.        @NullableTransactionAttribute txAttr,finalString joinpointIdentification){

  57.  

  58.    // If no name specified, apply method identification as transaction name.

  59.    if(txAttr !=null&& txAttr.getName()==null){

  60.        txAttr =newDelegatingTransactionAttribute(txAttr){

  61.            @Override

  62.            publicString getName(){

  63.                return joinpointIdentification;

  64.            }

  65.        };

  66.    }

  67.  

  68.    TransactionStatus status =null;

  69.    if(txAttr !=null){

  70.        if(tm !=null){

  71.            //开启事物

  72.            status = tm.getTransaction(txAttr);

  73.        }

  74.        else{

  75.            if(logger.isDebugEnabled()){

  76.                logger.debug("Skipping transactional joinpoint ["+ joinpointIdentification +

  77.                        "] because no transaction manager has been configured");

  78.            }

  79.        }

  80.    }

  81.    //准备事物对象信息

  82.    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);

  83. }

  84. /**

  85. * 委托doGetTransaction方法,isExistingTransaction判断条件和doBegin方法处理事务传播行为

  86. * This implementation handles propagation behavior. Delegates to

  87. * {@code doGetTransaction}, {@code isExistingTransaction}

  88. * and {@code doBegin}.

  89. * @see #doGetTransaction

  90. * @see #isExistingTransaction

  91. * @see #doBegin

  92. */

  93. @Override

  94. publicfinalTransactionStatus getTransaction(@NullableTransactionDefinition definition)throwsTransactionException{

  95.    Object transaction = doGetTransaction();

  96.  

  97.    // Cache debug flag to avoid repeated checks.

  98.    boolean debugEnabled = logger.isDebugEnabled();

  99.  

  100.    if(definition ==null){

  101.        // Use defaults if no transaction definition given.

  102.        definition =newDefaultTransactionDefinition();

  103.    }

  104.    //如果已经有事务存在,直接处理已存在的事务

  105.    if(isExistingTransaction(transaction)){

  106.        // Existing transaction found -> check propagation behavior to find out how to behave.

  107.        return handleExistingTransaction(definition, transaction, debugEnabled);

  108.    }

  109.  

  110.    // Check definition settings for new transaction.

  111.    if(definition.getTimeout()<TransactionDefinition.TIMEOUT_DEFAULT){

  112.        thrownewInvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());

  113.    }

  114.  

  115.    // No existing transaction found -> check propagation behavior to find out how to proceed.

  116.    if(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_MANDATORY){

  117.        thrownewIllegalTransactionStateException(

  118.                "No existing transaction found for transaction marked with propagation 'mandatory'");

  119.    }

  120.    elseif(definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRED ||

  121.            definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_REQUIRES_NEW ||

  122.            definition.getPropagationBehavior()==TransactionDefinition.PROPAGATION_NESTED){

  123.        SuspendedResourcesHolder suspendedResources = suspend(null);

  124.        if(debugEnabled){

  125.            logger.debug("Creating new transaction with name ["+ definition.getName()+"]: "+ definition);

  126.        }

  127.        try{

  128.            boolean newSynchronization =(getTransactionSynchronization()!= SYNCHRONIZATION_NEVER);

  129.            DefaultTransactionStatus status = newTransactionStatus(

  130.                    definition, transaction,true, newSynchronization, debugEnabled, suspendedResources);

  131.            //开启事务

  132.            doBegin(transaction, definition);

  133.            prepareSynchronization(status, definition);

  134.            return status;

  135.        }

  136.        catch(RuntimeException|Error ex){

  137.            resume(null, suspendedResources);

  138.            throw ex;

  139.        }

  140.    }

  141.    else{

  142.        // Create "empty" transaction: no actual transaction, but potentially synchronization.

  143.        if(definition.getIsolationLevel()!=TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()){

  144.            logger.warn("Custom isolation level specified but no actual transaction initiated; "+

  145.                    "isolation level will effectively be ignored: "+ definition);

  146.        }

  147.        boolean newSynchronization =(getTransactionSynchronization()== SYNCHRONIZATION_ALWAYS);

  148.        return prepareTransactionStatus(definition,null,true, newSynchronization, debugEnabled,null);

  149.    }

  150. }

 

如果第一个拦截器切面是AsyncExecutionInterceptor,则会获取一个任务执行器,再创建一个任务,然后从线程池ThreadPoolExecutor中获取一个线程来执行这个任务

  1. @Override

  2. @Nullable

  3. publicObject invoke(finalMethodInvocation invocation)throwsThrowable{

  4.    Class<?> targetClass =(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null);

  5.    Method specificMethod =ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);

  6.    finalMethod userDeclaredMethod =BridgeMethodResolver.findBridgedMethod(specificMethod);

  7.    //获取一个异步任务执行器

  8.    AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);

  9.    if(executor ==null){

  10.        thrownewIllegalStateException(

  11.                "No executor specified and no default executor set on AsyncExecutionInterceptor either");

  12.    }

  13.    //创建一个任务

  14.    Callable<Object> task =()->{

  15.        try{

  16.            Object result = invocation.proceed();

  17.            if(result instanceofFuture){

  18.                return((Future<?>) result).get();

  19.            }

  20.        }

  21.        catch(ExecutionException ex){

  22.            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());

  23.        }

  24.        catch(Throwable ex){

  25.            handleError(ex, userDeclaredMethod, invocation.getArguments());

  26.        }

  27.        returnnull;

  28.    };

  29.    //最终会从ThreadPoolExecutor获取一个线程来提交需要执行的任务

  30.    return doSubmit(task, executor, invocation.getMethod().getReturnType());

  31. }

  32. /**

  33. * AsyncExecutionAspectSupport.doSubmit()

  34. * 提交任务

  35. */

  36. @Nullable

  37. protectedObject doSubmit(Callable<Object> task,AsyncTaskExecutor executor,Class<?> returnType){

  38.    if(CompletableFuture.class.isAssignableFrom(returnType)){

  39.        returnCompletableFuture.supplyAsync(()->{

  40.            try{

  41.                return task.call();

  42.            }

  43.            catch(Throwable ex){

  44.                thrownewCompletionException(ex);

  45.            }

  46.        }, executor);

  47.    }

  48.    elseif(ListenableFuture.class.isAssignableFrom(returnType)){

  49.        return((AsyncListenableTaskExecutor) executor).submitListenable(task);

  50.    }

  51.    elseif(Future.class.isAssignableFrom(returnType)){

  52.        return executor.submit(task);

  53.    }

  54.    else{

  55.        //继续提交任务

  56.        executor.submit(task);

  57.        returnnull;

  58.    }

  59. }

  60. /**

  61. * ThreadPoolTaskExecutor.submit() 从线程池中获取一个线程来执行任务

  62. */

  63. public<T>Future<T> submit(Callable<T> task){

  64.    ThreadPoolExecutor executor =this.getThreadPoolExecutor();

  65.  

  66.    try{

  67.        return executor.submit(task);

  68.    }catch(RejectedExecutionException var4){

  69.        thrownewTaskRejectedException("Executor ["+ executor +"] did not accept task: "+ task, var4);

  70.    }

  71. }

 

解决方案

在Service中A方法调用B方法,要使B方法上的@Async或@Transcational注解生效,可在A方法中通过ApplicationContext.getBean()来从新获取一个新的Service对象,代码实现如下

  1. @Service

  2. publicclassTestServiceImplimplementsTestService,ApplicationContextAware{

  3.    //省略其他代码

  4.    @Override

  5.    @Transactional(rollbackFor =RuntimeException.class)

  6.    publicInteger getId(){

  7.        TestService testService =(TestService) applicationContext.getBean("testServiceImpl");

  8.        Integer integer = testService.testTransactional();

  9.        return integer;

  10.    }

  11.  

  12.    @Override

  13.    @Transactional(rollbackFor =RuntimeException.class)

  14.    publicInteger testTransactional(){

  15.        Integer test = test();

  16.        return test;

  17.    }

  18. }

上述testTransactional()方法上的@Transactional将生效

0
0
分享到:
评论

相关推荐

    带有@Transactional和@Async的循环依赖问题

    在Spring框架中,`@Transactional` 和 `@Async` 是两个非常重要的注解,它们分别用于声明事务管理和异步执行。然而,当这两个注解同时出现在一个方法上时,可能会引发一些复杂的问题,特别是在存在循环依赖的情况下...

    后端 Java Spring Data Jpa @Transactional 介绍

    在Java后端开发中,Spring框架提供了强大的事务管理能力,特别是在使用Spring Data JPA时,`@Transactional`注解使得...通过合理使用和配置,我们可以有效地确保业务逻辑的正确执行,并保护数据库的完整性和一致性。

    @Transactional实现原理.txt

    @Transactional实现原理.txt

    Spring3事务管理——使用@Transactional 注解.rar

    通过理解并熟练运用`@Transactional`注解,开发者可以更方便地管理和控制Spring应用中的事务,确保数据一致性,同时减少手动事务管理带来的复杂性。在实际开发中,根据项目需求灵活配置和使用这个注解,将极大地提高...

    Spring @Transactional工作原理详解

    在Spring框架中,`@...然而,理解其背后的实现细节对于优化性能、避免潜在问题和进行有效调试至关重要。开发者应根据实际需求和场景,合理利用`@Transactional`的各种属性和模式,以实现高效且可靠的事务管理。

    什么情况会导致@Transactional事务失效?

    在Java编程中,`@...了解以上这些情况并避免它们,有助于确保`@Transactional`事务管理的正确性和可靠性。在开发过程中,应结合日志和调试工具来检查事务是否正常开始、提交或回滚,以便及时发现和解决问题。

    Spring声明式事务@Transactional知识点分享

    "Spring声明式事务@Transactional知识点分享" 在 Spring 框架中,@Transactional 注解是实现声明式...@Transactional 注解是 Spring 框架中实现声明式事务的关键,可以灵活地控制事务的传播行为、隔离级别和读写控制。

    Spring中@Transactional事务回滚(含实例

    了解其工作原理和回滚规则,有助于我们在实际开发中更有效地利用这一功能,避免数据不一致性和业务错误。通过实例分析和源码阅读,我们可以更深入地理解Spring事务管理机制,提升我们的编程技能。

    Spring @Transactional注解失效解决方案

    Spring @Transactional 注解失效解决方案 ...通过了解 @Transactional 注解的特性和事务传播模式,并遵循解决方案,我们可以解决 @Transactional 注解不回滚的问题,确保事务管理的正确性和可靠性。

    spring 自定义事务管理器,编程式事务,声明式事务@Transactional使用

    在Spring框架中,事务管理是核心功能之一,它确保了数据操作的一致性和完整性。本教程将深入探讨如何在Spring中实现自定义事务管理器、编程式事务处理以及声明式事务`@Transactional`的使用。 首先,让我们了解事务...

    Java注解@Transactional事务类内调用不生效问题及解决办法

    Java注解@Transactional事务类内调用不生效问题及解决办法 Java注解@Transactional是Java中的一种注解,主要用于标记事务边界。然而,在某些情况下,@Transactional注解可能不会生效,例如在同一个类中的方法调用...

    浅谈Spring中@Transactional事务回滚及示例(附源码)

    本文将详细介绍@Transactional的使用场景、checked异常和unchecked异常的概念、@Transactional的使用实例等内容。 一、使用场景 在了解@Transactional怎么用之前,我们必须要先知道@Transactional有什么用。例如,...

    spring @Transactional 无效的解决方案

    当我们在方法 A 上添加 @Transactional 注解时,事务将正常生效,方法 A 和方法 B 将自动参与到同一个事务中。 结论 ---------- 在这篇文章中,我们介绍了 Spring @Transactional 注解无效的问题,并提供了解决...

    Spring中的@Transactional事物回滚实例源码

    总的来说,`@Transactional`注解是Spring中实现事务管理的重要手段,通过合理的配置和使用,可以有效地保证业务操作的原子性和一致性。在`first_maven_project`中,我们可以通过查看源码,了解如何在实际应用中使用...

    深入学习Spring Boot排查 @Transactional 引起的 NullPointerException问题

    在 Spring Boot 应用程序中,@Transactional 注解是非常常用的,它能够帮助我们管理事务,使得数据库操作更加可靠和安全。然而,在某些情况下,使用 @Transactional 注解可能会引起 NullPointerException,这是一个...

    spring-@Transactional-jar

    spring事务管理注解jar,spring-tx-3.2.4.RELEASE.jar,导入项目即可

    spring的@Transactional注解详细用法1

    在Spring框架中,事务管理是实现企业级应用稳定性和数据一致性的重要组成部分。Spring提供了两种主要的事务管理方式:编程式事务管理和声明式事务管理。本文将重点介绍基于`@Transactional`注解的声明式事务管理。 ...

Global site tag (gtag.js) - Google Analytics