`
y806839048
  • 浏览: 1121657 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Hystrix自定义并发策略实现ThreadLocal上下文的传递

阅读更多

 

总括:

 

自定义策略就是为了实现包装callable,包装的目的就是为了能调用前cpoy  threadlocal变量,子线程被调用设置的就在子线程中

所以一个策略可以包装多个callable,在spring容器中,一次设置都有效---类似初始化思想

效果就是只要用了这个策略,里面的callable都是有自定义用自定义的,没有用默认的(总和就是自定义+默认的)

 

@HystrixCommand设置的属性   隔离策略,fallback,线程池

注解往往和切面联合使用

 

前言

Hystrix提供了基于信号量和线程两种隔离模式,通过在Hystrix基础章节中已经验证过,通过@HystrixCommand注解的方法体将在新的线程中执行,这样会带来些什么意想不到的意外呢,先来看一个示例:

1、定义一个webapi,通过RequestContextHolder设定一个当前线程的上下文:

@GetMapping(value = "/getServerInfo/{serviceName}")

public String getServer1Info(@PathVariable(value = "serviceName") String serviceName) {

    LOGGER.info("当前线程ID:" + Thread.currentThread().getId() + "当前线程Name" + Thread.currentThread().getName());

    RequestContextHolder.currentRequestAttributes().setAttribute("context", "main-thread-context", SCOPE_REQUEST);

    return consumeService.getServerInfo(serviceName);

}

2、在@HystrixCommand注解的方法中再次通过RequestContextHolder获取当前上下文设定的value值:

@Override

@HystrixCommand(fallbackMethod = "getServerInfoFallback",

        commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "THREAD")},

        commandKey = "cust2GetServerInfo",

        threadPoolKey = "cust2ThreadPool",

        groupKey = "cust2")

public String getServerInfo(String serviceName) {

    LOGGER.info(RibbonFilterContextHolder.getCurrentContext().get("TAG"));

    LOGGER.info(RequestContextHolder.currentRequestAttributes().getAttribute("context", SCOPE_REQUEST).toString());

    //如果是service1则需要添加http认证头,service1暂时添加了认证机制;反之service2不需要认证直接发出请求即可

    if ("service1".equals(serviceName)) {

        HttpEntity<String> requestEntity = new HttpEntity<String>(getHeaders());

        ResponseEntity<String> responseEntity = restTemplate.exchange("http://" + serviceName + "/getServerInfo?userName=shuaishuai", HttpMethod.GET, requestEntity, String.class);

        return responseEntity.getBody();

    } else

        return restTemplate.getForObject("http://" + serviceName + "/getServerInfo?userName=shuaishuai", String.class);

}

 

public String getServerInfoFallback(String serviceName, Throwable e) {

    if (e != null) {

        LOGGER.error(e.getMessage());

    }

    return "Maybe the server named " + serviceName + " is not normal running";

}

3、启动服务请求1中定义的API:

 

可以看到上图中上下文的赋值与取值在不同的线程中执行,TAG信息被正常获取,而RequestContextHolder设定的上线文信息获取失败,并进入回退方法并打印出了对应的异常信息,首先来看下为何TAG信息被正常获取,在RibbonFilterContextHolder中定义变量如下

而在RequestContextHolder中变量定义如下

 

其区别在于是采用ThreadLocal与InheritableThreadLocal的差异,InheritableThreadLocal能够在子线程中继续传播父线程的上线文,而ThreadLocal只能在保存在当前线程中,但事实上我们不可能所有的应用均采用InheritableThreadLocal,尽管他是一个不错的选择,但如何让ThreadLocal也实现在Hystrix应用场景下实现线程上下文的传播呢。这就是本章的重点了。

 

 

本章概要

1、资料搜索;

2、源码分析;

3、扩展HystrixConcurrencyStrategy解决前言中的意外;

4、提高HystrixConcurrencyStrategy包装扩展性;

 

资料搜索

既然遇到了问题,就到springcloud的官方文档先检索下,找到如下对应的描述

红色框部分主要意思是,我们可以声明一个定制化的HystrixConcurrencyStrategy实例,并通过HystrixPlugins注册。先找到HystrixConcurrencyStrategy类,其有下面一段类注释

For example, every {@link Callable} executed by {@link HystrixCommand} will call {@link #wrapCallable(Callable)} to give a chance for custom implementations to decorate the {@link Callable} with additional behavior.

被@HystrixCommand注解的方法,其执行源Callable可以通过wrapCallable方法进行定制化装饰,加入附加的行为,继续来看看wrapCallable方法的定义

其同样提供了非常详细的注释,该方法提供了在方法被执行前进行装饰的机会,可以用来复制线程状态等附加行为,这个貌似就是我们需要的,很合意。

同样在Hystrix官方文档提供了更加详细的说明(https://github.com/Netflix/Hystrix/wiki/Plugins#concurrency-strategy),Concurrency Strategy作为了Plugin的一种类别,描述如下

可以看到红色框中的重点描述,其已经说了非常明确,可以从父线程复制线程状态至子线程。自定义的Plugin如何被HystrixCommand应用呢,继续查看官方的描述

其提供了HystrixPlugins帮助我们注册自定义的Plugin,除了我们本章节重点关注的Concurrency Strategy类别plugin,还有如下类别以及对应的抽象实现

类别

抽象实现

Event Notifier                        HystrixEventNotifier

Metrics Publisher                  HystrixMetricsPublisher

Properties Strategy               HystrixPropertiesStrategy

Concurrency Strategy           HystrixConcurrencyStrategy

Command Execution Hook   HystrixCommandExecutionHook

 

 

源码分析

在springcloud中还有如下一段话

既然提高了定制化的实现,不如来看看官方已经提供了哪些默认实现

首先来看看HystrixConcurrencyStrategyDefault,

很精简的一段代码,并没有任何方法重写,其作为了一个标准提供默认实现。继续来看看SecurityContextConcurrencyStrategy实现,直接找到wrapCallable方法

其对Callabe进行了二次包装,继续跟进来看看DelegatingSecurityContextCallable的定义

其主要实现均在call方法中,红色框中标出了重点,在调用call方法前,我们可以将当前上下文信息放入SecurityContextHolder中,在执行完成后清空SecurityContextHolder对应的设置。再来看看SecurityContextConcurrencyStrategy是如何被应用的,在HystrixSecurityAutoConfiguration中有如下代码段

在启动注册配置过程中机会通过HystrixPlugins注册当前扩展的HystrixConcurrencyStrategy实现。

 

小节:自定义扩展类实现Callable接口,并传入当前Callable变量delegate,在delegate执行call方法前后进行线程上线文的操作即可实现线程状态在父线程与子线程间的传播。

 

扩展HystrixConcurrencyStrategy解决前言中的意外

通过源码部分的解读,基本了解springcloud是如何实现扩展的,又是如何被应用的,照葫芦画瓢下。

 

1、定义一个RequestContextHystrixConcurrencyStrategy实现HystrixConcurrencyStrategy接口,并重写其wrapCallable方法:

public class RequestContextHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

 

    @Override

    public <T> Callable<T> wrapCallable(Callable<T> callable) {

        return new RequestAttributeAwareCallable<>(callable, RequestContextHolder.getRequestAttributes());

    }

 

    static class RequestAttributeAwareCallable<T> implements Callable<T> {

 

        private final Callable<T> delegate;

        private final RequestAttributes requestAttributes;

 

        public RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) {

            this.delegate = callable;

            this.requestAttributes = requestAttributes;

        }

 

        @Override

        public T call() throws Exception {

            try {

                RequestContextHolder.setRequestAttributes(requestAttributes);

                return delegate.call();

            } finally {

                RequestContextHolder.resetRequestAttributes();

            }

        }

    }

}

其中定义RequestAttributeAwareCallable装饰类,通过构造函数传入当前待执行Callable代理和当前待传播的RequestAttributes值,并在delegate的call方法执行前对RequestContextHolder的RequestAttributes赋值,在finally块中重置。

 

2、同样在任意配置类中添加如下代码段即可,通过HystrixPlugins注册RequestContextHystrixConcurrencyStrategy:

@PostConstruct

public void init() {

    HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestContextHystrixConcurrencyStrategy());

}

3、启动服务验证,子线程取值成功:

 

小节:以上参考SecurityContextConcurrencyStrategy的实现,完成了Hystrix中RequestContextHolder上下文信息传播。

 

提高HystrixConcurrencyStrategy包装扩展性

上一个小节介绍了如果在Hystrix线程隔离场景下实现ThreadLocal定义的上下文传播,根据示例,在实际应用过程中如果我们有多个类似RequestContextHystrixConcurrencyStrategy策略,需要将每个自定义HystrixConcurrencyStrategy示例注册至HystrixPlugins中,这在扩展性方面显然是缺失的,借鉴spring的实践,我们可以定义对Callable的包装接口HystrixCallableWrapper,根据实际的业务只需要对HystrixCallableWrapper进行实现,并注册对应的实现bean即可。具体实现如下:

 

1、定义用于包装hystrix中Callable实例的接口:

public interface HystrixCallableWrapper {

 

    /**

     * 包装Callable实例

     *

     * @param callable 待包装实例

     * @param <T>      返回类型

     * @return 包装后的实例

     */

    <T> Callable<T> wrap(Callable<T> callable);

 

}

 

2、通过之前的源码阅读与实践,基本已经发现实现线程上线文传播的核心在于对Callable进行包装,通过多次对Callable包装即实现了一个链式包装过程,如下扩展HystrixConcurrencyStrategy接口实现RequestContextHystrixConcurrencyStrategy,其中定义CallableWrapperChain类对所有注入的HystrixCallableWrapper包装实现进行装配:

public class RequestContextHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

    private final Collection<HystrixCallableWrapper> wrappers;

 

    public RequestContextHystrixConcurrencyStrategy(Collection<HystrixCallableWrapper> wrappers) {

        this.wrappers = wrappers;

    }

 

    @Override

    public <T> Callable<T> wrapCallable(Callable<T> callable) {

        return new CallableWrapperChain(callable, wrappers.iterator()).wrapCallable();

    }

 

    private static class CallableWrapperChain<T> {

 

        private final Callable<T> callable;

 

        private final Iterator<HystrixCallableWrapper> wrappers;

 

        CallableWrapperChain(Callable<T> callable, Iterator<HystrixCallableWrapper> wrappers) {

            this.callable = callable;

            this.wrappers = wrappers;

        }

 

        Callable<T> wrapCallable() {

            Callable<T> delegate = callable;

            while (wrappers.hasNext()) {

                delegate = wrappers.next().wrap(delegate);

            }

            return delegate;

        }

    }

}

3、实现HystrixCallableWrapper接口,定义一个包装RequestContextHolder上下文处理的实现类:

public final class RequestAttributeAwareCallableWrapper implements HystrixCallableWrapper {

    @Override

    public <T> Callable<T> wrap(Callable<T> callable) {

        return new RequestAttributeAwareCallable(callable, RequestContextHolder.getRequestAttributes());

    }

 

    static class RequestAttributeAwareCallable<T> implements Callable<T> {

 

        private final Callable<T> delegate;

        private final RequestAttributes requestAttributes;

 

        RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) {

            this.delegate = callable;

            this.requestAttributes = requestAttributes;

        }

 

        @Override

        public T call() throws Exception {

            try {

                RequestContextHolder.setRequestAttributes(requestAttributes);

                return delegate.call();

            } finally {

                RequestContextHolder.resetRequestAttributes();

            }

        }

    }

}

4、实现HystrixCallableWrapper接口,定义一个包装Mdc日志处理上下文的实现类:

public class MdcAwareCallableWrapper implements HystrixCallableWrapper {

    @Override

    public <T> Callable<T> wrap(Callable<T> callable) {

        return new MdcAwareCallable<>(callable, MDC.getCopyOfContextMap());

    }

 

    private class MdcAwareCallable<T> implements Callable<T> {

 

        private final Callable<T> delegate;

 

        private final Map<String, String> contextMap;

 

        public MdcAwareCallable(Callable<T> callable, Map<String, String> contextMap) {

            this.delegate = callable;

            this.contextMap = contextMap != null ? contextMap : new HashMap();

        }

 

        @Override

        public T call() throws Exception {

            try {

                MDC.setContextMap(contextMap);

                return delegate.call();

            } finally {

                MDC.clear();

            }

        }

    }

}

5、最后通过在Configuration配置类中注册如下HystrixCallableWrapper 实现类的bean实例,并通过HystrixPlugins注册扩展包装实现:

@Bean

public HystrixCallableWrapper requestAttributeAwareCallableWrapper() {

    return new RequestAttributeAwareCallableWrapper();

}

 

@Bean

public HystrixCallableWrapper mdcAwareCallableWrapper(){

    return new MdcAwareCallableWrapper();

}

 

@Autowired(required = false)

private List<HystrixCallableWrapper> wrappers = new ArrayList<>();

 

@PostConstruct

public void init() {

    HystrixPlugins.getInstance().registerConcurrencyStrategy(new RequestContextHystrixConcurrencyStrategy(wrappers));

}

 

总结

本章从官方网站与源码出发,逐步实现了hystrix中如何进行线程上下文的传播。同时为了更好的扩展性,提供了基于自定义接口并注入实现的方式。

 

原文:https://blog.csdn.net/songhaifengshuaige/article/details/80345012 

 

分享到:
评论

相关推荐

    详解Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失

    我们可以使用Hystrix的Context对象来传递数据,避免ThreadLocal数据丢失的问题。 知识点6:分布式Session 分布式Session是一种存储机制,用于存储和传递数据。我们可以使用分布式Session来存储数据,避免...

    springboot feign+Hystrix实现简单 微服务之间的调用 熔断的功能

    maven依赖 : &lt;groupId&gt;org.springframework.cloud &lt;artifactId&gt;spring-cloud-starter-...项目启动后 访问 http://127.0.0.1:8764/ 来实现项目接口调用 简单的demo 具备feign 接口调用 以及Hystrix简单熔断的功能

    Hystrix跨线程传递数据解决方案:HystrixRequestContext.docx

    通过 `HystrixRequestContext` 和相关的 API,Hystrix 能够有效地管理跨线程的数据传递,使得开发人员能够在使用线程池隔离模式时轻松地处理上下文数据。这种机制不仅提高了代码的可维护性和可读性,而且也增强了...

    Hystrix实现容错

    5. **启用Hystrix Dashboard**:在Spring Boot启动类上添加@EnableHystrixDashboard注解,然后在浏览器中访问暴露的Hystrix Dashboard端点,查看服务调用的实时监控数据。 6. **集成Turbine**:如果有多个服务实例...

    Hystrix组件学习笔记及debug调试截图

    在Spring Cloud中,Hystrix提供了`HystrixRequestContext`来实现请求上下文的跨线程传播。`HystrixRequestContext`存储了当前请求的相关信息,如命令执行的元数据等,它在请求开始时被创建,并在请求结束时销毁。 ...

    互联网高并发解决方案-基于Hystrix实现服务隔离与降级

    互联网高并发解决方案-基于Hystrix实现服务隔离与降级互联网高并发解决方案-基于Hystrix实现服务隔离与降级互联网高并发解决方案-基于Hystrix实现服务隔离与降级互联网高并发解决方案-基于Hystrix实现服务隔离与降级...

    断路器hystrix实现.rar

    本教程将深入探讨如何使用Hystrix在微服务中实现断路器功能。 首先,让我们理解断路器的工作原理。断路器在正常情况下处于“关闭”状态,允许请求通过。当服务出现故障,如连续多次调用失败或调用超时,断路器会...

    hystrix-config-issue:基于Spring Cloud Tests项目展示一个请求上下文属性的问题

    我创建了一个自定义并发策略,以允许传递 MDC 参数,如。 这导致请求日志被打开。 我无法修复请求上下文,因为当我尝试将其添加到过滤器时,我还有其他 aysnc 代码似乎丢失了该上下文。 我应该能够关闭请求日志,...

    spring-cloud-netflix-hystrix应用

    2. **创建HystrixCommand**:定义并实现自定义的HystrixCommand,封装服务调用逻辑。 3. **配置Hystrix**:设置命令的属性,如超时时间、熔断器阈值等。 4. **使用HystrixCommand**:在服务调用处使用HystrixCommand...

    Hystrix源码_可以跑起来

    Hystrix是一款由Netflix开发的开源库,主要目标是实现服务间的容错管理,通过提供断路器模式来防止服务雪崩,确保系统的稳定性和弹性。在这个“Hystrix源码_可以跑起来”的主题中,我们将深入探讨Hystrix的工作原理...

    Hystrix 熔断、降级实例

    当你创建一个自定义的HystrixCommand子类时,需要重写三个关键方法:run()、fallback()和getFallback()。 1. run():这是执行核心业务逻辑的地方,当HystrixCommand被调用时,这个方法会被执行。如果该方法抛出异常...

    Hystrix系列之信号量、线程池

    * 因为涉及到跨线程,那么就存在 ThreadLocal 数据的传递问题,比如在主线程初始化的 ThreadLocal 变量,在线程池线程中无法获取。 Hystrix 默认配置 Hystrix 默认使用了线程池模式,对于每个 Command,在初始化的...

    15.Spring Cloud中使用Hystrix

    在Spring Cloud生态系统中,Hystrix是一个至关重要的组件,它主要...在实际开发中,根据具体需求,还可以配置Hystrix的更多高级特性,如线程池、信号量隔离策略、自定义断路器规则等,以进一步优化系统的性能和稳定性。

    hystrix公司内部分享ppt

    Hystrix是一个由Netflix开源的延迟和容错库,旨在隔离远程系统、服务和第三方库的访问点,停止级联失败,提供后备选项,并实现优雅降级。它在高并发的分布式系统中尤为重要,可以在复杂的系统中保证服务调用的稳定性...

    spring cloud hystrix &&dashboard源码解读

    综上所述,`HystrixCommandAspect` 通过 AOP 技术实现了对标注了 `@HystrixCommand` 和 `@HystrixCollapser` 的方法的拦截和增强,通过不同的代理对象生成策略确保了正确地执行 Hystrix 的命令和合并逻辑。...

    14.Netflix之Hystrix详细分析

    线程池隔离每个服务调用都在独立的线程上执行,而信号量则限制并发请求数量。这两种方式都可以有效地防止单个服务故障导致整个系统瘫痪。 3. **断路器状态监控**:Hystrix维护了一个断路器,通过统计成功、失败、...

    hystrix-dashboar1.5.12

    在标题提到的 "hystrix-dashboar1.5.12" 中,我们关注的是 Hystrix Dashboard 的一个特定版本——1.5.12,这是一个可以直接部署到Web服务器上的WAR(Web ARchive)文件。 Hystrix 是一个用于处理分布式系统中延迟和...

    Hystrix服务高可用保障框架.pdf

    综上所述,Hystrix是一个功能强大的容错框架,它能够帮助开发者在开发复杂分布式系统时,有效地管理服务间的依赖关系,提升系统的健壮性和可用性。通过Hystrix提供的各项策略和工具,可以在服务故障发生时,采取适当...

    spring cloud hystrix原理介绍及使用

    对于开发者而言,只需在需要的服务调用上添加相应的Hystrix注解即可将该调用纳入Hystrix的管理之下。 简而言之,Spring Cloud Hystrix是微服务架构中不可或缺的一部分,它解决了服务间调用时可能出现的多种异常情况...

Global site tag (gtag.js) - Google Analytics