`

大众点评的实时监控系统分析(一)【转】

 
阅读更多

原文地址:http://www.dengshenyu.com/%E5%90%8E%E7%AB%AF%E6%8A%80%E6%9C%AF/2016/09/05/cat.html

今天我们从使用和实现两个方面来聊一聊大众点评的Java应用实时监控系统–CAT,它目前已成为一个开源项目,见Github

目录

CAT能做些什么?

在此之前,先来想一想对于线上应用我们希望能监控些什么?可能有如下这些:

  • 机器状态信息。CPU负载、内存信息、磁盘使用率这些是必需的,另外可能还希望收集Java进程的数据,例如线程栈、堆、垃圾回收等信息,以帮助出现问题时快速debug。
  • 请求访问情况。例如请求个数、响应时间、处理状态,如果有处理过程中的时间分析那就更完美了。
  • 异常情况。譬如缓存服务时不时出现无响应,我们希望能够监控到这种异常,从而做进一步的处理。
  • 业务情况。例如订单量统计,销售额等等。

以上这些CAT都能支持。根据其官方文档,CAT支持如下5种监控消息:

  • Transaction。记录跨越系统边界的程序访问行为,比如远程调用,数据库调用,也适合执行时间较长的业务逻辑监控。
  • Event。用来记录一件事发生的次数,比如记录系统异常,它和transaction相比缺少了时间的统计,开销比transaction要小。
  • Heartbeat。表示程序内定期产生的统计信息, 如CPU%, MEM%, 连接池状态, 系统负载等。
  • Metric。用于记录业务指标、指标可能包含对一个指标记录次数、记录平均值、记录总和,业务指标最低统计粒度为1分钟。
  • Trace。用于记录基本的trace信息,类似于log4j的info信息,这些信息仅用于查看一些相关信息

在一个请求处理中可能产生有多种消息,CAT将其组织成消息树的形式。在处理开始时,默认开始一个类型为URL的Transaction,在这个Transaction中业务本身可以产生子消息。例如,产生一个数据库访问的子Transaction或者一个订单统计的Metric。结构如下所示:

message-tree

CAT的使用比较简单,接口也比较清晰,关于其使用请参考官方文档,这里不再赘述。本文主要讨论其客户端的设计与实现。

CAT客户端的设计

作为一个日志上报的通用客户端,考虑点至少有如下这些:

  • 为了尽可能减少对业务的影响,需要对消息进行异步处理。即业务线程将消息交给CAT客户端与CAT客户端上报这两个过程需要异步。
  • 为了达到实时的目的以及适应高并发的情况,客户端上报应该基于TCP而非HTTP开发。
  • 在线程安全的前提下尽可能的资源低消耗以及低延时。我们知道,线程竞争的情况是由于资源共享造成的,要达到线程安全通常需要减少资源共享或者加锁,而这两点则会导致系统资源冗余和高延时。

CAT客户端实现并不复杂,但这些点都考虑到了。它的架构如下所示:

cat-architecture

大概步骤为:

  1. 业务线程产生消息,交给消息Producer,消息Producer将消息存放在该业务线程消息栈中;
  2. 业务线程通知消息Producer消息结束时,消息Producer根据其消息栈产生消息树放置在同步消息队列中;
  3. 消息上报线程监听消息队列,根据消息树产生最终的消息报文上报CAT服务端。

下面我们来一步一步分析其源码。

CAT客户端的实现

CAT客户端实现在源码目录cat-client下,而cat-client的主要实现则依赖于它的com.dianping.cat.message包。该包结构如下:

category

com.dianping.cat.message中主要包含了internal、io、spi这三个目录:

  • internal目录包含主要的CAT客户端内部实现类;
  • io目录包含建立服务端连接、重连、消息队列监听、上报等io实现类;
  • spi目录为上报消息工具包,包含消息二进制编解码、转义等实现类。

其uml图如下所示(可以放大看):

uml

类的功能如下:

  • Message为所有上报消息的抽象,它的子类实现有Transaction、Metric、Event、HeartBeat、Trace这五种。
  • MessageProducer封装了所有接口,业务在使用CAT时只需要通过MessageProducer来操作。
  • MessageManager为CAT客户端核心类,相当于MVC中的Controller。
  • Context类保存消息上下文。
  • TransportManager提供发送消息的sender,具体实现有DefaultTransportManager,调用其getSender接口返回一个TcpSocketSender。
  • TcpSocketSender类负责发送消息。

Message

上面说到,Message有五类,分别为Transaction、Metric、Event、HeartBeat、Trace。其中Metric、Event、HeartBeat、Trace基本相同,保存的数据都为一个字符串;而Transaction则保存一个Message列表。换句话说,Transaction的结构为一个递归包含的结构,其他结构则为原子性结构。

下面为DefaultTransaction的关键数据成员及操作:

public class DefaultTransaction extends AbstractMessage implements Transaction {
    private List<Message> m_children;
    private MessageManager m_manager;
    ...

    //添加子消息
    public DefaultTransaction addChild(Message message) {
        ...
    }

    //Transaction结束时调用此方法
    public void complete() {
        ...
        m_manager.end(this); //调用MessageManager来结束Transaction 
        ...
    }

值得一提的是,Transaction(或者其他的Message)在创建时自动开始,消息结束时需要业务方调用complete方法,而在complete方法内部则调用MessageManager来完成消息。

MessageProducer

MessageProducer对业务方封装了CAT内部的所有细节,它的主要方法如下:

public void logError(String message, Throwable cause);
public void logEvent(String type, String name, String status, String nameValuePairs);
public void logHeartbeat(String type, String name, String status, String nameValuePairs);
public void logMetric(String name, String status, String nameValuePairs);
public void logTrace(String type, String name, String status, String nameValuePairs);
...
public Event newEvent(String type, String name);
public Event newEvent(Transaction parent, String type, String name);
public Heartbeat newHeartbeat(String type, String name);
public Metric newMetric(String type, String name);
public Transaction newTransaction(String type, String name);
public Trace newTrace(String type, String name);
...

logXXX方法为方法糖(造词小能手呵呵),这些方法在调用时需要传入消息数据,方法结束后消息自动结束。

newXXX方法返回相应的Message,业务方需要调用Message方法设置数据,并最终调用Message.complete()方法结束消息。

MessageProducer只是接口封装,消息处理主要实现依赖于MessageManager这个类。

MessageManager

MessageManager为CAT的核心类,但它只是定义了接口,具体实现为DefaultMessageManager。DefaultMessageManager这个类里面主要包含了两个功能类,ContextTransportManager,分别用于保存上下文和消息传输。TransportManager运行期间为单例对象,而Context则包装成ThreadLocal为每个线程保存上下文。

我们通过接口来了解DefaultMessageManager的主要功能:

public void add(Message message);
public void start(Transaction transaction, boolean forked);
public void end(Transaction transaction);

public void flush(MessageTree tree);

add()方法用来添加原子性的Message,也就是Metric、Event、HeartBeat、Trace。

start()和end()方法用来开始和结束Transaction这种消息。

flush()方法用来将当前业务线程的所有消息刷新到CAT服务端,当然,是异步的。

Context

Context用来保存消息上下文,我们可以通过它的主要接口来了解它功能:

public void add(Message message) {
    if (m_stack.isEmpty()) {
         MessageTree tree = m_tree.copy();

         tree.setMessage(message);
         flush(tree);
    } else {
         Transaction parent = m_stack.peek();

         addTransactionChild(message, parent);
     }
 }

add方法主要添加原子性消息,它先判断该消息是否有上文消息(即判断是否处于一个Transaction中)。如果有则m_stack不为空并且将该消息添加到上文Transaction的子消息队列中;否则直接调用flush来将此原子性消息刷新到服务端。

public void start(Transaction transaction, boolean forked) {
    if (!m_stack.isEmpty()) {
        ...
        Transaction parent = m_stack.peek();
        addTransactionChild(transaction, parent);
    } else {
        m_tree.setMessage(transaction);
    }

    if (!forked) {
        m_stack.push(transaction);
    }
}

start方法用来开始Transaction(Transaction是消息里比较特殊的一种),如果当前消息栈为空则证明该Transaction为第一个Transaction,使用消息树保存该消息,同时将该消息压栈;否则将当前Transaction保存到上文Transaction的子消息队列中,同时将该消息压栈。

public boolean end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
        Transaction current = m_stack.pop();
        ...
        if (m_stack.isEmpty()) {
            MessageTree tree = m_tree.copy();

            m_tree.setMessageId(null);
            m_tree.setMessage(null);
            ...
            manager.flush(tree); //刷新消息到CAT服务端
            return true;
        }
    }

    return false;
}

end方法用来结束Transaction,每次调用都会pop消息栈,如果栈为空则调用flush来刷新消息到CAT服务端。

综上,Context的m_stack的结构如下:

message-stack

Transaction之间是有引用的,因此在end方法中只需要将第一个Transaction(封装在MessageTree中)通过MessageManager来flush,在拼接消息时可以根据这个引用关系来找到所有的Transaction :)。

TransportManager和MessageSender

这两个类用来发送消息到服务端。MessageManager通过TransportManager获取到MessageSender,调用sender.send()方法来发送消息。 TransportManager和MessageSender关系如下:

transport

TCPSocketSender为MessageSender的具体子类,它里面主要的数据成员为:

private MessageCodec m_codec;
private MessageQueue m_queue = new DefaultMessageQueue(SIZE);
private ChannelManager m_manager;
  • MessageCodec:CAT基于TCP传输消息,因此在发送消息时需要对字符消息编码成字节流,这个编码的工作由MessageCodec负责实现。

  • MessageQueue:还记得刚才说业务方在添加消息时,CAT异步发送到服务端吗?在添加消息时,消息会被放置在TCPSocketSender的m_queue中,如果超出queue大小则抛弃消息。

  • ChannelManager:CAT底层使用netty来实现TCP消息传输,ChannelManager负责维护通信Channel。通俗的说,维护连接。

TCPSocketSender主要方法为initialize、send和run,分别介绍如下:

public void initialize() {
    m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);

    Threads.forGroup("cat").start(this);
    Threads.forGroup("cat").start(m_manager);
    ...
}

initialize方法为初始化方法,在执行时主要创建两个线程,一个用来运行自身run方法(TCPSocketSender实现了Runnable接口)监听消息队列;另一个则用来执行ChannelManager维护通信Channel。

public void send(MessageTree tree) {
    if (isAtomicMessage(tree)) {
        boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

        if (!result) {
            logQueueFullInfo(tree);
        }
    } else {
        boolean result = m_queue.offer(tree, m_manager.getSample());

        if (!result) {
            logQueueFullInfo(tree);
        }
    }
}

send方法被MessageManager调用,把消息放置在消息队列中。

public void run() {
    m_active = true;

    while (m_active) {
        ChannelFuture channel = m_manager.channel();

        if (channel != null && checkWritable(channel)) {
            try {
                MessageTree tree = m_queue.poll();

                if (tree != null) {
                    sendInternal(tree);
                    tree.setMessage(null);
                }

            } catch (Throwable t) {
                m_logger.error("Error when sending message over TCP socket!", t);
            }
        } else {
            try {
                Thread.sleep(5);
            } catch (Exception e) {
                // ignore it
                m_active = false;
            }
        }
    }
}

private void sendInternal(MessageTree tree) {
    ChannelFuture future = m_manager.channel();
    ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

    m_codec.encode(tree, buf);

    int size = buf.readableBytes();
    Channel channel = future.channel();

    channel.writeAndFlush(buf);
    if (m_statistics != null) {
        m_statistics.onBytes(size);
    }
}

run方法会一直执行直到进程退出,在循环里先获取通信Channel,然后发送消息。值得注意的是,sendInternal方法在执行时调用m_codec.encode(tree, buf),参数为消息树缓冲区。消息树里面其实只保存了一个消息,还记得刚才说的Transaction上下文引用吗?m_codec在encode的时候会判断消息类型是否为Transaction,如果为Transaction则会递归获取子Transaction,否则直接将该消息编码。具体实现可以参考源代码的PlainTextMessageCodec类的encode方法,此处不再赘述。

最后

本文主要分享了大众点评的Java应用监控系统–CAT的客户端实现,如有错漏恳请指正。以上。

分享到:
评论

相关推荐

    大众点评实时监控系统CAT

    大众点评实时监控系统CAT是一个强大的工具,它不仅能够实时监测系统的健康状况,还能够帮助开发和运维团队快速定位问题,优化系统性能。通过其独特的设计理念和技术实现,CAT已成为大众点评及其他采用该系统的组织不...

    大众点评CAT监控系统

    大众点评CAT监控系统是一款专为分布式应用设计的性能监控平台,尤其在互联网行业中广泛应用。CAT,全称为"Chinase Application Tracking",旨在提供实时、全面的系统运行状况监控,帮助开发者快速定位并解决问题,...

    大众点评CAT监控解析

    大众点评CAT监控系统是一种基于Java开发的实时应用监控平台,旨在为大众点评网络提供全面的监控服务和决策支持。它作为大众点评网的基础监控组件,已在中间件框架(包括MVC、RPC、数据库、缓存框架等)中得到了广泛...

    大众点评数据架构之道

    大众点评在工具与自动化运维方面投入了大量的资源,采用了一系列工具和技术来提高运维效率,包括但不限于自动化监控、故障诊断、日志分析等。这些工具和技术的应用,使得大众点评能够及时发现并解决系统问题,保证了...

    数据抓包工具,可以抓取大众点评的所有数据

    数据抓包工具是一种在网络通信中获取传输数据的实用程序,它们可以帮助我们监控、记录和分析网络上的信息流。在本例中,我们将讨论如何利用抓包工具来抓取大众点评网上的数据,如图片信息和链接地址。这些工具通常被...

    大众点评网计划书创业项目

    【大众点评网计划书创业项目】是一份详细阐述如何创建并运营类似大众点评网的创业项目的计划书。该项目的核心在于利用手机二维码技术,整合手机网站、互联网和电子会员卡,为中小企业提供促销和广告平台,同时也为...

    仿大众点评网PHP程序.zip

    5. **数据统计与分析**:对于网站管理员,程序应提供数据分析工具,以便监控网站流量、用户行为、热门搜索等,从而优化网站内容和推广策略。 总的来说,《仿大众点评网PHP程序》为构建一个类似于大众点评的本地生活...

    SpringMVC监控系统案例(cat)

    而Cat则是由Dianping(大众点评)开源的一款实时分布式服务监控系统,它专注于服务和系统的性能监控,帮助开发者及时发现并处理问题。在"SpringMVC监控系统案例(cat)"中,我们将探讨如何结合这两者来构建一个全面...

    基于Java的类似大众点评的app源码.zip

    这是一个基于Java技术栈开发的类似大众点评的移动应用的源码项目,主要采用了Spring Boot、Spring Cloud和SSM(Spring Framework、Spring MVC、MyBatis)等框架。下面将详细介绍这些关键技术及其在项目中的作用。 ...

    餐饮美食点评网站PHP程序(类似大众点评网)_utf8版

    3. 性能监控:通过日志分析和性能测试工具(如New Relic)持续优化代码性能。 五、扩展与升级 随着用户需求和市场变化,网站需具备良好的可扩展性。例如,添加预订功能、积分系统、会员等级等。同时,保持代码整洁...

    大众点评数据架构之道---ppt

    例如,自动化部署、监控告警系统、性能分析工具等都是必不可少的部分。 综上所述,大众点评的数据架构设计不仅充分考虑到了当前的技术发展趋势,还在不断优化迭代中逐步形成了自己独特的体系,为公司业务的持续发展...

    运维架构全览和平台自动化演变

    1. 全方位监控系统:对运维环境中的各项指标进行实时监控,包括服务器性能、网络流量、应用状态等。 2. 自动化平台系统:实现运维工作的自动化,减少重复劳动,提高工作效率。 3. 配置类和管理系统:对服务器环境...

    架构师大会

    大众点评网的监控系统对于保证服务质量和用户体验至关重要。尤勇的分享可能涉及了监控系统的整体设计,包括指标收集、报警策略、性能优化等方面。他可能会讲解如何通过实时监控来预测和预防故障,以及如何利用大...

    时代凌宇skywalking链路监控1

    总结,SkyWalking 作为一款现代的 APM 工具,为分布式环境提供了强大的链路监控和性能分析能力,帮助企业提升系统稳定性,降低运维复杂度。通过 Java Agent 技术实现无侵入式监控,使得集成和维护变得更加简单高效。

    CAT框架的服务端和客户端的配置

    CAT(Coding Analysis Toolkit)框架是大众点评(Dianping)开源的一款实时监控系统,主要用于分布式应用的性能监控和故障排查。这个框架可以帮助开发者在复杂的分布式环境中实时了解服务的状态,包括服务调用的耗时...

    dianping-cat v3.0.0源码包

    猫眼(Cat)是一款由大众点评团队开发的企业级实时监控系统,它主要用于收集、聚合、展示系统的运行状态,为运维人员提供强大的监控能力。dianping-cat v3.0.0是该系统的最新版本,其源码包包含了丰富的功能模块和...

    FlinkForwardChina2018基于Flink的美团点评实时计算平台实践和应用.pdf

    实时计算平台在风控、流量分析、业务监控等领域的应用,能够帮助企业快速响应市场变化,实现数据驱动的决策。 #### 关于挑战与未来 虽然实时计算平台已经在美团点评取得了一定的成绩,但仍然面临着包括系统稳定性、...

Global site tag (gtag.js) - Google Analytics