我们接下来分析下EurekaClientAutoConfiguration文件下的EurekaClient bean的注入,这个类从名字我们就知道了是一个Eureka客户端的生成类,至于传入的参数bean ApplicationInfoManager和EurekaClientConfig在前面已经介绍过了,我们直接进入EurekaClient的实现类CloudEurekaClient中,可以看到它继承了netflix的DiscoveryClient类
我们首先看下CloudEurekaClient类,它有如图所示的方法和字段
这里主要说明下cancelOverrideStatus()方法(主要是为了了解字段的数据来源)
public void cancelOverrideStatus(InstanceInfo info) {
getEurekaHttpClient().deleteStatusOverride(info.getAppName(), info.getId(), info);
}
进入到该方法,我们可以看到首先有一个getEurekaHttpClient()方法,我们看下这个方法
EurekaHttpClient getEurekaHttpClient() {
if (this.eurekaHttpClient.get() == null) {
try {
Object eurekaTransport = this.eurekaTransportField.get(this);
Field registrationClientField = ReflectionUtils
.findField(eurekaTransport.getClass(), "registrationClient");
ReflectionUtils.makeAccessible(registrationClientField);
this.eurekaHttpClient.compareAndSet(null,
(EurekaHttpClient) registrationClientField.get(eurekaTransport));
}
catch (IllegalAccessException e) {
log.error("error getting EurekaHttpClient", e);
}
}
return this.eurekaHttpClient.get();
}
我们首先得看下eurekaTransportField这个字段的实现,可以看到它是在构造函数中的实现是
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
通过这段代码进入到DiscoveryClient类找到eurekaTransport字段,我们发现eurekaTransport字段是一个对应的类型为静态的私有final内部类,回到getEurekaHttpClient()方法,通过反射方法获取到的registrationClient字段正好是EurekaHttpClient接口类,这个接口类中有如图的方法
最终回到cancelOverrideStatus()方法,我们这里就可以明确了解到它调用了一个EurekaHttpClient接口的deleteStatusOverride()方法,从其实现类中了解到它是调用服务端的一个TYPE类型为DELETE的方法。
在这个地方说明下EurekaHttpClient的实现,默认情况下eureka提供的是Jersey来实现的restful调用,如果在不引用Jersey框架的情况,spring cloud提供了RestTemplate的方式,我们可以如图所示的方式来查看到这个引用结果
在RestTemplateTransportClientFactory中引用了RestTemplateEurekaHttpClient类,在RestTemplateTransportClientFactories类引用了RestTemplateTransportClientFactory类,在RestTemplateDiscoveryClientOptionalArgs类应用了RestTemplateTransportClientFactory类,最后在DiscoveryClientOptionalArgsConfiguration类定义了该类的bean的生效条件是
@ConditionalOnMissingClass("com.sun.jersey.api.client.filter.ClientFilter")
因此从后往前推就可以看到RestTemplateEurekaHttpClient生效的条件,当然在EurekaClientAutoConfiguration自动配置类中我们可以看到DiscoveryClientOptionalArgsConfiguration是通过@import注解注入的。
以上就主要说明了在CloudEurekaClient类中的实现,我们接下来看下在《spring cloud eureka源码分析(一)》提到的在父类DiscoveryClient类中如何实现向服务器注册客户端,如何让客户端与服务端保持联系等实现。通过
CloudEurekaClient构造函数进入到DiscoveryClient的构造函数中我们可以明确了解到在里面实现了很多值的初始化,我们先来看下这段代码
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
这段代码判断的是不需要向注册中心注册并且不需要向注册中心获取注册信息,把注册的实例对象赋值为当前对象就可以了,然后直接返回,后面不用做处理,这个也说明这段代码实现的有可能就是注册中心服务器的配置实现需要。
接下来我们看到在构造函数中实现了一个定时任务定义和心跳线程池及缓存刷新的线程池,
scheduleServerEndpointTask(eurekaTransport, args);
这段代码是为了在内部类
EurekaTransport中记录当前对象的属性值,而且该内部类只有一个方法那就是shutdown(),从名字就可以知道是一个关闭操作。
然后在看看这段代码
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
这段代码其实判断的是是否需要向服务器端注册同时是否强制在初始化的时候注册,通过shouldEnforceRegistrationAtInit这个字段的描述说明,我们可以知道在初始的时候使用有可能会出现异常,通过后面的代码也可以看到,看下register()方法
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
从上面的代码可以看出它直接通过调用API restful的方式进行注册了,我们在看下在构造函数中的initScheduledTasks()方法
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
在方法中我们启动了定时任务,我们首先看下cacheRefresh定时任务,在该定时任务中实现了CacheRefreshThread()线程类,进入到该线程类它调用了refreshRegistry()方法,进入该方法我们看下它调用的fetchRegistry()方法
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
......
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
在getAndStoreFullRegistry()方法中
private void getAndStoreFullRegistry() throws Throwable {
....
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
......
localRegionApps.set(this.filterAndShuffle(apps));
.....
}
通过方法看到我们去服务器端获取了所有的注册信息然后保存起来,在看下
getAndUpdateDelta()方法
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
.....
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
.....
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
.....
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
......
}
}
在上面方法中我们通过去获取服务器上面的增量实例,然后判断是否有新增的,没有则去调用getAndStoreFullRegistry()方法来全量获取信息,否则通过锁机制去更新本地的实例数据,经过上面的步骤我们就完成了缓存的更新。
我们在看下heartbeat的定时任务,在这个定时任务中实现了HeartbeatThread线程类,该线程类提供了一个renew()方法
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
......
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
.....
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
}
.....
}
在方法中我们首先发送一个心跳信息到服务端,如果返回正常则成功,如果没有发现,则调用register()方法进行注册。
通过上面源码的分析我们就知道了如何向服务端注册以及保持连接,至于和ribbon的负载均衡就暂时不整理了,客户端的介绍就到这里,接下来我们一起分析服务端的源码实现。
- 大小: 16.5 KB
- 大小: 35.7 KB
- 大小: 46.5 KB
分享到:
相关推荐
《Spring Cloud Eureka源码分析》 Spring Cloud Eureka是Netflix公司开源的一个服务发现组件,它是基于REST的服务,用于在分布式系统中定位服务,以实现负载均衡和中间层服务器的故障转移。Eureka的设计目标是提供...
为什么要看源码: 1、提升技术功底:学习源码里的优秀设计思想,比如一些疑难问题的解决思路,还有一些优秀的设计模式,整体提 升自己的技术功底 2、深度掌握技术框架:源码看多了,对于一个新技术或框架的掌握速度会有...
1. Eureka源码:研究Eureka的工作机制,包括服务注册、心跳机制、服务发现等,理解如何维护服务实例的健康状态。 2. Ribbon源码:查看Ribbon如何根据服务发现的信息选择合适的服务器,并实现负载均衡策略。 3. ...
3. `springcloud-consumer-hystrix-dashboard`可能是一个使用Hystrix Dashboard的消费者服务,用于监控服务的运行状态和熔断情况。 4. `springcloud-Eurake-7001`可能是Eureka服务器的实例,负责服务注册与发现,...
你可以在这个项目的基础上,按照上述步骤和源码分析,扩展到多节点的Eureka集群。 总的来说,构建Spring Cloud Eureka集群是一项关键任务,它涉及到服务注册、发现、健康检查和治理等多个方面。通过源码实现,不仅...
《Spring Cloud实战源码》是针对Java开发者的一本深度指南,旨在帮助读者全面理解并熟练掌握Spring Cloud框架的使用和核心原理。Spring Cloud是构建分布式系统的服务发现、配置管理、负载均衡、熔断机制等解决方案的...
SpringCloud的源码分析有助于开发者了解其实现机制,从而更好地定制和优化自己的服务。源码中包含了Eureka服务发现、Zuul边缘服务、Hystrix断路器、 Ribbon客户端负载均衡、Feign声明式客户端、Spring Cloud Config...
### 一、SpringCloud简介与核心组件 #### 1.1 SpringCloud概述 SpringCloud是一套基于Spring Boot实现的微服务云应用开发工具集,它提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线...
**源码分析** 对于深入理解 Eureka 的工作原理,可以查看其源码。Eureka Server 的核心类包括 `EurekaServerContext` 和 `EurekaServerConfig`,它们负责处理服务的注册、注销和心跳等功能。Eureka Client 的关键类...
9. **源码分析**: 分享的源码是作者手敲并通过编译验证的,意味着你可以深入研究每个服务的实现细节,理解Spring Cloud的原理和最佳实践。通过阅读和运行这些代码,开发者可以提升自己在实际项目中的应用能力。 ...
总结来说,"SpringCloud+SpringBoot+Eureka源码"项目旨在通过源码学习,深入了解如何利用SpringBoot快速构建微服务,并借助SpringCloud的Eureka实现服务间的注册与发现。通过对源码的探索,我们可以更好地掌握微服务...
在压缩包文件中,`SpringCloud-Consumer`可能包含了服务消费者项目的源代码,`EureKaserver`则是Eureka Server的源码,而`SpringCloud-Service`则可能是服务提供者的源码。通过分析这些代码,你可以更深入地理解...
综上所述,"尚硅谷周阳老师SpringCloud学习源码"涵盖了SpringCloud的多个关键组件,通过学习这些源码,开发者能够深入理解微服务架构中的服务治理、API网关、断路器、配置中心、消息驱动等核心概念,进一步提升...
本篇将围绕SpringCloud的核心组件进行源码分析,包括Eureka服务注册与发现、Ribbon客户端负载均衡、Hystrix断路器、Zuul路由网关以及Config分布式配置中心,旨在帮助读者深入理解这些组件的工作原理。 一、Eureka:...
SpringCloud是中国Java开发者广泛使用的微服务框架,它包含了一系列组件,用于构建分布式系统。这个压缩包文件"SpringCloud 15个完整例子"提供了一系列从基础到进阶的示例项目,帮助用户深入理解并实践SpringCloud的...
这些视频课程结合源码分析和课件学习,可以帮助开发者深入理解SpringBoot和SpringCloud的核心原理,并掌握实际应用中的最佳实践。对于想要在微服务领域深入发展的IT专业人士来说,这是一个非常有价值的资源。通过...
源码分析** 在"demo1"中,我们可能看到以下关键组件的实现: - 服务启动类:包含`@EnableEurekaClient`注解,表示该服务会注册到Eureka服务器。 - API接口定义:定义了微服务对外提供的业务接口。 - 控制器:实现...
本项目为基于Java的SpringCloud注册中心Eureka设计源码,包含54个文件,其中Java源文件26个,PNG图片文件11个,Git忽略文件6个,XML和YAML配置文件各5个,Markdown文档1个。该源码旨在构建一个适用于SpringCloud...
《基于SpringCloud的电商平台源码解析》 在现代互联网应用开发中,微服务架构已经成为主流。Spring Cloud作为一套完整的微服务解决方案,为开发者提供了构建分布式系统所需的工具集合,包括服务发现、配置中心、...
尚硅谷提供的SpringCloud源码分析资源,为开发者深入理解这一框架提供了宝贵的资料。本文将围绕SpringCloud的核心组件、设计模式以及源码解析进行详细介绍,并结合思维导图,帮助读者构建清晰的知识体系。 首先,...