编写不易,转载请注明(http://shihlei.iteye.com/blog/2429846)!
一 概述
书接前篇,《Hystrix:断路器》 对断路器和Hystrix做了简单的介绍,旨在帮助读者做个简单入门。本文简单分下Hystrix的源码实现,帮助读者更好的了解Hystrix。
分析版本:
<dependency> <groupId>com.netflix.hystrix</groupId> <artifactId>hystrix-core</artifactId> <version>1.5.12</version> </dependency>
如之前所述:Hystrix 底层基于 RxJava,RxJava 是响应式编程开发库,因此Hystrix的整个实现策略简单说即:把一个HystrixCommand封装成一个Observable(待观察者),针对自身要实现的核心功能,对Observable进行各种装饰,并在订阅各步装饰的Observable,以便在指定事件到达时,添加自己的业务。
阅读之前,建议对RxJava做个了解,Hystrix 实现基于RxJava,大量使用RxJava的相关操作,推荐之前的文章《响应式编程 RxJava》,《RxJava2.x 操作Demo 》;
二 Hystrix的执行流程
流程详细说明参见《Hystrix:断路器》处理流程分析部分;
根据执行流程,将实现分为如下几块:
(1)主流程HystrixCommand 包装成 Observable 执行:AbstractCommand
(2)异步执行:HystrixContextScheduler
(3)超时中断:HystrixObservableTimeoutOperator
(4)断路器:HystrixCircuitBreaker
(5)滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream()
(6)事件处理流:HystrixEventStream
(7)getFallback() 执行
三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable()
1)AbstractCommand<R> toObservable()
流程中的(1)(2)步HystrixCommand的所有执行方法 execute( ) , queue() , observe() , toObservable() 最终都归结于 AbstractCommand<R> toObservable() 方法调用,直接看该方法。
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; // 。。。。。。 final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } return applyHystrixSemantics(_cmd); } }; // 。。。。。。 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 。。。。。。 // 返回一个冷Observable,有订阅才会执行,并且数据是最新的 Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // 把observable 放进缓存 。。。。。。 if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } }); }
分析(删除了无用的代码):
(1)重点方法 applyHystrixSemantics() ,包装HystrixCommand; 最终的返回值,通过 Observable 的 defer 操作做延迟绑定,保证观察者获取HystrixCommand执行的最新数据,包装后的Observable:hystrixObservable
(2)hystrixObservable 再进行缓存包装 ,也是通过defer操作完成,提供了requestCache能力,因为不是本篇的重点,不做分析。
RxJava defer操作讲解,参见 《RxJava2.x 操作Demo》:》 二 Observable 操作符 》 1) defer操作符
2)applyHystrixSemantics(_cmd):封装Hystrix的核心流程,根据 HystrixCircuitBreaker 提供熔断器状态,确定执行run() 还是getFallback();
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { //。。。。。。 if (circuitBreaker.attemptExecution()) { //判断是否可执行 //。。。。。。 if (executionSemaphore.tryAcquire()) { // 判断是否具有信号量资源 try { //设置开始时间,用于监控执行超时。 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); // 继续包装HystrixCommand Observable,注册观察者,执行 return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { //信号量资源不足,拒绝执行,执行getFallBack()方法 return handleSemaphoreRejectionViaFallback(); } } else { // 不可执行,快速失效,执行getFallBack()方法 return handleShortCircuitViaFallback(); } }
分析(删除了无用的代码):
(1)circuitBreaker.attemptExecution() 断路器状态判断,是否可以执行;可执行则继续包装;不可执行,直接调用getFallBack();关于 HystrixCircuitBreaker 后面详细介绍
(2)executionSemaphore.tryAcquire() 判断是否有信号量资源,没有则直接调用getFallBack();
(3)如果条件都满足,则 调用 executeCommandAndObserve(_cmd) 继续包装 HystrixCommand执行。
3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要,根据用户指定的隔离级别(线程,信号量),包装相应的执行环境,执行Command
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); // 。。。。。。 Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { // 如果开启超时功能,则包装超时中断能力 execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { //否则不需要包装中断能力 execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
分析(删除了无用的代码):
(1)本方法包括两部分
(a)注册观察者,监听Command执行事件,用于更新统计信息,先不详细介绍,忽略了
(b)包装超时熔断能力:HystrixObservableTimeoutOperator
(2)超时熔断能力通过 Observable 的 left 操作 在原有的 HystrixObservable 上进行包装实现的,核心 HystrixObservableTimeoutOperator
execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
RxJava left 操作讲解,参见 《RxJava2.x 操作Demo》:》 二 Observable 操作符 》 2)lift 操作符
(3)最终调用 executeCommandWithSpecifiedIsolation(_cmd) 根据隔离级别选择是“线程方式”隔离执行还是“信号量方式”隔离执行;
4) executeCommandWithSpecifiedIsolation(_cmd) 具体的执行方法:
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { //Hystrix 配置的隔离策略是线程 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { return Observable.defer(new Func0<Observable<R>>() { // 。。。。。。 }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { // Hystrx配置的隔离策略是信号量 // 。。。。。。 return Observable.defer(new Func0<Observable<R>>() { // 。。。。。。 }); } }
分析(删除了无用的代码):
(1)这里就两件事:
(a)如果隔离策略是线程,则“线程策略”执行
(b)如果隔离策略是信号量,则“信号量策略”执行
(2)咋启动的线程呢,RxJava本身就提供异步执行的能力,通过 Observable 的 subscribeOn 绑定执行方式,底层具体实现是 线程调度:Scheduler。
马上进入下一章节,详细讲解一下
四 异步执行:HystrixContextScheduler
1)概述
Hystrix 要完成超时中断,需要异步执行,调用线程才能隔离影响,该功能借助RxJava的 Scheduler 机制实现;
关于 RxJava Scheduler 可以提前阅读下:《RxJava2.x 操作Demo》:》 三 线程调度:Scheduler 做个了解;
Hystrix实现异步的类结构如下:
主要职能:
HystrixThreadPoolDefault:依赖BlockingQueue<Runnable> queue 和 ThreadPoolExecutor threadPool; 统一线程池和任务队列
(a)Scheduler:调度器,用于调度一个Worker执行任务,核心方法:Worker createWorker();
HystrixContextScheduler: 这个调度器除了封装线程池和同步策略外,创建一个HystrixContextSchedulerWorker,没有其他特殊的操作
ThreadPoolScheduler:线程池调度器,负责产生ThreadPoolWorker 向线程池提交任务
(b)Scheduler.Worker:执行线程任务,核心方法:schedule(@NonNull Runnable run)
HystrixContextSchedulerWorker:代理ThreadPoolWorker
ThreadPoolWorker:负责在线程池线ThreadPoolExecutor中提交任务,具有中断任务能力
各个实现组件做个比较
(1)执行代码
RxJava :observable.observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread())
Hystrix:
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } }));
(2)核心组件
(a)RxJava: Scheduler:调度器,用于调度一个Worker执行任务,核心方法:Worker createWorker();
Hystrix:HystrixContextScheduler --》 依赖:ThreadPoolScheduler 返回的worker 即 ThreadPoolWorker
(b)RxJava: Scheduler.Worker:执行线程任务,核心方法:schedule(@NonNull Runnable run)
Hystrix:HystrixContextSchedulerWorker --》 依赖:ThreadPoolScheduler --》 ThreadPoolWorker
2)具体实现
(1)Hystrix指定Scheduler:
通过executeCommandWithSpecifiedIsolation(_cmd) 里的subscribeOn()方法完成;
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call() { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } }));
分析:这里使用 HystrixThreadPool 获得 Scheduler(调度器) ; 默认实现 HystrixThreadPoolDefault:维护线程池和任务队列。
(2)HystrixThreadPoolDefault:HystrixThreadPool 内部类
static class HystrixThreadPoolDefault implements HystrixThreadPool { // 。。。。。。 // Hystrix线程池配置,demo中有指定 private final HystrixThreadPoolProperties properties; //任务队列 private final BlockingQueue<Runnable> queue; //线程池 private final ThreadPoolExecutor threadPool; //线程监控 private final HystrixThreadPoolMetrics metrics; // 队列大小 private final int queueSize; public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) { this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults); HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.queueSize = properties.maxQueueSize().get(); this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey, concurrencyStrategy.getThreadPool(threadPoolKey, properties), properties); this.threadPool = this.metrics.getThreadPool(); this.queue = this.threadPool.getQueue(); // 。。。。。。 } // 。。。。。。 @Override public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) { touchConfig(); return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread); } // 。。。。。。 }
分析(删除了无用的代码):HystrixThreadPoolDefault 主要维护一个线程池和任务队列,用于异步操作,最后会传给 HystrixContextScheduler 调度器使用。
(3)HystrixContextScheduler :上下文调度器
public class HystrixContextScheduler extends Scheduler { private final HystrixConcurrencyStrategy concurrencyStrategy; private final Scheduler actualScheduler; private final HystrixThreadPool threadPool; // 。。。。。。 public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.concurrencyStrategy = concurrencyStrategy; this.threadPool = threadPool; this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread); } @Override public Worker createWorker() { return new HystrixContextSchedulerWorker(actualScheduler.createWorker()); } // 。。。。。。 }
分析(删除了无用的代码):这个调度器除了封装线程池和同步策略外,创建一个HystrixContextSchedulerWorker,没有其他特殊的操作。
(4)HystrixContextSchedulerWorker :
上下文Worker,只代理,具体的调度工作由传入的actualWorker 完成(即 ThreadPoolWorker)
private class HystrixContextSchedulerWorker extends Worker { private final Worker worker; private HystrixContextSchedulerWorker(Worker actualWorker) { this.worker = actualWorker; } //。。。。。。。 @Override public Subscription schedule(Action0 action) { if (threadPool != null) { if (!threadPool.isQueueSpaceAvailable()) { throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); } } return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action)); } }
分析(删除了无用的代码):HystrixContextSchedulerWorker,这个调度器除了封装线程池和同步策略外,只一个HystrixContextSchedulerWorker 外,没有特殊的操作。
(5) ThreadPoolScheduler:
真正的线程池调度Scheduler,创建一个 ThreadPoolWorker
private static class ThreadPoolScheduler extends Scheduler { private final HystrixThreadPool threadPool; private final Func0<Boolean> shouldInterruptThread; public ThreadPoolScheduler(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.threadPool = threadPool; this.shouldInterruptThread = shouldInterruptThread; } @Override public Worker createWorker() { return new ThreadPoolWorker(threadPool, shouldInterruptThread); } }
(6) ThreadPoolWorker:
负责在线程池线ThreadPoolExecutor中提交任务,具有中断任务能力;
private static class ThreadPoolWorker extends Worker { private final HystrixThreadPool threadPool; private final CompositeSubscription subscription = new CompositeSubscription(); private final Func0<Boolean> shouldInterruptThread; public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) { this.threadPool = threadPool; this.shouldInterruptThread = shouldInterruptThread; } // 。。。。。。 @Override public Subscription schedule(final Action0 action) { if (subscription.isUnsubscribed()) { return Subscriptions.unsubscribed(); } ScheduledAction sa = new ScheduledAction(action); subscription.add(sa); sa.addParent(subscription); ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); FutureTask<?> f = (FutureTask<?>) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor)); return sa; } // 。。。。。。 }
分析(删除了无用的代码):schedule() 方法向线程池提交任务,关键返回一个Subscription,用于调用端使用,或中断当前任务执行;
五 超时中断:HystrixObservableTimeoutOperator
1)概述:
Hystrix实现异步的类结构如下:
其中:
HystrixObservableTimeoutOperator:包装超时中断的主流程,借助下面的组件实现超时中断逻辑
(1)HystrixTimer:依赖ScheduledExecutor,最终使用JDK的ScheduledThreadPoolExecutor 实现指定时间后的回调
(2)TimerListener:实现超时的回调逻辑
(3)CompositeSubscription:RxJava 1.x 提供,通过unsubscribe()方法可以中断当前任务的执行。
2)看一下细节:
(1)回到之前讲解的主流程:
三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中
》 3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要的是,根据用户指定的隔离级别(线程,信号量),包装响应的执行环境,执行
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { // 。。。。。。 Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) // 添加超时中断逻辑 .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { // 。。。。。。 } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
分析(删除了无用的代码):超时中断主要通过:lift 操作,使用new HystrixObservableTimeoutOperator<R>(_cmd) 包装 HystrixObservable实现的。
(2)HystrixObservableTimeoutOperator 实现
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { final AbstractCommand<R> originalCommand; public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) { this.originalCommand = originalCommand; } @Override public Subscriber<? super R> call(final Subscriber<? super R> child) { // 创建一个组合订阅,用于添加一组Disposable,快速解除订阅,这里用于超时,中断任务 final CompositeSubscription s = new CompositeSubscription(); child.add(s); //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread(); //创建一个超时回调类 TimerListener listener = new TimerListener() { @Override public void tick() { //设置TimedOutStatus if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // 报告超时 originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); // 通过 CompositeSubscription 的 unsubscribe 中断正在异步执行的HystrixCommand s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); //启动timeoutRunnable,就是执行上面的 child.onError(new HystrixTimeoutException()); 发送error时间,通知观察者 timeoutRunnable.run(); } } //获取超时时间 @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } }; final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); // 设置当前时间,用于比对超时 originalCommand.timeoutTimer.set(tl); Subscriber<R> parent = new Subscriber<R>() { @Override public void onCompleted() { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onCompleted(); } } @Override public void onError(Throwable e) { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onError(e); } } @Override public void onNext(R v) { if (isNotTimedOut()) { child.onNext(v); } } private boolean isNotTimedOut() { // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); } }; // if s is unsubscribed we want to unsubscribe the parent s.add(parent); }
分析(删除了无用的代码):
(1)创建 TimerListener 实例,添加监听到超时的回调逻辑,指定超时时间(用户通过executionTimeoutInMilliseconds 配置的超时时间)。
主要逻辑:
(a)设置HystrixCommand状态为超时
(b)创建 CompositeSubscription 绑定之前异步的Subscription 使 调用端可以根据超时中断 当前任务。
(c)若超时,异步发送超时事件,通知HystrixCommand执行的观察者
(2)CompositeSubscription 绑定父子Subscription ,用于中断任务
RxJava CompositeSubscription使用,可参见 《RxJava2.x 操作Demo》:》 四 解除订阅,中断任务:Disposable 》 (2) CompositeDisposable 尽管 1.x 和2.x 有差别
(3) HystrixTimer 和 HystrixTimerListener:比较简单,就是ScheduledThreadPoolExecutor 定时触发
public static interface TimerListener { //指定时间周期触发回调 public void tick(); //时间周期 public int getIntervalTimeInMilliseconds(); } public class HystrixTimer { private static final Logger logger = LoggerFactory.getLogger(HystrixTimer.class); private static HystrixTimer INSTANCE = new HystrixTimer(); // 。。。。。。 public Reference<TimerListener> addTimerListener(final TimerListener listener) { startThreadIfNeeded(); // add the listener Runnable r = new Runnable() { @Override public void run() { try { listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); } } }; // 使用 ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); } static class ScheduledExecutor { volatile ScheduledThreadPoolExecutor executor; // 。。。。。。 public void initialize() { // 。。。。。。 executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory); initialized = true; } // 。。。。。。 } }
六 断路器:HystrixCircuitBreaker
1)概述
HystrixCircuitBreaker 提供了状态维护功能,其使用体现在如下部分:
(1)HystrixCommand 执行前 通过 HystrixCircuitBreaker 根据状态判断是否允许执行:
open:禁止执行;
close 可以执行;
half_close:进入open状态已超过等待重试时间,则可以执行,否则禁止执行。
(2)订阅HystrixCommandMetrics 滑动窗口的数据统计更新事件,根据统计数据(错误率),决定是否进入open状态。
HystrixCircuitBreaker的类结构如下:
2)看看细节:
(1)HystrixCircuitBreaker接口:定义了主要服务
public interface HystrixCircuitBreaker { /** * 是否允许HystrixCommand请求执行,如果断路器是half-open(半开)状态,则根据逻辑允许部分请求执行和改变状态。 */ boolean allowRequest(); /** * 断路器当前是否打开状态 */ boolean isOpen(); /** * half-open(半开)状态的反馈机制,执行成功时调用 */ void markSuccess(); /** * half-open(半开)状态的反馈机制,执行失败调用 */ void markNonSuccess(); /** * HystrixCommand执行行前,判断是否尝试执行 */ boolean attemptExecution(); }
分析:这里就几个方法,简单做了注释;
(2)内部类 HystrixCircuitBreakerImpl 断路器实现(重要):
HystrixCircuitBreakerImpl实现断路器的核心逻辑:状态定义,状态维护。
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { // 用户配置定义 private final HystrixCommandProperties properties; // HystrixCommand 执行指标数据 private final HystrixCommandMetrics metrics; // 断路器状态定义 enum Status { CLOSED, OPEN, HALF_OPEN; } // 断路器当前状态,利用原子操作保证线程安全 private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED); // 断路器是否打开,利用原子操作保证线程安全,circuitOpened > 0 则为打开状态,同时该变量记录了打开的时间,用于间隔“circuitBreakerSleepWindowInMilliseconds”重试, private final AtomicLong circuitOpened = new AtomicLong(-1); //指标数据订阅 private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; //订阅 metrics 指标数据,一旦HystrixCommand执行结束,触发监控更新,“断路器” HystrixCircuitBreaker 重新计算并进入“关闭”或“打开“状态 Subscription s = subscribeToStream(); activeSubscription.set(s); } /** * 本方法是核心:订阅 metrics 指标数据事件,计算 OPEN/CLOSED 状态,并更新 */ private Subscription subscribeToStream() { return metrics.getHealthCountsStream() .observe() .subscribe(new Subscriber<HealthCounts>() { // 。。。。。。 @Override public void onNext(HealthCounts hc) { if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // 判断一个滑动窗口内“触发熔断”要达到的最小访问次数,没有达到,则不改变状态; // 指标由用户通过“statisticalWindowVolumeThreshold”定义; } else { if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { // 判断滑动窗口时间内,错误率是否达到用户指定的错误率,没有达到,则不改变状态 // 指标由用户通过“circuitBreakerErrorThresholdPercentage”定义; } else { // 滑动窗口内错误率超过用户指定阈值,断路器进入打开状态,记录打开时间 // 特别注意:这里使用了 compareAndSet ,不会重复打开 if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); } @Override public void markSuccess() { // 半开状态下,尝试执行成功,断路器关闭 if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { // 。。。。。。 } } @Override public void markNonSuccess() { // 半开状态,尝试执行失败,断路器打开 if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { // 。。。。。。 } } // 判断断路器是否打开,很简单,不解释 @Override public boolean isOpen() { if (properties.circuitBreakerForceOpen().get()) { return true; } if (properties.circuitBreakerForceClosed().get()) { return false; } //circuitOpened > 0 打开状态 return circuitOpened.get() >= 0; } //HystrixCommand执行行前,判断是否允许请求 @Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { // 断路器关闭,可以执行 return true; } else { //断路器半开状态,不允许执行 if (status.get().equals(Status.HALF_OPEN)) { return false; } else { //根判断是否经过了重试等待时间,即“当前时间”是否大于“断路器打开时间” + 用户指定的“circuitBreakerSleepWindowInMilliseconds” 等待重试时间 return isAfterSleepWindow(); } } } private boolean isAfterSleepWindow() { final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); return currentTime > circuitOpenTime + sleepWindowTime; } @Override public boolean attemptExecution() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { //断路器半开状态,不允许执行 return true; } else { //根判断是否经过了重试等待时间,即“当前时间”是否大于“断路器打开时间” + 用户指定的“circuitBreakerSleepWindowInMilliseconds” 等待重试时间 if (isAfterSleepWindow()) { //等待重试时间已到 if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { // 首次请求,断路器状态进入“半开状态”,可以尝试执行HystrixCommand return true; } else { // 非首次进入“半开状态”,禁止执行 return false; } } else { // 等待重试时间未到,继续禁止执行 return false; } } } }
分析:这里逻辑较多,直接在代码做相应的解释,这里就不再赘述了
七 滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream()
1)概述
上面介绍 HystrixCircuitBreakerImpl 的实现,比较关键的一步,在构造函数中调用了subscribeToStream()方法;细看该方法,HystrixCircuitBreakerImpl 订阅了metrics.getHealthCountsStream(), 通过判断滑动窗口内的统计数据 HystrixCommandMetrics.HealthCounts 完成自身状态的更新;
/** * 本方法是核心:订阅 metrics 指标数据事件,计算 OPEN/CLOSED 状态,并更新 */ private Subscription subscribeToStream() { return metrics.getHealthCountsStream() .observe() .subscribe(new Subscriber<HealthCounts>() { // 。。。。。。 @Override public void onNext(HealthCounts hc) { if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // 判断一个滑动窗口内“触发熔断”要达到的最小访问次数,没有达到,则不改变状态; // 指标由用户通过“statisticalWindowVolumeThreshold”定义; } else { if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { // 判断滑动窗口时间内,错误率是否达到用户指定的错误率,没有达到,则不改变状态 // 指标由用户通过“circuitBreakerErrorThresholdPercentage”定义; } else { // 滑动窗口内错误率超过用户指定阈值,断路器进入打开状态,记录打开时间 // 特别注意:这里使用了 compareAndSet ,不会重复打开 if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); }
下面介绍下Hystrix滑动窗口设计(类图):
(1)健康统计数据:
(a)HealthCounts:具体的统计信息,包括totalCount:执行总数;errorCount:错误数;errorPercentage:错误率;配合上面的代码供状态转换使用
(2)滑动窗口:
(a)BucketedCounterStream:提供基于Bucket统计及基于桶的统计事件订阅
(b)BucketedRollingCounterStream:滑动窗口实现,一个滑动窗口包括 numBuckets 个bucket,聚合最近的 numBuckets 个bucket的数据,作为滑动窗口的时间统计输出
(c)HealthCountsStream :真正的HystrixCommand处理结束,时间窗口周期到达,生成HealthCounts的统计数据
2)看看细节
(1)BucketedCounterStream:提供基于Bucket统计及基于桶的统计事件订阅
public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> { // 桶数量 protected final int numBuckets; // 基于Bucket的观察流 protected final Observable<Bucket> bucketedStream; // protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null); private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary; private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue()); protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket) { this.numBuckets = numBuckets; // Event事件按照一个bucket的时间周期的合并逻辑,其实是定义了一个开放两个抽象方法getEmptyBucketSummary() , appendRawEventToBucket 用于聚合 this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() { @Override public Observable<Bucket> call(Observable<Event> eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } }; final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>(); for (int i = 0; i < numBuckets; i++) { emptyEventCountsToStart.add(getEmptyBucketSummary()); } this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe() //借助windows()操作缓存Event事件,积累到一个bucket的时间周期,统一处理 .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //聚合一个bucket的时间周期Event数据 .flatMap(reduceBucketToSummary) .startWith(emptyEventCountsToStart); } }); } abstract Bucket getEmptyBucketSummary(); abstract Output getEmptyOutputValue(); // 。。。。。。 }
分析(删除了无用的代码):BucketedCounterStream 提供了基于Bucket的抽象,核心就是构造函数
1)构造函数可以看到 numBuckets 提供了桶的数量,bucketSizeInMs 提供一个桶的时间周期
2)然后 针对源 inputEventStream 是通过 windows() 操作指定了聚合事件的周期,flatMap()操作指定 聚合方法
3)最后通过抽象方法 appendRawEventToBucket 参数提供调用端指定具体的聚合策略
注:这里只抽象到桶的级别和聚合,具体输出类型 Output 需要子类继续重写实现
(2)BucketedRollingCounterStream:滑动窗口实现,一个滑动窗口包括 numBuckets 个bucket,聚合最近的 numBuckets 个bucket的数据,作为滑动窗口的时间统计输出
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> { private Observable<Output> sourceStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket, final Func2<Output, Bucket, Output> reduceBucket) { super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); this.sourceStream = bucketedStream // 按照滑动窗口配置,对最近的numBuckets 个 bucket数据进行聚合,这指定窗口数量 .window(numBuckets, 1) // 按照滑动窗口配置,对最近的numBuckets 个 bucket数据进行聚合,这指定聚合策略 .flatMap(reduceWindowToSummary) .doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true)) .doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false)) .share() .onBackpressureDrop(); } @Override public Observable<Output> observe() { return sourceStream; } // 。。。。。。 }
分析(删除了无用的代码):BucketedRollingCounterStream提供了滑动窗口的抽象,核心还是在构造函数中
1)然后 针对bucket源 bucketedStream 是通过 windows() 操作对最近的 numBuckets 个 bucket数据进行聚合,flatMap()操作指定 聚合方法
2)最后通过抽象方法 reduceBucket 参数提供调用端指定具体的聚合策略
(3)HealthCountsStream :真正的HystrixCommand处理结束,时间窗口周期到达,生成HealthCounts的统计数据
/** * 看泛型:HystrixCommandCompletion:command执行结束事件;long[]:bucket 类型;HystrixCommandMetrics.HealthCounts:滑动时间窗口内的执行统计数据 */ public class HealthCountsStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], HystrixCommandMetrics.HealthCounts> { private static final ConcurrentMap<String, HealthCountsStream> streams = new ConcurrentHashMap<String, HealthCountsStream>(); private static final int NUM_EVENT_TYPES = HystrixEventType.values().length; private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() { @Override public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) { return healthCounts.plus(bucketEventCounts); } }; // 。。。。。。 private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs, Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) { // 特别注意 HystrixCommandCompletionStream 这里直接绑死了 这个事件流 super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator); } // 。。。。。。 }
分析(删除了无用的代码):
HealthCountsStream实现具体的滑动窗口业务,构造函数绑定 HystrixCommandCompletionStream 事件流,直接观察 HystrixCommandCompletion:HystrixCommand执行结束事件,聚合成 类型;滑动窗口内 HystrixCommandMetrics 更新统计需要的 HealthCounts
(4)HealthCounts:具体的统计信息 没有特殊的,不解释了
public static class HealthCounts { private final long totalCount; private final long errorCount; private final int errorPercentage; HealthCounts(long total, long error) { this.totalCount = total; this.errorCount = error; if (totalCount > 0) { this.errorPercentage = (int) ((double) errorCount / totalCount * 100); } else { this.errorPercentage = 0; } } //。。。。。。 public HealthCounts plus(long[] eventTypeCounts) { long updatedTotalCount = totalCount; long updatedErrorCount = errorCount; long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()]; long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()]; long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()]; long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()]; long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()]; updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); return new HealthCounts(updatedTotalCount, updatedErrorCount); } //。。。。。。 }
八 事件处理机制:HystrixEventStream
1)概述:
六 滑动窗口统计及断路器状态更新:metrics.getHealthCountsStream() 讲述了 “断路器”如何根据HystrixCommand处理结果更新状态。
核心:HealthCountsStream 是在构造函数中绑定了 HystrixCommandCompletionStream 事件流 ,关注HystrixCommand执行结束事件,那么 HystrixEventStream 事件流式如何产生的,我们下面就简单介绍下;
HystrixEventStream 的集成体系如下:
2)看看细节:
(1)回到之前的主流程:
三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable()
》 1)流程中的(1)(2)步HystrixCommand的所有执行方法execute(),queue(),observe(),toObservable()最终都是调用的都是 AbstractCommand<R> toObservable(),直接看该方法。
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; final Action0 terminateCommandCleanup = new Action0() { @Override public void call() { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { handleCommandEnd(false); //user code never ran } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { handleCommandEnd(true); //user code did run } } }; // 。。。。。。 return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 。。。。。。 return afterCache .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once .doOnCompleted(fireOnCompletedHook); } }); }
分析(删除了无用的代码):订阅了HystrixCommand的 结束事件,会执行 terminateCommandCleanup ;主要代码在 handleCommandEnd() 方法中;
(2)handleCommandEnd(): HystrixCommand 命令执行结束,根据执行结果做相应的处理
private void handleCommandEnd(boolean commandExecutionStarted) { // 。。。。。。 // 记录用户执行时间 long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); if (executionResultAtTimeOfCancellation == null) { metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted); } else { metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted); } // 。。。。。。 }
分析(删除了无用的代码):主要是调用 HystrixCommandMetrics 的 markCommandDone 方法
(3)HystrixCommandMetrics:通过HystrixThreadEventStream 执行 executionDone 执行命令结束的相关操作
void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) { HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey); if (executionStarted) { concurrentExecutionCount.decrementAndGet(); } }
(4)HystrixThreadEventStream:线程事件流,事件和事件监听器采用观察者模式,互相进行了隔离
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) { //根据处理结果,构造一个 HystrixCommandCompletion 事件 HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey); //通知事件完成主题的监听者 writeOnlyCommandCompletionSubject.onNext(event); }
分析(删除了无用的代码):最终调用了 HystrixThreadEventStream 完成事件的生成和绑定客户端的情况
public class HystrixThreadEventStream { private final long threadId; private final String threadName; private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlyCommandStartSubject; private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject; private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyCollapserSubject; // 。。。。。。 HystrixThreadEventStream(Thread thread) { this.threadId = thread.getId(); this.threadName = thread.getName(); writeOnlyCommandStartSubject = PublishSubject.create(); writeOnlyCommandCompletionSubject = PublishSubject.create(); writeOnlyCollapserSubject = PublishSubject.create(); writeOnlyCommandStartSubject .onBackpressureBuffer() .doOnNext(writeCommandStartsToShardedStreams) .unsafeSubscribe(Subscribers.empty()); writeOnlyCommandCompletionSubject .onBackpressureBuffer() .doOnNext(writeCommandCompletionsToShardedStreams) .unsafeSubscribe(Subscribers.empty()); writeOnlyCollapserSubject .onBackpressureBuffer() .doOnNext(writeCollapserExecutionsToShardedStreams) .unsafeSubscribe(Subscribers.empty()); } // 。。。。。。 public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) { //根据处理结果,构造一个 HystrixCommandCompletion 事件 HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey); //通知事件完成主题的监听者 writeOnlyCommandCompletionSubject.onNext(event); } // 。。。。。。 }
分析(删除了无用的代码):
HystrixThreadEventStream 本身维护各种关注事件的Subject ,Subject在Rxjava 作为主题,提供观察者双向操作的能力。最终其实是向HystrixCommandCompletionStream 中 提交 HystrixCommandCompletion 事件,供观察者使用。
(5)事件流:HystrixEventStream
Hystrix 命令执行过程会触发响应的事件,用户观察者订阅使用,HystrixCommandCompletionStream 就是用于订阅 HystrixCommandCompletion 命令完成事件。
这里简单看看类结果,详情不再赘述,如开始,不再赘述;
九 getFallback() 执行
1)概述:
关于 getFallback() 方法的执行,之前在
《Hystrix:断路器》:》 三 Hystrix :》 4)处理流程分析 :》 8)执行fallback 做过总结,触发方式如下:
(1)断路器状态:断路器已经打开的时候
(2)由construct() or run()抛出了一个异常
没有空闲的线程池和队列或者信号量:RejectedExecutionException
一次命令执行超时:HystrixTimeoutException
2)这里分析下具体实现
(1) 断路器状态判断-断路器已经打开的时候:在HystrixCommand执行前,根据 circuitBreaker 状态判断是否执行还是快速失效
回到之前讲解的主流程:
》三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中
》(2)applyHystrixSemantics(_cmd):封装Hystrix的核心流程,根据 HystrixCircuitBreaker 提供熔断器状态,确定执行run() 还是getFallback();
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { //。。。。。。 if (circuitBreaker.attemptExecution()) { //判断是否可执行 //。。。。。。 // 继续包装HystrixCommand Observable,注册观察者,执行 return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } else { // 不可执行,快速失效,执行getFallBack()方法 return handleShortCircuitViaFallback(); } }
(2) construct() or run()抛出了一个异常或超时:
在HystrixCommand执行后,根据结果处理接收到 RejectedExecutionException;HystrixTimeoutException,及非 HystrixBadRequestException 异常 ,执行getFallback()
回到之前讲解的主流程:
三 主流程HystrixCommand 包装成 Observable执行:AbstractCommand<R> toObservable() 中
》3)executeCommandAndObserve(_cmd) 执行HystrixCommand:最重要的是,根据用户指定的隔离级别(线程,信号量),包装响应的执行环境,执行
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); // 。。。。。。 final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call(Throwable t) { circuitBreaker.markNonSuccess(); Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; // 。。。。。。 Observable<R> execution; return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
分析(删除了无用的代码):主要是:handleFallback ,因为涉及fallback 链的情况这里只简单概述
1)异常类型 RejectedExecutionException;HystrixTimeoutException,及非HystrixBadRequestException 会通过内部方法 getFallbackOrThrowException() 完成对getFallback() 的调用
2)异常类型 HystrixBadRequestException 不会调用 getFallback()
相关推荐
springcloud:Hystrix服务熔断demo源码案例演示
第四课: 断路器(Hystrix) 第五课: 路由网关(zuul) 第六课: 分布式配置中心(Spring Cloud Config) 第七课: 高可用的分布式配置中心(Spring Cloud Config) 第八课: 消息总线(Spring Cloud Bus) 第九课: 服务...
《深入理解Spring Cloud Netflix Hystrix:构建弹性微服务架构》 在当今的软件开发领域,微服务架构已经成为主流,而Spring Cloud作为Java生态中的微服务解决方案,深受开发者喜爱。其中,Spring Cloud Netflix ...
Spring Cloud使用的各种示例,以最简单、...spring-cloud-sleuth-zipkin: 利用Sleuth、Zipkin对Spring Cloud应用进行服务追踪分析 spring-boot-admin-eureka: 使用Spring Boot Admin 对Spring Cloud集群进行监控示例
史上最简单的SpringCloud教程 | 第十二篇: 断路器监控(Hystrix Dashboard)(Finchley版本) 史上最简单的SpringCloud教程 | 第十三篇: 断路器聚合监控(Hystrix Turbine)(Finchley版本) 史上最简单的SpringCloud教程 | ...
Spring Cloud Hystrix是一个由Netflix开源的Java框架,用于在分布式系统中提供延迟和容错功能,特别适用于对微服务架构中的远程过程调用(RPC)进行控制。其核心目标是通过添加一个延迟容忍层来隔离各个微服务之间的...
在分布式系统中,服务间的调用异常处理是至关重要的,Spring Cloud Hystrix 就是为了解决这一问题而设计的。Hystrix 是 Netflix 开源的一个延迟和容错库,用于隔离服务间的调用,防止因某个服务的不稳定导致整个系统...
在本篇文章中,我们将对 Spring Cloud OpenFeign 进行详细介绍,讨论其在声明式服务调用中的应用,包括与 Ribbon 和 Hystrix 的集成。 什么是 Spring Cloud OpenFeign? Spring Cloud OpenFeign 是一个声明式的...
7. **分布式追踪**:Spring Cloud Sleuth与Zipkin或Jaeger集成,提供全链路的请求追踪,帮助开发者分析和调试分布式系统中的问题。 8. **数据库集成**:项目可能会使用MySQL或其他关系型数据库存储书籍信息,通过...
在Spring Cloud生态系统中,Hystrix是一个至关重要的组件,它主要负责实现服务容错和断路器模式,以增强系统的稳定性和健壮性。本文将深入探讨如何在Spring Cloud项目中集成并使用Hystrix,以及如何将其与Feign...
《Hystrix Dashboard 1.5.12:构建弹性微服务的重要工具》 Hystrix Dashboard 是 Netflix 开源的一款强大的监控工具,主要用于监控微服务架构中的 Hystrix 库的性能和健康状况。在标题提到的 "hystrix-dashboar...
Spring Cloud使用的各种示例,以最简单、...spring-cloud-sleuth-zipkin: 利用Sleuth、Zipkin对Spring Cloud应用进行服务追踪分析 spring-boot-admin-eureka: 使用Spring Boot Admin 对Spring Cloud集群进行监控示例
Spring Cloud 是一个基于 Spring Boot 实现的云应用开发工具包,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)...
### Spring Cloud Hystrix & Dashboard 源码解读 #### 一、加载、初始化概要 ##### 1.1 @EnableCircuitBreaker 的生效 `@EnableCircuitBreaker` 注解是 Spring Cloud Hystrix 提供的一个关键注解,用于启动熔断器...
本教程将通过分析`microservice-hystrix-dashboard`、`microservice-client`和`microservice-springcloud`这三个项目来深入理解Spring Cloud Hystrix的使用。 首先,`microservice-hystrix-dashboard`是Hystrix的...
在给定的标题和描述中,我们看到了几个关键组件:Eureka、Zuul、Ribbon、Hystrix 和 Feign,这些都是Spring Cloud生态中的重要组成部分。下面将详细阐述这些组件及其在实际应用中的作用。 1. **Eureka**:它是...
《Spring Cloud项目源码深度解析》 在当前的微服务架构领域,Spring Cloud以其强大的功能和易用性,成为开发者构建分布式系统的重要选择。本文将深入探讨基于Spring Cloud的项目源码,帮助读者理解其核心原理,提升...
标题 "springcloud2-hystrix-feign-zuul.zip" 提示了我们这是一组关于Spring Cloud 2的实现,具体涉及Hystrix、Feign和Zuul组件的实践项目。Spring Cloud 是一个用于构建分布式系统的服务发现、配置管理和微服务连接...
尽管文档标题和描述较为简单,但可以从这些信息中提炼出关于Spring Cloud项目的源码下载、Spring Cloud的基本概念、用途及其核心组件等方面的内容。 ### Spring Cloud 概念 Spring Cloud 是一个基于Spring Boot...
Spring Cloud Gateway 配置 Hystrix 熔断、限流、后台调用注意点 Spring Cloud Gateway 是一种基于 Spring Boot 框架的 API 网关解决方案,提供了许多实用的功能来管理和保护微服务架构中的 API。其中,Hystrix ...