`

SpringCloud(四)番外篇(二):Hystrix 1.5.12 源码分析

阅读更多

  

编写不易,转载请注明(http://shihlei.iteye.com/blog/2429846)!

一 概述

书接前篇,《Hystrix:断路器》 对断路器和Hystrix做了简单的介绍,旨在帮助读者做个简单入门。本文简单分下Hystrix的源码实现,帮助读者更好的了解Hystrix。

 

分析版本:

 

<dependency>  
    <groupId>com.netflix.hystrix</groupId>  
    <artifactId>hystrix-core</artifactId>  
    <version>1.5.12</version>  
</dependency>
 

 

如之前所述:Hystrix 底层基于 RxJava,RxJava 是响应式编程开发库,因此Hystrix的整个实现策略简单说即:把一个HystrixCommand封装成一个Observable(待观察者),针对自身要实现的核心功能,对Observable进行各种装饰,并在订阅各步装饰的Observable,以便在指定事件到达时,添加自己的业务。

 

阅读之前,建议对RxJava做个了解,Hystrix 实现基于RxJava,大量使用RxJava的相关操作,推荐之前的文章《响应式编程 RxJava》《RxJava2.x 操作Demo 》

 

二 Hystrix的执行流程

 

 

 

流程详细说明参见《Hystrix:断路器》处理流程分析部分;

 

根据执行流程,将实现分为如下几块:

(1)主流程HystrixCommand 包装成 Observable 执行:AbstractCommand

(2)异步执行:HystrixContextScheduler

(3)超时中断:HystrixObservableTimeoutOperator

(4)断路器:HystrixCircuitBreaker

(5)滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream()

(6)事件处理流:HystrixEventStream

(7)getFallback() 执行

 

三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable()

 

1)AbstractCommand<R> toObservable() 

流程中的(1)(2)步HystrixCommand的所有执行方法 execute( ) , queue() , observe() , toObservable() 最终都归结于 AbstractCommand<R> toObservable() 方法调用,直接看该方法。

 

public Observable<R> toObservable() {
        final AbstractCommand<R> _cmd = this;

        // 。。。。。。

        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                return applyHystrixSemantics(_cmd);
            }
        };

	// 。。。。。。
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {

                // 。。。。。。

                // 返回一个冷Observable,有订阅才会执行,并且数据是最新的
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 把observable 放进缓存 。。。。。。
                if (requestCacheEnabled && cacheKey != null) {
                    // wrap it for caching
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // another thread beat us so we'll use the cached value instead
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // we just created an ObservableCommand so we cast and return it
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                        .doOnCompleted(fireOnCompletedHook);
            }
        });
    }
 

 

    分析(删除了无用的代码):

(1)重点方法 applyHystrixSemantics() ,包装HystrixCommand; 最终的返回值,通过 Observable 的 defer 操作做延迟绑定,保证观察者获取HystrixCommand执行的最新数据,包装后的Observable:hystrixObservable

(2)hystrixObservable 再进行缓存包装 ,也是通过defer操作完成,提供了requestCache能力,因为不是本篇的重点,不做分析。

 

RxJava defer操作讲解,参见 《RxJava2.x 操作Demo》:》 二 Observable 操作符 》 1) defer操作符 

 

 

    2)applyHystrixSemantics(_cmd):封装Hystrix的核心流程,根据 HystrixCircuitBreaker 提供熔断器状态,确定执行run() 还是getFallback();

 

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
       	  	
    	//。。。。。。

        if (circuitBreaker.attemptExecution()) { //判断是否可执行
	//。。。。。。        	
            if (executionSemaphore.tryAcquire()) { // 判断是否具有信号量资源
                try {
                    
                    //设置开始时间,用于监控执行超时。
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

                    // 继续包装HystrixCommand Observable,注册观察者,执行
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else { //信号量资源不足,拒绝执行,执行getFallBack()方法
                return handleSemaphoreRejectionViaFallback();
            }
        } else { // 不可执行,快速失效,执行getFallBack()方法
            return handleShortCircuitViaFallback();
        }
}
 

    分析(删除了无用的代码):

(1)circuitBreaker.attemptExecution() 断路器状态判断,是否可以执行;可执行则继续包装;不可执行,直接调用getFallBack();关于 HystrixCircuitBreaker 后面详细介绍

(2)executionSemaphore.tryAcquire() 判断是否有信号量资源,没有则直接调用getFallBack();

(3)如果条件都满足,则 调用 executeCommandAndObserve(_cmd)  继续包装 HystrixCommand执行。

 

    3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要,根据用户指定的隔离级别(线程,信号量),包装相应的执行环境,执行Command

 

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

       	// 。。。。。。

        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) { // 如果开启超时功能,则包装超时中断能力
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

        } else { //否则不需要包装中断能力
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
}

 

分析(删除了无用的代码):

(1)本方法包括两部分

(a)注册观察者,监听Command执行事件,用于更新统计信息,先不详细介绍,忽略了

(b)包装超时熔断能力:HystrixObservableTimeoutOperator

 

(2)超时熔断能力通过 Observable 的 left 操作 在原有的 HystrixObservable 上进行包装实现的,核心 HystrixObservableTimeoutOperator

execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));

 

RxJava left 操作讲解,参见 《RxJava2.x 操作Demo》:》 二 Observable 操作符 》 2)lift 操作符 

 

(3)最终调用  executeCommandWithSpecifiedIsolation(_cmd) 根据隔离级别选择是“线程方式”隔离执行还是“信号量方式”隔离执行;

 

 4)  executeCommandWithSpecifiedIsolation(_cmd)  具体的执行方法:

 

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { //Hystrix 配置的隔离策略是线程 
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
          
            return Observable.defer(new Func0<Observable<R>>() {

            // 。。。。。。

            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        } else { // Hystrx配置的隔离策略是信号量
            // 。。。。。。
            return Observable.defer(new Func0<Observable<R>>() {
                // 。。。。。。
            });
        }
}

 

分析(删除了无用的代码):

(1)这里就两件事:

(a)如果隔离策略是线程,则“线程策略”执行

(b)如果隔离策略是信号量,则“信号量策略”执行

 

(2)咋启动的线程呢,RxJava本身就提供异步执行的能力,通过 Observable 的 subscribeOn 绑定执行方式,底层具体实现是 线程调度:Scheduler。

 

马上进入下一章节,详细讲解一下

 

四 异步执行:HystrixContextScheduler

 

1)概述

 

Hystrix 要完成超时中断,需要异步执行,调用线程才能隔离影响,该功能借助RxJava的 Scheduler 机制实现;

 

关于 RxJava Scheduler 可以提前阅读下:《RxJava2.x 操作Demo》:》 三 线程调度:Scheduler  做个了解;

 

Hystrix实现异步的类结构如下:



  

主要职能:

HystrixThreadPoolDefault:依赖BlockingQueue<Runnable> queue 和 ThreadPoolExecutor threadPool; 统一线程池和任务队列

 

(a)Scheduler:调度器,用于调度一个Worker执行任务,核心方法:Worker createWorker();

HystrixContextScheduler: 这个调度器除了封装线程池和同步策略外,创建一个HystrixContextSchedulerWorker,没有其他特殊的操作

ThreadPoolScheduler:线程池调度器,负责产生ThreadPoolWorker 向线程池提交任务

 

(b)Scheduler.Worker:执行线程任务,核心方法:schedule(@NonNull Runnable run)

HystrixContextSchedulerWorker:代理ThreadPoolWorker

ThreadPoolWorker:负责在线程池线ThreadPoolExecutor中提交任务,具有中断任务能力

 

各个实现组件做个比较

 

(1)执行代码

RxJava :observable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread())

 

Hystrix: 

.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
   @Override
        public Boolean call() {
            return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
        }
}));

 

(2)核心组件

(a)RxJava: Scheduler:调度器,用于调度一个Worker执行任务,核心方法:Worker createWorker();

Hystrix:HystrixContextScheduler --》 依赖:ThreadPoolScheduler  返回的worker 即 ThreadPoolWorker

 

(b)RxJava: Scheduler.Worker:执行线程任务,核心方法:schedule(@NonNull Runnable run)

Hystrix:HystrixContextSchedulerWorker --》 依赖:ThreadPoolScheduler --》 ThreadPoolWorker

 

2)具体实现 

(1)Hystrix指定Scheduler:

通过executeCommandWithSpecifiedIsolation(_cmd)  里的subscribeOn()方法完成;

 
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
	        @Override
	        public Boolean call() {
	            return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
	        }
    }));
 

分析:这里使用 HystrixThreadPool 获得 Scheduler(调度器) ; 默认实现 HystrixThreadPoolDefault:维护线程池和任务队列。

 

(2)HystrixThreadPoolDefault:HystrixThreadPool 内部类
static class HystrixThreadPoolDefault implements HystrixThreadPool {

  	// 。。。。。。

 	// Hystrix线程池配置,demo中有指定
        private final HystrixThreadPoolProperties properties;

        //任务队列
        private final BlockingQueue<Runnable> queue;

        //线程池
        private final ThreadPoolExecutor threadPool;

        //线程监控
        private final HystrixThreadPoolMetrics metrics;

        // 队列大小
        private final int queueSize;

        public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
            this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
            HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            this.queueSize = properties.maxQueueSize().get();

            this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
                    concurrencyStrategy.getThreadPool(threadPoolKey, properties),
                    properties);
            this.threadPool = this.metrics.getThreadPool();
            this.queue = this.threadPool.getQueue();

            // 。。。。。。
        }

       	// 。。。。。。

        @Override
        public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
            touchConfig();
            return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
        }

        // 。。。。。。
}

 

分析(删除了无用的代码):HystrixThreadPoolDefault 主要维护一个线程池和任务队列,用于异步操作,最后会传给 HystrixContextScheduler 调度器使用。

 

(3)HystrixContextScheduler :上下文调度器

 

public class HystrixContextScheduler extends Scheduler {

    private final HystrixConcurrencyStrategy concurrencyStrategy;
    private final Scheduler actualScheduler;
    private final HystrixThreadPool threadPool;

    // 。。。。。。

    public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
        this.concurrencyStrategy = concurrencyStrategy;
        this.threadPool = threadPool;
        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
    }

    @Override
    public Worker createWorker() {
        return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
    }

   	// 。。。。。。
}

 

分析(删除了无用的代码):这个调度器除了封装线程池和同步策略外,创建一个HystrixContextSchedulerWorker,没有其他特殊的操作。

 

(4)HystrixContextSchedulerWorker :

 

上下文Worker,只代理,具体的调度工作由传入的actualWorker 完成(即 ThreadPoolWorker)

 

    private class HystrixContextSchedulerWorker extends Worker {

        private final Worker worker;

        private HystrixContextSchedulerWorker(Worker actualWorker) {
            this.worker = actualWorker;
        }

        //。。。。。。。

        @Override
        public Subscription schedule(Action0 action) {
            if (threadPool != null) {
                if (!threadPool.isQueueSpaceAvailable()) {
                    throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
                }
            }
            return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
        }

}

 

分析(删除了无用的代码):HystrixContextSchedulerWorker,这个调度器除了封装线程池和同步策略外,只一个HystrixContextSchedulerWorker 外,没有特殊的操作。

    

(5) ThreadPoolScheduler:

真正的线程池调度Scheduler,创建一个 ThreadPoolWorker

 

private static class ThreadPoolScheduler extends Scheduler {

        private final HystrixThreadPool threadPool;
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }

        @Override
        public Worker createWorker() {
            return new ThreadPoolWorker(threadPool, shouldInterruptThread);
        }

 }

 

(6) ThreadPoolWorker:

负责在线程池线ThreadPoolExecutor中提交任务,具有中断任务能力;

    

private static class ThreadPoolWorker extends Worker {

        private final HystrixThreadPool threadPool;
        private final CompositeSubscription subscription = new CompositeSubscription();
        private final Func0<Boolean> shouldInterruptThread;

        public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
            this.threadPool = threadPool;
            this.shouldInterruptThread = shouldInterruptThread;
        }

      	// 。。。。。。

        @Override
        public Subscription schedule(final Action0 action) {
            if (subscription.isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }

            ScheduledAction sa = new ScheduledAction(action);

            subscription.add(sa);
            sa.addParent(subscription);

            ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
            FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
            sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

            return sa;
        }

        // 。。。。。。
}

 

 分析(删除了无用的代码):schedule() 方法向线程池提交任务,关键返回一个Subscription,用于调用端使用,或中断当前任务执行;

 

五 超时中断:HystrixObservableTimeoutOperator

 

1)概述:

 

Hystrix实现异步的类结构如下:



 

 

其中:

HystrixObservableTimeoutOperator:包装超时中断的主流程,借助下面的组件实现超时中断逻辑

(1)HystrixTimer:依赖ScheduledExecutor,最终使用JDK的ScheduledThreadPoolExecutor 实现指定时间后的回调

(2)TimerListener:实现超时的回调逻辑

(3)CompositeSubscription:RxJava 1.x 提供,通过unsubscribe()方法可以中断当前任务的执行。

 

2)看一下细节:

(1)回到之前讲解的主流程:

 

三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中

》 3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要的是,根据用户指定的隔离级别(线程,信号量),包装响应的执行环境,执行

 

 

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
   	// 。。。。。。

    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
        		// 添加超时中断逻辑
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

    } else {
    	// 。。。。。。
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}
 

 

分析(删除了无用的代码):超时中断主要通过:lift 操作,使用new HystrixObservableTimeoutOperator<R>(_cmd) 包装 HystrixObservable实现的。

 

(2)HystrixObservableTimeoutOperator 实现

 

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

    final AbstractCommand<R> originalCommand;

    public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
        this.originalCommand = originalCommand;
    }

    @Override
    public Subscriber<? super R> call(final Subscriber<? super R> child) {

    	// 创建一个组合订阅,用于添加一组Disposable,快速解除订阅,这里用于超时,中断任务
        final CompositeSubscription s = new CompositeSubscription();

        child.add(s);

        //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
        final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();

        //创建一个超时回调类
        TimerListener listener = new TimerListener() {

            @Override
            public void tick() {
                //设置TimedOutStatus
                if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    // 报告超时
                    originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                    // 通过 CompositeSubscription 的 unsubscribe 中断正在异步执行的HystrixCommand
                    s.unsubscribe();

                    final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                        @Override
                        public void run() {
                            child.onError(new HystrixTimeoutException());
                        }
                    });

                    //启动timeoutRunnable,就是执行上面的 child.onError(new HystrixTimeoutException()); 发送error时间,通知观察者
                    timeoutRunnable.run();
                }
            }

            //获取超时时间
            @Override
            public int getIntervalTimeInMilliseconds() {
                return originalCommand.properties.executionTimeoutInMilliseconds().get();
            }
        };

        final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

       	// 设置当前时间,用于比对超时
        originalCommand.timeoutTimer.set(tl);

       Subscriber<R> parent = new Subscriber<R>() {

            @Override
            public void onCompleted() {
                if (isNotTimedOut()) {
                    // stop timer and pass notification through
                    tl.clear();
                    child.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                if (isNotTimedOut()) {
                    // stop timer and pass notification through
                    tl.clear();
                    child.onError(e);
                }
            }

            @Override
            public void onNext(R v) {
                if (isNotTimedOut()) {
                    child.onNext(v);
                }
            }

            private boolean isNotTimedOut() {
                // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                        originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
            }

        };

        // if s is unsubscribed we want to unsubscribe the parent
        s.add(parent);
}
 

分析(删除了无用的代码):

(1)创建 TimerListener 实例,添加监听到超时的回调逻辑,指定超时时间(用户通过executionTimeoutInMilliseconds 配置的超时时间)

主要逻辑:

(a)设置HystrixCommand状态为超时

(b)创建 CompositeSubscription 绑定之前异步的Subscription 使 调用端可以根据超时中断 当前任务

(c)若超时,异步发送超时事件,通知HystrixCommand执行的观察者

 

(2)CompositeSubscription 绑定父子Subscription ,用于中断任务

 

RxJava CompositeSubscription使用,可参见 《RxJava2.x 操作Demo》:》 四 解除订阅,中断任务:Disposable 》 (2) CompositeDisposable 尽管 1.x 和2.x 有差别

 

(3) HystrixTimer 和 HystrixTimerListener:比较简单,就是ScheduledThreadPoolExecutor 定时触发

 

public static interface TimerListener {
  
    //指定时间周期触发回调
    public void tick();

    //时间周期
    public int getIntervalTimeInMilliseconds();
}

public class HystrixTimer {

	private static final Logger logger = LoggerFactory.getLogger(HystrixTimer.class);

	private static HystrixTimer INSTANCE = new HystrixTimer();

	// 。。。。。。

	public Reference<TimerListener> addTimerListener(final TimerListener listener) {
	    startThreadIfNeeded();
	    // add the listener

	    Runnable r = new Runnable() {

	        @Override
	        public void run() {
	            try {
	                listener.tick();
	            } catch (Exception e) {
	                logger.error("Failed while ticking TimerListener", e);
	            }
	        }
	    };

	    // 使用
	    ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
	    return new TimerReference(listener, f);
	}


	static class ScheduledExecutor {
	    volatile ScheduledThreadPoolExecutor executor;
	    
	    // 。。。。。。

	    public void initialize() {

	    	// 。。。。。。
	        executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
	        initialized = true;
	    }

	   // 。。。。。。
	}
}

 

六 断路器:HystrixCircuitBreaker

 

1)概述

 

HystrixCircuitBreaker 提供了状态维护功能,其使用体现在如下部分:

(1)HystrixCommand 执行前 通过 HystrixCircuitBreaker 根据状态判断是否允许执行:

open:禁止执行;

close 可以执行;

half_close:进入open状态已超过等待重试时间,则可以执行,否则禁止执行。

 

(2)订阅HystrixCommandMetrics 滑动窗口的数据统计更新事件,根据统计数据(错误率),决定是否进入open状态。

 

HystrixCircuitBreaker的类结构如下:

 

 

 2)看看细节:

(1)HystrixCircuitBreaker接口:定义了主要服务

 

 

public interface HystrixCircuitBreaker {

    /**
     * 是否允许HystrixCommand请求执行,如果断路器是half-open(半开)状态,则根据逻辑允许部分请求执行和改变状态。
     */
    boolean allowRequest();

    /**
     * 断路器当前是否打开状态
     */
    boolean isOpen();

    /**
     * half-open(半开)状态的反馈机制,执行成功时调用
     */
    void markSuccess();

    /**
     * half-open(半开)状态的反馈机制,执行失败调用
     */
    void markNonSuccess();

    /**
     * HystrixCommand执行行前,判断是否尝试执行
     */
    boolean attemptExecution();
}
  

 

 分析:这里就几个方法,简单做了注释;

 
(2)内部类 HystrixCircuitBreakerImpl 断路器实现(重要):

 

HystrixCircuitBreakerImpl实现断路器的核心逻辑:状态定义,状态维护。

 

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
	// 用户配置定义
    private final HystrixCommandProperties properties;

    // HystrixCommand 执行指标数据
    private final HystrixCommandMetrics metrics;

    // 断路器状态定义
    enum Status {
        CLOSED, OPEN, HALF_OPEN;
    }

    // 断路器当前状态,利用原子操作保证线程安全
    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);

    // 断路器是否打开,利用原子操作保证线程安全,circuitOpened > 0 则为打开状态,同时该变量记录了打开的时间,用于间隔“circuitBreakerSleepWindowInMilliseconds”重试,
    private final AtomicLong circuitOpened = new AtomicLong(-1);

    //指标数据订阅
    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;

        //订阅 metrics 指标数据,一旦HystrixCommand执行结束,触发监控更新,“断路器” HystrixCircuitBreaker 重新计算并进入“关闭”或“打开“状态
        Subscription s = subscribeToStream();
        activeSubscription.set(s);
    }

    /**
    * 本方法是核心:订阅 metrics 指标数据事件,计算 OPEN/CLOSED 状态,并更新
    */ 
    private Subscription subscribeToStream() {

        return metrics.getHealthCountsStream()
                .observe()
                .subscribe(new Subscriber<HealthCounts>() {
                   
                    // 。。。。。。

                    @Override
                    public void onNext(HealthCounts hc) {
                        
                        if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {

                        // 判断一个滑动窗口内“触发熔断”要达到的最小访问次数,没有达到,则不改变状态;
                        // 指标由用户通过“statisticalWindowVolumeThreshold”定义;  

                        } else {
                            if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

                                // 判断滑动窗口时间内,错误率是否达到用户指定的错误率,没有达到,则不改变状态
                                // 指标由用户通过“circuitBreakerErrorThresholdPercentage”定义;  

                            } else {

                                // 滑动窗口内错误率超过用户指定阈值,断路器进入打开状态,记录打开时间  
                                // 特别注意:这里使用了 compareAndSet ,不会重复打开
                                if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {                                   
                                    circuitOpened.set(System.currentTimeMillis());
                                }
                            }
                        }
                    }
                });
    }

    @Override
    public void markSuccess() {
    	// 半开状态下,尝试执行成功,断路器关闭
        if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {                
            // 。。。。。。
        }
    }

    @Override
    public void markNonSuccess() {
    	// 半开状态,尝试执行失败,断路器打开
        if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
            // 。。。。。。
        }
    }

    // 判断断路器是否打开,很简单,不解释
    @Override
    public boolean isOpen() {
        if (properties.circuitBreakerForceOpen().get()) {
            return true;
        }
        if (properties.circuitBreakerForceClosed().get()) {
            return false;
        }
        //circuitOpened > 0 打开状态
        return circuitOpened.get() >= 0;
    }

    //HystrixCommand执行行前,判断是否允许请求
    @Override
    public boolean allowRequest() {
        if (properties.circuitBreakerForceOpen().get()) {
            return false;
        }
        if (properties.circuitBreakerForceClosed().get()) {
            return true;
        }
        
        if (circuitOpened.get() == -1) { 
        	// 断路器关闭,可以执行
            return true;
        } else {
        	//断路器半开状态,不允许执行
            if (status.get().equals(Status.HALF_OPEN)) {
                return false;
            } else {
            	//根判断是否经过了重试等待时间,即“当前时间”是否大于“断路器打开时间” + 用户指定的“circuitBreakerSleepWindowInMilliseconds” 等待重试时间
                return isAfterSleepWindow();
            }
        }
    }

    private boolean isAfterSleepWindow() {
        final long circuitOpenTime = circuitOpened.get();
        final long currentTime = System.currentTimeMillis();
        final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
        return currentTime > circuitOpenTime + sleepWindowTime;
    }

    @Override
    public boolean attemptExecution() {
        if (properties.circuitBreakerForceOpen().get()) {
            return false;
        }
        if (properties.circuitBreakerForceClosed().get()) {
            return true;
        }
        if (circuitOpened.get() == -1) {
        	//断路器半开状态,不允许执行
            return true;
        } else {
        	//根判断是否经过了重试等待时间,即“当前时间”是否大于“断路器打开时间” + 用户指定的“circuitBreakerSleepWindowInMilliseconds” 等待重试时间
            if (isAfterSleepWindow()) { 
            	//等待重试时间已到
                if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                    // 首次请求,断路器状态进入“半开状态”,可以尝试执行HystrixCommand
                    return true;
                } else {
                	// 非首次进入“半开状态”,禁止执行
                    return false;
                }
            } else {
            	// 等待重试时间未到,继续禁止执行
                return false;
            }
        }
    }
}
 

 

 分析:这里逻辑较多,直接在代码做相应的解释,这里就不再赘述了

 

 七 滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream()

 

1)概述

上面介绍 HystrixCircuitBreakerImpl 的实现,比较关键的一步,在构造函数中调用了subscribeToStream()方法;细看该方法,HystrixCircuitBreakerImpl 订阅了metrics.getHealthCountsStream(), 通过判断滑动窗口内的统计数据 HystrixCommandMetrics.HealthCounts 完成自身状态的更新;

 

/**
* 本方法是核心:订阅 metrics 指标数据事件,计算 OPEN/CLOSED 状态,并更新
*/ 
private Subscription subscribeToStream() {

	return metrics.getHealthCountsStream()
	        .observe()
	        .subscribe(new Subscriber<HealthCounts>() {
	           
	            // 。。。。。。

	            @Override
	            public void onNext(HealthCounts hc) {
	                
	                if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {

	                // 判断一个滑动窗口内“触发熔断”要达到的最小访问次数,没有达到,则不改变状态;
	                // 指标由用户通过“statisticalWindowVolumeThreshold”定义;  

	                } else {
	                    if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

	                        // 判断滑动窗口时间内,错误率是否达到用户指定的错误率,没有达到,则不改变状态
	                        // 指标由用户通过“circuitBreakerErrorThresholdPercentage”定义;  

	                    } else {

	                        // 滑动窗口内错误率超过用户指定阈值,断路器进入打开状态,记录打开时间  
	                        // 特别注意:这里使用了 compareAndSet ,不会重复打开
	                        if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {                                   
	                            circuitOpened.set(System.currentTimeMillis());
	                        }
	                    }
	                }
	            }
	        });
}

 

下面介绍下Hystrix滑动窗口设计(类图):

 

 
 

(1)健康统计数据:

(a)HealthCounts:具体的统计信息,包括totalCount:执行总数;errorCount:错误数;errorPercentage:错误率;配合上面的代码供状态转换使用

 

(2)滑动窗口:

(a)BucketedCounterStream:提供基于Bucket统计及基于桶的统计事件订阅

(b)BucketedRollingCounterStream:滑动窗口实现,一个滑动窗口包括 numBuckets 个bucket,聚合最近的 numBuckets 个bucket的数据,作为滑动窗口的时间统计输出

(c)HealthCountsStream :真正的HystrixCommand处理结束,时间窗口周期到达,生成HealthCounts的统计数据

 

2)看看细节

(1)BucketedCounterStream:提供基于Bucket统计及基于桶的统计事件订阅
public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
	// 桶数量
    protected final int numBuckets;
    // 基于Bucket的观察流
    protected final Observable<Bucket> bucketedStream;
    // 
    protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null);

    private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary;

    private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue());

    protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
        this.numBuckets = numBuckets;

       	// Event事件按照一个bucket的时间周期的合并逻辑,其实是定义了一个开放两个抽象方法getEmptyBucketSummary() , appendRawEventToBucket 用于聚合
        this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call(Observable<Event> eventBucket) {
                return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
            }
        };

        final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
        for (int i = 0; i < numBuckets; i++) {
            emptyEventCountsToStart.add(getEmptyBucketSummary());
        }

        this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call() {
                return inputEventStream
                        .observe()
                        //借助windows()操作缓存Event事件,积累到一个bucket的时间周期,统一处理
                        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) 
                        //聚合一个bucket的时间周期Event数据
                        .flatMap(reduceBucketToSummary)               
                        .startWith(emptyEventCountsToStart);           
            }
        });
    }

    abstract Bucket getEmptyBucketSummary();

    abstract Output getEmptyOutputValue();

    // 。。。。。。
}

 

分析(删除了无用的代码):BucketedCounterStream 提供了基于Bucket的抽象,核心就是构造函数

1)构造函数可以看到 numBuckets 提供了桶的数量,bucketSizeInMs 提供一个桶的时间周期

2)然后 针对源 inputEventStream 是通过 windows() 操作指定了聚合事件的周期,flatMap()操作指定 聚合方法

3)最后通过抽象方法 appendRawEventToBucket 参数提供调用端指定具体的聚合策略

 

注:这里只抽象到桶的级别和聚合,具体输出类型 Output 需要子类继续重写实现

 

(2)BucketedRollingCounterStream:滑动窗口实现,一个滑动窗口包括 numBuckets 个bucket,聚合最近的 numBuckets 个bucket的数据,作为滑动窗口的时间统计输出

 

public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
    protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
                                           final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
                                           final Func2<Output, Bucket, Output> reduceBucket) {
        super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
        Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
        this.sourceStream = bucketedStream
        		// 按照滑动窗口配置,对最近的numBuckets 个 bucket数据进行聚合,这指定窗口数量
                .window(numBuckets, 1) 
                // 按照滑动窗口配置,对最近的numBuckets 个 bucket数据进行聚合,这指定聚合策略      
                .flatMap(reduceWindowToSummary) 
                .doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
                .doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
                .share()                       
                .onBackpressureDrop();  
    }
    @Override
    public Observable<Output> observe() {
        return sourceStream;
    }
    
    // 。。。。。。
}

  

分析(删除了无用的代码):BucketedRollingCounterStream提供了滑动窗口的抽象,核心还是在构造函数中

1)然后 针对bucket源 bucketedStream 是通过 windows() 操作对最近的 numBuckets 个 bucket数据进行聚合,flatMap()操作指定 聚合方法

2)最后通过抽象方法 reduceBucket 参数提供调用端指定具体的聚合策略

 
(3)HealthCountsStream :真正的HystrixCommand处理结束,时间窗口周期到达,生成HealthCounts的统计数据

 

/**
* 看泛型:HystrixCommandCompletion:command执行结束事件;long[]:bucket 类型;HystrixCommandMetrics.HealthCounts:滑动时间窗口内的执行统计数据
*/
public class HealthCountsStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], HystrixCommandMetrics.HealthCounts> {

    private static final ConcurrentMap<String, HealthCountsStream> streams = new ConcurrentHashMap<String, HealthCountsStream>();

    private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;

    private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
        @Override
        public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {
            return healthCounts.plus(bucketEventCounts);
        }
    };

    // 。。。。。。

    private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,
                               Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
        // 特别注意 HystrixCommandCompletionStream 这里直接绑死了 这个事件流
        super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
    }

   // 。。。。。。
}

 

分析(删除了无用的代码):

HealthCountsStream实现具体的滑动窗口业务,构造函数绑定 HystrixCommandCompletionStream 事件流,直接观察 HystrixCommandCompletion:HystrixCommand执行结束事件,聚合成 类型;滑动窗口内 HystrixCommandMetrics 更新统计需要的 HealthCounts

 

(4)HealthCounts:具体的统计信息 没有特殊的,不解释了
 public static class HealthCounts {
        private final long totalCount;
        private final long errorCount;
        private final int errorPercentage;

        HealthCounts(long total, long error) {
            this.totalCount = total;
            this.errorCount = error;
            if (totalCount > 0) {
                this.errorPercentage = (int) ((double) errorCount / totalCount * 100);
            } else {
                this.errorPercentage = 0;
            }
        }

 		//。。。。。。

        public HealthCounts plus(long[] eventTypeCounts) {
            long updatedTotalCount = totalCount;
            long updatedErrorCount = errorCount;

            long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
            long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
            long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
            long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
            long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];

            updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
            updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
            return new HealthCounts(updatedTotalCount, updatedErrorCount);
        }

  		//。。。。。。
}

 

八 事件处理机制:HystrixEventStream

 

1)概述:

 

 六 滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream() 讲述了 “断路器”如何根据HystrixCommand处理结果更新状态。

核心:HealthCountsStream 是在构造函数中绑定了 HystrixCommandCompletionStream 事件流 ,关注HystrixCommand执行结束事件,那么 HystrixEventStream 事件流式如何产生的,我们下面就简单介绍下;

 

HystrixEventStream 的集成体系如下:

 

 

 

2)看看细节:

(1)回到之前的主流程:

 

三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable()

》 1)流程中的(1)(2)步HystrixCommand的所有执行方法execute(),queue(),observe(),toObservable()最终都是调用的都是 AbstractCommand<R> toObservable(),直接看该方法。

 

 public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;

    final Action0 terminateCommandCleanup = new Action0() {

        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                handleCommandEnd(true); //user code did run
            }
        }
    };

    // 。。。。。。

    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            // 。。。。。。

            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

 

分析(删除了无用的代码):订阅了HystrixCommand的 结束事件,会执行 terminateCommandCleanup ;主要代码在 handleCommandEnd() 方法中;

 

(2)handleCommandEnd(): HystrixCommand 命令执行结束,根据执行结果做相应的处理

 

private void handleCommandEnd(boolean commandExecutionStarted) {

	// 。。。。。。

    // 记录用户执行时间
    long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
    executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
    if (executionResultAtTimeOfCancellation == null) {
        metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
    } else {
        metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
    }

    // 。。。。。。
}

 

分析(删除了无用的代码):主要是调用 HystrixCommandMetrics 的 markCommandDone 方法

 

(3)HystrixCommandMetrics:通过HystrixThreadEventStream 执行 executionDone 执行命令结束的相关操作

 

void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
    HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
    if (executionStarted) {
        concurrentExecutionCount.decrementAndGet();
    }
}

 

(4)HystrixThreadEventStream:线程事件流,事件和事件监听器采用观察者模式,互相进行了隔离

 

public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
    	//根据处理结果,构造一个 HystrixCommandCompletion 事件
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);

        //通知事件完成主题的监听者
        writeOnlyCommandCompletionSubject.onNext(event);
}

 

分析(删除了无用的代码):最终调用了 HystrixThreadEventStream 完成事件的生成和绑定客户端的情况

 

public class HystrixThreadEventStream {
    private final long threadId;
    private final String threadName;

    private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlyCommandStartSubject;
    private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
    private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyCollapserSubject;

    // 。。。。。。

    HystrixThreadEventStream(Thread thread) {
        this.threadId = thread.getId();
        this.threadName = thread.getName();
        writeOnlyCommandStartSubject = PublishSubject.create();
        writeOnlyCommandCompletionSubject = PublishSubject.create();
        writeOnlyCollapserSubject = PublishSubject.create();

        writeOnlyCommandStartSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandStartsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());

        writeOnlyCommandCompletionSubject
                .onBackpressureBuffer()
                .doOnNext(writeCommandCompletionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());

        writeOnlyCollapserSubject
                .onBackpressureBuffer()
                .doOnNext(writeCollapserExecutionsToShardedStreams)
                .unsafeSubscribe(Subscribers.empty());
    }

    // 。。。。。。

    public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
        //根据处理结果,构造一个 HystrixCommandCompletion 事件
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);

        //通知事件完成主题的监听者
        writeOnlyCommandCompletionSubject.onNext(event);
    }

    // 。。。。。。
}

 

分析(删除了无用的代码):

HystrixThreadEventStream 本身维护各种关注事件的Subject ,Subject在Rxjava 作为主题,提供观察者双向操作的能力。最终其实是向HystrixCommandCompletionStream 中 提交 HystrixCommandCompletion 事件,供观察者使用。

 

(5)事件流:HystrixEventStream

Hystrix 命令执行过程会触发响应的事件,用户观察者订阅使用,HystrixCommandCompletionStream 就是用于订阅 HystrixCommandCompletion 命令完成事件。

 

这里简单看看类结果,详情不再赘述,如开始,不再赘述;

 

九 getFallback() 执行

 

1)概述:

关于 getFallback() 方法的执行,之前在

《Hystrix:断路器》:》 三 Hystrix :》 4)处理流程分析 :》 8)执行fallback  做过总结,触发方式如下:

 

(1)断路器状态:断路器已经打开的时候

 

(2)由construct() or run()抛出了一个异常

没有空闲的线程池和队列或者信号量:RejectedExecutionException

一次命令执行超时:HystrixTimeoutException

 

2)这里分析下具体实现

(1) 断路器状态判断-断路器已经打开的时候:在HystrixCommand执行前,根据 circuitBreaker 状态判断是否执行还是快速失效

回到之前讲解的主流程:

》三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中

》(2)applyHystrixSemantics(_cmd):封装Hystrix的核心流程,根据 HystrixCircuitBreaker 提供熔断器状态,确定执行run() 还是getFallback();

 

 

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
   	  	
	//。。。。。。

    if (circuitBreaker.attemptExecution()) { //判断是否可执行

		//。。。。。。 
		// 继续包装HystrixCommand Observable,注册观察者,执行
        return executeCommandAndObserve(_cmd)
            .doOnError(markExceptionThrown)
            .doOnTerminate(singleSemaphoreRelease)
            .doOnUnsubscribe(singleSemaphoreRelease);          
    } else { // 不可执行,快速失效,执行getFallBack()方法
        return handleShortCircuitViaFallback();
    }
}

 

(2) construct() or run()抛出了一个异常或超时:
在HystrixCommand执行后,根据结果处理接收到 RejectedExecutionException;HystrixTimeoutException,及非 HystrixBadRequestException 异常 ,执行getFallback()

 

回到之前讲解的主流程:

三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中

》3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要的是,根据用户指定的隔离级别(线程,信号量),包装响应的执行环境,执行

 

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

   	// 。。。。。。

   	final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            circuitBreaker.markNonSuccess();
            Exception e = getExceptionFromThrowable(t);
            executionResult = executionResult.setExecutionException(e);
            if (e instanceof RejectedExecutionException) {
                return handleThreadPoolRejectionViaFallback(e);
            } else if (t instanceof HystrixTimeoutException) {
                return handleTimeoutViaFallback();
            } else if (t instanceof HystrixBadRequestException) {
                return handleBadRequestByEmittingError(e);
            } else {
                if (e instanceof HystrixBadRequestException) {
                    eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                    return Observable.error(e);
                }

                return handleFailureViaFallback(e);
            }
        }
    };

    // 。。。。。。

    Observable<R> execution;
   
    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

 

分析(删除了无用的代码):主要是:handleFallback ,因为涉及fallback 链的情况这里只简单概述 

1)异常类型 RejectedExecutionException;HystrixTimeoutException,及非HystrixBadRequestException 会通过内部方法 getFallbackOrThrowException() 完成对getFallback() 的调用

2)异常类型 HystrixBadRequestException 不会调用 getFallback()

  • 大小: 192.8 KB
  • 大小: 297.9 KB
  • 大小: 717.5 KB
  • 大小: 159.3 KB
  • 大小: 391.6 KB
分享到:
评论

相关推荐

    springcloud:Hystrix服务熔断demo源码案例演示

    springcloud:Hystrix服务熔断demo源码案例演示

    springCloud项目练习

    第四课: 断路器(Hystrix) 第五课: 路由网关(zuul) 第六课: 分布式配置中心(Spring Cloud Config) 第七课: 高可用的分布式配置中心(Spring Cloud Config) 第八课: 消息总线(Spring Cloud Bus) 第九课: 服务...

    spring-cloud-netflix-hystrix应用

    《深入理解Spring Cloud Netflix Hystrix:构建弹性微服务架构》 在当今的软件开发领域,微服务架构已经成为主流,而Spring Cloud作为Java生态中的微服务解决方案,深受开发者喜爱。其中,Spring Cloud Netflix ...

    springcloud微服务框架+服务模版

    Spring Cloud使用的各种示例,以最简单、...spring-cloud-sleuth-zipkin: 利用Sleuth、Zipkin对Spring Cloud应用进行服务追踪分析 spring-boot-admin-eureka: 使用Spring Boot Admin 对Spring Cloud集群进行监控示例

    SpringCloudLearning_forezp.tar.gz

    史上最简单的SpringCloud教程 | 第十二篇: 断路器监控(Hystrix Dashboard)(Finchley版本) 史上最简单的SpringCloud教程 | 第十三篇: 断路器聚合监控(Hystrix Turbine)(Finchley版本) 史上最简单的SpringCloud教程 | ...

    spring cloud hystrix原理介绍及使用

    Spring Cloud Hystrix是一个由Netflix开源的Java框架,用于在分布式系统中提供延迟和容错功能,特别适用于对微服务架构中的远程过程调用(RPC)进行控制。其核心目标是通过添加一个延迟容忍层来隔离各个微服务之间的...

    springcloud hystrix 断路由

    在分布式系统中,服务间的调用异常处理是至关重要的,Spring Cloud Hystrix 就是为了解决这一问题而设计的。Hystrix 是 Netflix 开源的一个延迟和容错库,用于隔离服务间的调用,防止因某个服务的不稳定导致整个系统...

    05Spring Cloud OpenFeign:基于Ribbon和Hystrix的声明式服务调用1

    在本篇文章中,我们将对 Spring Cloud OpenFeign 进行详细介绍,讨论其在声明式服务调用中的应用,包括与 Ribbon 和 Hystrix 的集成。 什么是 Spring Cloud OpenFeign? Spring Cloud OpenFeign 是一个声明式的...

    微服务书籍管理系统springcloud.rar

    7. **分布式追踪**:Spring Cloud Sleuth与Zipkin或Jaeger集成,提供全链路的请求追踪,帮助开发者分析和调试分布式系统中的问题。 8. **数据库集成**:项目可能会使用MySQL或其他关系型数据库存储书籍信息,通过...

    15.Spring Cloud中使用Hystrix

    在Spring Cloud生态系统中,Hystrix是一个至关重要的组件,它主要负责实现服务容错和断路器模式,以增强系统的稳定性和健壮性。本文将深入探讨如何在Spring Cloud项目中集成并使用Hystrix,以及如何将其与Feign...

    hystrix-dashboar1.5.12

    《Hystrix Dashboard 1.5.12:构建弹性微服务的重要工具》 Hystrix Dashboard 是 Netflix 开源的一款强大的监控工具,主要用于监控微服务架构中的 Hystrix 库的性能和健康状况。在标题提到的 "hystrix-dashboar...

    spring-cloud-examples

    Spring Cloud使用的各种示例,以最简单、...spring-cloud-sleuth-zipkin: 利用Sleuth、Zipkin对Spring Cloud应用进行服务追踪分析 spring-boot-admin-eureka: 使用Spring Boot Admin 对Spring Cloud集群进行监控示例

    spring cloud视频教程

    Spring Cloud 是一个基于 Spring Boot 实现的云应用开发工具包,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)...

    spring cloud hystrix &&dashboard源码解读

    ### Spring Cloud Hystrix & Dashboard 源码解读 #### 一、加载、初始化概要 ##### 1.1 @EnableCircuitBreaker 的生效 `@EnableCircuitBreaker` 注解是 Spring Cloud Hystrix 提供的一个关键注解,用于启动熔断器...

    Spring Cloud(Hystrix)使用.zip

    本教程将通过分析`microservice-hystrix-dashboard`、`microservice-client`和`microservice-springcloud`这三个项目来深入理解Spring Cloud Hystrix的使用。 首先,`microservice-hystrix-dashboard`是Hystrix的...

    spring cloud eureka zuul ribbon hystrix feign config 示例

    在给定的标题和描述中,我们看到了几个关键组件:Eureka、Zuul、Ribbon、Hystrix 和 Feign,这些都是Spring Cloud生态中的重要组成部分。下面将详细阐述这些组件及其在实际应用中的作用。 1. **Eureka**:它是...

    基于spring cloud项目源码源码.rar

    《Spring Cloud项目源码深度解析》 在当前的微服务架构领域,Spring Cloud以其强大的功能和易用性,成为开发者构建分布式系统的重要选择。本文将深入探讨基于Spring Cloud的项目源码,帮助读者理解其核心原理,提升...

    springcloud2-hystrix-feign-zuul.zip

    标题 "springcloud2-hystrix-feign-zuul.zip" 提示了我们这是一组关于Spring Cloud 2的实现,具体涉及Hystrix、Feign和Zuul组件的实践项目。Spring Cloud 是一个用于构建分布式系统的服务发现、配置管理和微服务连接...

    SpringCloud项目源码下载.docx

    尽管文档标题和描述较为简单,但可以从这些信息中提炼出关于Spring Cloud项目的源码下载、Spring Cloud的基本概念、用途及其核心组件等方面的内容。 ### Spring Cloud 概念 Spring Cloud 是一个基于Spring Boot...

    spring cloud gateway配置Hystrix 熔断、限流、后台调用注意点.pdf

    Spring Cloud Gateway 配置 Hystrix 熔断、限流、后台调用注意点 Spring Cloud Gateway 是一种基于 Spring Boot 框架的 API 网关解决方案,提供了许多实用的功能来管理和保护微服务架构中的 API。其中,Hystrix ...

Global site tag (gtag.js) - Google Analytics