MonitorFactory 创建 Monitor 的接口
@SPI("dubbo")
public interface MonitorFactory {
/**
* Create monitor.
*
* @param url
* @return monitor
*/
@Adaptive("protocol")
Monitor getMonitor(URL url);
}
MonitorService 接口定义收集和查询监控数据
Monitor 代表了监控对象,继承于 MonitorService 和 Node 接口.
MetricsService 这一个接口应该是留作扩展用的.
AbstractMonitorFactory 是一个抽象类,实现了大部分创建 MonitorFactory 的代码,留有扩展点.
MonitorFilter 采用责任链模式模式,收集调用信息.
class MonitorListener implements Listener {
@Override
public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
/**
* The collector logic, it will be handled by the default monitor
*
* @param invoker
* @param invocation
* @param result the invoke result
* @param remoteHost the remote host address
* @param start the timestamp the invoke begin
* @param error if there is an error on the invoke
*/
private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
try {
URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
Monitor monitor = monitorFactory.getMonitor(monitorUrl);
if (monitor == null) {
return;
}
URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
monitor.collect(statisticsURL);
} catch (Throwable t) {
logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
}
}
/**
* Create statistics url
*
* @param invoker
* @param invocation
* @param result
* @param remoteHost
* @param start
* @param error
* @return
*/
private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
// ---- service statistics ----
long elapsed = System.currentTimeMillis() - start; // invocation cost
int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
String application = invoker.getUrl().getParameter(APPLICATION_KEY);
String service = invoker.getInterface().getName(); // service name
String method = RpcUtils.getMethodName(invocation); // method name
String group = invoker.getUrl().getParameter(GROUP_KEY);
String version = invoker.getUrl().getParameter(VERSION_KEY);
int localPort;
String remoteKey, remoteValue;
if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
// ---- for service consumer ----
localPort = 0;
remoteKey = MonitorService.PROVIDER;
remoteValue = invoker.getUrl().getAddress();
} else {
// ---- for service provider ----
localPort = invoker.getUrl().getPort();
remoteKey = MonitorService.CONSUMER;
remoteValue = remoteHost;
}
String input = "", output = "";
if (invocation.getAttachment(INPUT_KEY) != null) {
input = invocation.getAttachment(INPUT_KEY);
}
if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
output = result.getAttachment(OUTPUT_KEY);
}
return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
}
}
Statistics 类相当于一个 pojo 类,用于收集调用的基本信息,例如:
private URL url;
private String application;
private String service;
private String method;
private String group;
private String version;
private String client;
private String server;
在看代码的时候,发现 DubboMonitorFactory 类既有 getMonitor 方法,又有 createMonitor 方法,当时就赶到很奇怪,但是仔细看了下代码,发现 getMonitor 方法中调用了 createMonitor 方法,如下:
public Monitor getMonitor(URL url) {
url = url.setPath(MonitorService.class.getName()).addParameter(INTERFACE_KEY, MonitorService.class.getName());
String key = url.toServiceStringWithoutResolving();
Monitor monitor = MONITORS.get(key);
Future<Monitor> future = FUTURES.get(key);
if (monitor != null || future != null) {
return monitor;
}
LOCK.lock();
try {
monitor = MONITORS.get(key);
future = FUTURES.get(key);
if (monitor != null || future != null) {
return monitor;
}
final URL monitorUrl = url;
final CompletableFuture<Monitor> completableFuture = CompletableFuture.supplyAsync(() -> AbstractMonitorFactory.this.createMonitor(monitorUrl));
FUTURES.put(key, completableFuture);
completableFuture.thenRunAsync(new MonitorListener(key), executor);
return null;
} finally {
// unlock
LOCK.unlock();
}
}
在看 DubboMonitorFactory 类的时候,发现即有 protocol 又有 proxyFactory 等的,不是很明白.
仔细品味下,其实会发现这段代码是不是像极了一个对一个协议的引用?例如 HttpProtocol 协议.
protected Monitor createMonitor(URL url) {
URLBuilder urlBuilder = URLBuilder.from(url);
urlBuilder.setProtocol(url.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL));
if (StringUtils.isEmpty(url.getPath())) {
urlBuilder.setPath(MonitorService.class.getName());
}
String filter = url.getParameter(REFERENCE_FILTER_KEY);
if (StringUtils.isEmpty(filter)) {
filter = "";
} else {
filter = filter + ",";
}
urlBuilder.addParameters(CHECK_KEY, String.valueOf(false),
REFERENCE_FILTER_KEY, filter + "-monitor");
Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, urlBuilder.build());
MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
return new DubboMonitor(monitorInvoker, monitorService);
}
最后,我们来看下 DubboMonitor 的源码实现.
我们先看下构造方法,可以清晰的看到,dubbo 是每分钟上报一次.
public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {
this.monitorInvoker = monitorInvoker;
this.monitorService = monitorService;
this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000);
// collect timer for collecting statistics data
sendFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
try {
// collect data
send();
} catch (Throwable t) {
logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);
}
}, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
}
update[0] :调用成功的次数
update[1] :调用失败的次数
update[2] :总调用流量(请求包的总大小)。
update[3] :总响应流量(响应包的总大小)。
update[4] :总响应时长(总服务调用开销)。
update[5] :一次收集周期的平均TPS。
update[6] :最大请求包大小。
update[7] :最大响应包大小。
update[8] :最大响应时间。
update[9] :最大TPS。
但是我没有看到向监控中心发送数据的代码了,为啥了???
<!--?xml version="1.0" encoding="UTF-8"?-->
其实是有的,我们上面不是分析了,在 DubboMonitorFactory 中不是像一个协议一样吗?那个 refer 其实就是引用的注册中心的服务.
相关推荐
这表明压缩包包含的是Dubbo监控中心控制台的源码或者部署文件。war文件是Web应用程序的标准打包格式,通常包含了Web应用的所有资源,如HTML、CSS、JavaScript、图片以及Servlet等。用户可以将此war文件部署到支持...
压缩包中的`dubbo-monitor-simple-2.5.4-SNAPSHOT`是Dubbo监控中心的简化版本。这个组件提供了基本的服务监控功能,允许管理员查看服务的运行状态、调用统计、异常信息等,有助于快速定位和解决问题。简单易用是其一...
《Dubbo源码分析》是一套深入探讨Apache Dubbo这一著名Java开源框架的书籍,旨在帮助开发者更好地理解和应用Dubbo。Dubbo是一个高性能、轻量级的服务治理框架,广泛应用于微服务架构中,以实现服务的发布、发现、...
《Dubbo源码分析系列》是一份深入探讨Java开源框架Dubbo核心原理和技术细节的资料。Dubbo,作为阿里巴巴的一款高性能、轻量级的服务治理框架,它为分布式系统提供了服务发现、调用、负载均衡、容错等关键功能。这份...
通过监控中心的建立和监控数据的收集,可以实时了解服务的运行状态,对于发现和解决问题具有重要的帮助。 12. 在整个学习过程中,读者应该注重知识储备。对于初学者,建议先行阅读相关的Java编程书籍,比如《java...
这部分源码展示了如何对接Dubbo的配置中心,如ZKConfigurator,实现配置的实时同步和更新。 八、异常处理与日志记录 在源码中,我们可以看到dubbo-admin如何处理各种异常情况,以及如何记录和上报日志。这部分内容...
Service是业务接口,Provider是服务提供方,Consumer是服务消费方,Registry是服务注册中心,而Monitor则是用于统计服务调用数据的服务监控中心。在这些组件之间,Dubbo利用了工厂模式、代理模式、观察者模式等多种...
- **Monitor**:监控中心,记录服务的调用次数、调用时间等数据,用于监控服务的健康状态。 #### 三、核心机制分析 1. **设计模式**:Dubbo中广泛使用了设计模式来实现其功能。例如,在扩展点机制中使用了工厂模式...
4. **监控中心(Monitor)**:统计服务的调用次数、调用时间等,用于系统调优和故障排查。 三、服务治理 1. **配置中心(Configuration Center)**:集中管理和动态调整服务配置,支持Zookeeper、Redis等多种方式。 2....
- **7.5.1 监控中心**:集中管理Dubbo服务的监控信息。 - **7.5.2 SimpleMonitorService** - **7.5.2.1 Monitor基础类**:监控服务的基础类。 - **7.5.2.2 SimpleMonitorService**:简单的监控服务实现。 - **7.5...
4. 运行时监控(Monitor):收集服务的运行数据,用于性能分析和故障排查。 三、服务治理机制 1. 服务注册与发现:服务提供者向注册中心注册服务,服务消费者通过注册中心获取服务提供者的地址信息。 2. 负载均衡...
- **服务监控**:通过监控中心收集服务调用的统计信息,分析服务的健康状况。 - **动态配置**:在运行时动态调整服务的配置,如修改服务版本、权重等。 - **服务路由**:根据条件进行服务路由,比如只调用特定...
【标题】"dubbo应用实例源码工程"指的是一个基于...通过分析和实践这个dubbo应用实例源码工程,开发者可以更深入地理解Dubbo的工作原理和使用技巧,从而在自己的项目中更好地利用Dubbo实现高效、稳定的微服务架构。
在早期的版本中,Dubbo提供了一个名为dubbo-admin的监控中心,用于可视化展示和管理Dubbo服务的运行状态。本文将详细介绍如何使用dubbo-admin老版本进行服务监控。 一、安装与启动dubbo-admin 1. 下载源码:根据...
通过对Dubbo源码的阅读和分析,我们可以更深入地理解服务治理的原理,学习到如何设计高可用、可扩展的微服务架构,这对于提升我们的Java编程技能和系统设计能力具有重要的实践意义。同时,Dubbo也是许多企业级项目的...
Dubbo内置了监控中心,可以收集服务的调用统计、异常信息等,通过可视化界面展示,帮助开发者实时了解服务运行状态,及时发现问题并进行优化。 通过对Dubbo 2.7.2源码的深度解析,我们可以了解到其设计思想和实现...
总的来说,"dubbo-控制台使用源码"涵盖了分布式服务治理、监控、源码分析等多个重要知识点,对于提升开发者在微服务架构下的设计和实现能力大有裨益。通过深入研究,不仅可以提升个人技术水平,也能为实际项目带来更...
源码分析可以帮助开发者深入理解其内部机制,提升开发和优化服务的能力。 1. **设计模式与架构** - **服务提供者(Provider)**:提供服务的模块,暴露服务接口供消费者调用。 - **服务消费者(Consumer)**:...
在 Apache Dubbo 服务发布的源码分析中,我们主要关注如何将服务暴露并注册到服务中心。这个过程涉及到了几个关键的步骤和 Spring 框架的集成。首先,我们需要了解 Spring 如何处理自定义标签以及 Dubbo 如何利用这...