`
m635674608
  • 浏览: 5026976 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Hystrix基本使用

 
阅读更多

添加依赖

<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-core</artifactId>
    <version>1.5.8</version>
</dependency>
<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-metrics-event-stream</artifactId>
    <version>1.5.8</version>
</dependency>

HelloWorld

import com.netflix.hystrix.*;
import rx.Observable;

import java.util.concurrent.Future;

/**
 * Created by gmou on 2016/12/15.
 */
public classA_CommandHelloWorldextendsHystrixCommand<String>{

    private final String name;

    public A_CommandHelloWorld(String name) {
        super(
                Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("CommandHelloWorld"))
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroupThreadPool"))
                        .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                                .withCoreSize(20)
                                .withMaxQueueSize(40))
                        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                                // 设置隔离方式,默认线程池隔离,可以设置为信号量隔离
//                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                                // 添加熔断策略,服务不可用到一定的阈值直接使用降级方案,以免造成雪崩。
                                .withCircuitBreakerEnabled(true)
                                .withCircuitBreakerErrorThresholdPercentage(50)
                                .withCircuitBreakerSleepWindowInMilliseconds(5)
                                .withExecutionTimeoutInMilliseconds(40))
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        Thread.sleep(50);
        return String.format("Hello %s !", name);
    }

    @Override
    protected String getFallback() {
        return "fallback:" + name;
    }

    public static void main(String[] args) throws Exception {
        A_CommandHelloWorld command1 = new A_CommandHelloWorld("Jack1");
        // 同步
        command1.execute();

        // 异步
        Future<String> future = new A_CommandHelloWorld("Jack2").queue();
        future.get();

        //响应式
        // 1. 立即执行execute
        Observable observable1 = new A_CommandHelloWorld("Jack3").observe();
        // 2. subscribe后才会执行execute
        Observable observable2 = new A_CommandHelloWorld("Jack4").toObservable();

        observable1.subscribe(obj -> {
            System.out.println("next:" + obj);
        }, obj -> {
            System.out.println("error:" + obj);
        }, () -> {
            System.out.println("complete");
        });
    }

}

响应式,可以返回多次结果

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;

/**
 * 响应式
 * 可以返回多个结果,执行多次onNext
 */
public classB_ObservableCommandHelloWorldextendsHystrixObservableCommand<String>{

    private final String name;

    public B_ObservableCommandHelloWorld(String name) {
        super(
                HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                                .withCircuitBreakerEnabled(true)
                                .withExecutionTimeoutInMilliseconds(40))
        );
        this.name = name;
    }

    @Override
    protected Observable<String> construct() {
        return Observable.create(observer -> {
            try {
                if (!observer.isUnsubscribed()) {
                    // 执行onNext返回结果,可执行多次
                    observer.onNext("Hello");
                    observer.onNext(name + "!");
                    observer.onCompleted();
                }
            } catch (Exception e) {
                observer.onError(e);
//                throw new RuntimeException(e.getMessage());
            }
        });
    }

    // 服务降级
    @Override
    protected Observable<String> resumeWithFallback() {
        // 忽略异常
        Observable.empty();
        // 直接抛出异常
        Observable.error(new Throwable("failure from Command fallback"));
        // 使用常量
        Observable.just(name);
        // 服务降级,使用备选方案(从缓存获取数据,调用数据库...)
        return Observable.create(observer -> {
            try {
                if (!observer.isUnsubscribed()) {
                    observer.onNext("Fallback.Hello");
                    observer.onNext("Fallback." + name + "!");
                    observer.onCompleted();
                }
            } catch (Exception e) {
                observer.onError(e);
            }
        });
    }

    public static void main(String[] args) throws Exception {
        // 监听
        B_ObservableCommandHelloWorld observable1 = new B_ObservableCommandHelloWorld("Jack");
        observable1.toObservable().subscribe(obj -> {
            System.out.println("next:" + obj);
        }, obj -> {
            System.out.println("error:" + obj);
        }, () -> {
            System.out.println("complete");
        });

        // 同步阻塞
        B_ObservableCommandHelloWorld observable2 = new B_ObservableCommandHelloWorld("Jack2");
        // 当observable2只返回一个结果,即内部只调用一次onNext,throw Exception:Sequence contains too many elements
//        System.out.println(observable2.observe().toBlocking().toFuture().get());
        // 当observable2返回多个结果,只取第一个。没有结果则报错:Sequence contains no elements
        System.out.println(observable2.observe().toBlocking().first());

    }
}

设置缓存

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

public classC_RequestCacheCommandextendsHystrixCommand<Integer> {

    private final Integer value;

    publicC_RequestCacheCommand(int value){
        super(
                HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CacheCommand"))
        );
        this.value = value;
    }

    @Override
    protected Integer run()throws Exception {
        System.out.println("run:" + value);
        return value;
    }

    // 通过控制key来动态缓存或者设置缓存时间
    @Override
    protected String getCacheKey(){
        return value + "";
    }

    publicstaticvoidmain(String[] args)throws Exception {

        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {
            C_RequestCacheCommand command1 = new C_RequestCacheCommand(1);
            // 首次执行,execute获取结果
            System.out.println(command1.execute());
            System.out.println(command1.isResponseFromCache());
            C_RequestCacheCommand command2 = new C_RequestCacheCommand(1);
            // 再次执行,从缓存获取(同一个context中)
            System.out.println(command2.execute());
            System.out.println(command2.isResponseFromCache());
        } finally {
            context.shutdown();
        }

        context = HystrixRequestContext.initializeContext();
        try {
            // 新的context后,所有数据从新执行execute获取
            C_RequestCacheCommand command3 = new C_RequestCacheCommand(1);
            System.out.println(command3.execute());
            System.out.println(command3.isResponseFromCache());
        } finally {
            context.shutdown();
        }
    }
}

请求合并

单位时间内多次请求内部合并成一个请求,具体逻辑有Command中做
import com.netflix.hystrix.*;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

/**
 * Created by gmou on 2016/12/14.
 */
public class D_CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {

    private final Integer key;

    public D_CommandCollapserGetValueForKey(Integer key) {
        this.key = key;
    }

    @Override
    public Integer getRequestArgument() {
        return key;
    }

    @Override
    protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, Integer>> collapsedRequests) {
        System.out.println("createCommand:" + StringUtils.join(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()), ","));
        return new BatchCommand(collapsedRequests);
    }

    @Override
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> collapsedRequests) {
        System.out.println("mapResponseToRequests:" + StringUtils.join(batchResponse, ","));
        if (batchResponse == null || batchResponse.size() == 0) {
            return;
        }
        int count = 0;
        for (CollapsedRequest<String, Integer> request : collapsedRequests) {
            request.setResponse(batchResponse.get(count++));
        }
    }

    private static final class BatchCommand extends HystrixCommand<List<String>> {


        private final Collection<CollapsedRequest<String, Integer>> requests;

        private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
            System.out.println("BatchCommand:" + StringUtils.join(requests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()), ","));
            this.requests = requests;
        }

        @Override

        protected List<String> run() throws Exception {
            ArrayList<String> response = new ArrayList<String>();
            for (CollapsedRequest<String, Integer> request : requests) {
                if (request.getArgument() == 1) {
//                    throw new RuntimeException("argument can not be 1");
                }
                response.add("ret-" + request.getArgument());
            }
            return response;
        }

        @Override
        protected List<String> getFallback() {
            return new ArrayList<>();
        }
    }

    public static void main(String[] args) throws Exception {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();

        try {
            Future<String> f1 = new D_CommandCollapserGetValueForKey(1).queue();
            Future<String> f2 = new D_CommandCollapserGetValueForKey(2).queue();
            Future<String> f3 = new D_CommandCollapserGetValueForKey(3).queue();
            Future<String> f4 = new D_CommandCollapserGetValueForKey(4).queue();
            System.out.println("1 : " + f1.get());
            System.out.println("2 : " + f2.get());
            System.out.println("3 : " + f3.get());
            System.out.println("4 : " + f4.get());

//            List<Future<String>> rets = new ArrayList<>();
//            for (int i = 0; i < 100; i++) {
//                rets.add(new D_CommandCollapserGetValueForKey(i).queue());
//            }
//            for (Future<String> ret : rets) {
//                System.out.println(ret.get());
//            }

            System.out.println("ExecutedCommands.size:" + HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
            HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
            System.out.println("GetValueForKey:" + command.getCommandKey().name());
            System.out.println(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
            System.out.println(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            context.shutdown();
        }
    }

}

管理HystrixRequestContext

通过配置拦截器统一配置管理HystrixRequestContext生命周期

 通过拦截器统一设置管理HystrixRequstContext
 public class HystrixRequestContextServletFilter implements Filter {

     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
     throws IOException, ServletException {
         HystrixRequestContext context = HystrixRequestContext.initializeContext();
         try {
            chain.doFilter(request, response);
         } finally {
            context.shutdown();
         }
     }
 }

 <filter>
     <display-name>HystrixRequestContextServletFilter</display-name>
     <filter-name>HystrixRequestContextServletFilter</filter-name>
     <filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
 </filter>
 <filter-mapping>
     <filter-name>HystrixRequestContextServletFilter</filter-name>
     <url-pattern>/*</url-pattern>
 </filter-mapping>

强制熔断器开启

import com.netflix.config.ConfigurationManager;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;

/**
 * 强制开启熔断,服务降级,取消优先级低的服务依赖,提高主服务性能
 */
public classE_ForceCircuitBreakerCommandextendsHystrixCommand<Boolean>{

    public E_ForceCircuitBreakerCommand() {
        super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("ForceCommandKey")));
    }

    @Override
    protected Boolean run() throws Exception {
        return true;
    }

    @Override
    protected Boolean getFallback() {
        return false;
    }


    public static void main(String[] args) throws Exception {
        System.out.println(new E_ForceCircuitBreakerCommand().execute());
        System.out.println("before : " + new E_ForceCircuitBreakerCommand().isCircuitBreakerOpen());
        /**
         *  hystrix.command.HystrixCommandKey.circuitBreaker.forceOpen 强制设置熔断器开启。
         *  http://stackoverflow.com/questions/29165654/how-to-force-a-hystrix-circuit-breaker-open
         */
        ConfigurationManager.getConfigInstance().setProperty("hystrix.command.ForceCommandKey.circuitBreaker.forceOpen", true);
        System.out.println("after : " + new E_ForceCircuitBreakerCommand().isCircuitBreakerOpen());
        System.out.println(new E_ForceCircuitBreakerCommand().execute());
    }
}

https://my.oschina.net/gmouou/blog/807439

分享到:
评论

相关推荐

    15.Spring Cloud中使用Hystrix

    本文将深入探讨如何在Spring Cloud项目中集成并使用Hystrix,以及如何将其与Feign客户端结合。 首先,我们需要了解Hystrix的基本原理。Hystrix通过隔离请求,防止单个服务的故障蔓延至整个系统,避免雪崩效应。断路...

    spring-cloud-netflix-hystrix应用

    4. **降级策略**:当熔断器打开或服务不可用时,Hystrix会执行预先定义的降级逻辑,如返回默认值、缓存数据或展示静态页面,确保系统仍能提供基本功能。 5. **健康检查**:Hystrix通过定期发送心跳请求来监控服务的...

    hystrix-demo:Hystrix的使用demo

    4. 降级策略:当服务不可用或断路器打开时,Hystrix提供降级策略,即回退到备选方案,以确保系统的基本功能仍可正常运行。 三、Hystrix工作流程 1. 请求到达:客户端发起服务调用。 2. 创建命令:HystrixCommand被...

    Hystrix源码_可以跑起来

    在Hystrix中,命令(Command)是执行服务调用的基本单元,它实现了请求的封装和结果的缓存。用户可以通过继承`HystrixCommand`或`HystrixObservableCommand`来创建自定义命令,这两个类分别对应同步和异步执行模型。...

    Hystrix-dashboard+turbine-web+说明文档

    3. **降级策略**:在熔断后,Hystrix 允许应用提供降级策略,如返回缓存数据或显示备用界面,确保应用的基本功能不受影响。 4. **命令模式**:Hystrix 使用命令模式将业务逻辑封装在命令对象中,便于管理和监控。 *...

    spring cloud hystrix &&dashboard源码解读

    通过该配置类可以完成 Hystrix 断路器的基本设置和集成。 **配置类**: `org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration` - **主要功能**: 初始化 Hystrix 相关的组件和配置。 - **...

    Hystrix降级和熔断

    在这个场景下,可以使用Hystrix来确保即使某些服务出现故障,也不会导致整个订票系统无法使用。例如,当查询航班服务出现问题时,可以设置一个简单的降级逻辑返回一个预设的消息“暂时无法获取航班信息”,这样用户...

    hystrix接入.docx

    - **服务隔离与控制**:通过将调用封装在一个独立的执行单元中(通常是线程池或信号量),Hystrix可以有效地管理每个服务调用的资源使用情况,避免因某个服务出现问题而导致整个系统性能下降。 - **服务熔断与降级**...

    Hystrix熔断器简介及其工作原理

    4. **降级应用**:在服务不可用的情况下,Hystrix 提供了降级机制,可以返回一个简单的默认响应或备用数据,以保持应用程序的基本可用性。 #### 四、Hystrix的设计原则 1. **资源隔离**:为每个依赖服务分配独立的...

    Feign对Hystrix的支持-代码部分.zip

    1. **Feign的基本使用**:Feign通过接口定义了服务调用的方式,使用注解来指定HTTP请求的方法、URL、参数等信息。开发者只需要定义好接口,Feign会自动生成实现,实现了服务调用的声明式编程。 2. **Hystrix的集成*...

    SpringCloud10-Hystrix熔断器学习代码及指定默认的全局兜底方法

    这个标题"SpringCloud10-Hystrix熔断器学习代码及指定默认的全局兜底方法"揭示了我们要讨论的主题——如何使用Hystrix进行熔断操作,并配置全局的 fallback 方法来处理服务调用失败的情况。 Hystrix的工作原理基于...

    spring boot整合consul、fegin、HystrixCommand

    通过以上步骤,我们已经构建了一个基本的微服务架构,服务之间通过Feign进行透明化的调用,同时,HystrixCommand提供的熔断机制保障了系统的稳定性。这种架构可以很好地适应云环境,提高系统的可用性和容错性。 在...

    SpringCloud Hystrix服务熔断.docx

    5. **Hystrix Dashboard基本使用**: Hystrix Dashboard是监控Hystrix的一个可视化工具,可以实时展示服务的健康状况、请求统计、熔断情况等。通过在服务中添加Hystrix Stream依赖,并在前端页面引入Dashboard,...

    Hystrix学习讲义大全.pdf

    当与Hystrix集成时,可以定义一个fallback类,在服务调用失败时自动切换到该降级逻辑,保证基本服务的可用性。 例如,在OpenFeign中使用Hystrix,首先需要创建并配置服务提供者(如`provider-order-service`),...

    hello-hystrix:带有Spring-Boot的hystrix的基本示例

    Hystrix提供了强大的监控界面,我们可以通过`/actuator/hystrix.stream`端点获取实时的Hystrix指标,并使用Turbine或者Hystrix Dashboard进行可视化展示。 总结,通过Spring Boot与Hystrix的结合,我们可以构建出高...

    SpringCloud服务容错保护(Hystrix)介绍与使用示例

    1. **包裹请求**:使用 `HystrixCommand` 封装依赖调用,每个命令在单独线程中执行,应用了命令模式。 2. **跳闸机制**:当服务错误率超过预设阈值时,Hystrix 自动开启断路器,停止向故障服务发送请求。 3. **资源...

    SpringCloud——断路器(Hystrix)

    下面我们将深入探讨 Spring Cloud Hystrix 的核心概念、Ribbon 中的断路器使用以及 Feign 如何集成 Hystrix 来实现断路器功能。 **Hystrix 概述** Spring Cloud Hystrix 是 Netflix 开源的一个库,用于实现容错和...

    Hystrix dashboard

    通过学习和使用Hystrix Dashboard,开发者不仅可以监控微服务的健康状态,还可以及时发现和解决问题,提升系统的稳定性。对于初学者而言,这是一次宝贵的实践机会,有助于深入理解微服务架构和断路器设计模式。

    Feign屏蔽单个服务的Hystrix示例

    在这个"Feign屏蔽单个服务的Hystrix示例"中,我们将探讨如何在使用Feign进行服务调用时,为特定的服务添加Hystrix保护,以便在目标服务不可用或性能下降时,能够优雅地处理错误,避免整个系统崩溃。 首先,我们需要...

    Hystrix提供延迟和容错

    通过合理配置和使用Hystrix,开发者可以有效地应对服务间的不稳定性和延迟,提升系统的整体健壮性。 在实际应用中,Netflix-Hystrix-809104c这个版本可能包含了Hystrix的源代码、文档、示例以及其他相关资源。...

Global site tag (gtag.js) - Google Analytics