`

使用Hystrix守护应用(2)

阅读更多
接上篇(http://ningandjiao.iteye.com/blog/2171185),
执行方式
HystrixCommand提供了3种执行方式:

同步执行:即一旦开始执行该命令,当前线程就得阻塞着直到该命令返回结果,然后才能继续执行下面的逻辑。当调用命令的execute()方法即为同步执行, 示例:
  
@Test
    public void synchronousExecute() throws Exception {
        ThreadEchoCommand command = new ThreadEchoCommand("xianlinbox");
        String result = command.execute();
        assertThat(result,equalTo("Echo: xianlinbox"));
    }

异步执行:命令开始执行会返回一个Future<T>的对象,不阻塞后面的逻辑,开发者自己根据需要去获取结果。当调用HystrixCommand的queue()方法即为异步执行
   
@Test
    public void asynchronousExecute() throws Exception {
        ThreadEchoCommand command = new ThreadEchoCommand("xianlinbox");
        Future<String> result = command.queue();
        while (!result.isDone()){
            System.out.println("Do other things ...");
        }
        assertThat(result.get(),equalTo("Echo: xianlinbox"));
    }

响应式执行:命令开始执行会返回一个Observable<T> 对象,开发者可以给给Obeservable对象注册上Observer或者Action1对象,响应式地处理命令执行过程中的不同阶段。当调用HystrixCommand的observe()方法,或使用Observable的工厂方法(just(),from())即为响应式执行,这个功能的实现是基于Netflix的另一个开源项目RxJava(https://github.com/Netflix/RxJava)来的,更细节的用法可以参考:https://github.com/Netflix/Hystrix/wiki/How-To-Use#wiki-Reactive-Execution。 示例:
   
@Test
    public void reactiveExecute1() throws Exception {
        ThreadEchoCommand command1 = new ThreadEchoCommand("xianlinbox");
        Observable<String> result = command1.observe();
        result.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                logger.info("Command called. Result is:{}", s);
            }1
        });
        Thread.sleep(1000);
    }

    @Test
    public void reactiveExecute2() throws Exception {
        ThreadEchoCommand command = new ThreadEchoCommand("xianlinbox");
        Observable<String> result = command.observe();
        result.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                logger.info("Command Completed");
            }

            @Override
            public void onError(Throwable e) {
                logger.error("Command failled", e);
            }

            @Override
            public void onNext(String args) {
                logger.info("Command finished,result is {}", args);
            }
        });
        Thread.sleep(1000);
    }

隔离方式(Thread Pool和Semaphores)
Hystrix支持2种隔离方式:
ThreadPool:即根据配置把不同的命令分配到不同的线程池中,这是比较常用的隔离策略,该策略的优点是隔离性好,并且可以配置断路,某个依赖被设置断路之后,系统不会再尝试新起线程运行它,而是直接提示失败,或返回fallback值;缺点是新起线程执行命令,在执行的时候必然涉及到上下文的切换,这会造成一定的性能消耗,但是Netflix做过实验,这种消耗对比其带来的价值是完全可以接受的,具体的数据参见Hystrix Wiki(https://github.com/Netflix/Hystrix/wiki/How-it-Works#wiki-Isolation)。 本文前面的例子都是使用的TheadPool隔离策略。

Semaphores:信号量,顾名思义就是使用一个信号量来做隔离,开发者可以限制系统对某一个依赖的最高并发数。这个基本上就是一个限流的策略。每次调用依赖时都会检查一下是否到达信号量的限制值,如达到,则拒绝。该隔离策略的优点不新起线程执行命令,减少上下文切换,缺点是无法配置断路,每次都一定会去尝试获取信号量。示例:

public class SemaphoreEchoCommand extends HystrixCommand<String> {
    private Logger logger = LoggerFactory.getLogger(ThreadEchoCommand.class);
    private String input;

    protected SemaphoreEchoCommand(String input) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Semaphore Echo"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("Echo"))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                        .withExecutionIsolationSemaphoreMaxConcurrentRequests(2)));
        this.input = input;
    }

    @Override
    protected String run() throws Exception {
        logger.info("Run command with input: {}", input);
        Thread.currentThread().sleep(100);
        return "Echo: " + input;
    }
}
    @Test
    public void semaphoresCommandExecute() throws Exception {
        SemaphoreEchoCommand command = new SemaphoreEchoCommand("xianlinbox");
        assertThat(command.execute(), equalTo("Echo: xianlinbox"));
    }

    @Test
    public void semaphoresCommandMultiExecute() throws Exception {
        for (int i = 0; i < 5; i++) {
            final SemaphoreEchoCommand command = new SemaphoreEchoCommand("xianlinbox-" + i);
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    command.queue();
                }
            });
            thread.start();
        }
        Thread.sleep(1000);
    }

第一个测试的运行日志如下:
23:10:34.996 [main] INFO  c.n.c.DynamicPropertyFactory - DynamicPropertyFactory is initialized with configuration sources: com.netflix.config.ConcurrentCompositeConfiguration@2224df87
23:10:35.045 [main] INFO  d.ThreadEchoCommand - Run command with input: xianlinbox
从运行日志可以看到,HystrixCommand一样是在主线程中执行。

第二个测试的运行日志如下:
14:56:22.285 [Thread-5] INFO  d.ThreadEchoCommand - Run command with input: xianlinbox-4
14:56:22.285 [Thread-1] INFO  d.ThreadEchoCommand - Run command with input: xianlinbox-0
Exception in thread "Thread-2" Exception in thread "Thread-4" com.netflix.hystrix.exception.HystrixRuntimeException: Echo could not acquire a semaphore for execution and no fallback available.
示例中,设置的信号量最大值为2, 因此可以看到有2个线程可以成功运行命令,第三个则会得到一个无法获取信号量的HystrixRuntimeException。
优雅降级
在调用第三方服务时,总是无可避免会出现一些错误(fail,timeout等),再加上上面提到的线程池大小,信号量的限制等等,在执行HystrixComamnd的过程中,总难免会抛出一些异常。而Hystrix为执行过程中的异常情况提供了优雅的降级方案,只需要在自己的HystrixCommand中实现getFallback()方法,当异常出现时,就会自动调用getFallback()方法的值. 示例:为第一小节中的AddressHystrixCommand和ContactHystrixCommand添加getFallback()方法, 当有异常发生的时候,直接返回null:
   
@Override
    protected Contact getFallback() {
        logger.info("Met error, using fallback value: {}", customerId);
        return null;
    }

然后,停掉Stub的Contact和Address服务, 再次调用GetCustomer服务(http://localhost:8080/HystrixDemo/customers/1),得到结果如下:
{"id":"1","name":"xianlinbox","contact":null,"address":null}
运行日志:
15:22:08.847 [hystrix-Contact-1] INFO  c.x.h.d.ContactHystrixCommand - Get contact for customer 1
15:22:09.098 [hystrix-Contact-1] INFO  c.x.h.d.ContactHystrixCommand - Met error, using fallback value: 1
15:22:09.101 [hystrix-Address-1] INFO  c.x.h.d.AddressHystrixCommand - Get address for customer 1
15:22:09.103 [hystrix-Address-1] INFO  c.x.h.d.AddressHystrixCommand - Met error, using fallback value: 1
请求作用域特性
作用域设置:要想要使用请求作用域特性,首先必须把HystrixCommand置于HystrixRequestContext的生命周期管理中,其典型用法是在Web应用中增加一个ServletFilter,把每个用户Request用HystrixRequestContext包起来。示例:
public class HystrixRequestContextServletFilter implements Filter {
	...
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) 
     throws IOException, ServletException {
     	//启动HystrixRequestContext	
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {
            chain.doFilter(request, response);
        } finally {
        	//关闭HystrixRequestContext
            context.shutdown();
        }
    }
}
然后把该SevletFilter配置到web.xml中:
   
<filter>
        <display-name>HystrixRequestContextServletFilter</display-name>
        <filter-name>HystrixRequestContextServletFilter</filter-name>
        <filter-class>com.xianlinbox.hystrix.filter.HystrixRequestContextServletFilter</filter-class>
    </filter>

    <filter-mapping>
        <filter-name>HystrixRequestContextServletFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

设置了请求作用域之后,接下来看看,我们从中可以得到哪些好处:

请求缓存(Request Cache):即当用户调用HystrixCommand时,HystrixCommand直接从缓存中取而不需要调用外部服务。HystrixCommand从缓存中取需要3个条件:
1. 该HystrixCommand被包裹一个HystrixRequestContext中
2. 该HystrixCommand实现了getCacheKey()方法
3. 在HystrixRequestContext中已有相同Cache Key值的缓存
示例:
public void requestCache() throws Exception {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {
            ThreadEchoCommand command1 = new ThreadEchoCommand("xianlinbox");
            ThreadEchoCommand command2 = new ThreadEchoCommand("xianlinbox");

            assertThat(command1.execute(),equalTo("Echo: xianlinbox"));
            assertThat(command1.isResponseFromCache(),equalTo(false));
            assertThat(command2.execute(),equalTo("Echo: xianlinbox"));
            assertThat(command2.isResponseFromCache(),equalTo(true));
        } finally {
            context.shutdown();
        }

        context = HystrixRequestContext.initializeContext();
        try {
            ThreadEchoCommand command3 = new ThreadEchoCommand("xianlinbox");
            assertThat(command3.execute(),equalTo("Echo: xianlinbox"));
            assertThat(command3.isResponseFromCache(),equalTo(false));
        } finally {
            context.shutdown();
        }
    }
从上面的例子看以得到,一旦重新初始化了RequestContext,Cache也都全部失效了。另外,从Cache中获取值不会去执行HystrixCommand的run()方法。

除了重新初始化RequestContext,Hystrix还提供了另外一种方式来刷新Cache,该方式需要使用HystrixRequestCache的clear()方法,示例:在ThreadEchoCommand中实现一个静态方法flushCache( ),该方法会调用HystrixRequestCache的clear方法清理Cache
  
 public static void flushCache(String cacheKey) {
        HystrixRequestCache.getInstance(HystrixCommandKey.Factory.asKey("Echo"),
                HystrixConcurrencyStrategyDefault.getInstance()).clear(cacheKey);
    }
    
    @Test
    public void flushCacheTest() throws Exception {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {
            ThreadEchoCommand command1 = new ThreadEchoCommand("xianlinbox");
            ThreadEchoCommand command2 = new ThreadEchoCommand("xianlinbox");

            assertThat(command1.execute(), equalTo("Echo: xianlinbox"));
            assertThat(command1.isResponseFromCache(), equalTo(false));
            assertThat(command2.execute(), equalTo("Echo: xianlinbox"));
            assertThat(command2.isResponseFromCache(), equalTo(true));

            ThreadEchoCommand.flushCache("xianlinbox");
            ThreadEchoCommand command3 = new ThreadEchoCommand("xianlinbox");
            assertThat(command3.execute(), equalTo("Echo: xianlinbox"));
            assertThat(command3.isResponseFromCache(), equalTo(false));
        } finally {
            context.shutdown();
        }
    }

通过这个机制,开发者可以实现Get-Set-Get的Cache验证机制,防止因为Cache导致的不一致状况。

批量执行请求(Request Collapsing):即用户可以把多个命令封装到一个HystrixCommand中执行以提升效率,这多个命令会在一个线程中依次执行(注:经笔者测试,在JDK6下线程数固定,但是在JDK7下的运行线程数不固定)。要使用该特性需要把依赖调用封装到一个HystrixCollapser<BatchReturnType,ResponseType,RequestArgumentType>中, 该抽象类的主要作用有3个:
1. 把所有的依赖调用封装到一个CollapseRequest的集合中
2. 以第一步得到的CollapseRequest集合为参数创建一个HystrixCommand
3. 把第二步得到的结果集一一对应的设置到对应的CollapseRequest中

为了支持上面的功能,该抽象类提供了3个泛型参数:
BatchReturnType:即BatchCommand的返回值,通常为ResponseType的集合。
ResponseType:依赖调用的返回值。
RequestArgumentType:依赖调用的参数,如果有多个参数,需封装为一个对象或使用集合。
示例:
public class CollapseEchoHystrixCommand extends HystrixCollapser<List<String>, String, String> {
    private Logger logger = LoggerFactory.getLogger(CollapseEchoHystrixCommand.class);
    private String input;

    public CollapseEchoHystrixCommand(String input) {
        super(HystrixCollapser.Setter
                .withCollapserKey(HystrixCollapserKey.Factory.asKey("Echo Collapse")));
        this.input = input;
    }

    @Override
    public String getRequestArgument() {
        return input;
    }

    @Override
    protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) {
        return new BatchCommand(collapsedRequests);
    }

    @Override
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> collapsedRequests) {
        logger.info("Mapping response to Request");
        int count = 0;
        for (CollapsedRequest<String, String> request : collapsedRequests) {
            request.setResponse(batchResponse.get(count++));
        }

    }

    private class BatchCommand extends HystrixCommand<List<String>> {
        private Collection<CollapsedRequest<String, String>> requests;

        public BatchCommand(Collection<CollapsedRequest<String, String>> requests) {
            super(HystrixCommandGroupKey.Factory.asKey("Batch"));
            this.requests = requests;
        }

        @Override
        protected List<String> run() throws Exception {
            logger.info("Run batch command");
            List<String> responses = new ArrayList<String>();
            for (CollapsedRequest<String, String> request : requests) {
                logger.info("Run request: {}", request.getArgument());
                responses.add("Echo: " + request.getArgument());
            }
            return responses;
        }
    }
}

    @Test
    public void collapseCommandTest() throws Exception {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();

        try {
            Future<String> result1 = new CollapseEchoHystrixCommand("xianlinbox-1").queue();
            Future<String> result2 = new CollapseEchoHystrixCommand("xianlinbox-2").queue();
            Future<String> result3 = new CollapseEchoHystrixCommand("xianlinbox-3").queue();

            assertThat(result1.get(),equalTo("Echo: xianlinbox-1"));
            assertThat(result2.get(),equalTo("Echo: xianlinbox-2"));
            assertThat(result3.get(),equalTo("Echo: xianlinbox-3"));

            assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
        } finally {
            context.shutdown();
        }
    }

运行日志:
03:10:58.584 [main] INFO  d.CollapseEchoHystrixCommand - Get argument
03:10:58.597 [main] INFO  d.CollapseEchoHystrixCommand - Get argument
03:10:58.597 [main] INFO  d.CollapseEchoHystrixCommand - Get argument
03:10:58.598 [HystrixTimer-1] INFO  d.CollapseEchoHystrixCommand - Create batch command
03:10:58.637 [hystrix-Batch-1] INFO  d.CollapseEchoHystrixCommand - Run batch command
03:10:58.637 [hystrix-Batch-1] INFO  d.CollapseEchoHystrixCommand - Run request: xianlinbox-1
03:10:58.639 [hystrix-Batch-1] INFO  d.CollapseEchoHystrixCommand - Run request: xianlinbox-2
03:10:58.639 [hystrix-Batch-1] INFO  d.CollapseEchoHystrixCommand - Run request: xianlinbox-3
03:10:58.644 [RxComputationThreadPool-1] INFO  d.CollapseEchoHystrixCommand - Mapping response to Request

从运行日志可以看到,整个Collapser的运行过程:
1. 获取调用参数,封装到CollapseRequest中
2. 以封装后的List<CollapseRequest>为参数创建Batch HystrixComand
3. Batch HystrixCommand运行所有的请求,把所有的返回放到List<Response>中
4. 把Response设置到对应的CollapseRequest中,返回给调用者。
1
0
分享到:
评论

相关推荐

    15.Spring Cloud中使用Hystrix

    本文将深入探讨如何在Spring Cloud项目中集成并使用Hystrix,以及如何将其与Feign客户端结合。 首先,我们需要了解Hystrix的基本原理。Hystrix通过隔离请求,防止单个服务的故障蔓延至整个系统,避免雪崩效应。断路...

    spring-cloud-netflix-hystrix应用

    4. **使用HystrixCommand**:在服务调用处使用HystrixCommand,确保服务调用被正确地封装和隔离。 5. **实现降级逻辑**:编写降级方法,当服务不可用时自动执行。 6. **集成监控**:集成Hystrix Dashboard和Turbine...

    spring cloud hystrix原理介绍及使用

    降级逻辑可以通过HystrixCommand类中的fallbackMethod方法来指定,或者在响应式调用中使用fallback操作符。 Hystrix的入门相对简单,其提供的注解编程模型允许开发者通过极简的配置就能实现上述功能。对于开发者而...

    springcloud hystrix 断路由

    在 `SpringCloudDemo-Hystrix` 压缩包中,通常包含了示例代码,演示了如何在 Spring Boot 应用中配置和使用 Hystrix。开发者可以通过阅读这些代码,学习如何将断路器集成到自己的服务中。 总的来说,Spring Cloud ...

    Hystrix Dashboard的使用-代码部分.zip

    2. **Hystrix Dashboard 工程**: 包含了启用 Hystrix Dashboard 的配置和相关设置,以及如何暴露监控端点的示例。 3. **Turbine 工程**: 实现了如何聚合多个服务实例的 Hystrix 流数据,以及如何配置 Eureka 服务...

    hystrix-dashboar1.5.12

    2. **Hystrix Dashboard 1.5.12 版本特性**: - 该版本可能包含了对前一版本的优化和bug修复,提供了更稳定的监控体验。 - 可能新增或改进了一些功能,比如增强的数据分析能力或者用户体验改进。 3. **部署与使用...

    断路器hystrix实现.rar

    2. **创建Hystrix命令**:Hystrix的核心是命令(Command)模式,我们需要为每个服务调用创建一个HystrixCommand子类。在命令类中,定义执行业务逻辑的`run()`方法和备用的`fallback()`方法。`run()`方法是正常的业务...

    微服务断路器hystrix应用实例java工程源码.zip

    本压缩包“微服务断路器hystrix应用实例java工程源码.zip”包含了一个基于Java实现的Hystrix应用实例,这个实例旨在帮助开发者理解和掌握如何在实际项目中集成和使用Hystrix。通过分析源码,我们可以深入学习以下几...

    hystrix公司内部分享ppt

    Hystrix是一个由Netflix开源的延迟和容错库,旨在隔离远程系统、服务和第三方库的访问点,停止级联失败,提供后备选项,并实现优雅降级。...这说明了使用Hystrix进行服务调用时,资源隔离与限流的重要性。

    SpringCloud -Hystrix监控面板及数据聚合(Turbine)介绍与使用示例

    要使用 Hystrix Dashboard,首先需要在项目中引入 `spring-cloud-starter-netflix-hystrix-dashboard` 依赖,并在启动类上添加 `@EnableHystrixDashboard` 注解以开启监控功能。配置文件中指定应用名称和监听端口,...

    hystrix简介

    例如,对于一个依赖于 30 个服务的应用程序,每个服务的正常运行时间为 99.99%,那么整体系统的预期正常运行时间将降至 99.7%,这意味着每月可能会有超过 2 小时的潜在停机时间。此外,即使所有依赖服务都保持极高的...

    hystrix-dashboard.zip

    【描述】提到的内容表明,这个压缩包可能包含了一个完整的示例项目,用于演示如何在Spring Cloud应用中集成和使用Hystrix Dashboard。在文章《Spring Cloud Hystrix Dashboard实战》...

    Hystrix-dashboard+turbine-web+说明文档

    3. **降级策略**:在熔断后,Hystrix 允许应用提供降级策略,如返回缓存数据或显示备用界面,确保应用的基本功能不受影响。 4. **命令模式**:Hystrix 使用命令模式将业务逻辑封装在命令对象中,便于管理和监控。 *...

    Hystrix 熔断、降级实例

    "HystrixComma"可能是压缩包中的一个示例文件,它可能包含了一些Hystrix的使用代码或者配置示例。通过分析和学习这些代码,我们可以更好地理解Hystrix的用法和配置。 总结,Hystrix通过HystrixCommand实现了服务...

    熔断器Hystrix的使用与原理.pdf

    • 1. 服务雪崩效应 • 2. 服务雪崩应对策略 ... 使用Hystrix预防服务雪崩 • 4. 预售中Hystrix的运用 • 5. Hystrix的实现 • 6. Hystrix的运维 • 7. 题外话: Netflix的技术栈 • 8. 题外话: 响应式编程

    Hystrix是如何保护应用验证1

    2. **线程池隔离策略**:Hystrix使用线程池来隔离命令执行,限制每个服务的并发请求量。在压力测试中,当线程池大小设置为10时,在250个并发请求下线程池已满,表明线程池设置过小。调整线程池大小至20后,250个并发...

    springcloud hystrix的使用

    2. **配置Hystrix**:在`application.yml`或`application.properties`文件中配置Hystrix的默认属性,例如超时时间、熔断阈值等。 3. **创建Hystrix命令**:通过实现`HystrixCommand`接口或使用注解`@HystrixCommand...

    微服务springcloud之feign和hystrix使用demo

    feign中包含了hystrix以及ribbon,即feign在不导入hystrix和ribbon的依赖下也能完成他们所能实现的功能,当然,如果想使用hystrix和ribbon自带的注解以及配置,必须导入依赖才可以,feign结合hystrix实现熔断+降级,...

    APM之hystrix的使用

    APM之hystrix的使用,改造老的项目,没有使用spring cloud全家桶的情况下如何使用

    Hystrix源码_可以跑起来

    在源代码中找到示例应用,理解其工作流程,通过调试器设置断点,逐步跟踪命令的执行过程。 在源码中,你会看到如`HystrixCommand.execute()`或`HystrixCommand.queue().get()`这样的方法调用,它们是发起服务请求的...

Global site tag (gtag.js) - Google Analytics