`

dubbo 监控中心源码分析

阅读更多
MonitorFactory 创建 Monitor 的接口
@SPI("dubbo")
public interface MonitorFactory {
 
    /**
     * Create monitor.
     *
     * @param url
     * @return monitor
     */
    @Adaptive("protocol")
    Monitor getMonitor(URL url);
 
}
 
 
MonitorService 接口定义收集和查询监控数据
 
Monitor 代表了监控对象,继承于 MonitorService 和 Node 接口.
 
MetricsService 这一个接口应该是留作扩展用的.
 
AbstractMonitorFactory 是一个抽象类,实现了大部分创建 MonitorFactory 的代码,留有扩展点.
 
MonitorFilter 采用责任链模式模式,收集调用信息.
 
class MonitorListener implements Listener {
 
        @Override
        public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }
 
        @Override
        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }
 
        /**
         * The collector logic, it will be handled by the default monitor
         *
         * @param invoker
         * @param invocation
         * @param result     the invoke result
         * @param remoteHost the remote host address
         * @param start      the timestamp the invoke begin
         * @param error      if there is an error on the invoke
         */
        private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
            try {
                URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
                Monitor monitor = monitorFactory.getMonitor(monitorUrl);
                if (monitor == null) {
                    return;
                }
                URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
                monitor.collect(statisticsURL);
            } catch (Throwable t) {
                logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
 
        /**
         * Create statistics url
         *
         * @param invoker
         * @param invocation
         * @param result
         * @param remoteHost
         * @param start
         * @param error
         * @return
         */
        private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
            // ---- service statistics ----
            long elapsed = System.currentTimeMillis() - start; // invocation cost
            int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
            String application = invoker.getUrl().getParameter(APPLICATION_KEY);
            String service = invoker.getInterface().getName(); // service name
            String method = RpcUtils.getMethodName(invocation); // method name
            String group = invoker.getUrl().getParameter(GROUP_KEY);
            String version = invoker.getUrl().getParameter(VERSION_KEY);
 
            int localPort;
            String remoteKey, remoteValue;
            if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
                // ---- for service consumer ----
                localPort = 0;
                remoteKey = MonitorService.PROVIDER;
                remoteValue = invoker.getUrl().getAddress();
            } else {
                // ---- for service provider ----
                localPort = invoker.getUrl().getPort();
                remoteKey = MonitorService.CONSUMER;
                remoteValue = remoteHost;
            }
            String input = "", output = "";
            if (invocation.getAttachment(INPUT_KEY) != null) {
                input = invocation.getAttachment(INPUT_KEY);
            }
            if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
                output = result.getAttachment(OUTPUT_KEY);
            }
 
            return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
        }
 
    }
 
 
Statistics 类相当于一个 pojo 类,用于收集调用的基本信息,例如:
 
    private URL url;
 
    private String application;
 
    private String service;
 
    private String method;
 
    private String group;
 
    private String version;
 
    private String client;
 
    private String server;
 
在看代码的时候,发现 DubboMonitorFactory 类既有 getMonitor 方法,又有 createMonitor 方法,当时就赶到很奇怪,但是仔细看了下代码,发现 getMonitor 方法中调用了 createMonitor 方法,如下:
 
public Monitor getMonitor(URL url) {
        url = url.setPath(MonitorService.class.getName()).addParameter(INTERFACE_KEY, MonitorService.class.getName());
        String key = url.toServiceStringWithoutResolving();
        Monitor monitor = MONITORS.get(key);
        Future<Monitor> future = FUTURES.get(key);
        if (monitor != null || future != null) {
            return monitor;
        }
 
        LOCK.lock();
        try {
            monitor = MONITORS.get(key);
            future = FUTURES.get(key);
            if (monitor != null || future != null) {
                return monitor;
            }
 
            final URL monitorUrl = url;
            final CompletableFuture<Monitor> completableFuture = CompletableFuture.supplyAsync(() -> AbstractMonitorFactory.this.createMonitor(monitorUrl));
            FUTURES.put(key, completableFuture);
            completableFuture.thenRunAsync(new MonitorListener(key), executor);
 
            return null;
        } finally {
            // unlock
            LOCK.unlock();
        }
    }
 
在看 DubboMonitorFactory 类的时候,发现即有 protocol 又有 proxyFactory 等的,不是很明白.
 
仔细品味下,其实会发现这段代码是不是像极了一个对一个协议的引用?例如 HttpProtocol 协议.
protected Monitor createMonitor(URL url) {
        URLBuilder urlBuilder = URLBuilder.from(url);
        urlBuilder.setProtocol(url.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL));
        if (StringUtils.isEmpty(url.getPath())) {
            urlBuilder.setPath(MonitorService.class.getName());
        }
        String filter = url.getParameter(REFERENCE_FILTER_KEY);
        if (StringUtils.isEmpty(filter)) {
            filter = "";
        } else {
            filter = filter + ",";
        }
        urlBuilder.addParameters(CHECK_KEY, String.valueOf(false),
                REFERENCE_FILTER_KEY, filter + "-monitor");
        Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, urlBuilder.build());
        MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
        return new DubboMonitor(monitorInvoker, monitorService);
    }
 
最后,我们来看下 DubboMonitor 的源码实现.
 
我们先看下构造方法,可以清晰的看到,dubbo 是每分钟上报一次.
public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {
        this.monitorInvoker = monitorInvoker;
        this.monitorService = monitorService;
        this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000);
        // collect timer for collecting statistics data
        sendFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> {
            try {
                // collect data
                send();
            } catch (Throwable t) {
                logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);
            }
        }, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
    }
 
      update[0] :调用成功的次数
      update[1] :调用失败的次数
      update[2] :总调用流量(请求包的总大小)。
      update[3] :总响应流量(响应包的总大小)。
      update[4] :总响应时长(总服务调用开销)。
      update[5] :一次收集周期的平均TPS。
      update[6] :最大请求包大小。
      update[7] :最大响应包大小。
      update[8] :最大响应时间。
      update[9] :最大TPS。
 
但是我没有看到向监控中心发送数据的代码了,为啥了???

<!--?xml version="1.0" encoding="UTF-8"?-->

其实是有的,我们上面不是分析了,在 DubboMonitorFactory 中不是像一个协议一样吗?那个 refer 其实就是引用的注册中心的服务. 
0
1
分享到:
评论

相关推荐

    dubbo监控中心控制台

    这表明压缩包包含的是Dubbo监控中心控制台的源码或者部署文件。war文件是Web应用程序的标准打包格式,通常包含了Web应用的所有资源,如HTML、CSS、JavaScript、图片以及Servlet等。用户可以将此war文件部署到支持...

    Dubbo简易监控中心发布包(2.5.4开发版)

    压缩包中的`dubbo-monitor-simple-2.5.4-SNAPSHOT`是Dubbo监控中心的简化版本。这个组件提供了基本的服务监控功能,允许管理员查看服务的运行状态、调用统计、异常信息等,有助于快速定位和解决问题。简单易用是其一...

    dubbo源码分析pdf.zip

    《Dubbo源码分析》是一套深入探讨Apache Dubbo这一著名Java开源框架的书籍,旨在帮助开发者更好地理解和应用Dubbo。Dubbo是一个高性能、轻量级的服务治理框架,广泛应用于微服务架构中,以实现服务的发布、发现、...

    dubbo源码分析系列

    《Dubbo源码分析系列》是一份深入探讨Java开源框架Dubbo核心原理和技术细节的资料。Dubbo,作为阿里巴巴的一款高性能、轻量级的服务治理框架,它为分布式系统提供了服务发现、调用、负载均衡、容错等关键功能。这份...

    dubbo源码解析

    通过监控中心的建立和监控数据的收集,可以实时了解服务的运行状态,对于发现和解决问题具有重要的帮助。 12. 在整个学习过程中,读者应该注重知识储备。对于初学者,建议先行阅读相关的Java编程书籍,比如《java...

    dubbo-admin源码

    这部分源码展示了如何对接Dubbo的配置中心,如ZKConfigurator,实现配置的实时同步和更新。 八、异常处理与日志记录 在源码中,我们可以看到dubbo-admin如何处理各种异常情况,以及如何记录和上报日志。这部分内容...

    dubbo的源码分析

    Service是业务接口,Provider是服务提供方,Consumer是服务消费方,Registry是服务注册中心,而Monitor则是用于统计服务调用数据的服务监控中心。在这些组件之间,Dubbo利用了工厂模式、代理模式、观察者模式等多种...

    dubbo2.0-源码阅读

    - **Monitor**:监控中心,记录服务的调用次数、调用时间等数据,用于监控服务的健康状态。 #### 三、核心机制分析 1. **设计模式**:Dubbo中广泛使用了设计模式来实现其功能。例如,在扩展点机制中使用了工厂模式...

    dubbo-2.5.3源码

    4. **监控中心(Monitor)**:统计服务的调用次数、调用时间等,用于系统调优和故障排查。 三、服务治理 1. **配置中心(Configuration Center)**:集中管理和动态调整服务配置,支持Zookeeper、Redis等多种方式。 2....

    dubbo源码解析2

    - **7.5.1 监控中心**:集中管理Dubbo服务的监控信息。 - **7.5.2 SimpleMonitorService** - **7.5.2.1 Monitor基础类**:监控服务的基础类。 - **7.5.2.2 SimpleMonitorService**:简单的监控服务实现。 - **7.5...

    dubbo入门学习框架源码

    4. 运行时监控(Monitor):收集服务的运行数据,用于性能分析和故障排查。 三、服务治理机制 1. 服务注册与发现:服务提供者向注册中心注册服务,服务消费者通过注册中心获取服务提供者的地址信息。 2. 负载均衡...

    dubbo 分布式搭建源码

    - **服务监控**:通过监控中心收集服务调用的统计信息,分析服务的健康状况。 - **动态配置**:在运行时动态调整服务的配置,如修改服务版本、权重等。 - **服务路由**:根据条件进行服务路由,比如只调用特定...

    dubbo应用实例源码工程

    【标题】"dubbo应用实例源码工程"指的是一个基于...通过分析和实践这个dubbo应用实例源码工程,开发者可以更深入地理解Dubbo的工作原理和使用技巧,从而在自己的项目中更好地利用Dubbo实现高效、稳定的微服务架构。

    dubbo监控中心dubbo-admin老版本使用

    在早期的版本中,Dubbo提供了一个名为dubbo-admin的监控中心,用于可视化展示和管理Dubbo服务的运行状态。本文将详细介绍如何使用dubbo-admin老版本进行服务监控。 一、安装与启动dubbo-admin 1. 下载源码:根据...

    dubbo-master源码

    通过对Dubbo源码的阅读和分析,我们可以更深入地理解服务治理的原理,学习到如何设计高可用、可扩展的微服务架构,这对于提升我们的Java编程技能和系统设计能力具有重要的实践意义。同时,Dubbo也是许多企业级项目的...

    dubbo-dubbo-2.7.2源码

    Dubbo内置了监控中心,可以收集服务的调用统计、异常信息等,通过可视化界面展示,帮助开发者实时了解服务运行状态,及时发现问题并进行优化。 通过对Dubbo 2.7.2源码的深度解析,我们可以了解到其设计思想和实现...

    dubbo-控制台使用源码

    总的来说,"dubbo-控制台使用源码"涵盖了分布式服务治理、监控、源码分析等多个重要知识点,对于提升开发者在微服务架构下的设计和实现能力大有裨益。通过深入研究,不仅可以提升个人技术水平,也能为实际项目带来更...

    apache dubbo 3.0.7源码

    源码分析可以帮助开发者深入理解其内部机制,提升开发和优化服务的能力。 1. **设计模式与架构** - **服务提供者(Provider)**:提供服务的模块,暴露服务接口供消费者调用。 - **服务消费者(Consumer)**:...

    03-05-12-ApacheDubbo服务发布源码分析1

    在 Apache Dubbo 服务发布的源码分析中,我们主要关注如何将服务暴露并注册到服务中心。这个过程涉及到了几个关键的步骤和 Spring 框架的集成。首先,我们需要了解 Spring 如何处理自定义标签以及 Dubbo 如何利用这...

Global site tag (gtag.js) - Google Analytics