`
SavageGarden
  • 浏览: 221653 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于Hadoo的日志收集框架---Chukwa的源码分析(适配器、代理)

 
阅读更多

1. 接口、实现类简介
(1)Chukwa使用适配器(Adaptor)实现对各种输入(Streaming、Log File)的监控

    org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor

    适配器接口

// 启动适配器
void start(String adaptorID, String type, long offset, ChunkReceiver dest)

// 返回适配器状态
String getCurrentStatus()

// 返回适配器类型
String getType()

// 参数处理,返回流的名称
String parseArgs(String datatype, String params, AdaptorManager c)

// 关闭适配器,关闭之前应推送所有数据
long shutdown(AdaptorShutdownPolicy shutdownPolicy)

 

    org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor
    适配器的实现类,是一个抽象类,只实现了适配器接口中的start方法和parseArgs方法

// 只是做了对属性的赋值操作,启动方法放在了重载的start(long offset)方法中
void start(String adaptorID, String type, long offset, ChunkReceiver dest)

// 抽象方法start由AbstractAdaptor的子类实现适配器的启动过程
abstract void start(long offset)

// 同样的方式实现了parseArgs方法
String parseArgs(String d, String s, AdaptorManager c)
abstract String parseArgs(String s)

// 使用AdaptorManager实现Adaptor的注销和停止
void deregisterAndStop()

 

    org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.LWFTAdaptor
    继承自AbstractAdaptor,实现了使用tail的方式监测文件增量内容的功能

// 启动一个文件监测线程(FileTailer),并设置当前读取位置(fileReadOffset)
void start(long offset)

// 停止文件监测线程(FileTailer),并返回已经读取的长度
long shutdown(AdaptorShutdownPolicy shutdownPolicy)

// 监测文件增量内容
synchronized boolean tailFile()

// 从字节数组中提取数据
extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)

 

    org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor
    继承自LWFTAdaptor

// 重写了停止的方法
long shutdown(AdaptorShutdownPolicy shutdownPolicy)

// 重写了监测文件增量内容的方法
synchronized boolean tailFile()

 

    org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8
    继承自FileTailingAdaptor

// 重写了提取数据的方法
int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)

 

(2)Chukwa使用代理(Agent)实现对各种适配器(Adaptor)的管理

    org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager
    适配器管理者接口

// 返回属于此管理者的适配器数量
int adaptorCount()

// 根据ID停止某个适配器
long stopAdaptor(String id, AdaptorShutdownPolicy mode)

// 根据ID获取适配器对象
Adaptor getAdaptor(String id)

// 命令处理
String processAddCommand(String cmd)

// 以map形式返回适配器列表
Map<String, String> getAdaptorList()

// 提交报告
String reportCommit(Adaptor src, long uuid)

 

    org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent
    适配器管理者的实现类

 

2. 启动、处理流程

(1)代理的启动流程
    org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent

// 存储adaptor及其偏移量
Map<Adaptor, Offset> adaptorPositions

// 存储adaptor及其ID
Map<String, Adaptor> adaptorsByName

// 记录恢复的次数
int checkpointNumber

// 是否启用checkpoint机制
boolean DO_CHECKPOINT_RESTORE	

// checkpoint文件前缀名
String CHECKPOINT_BASE_NAME	

// checkpoint存储目录
File checkpointDir

/**
 * 1. 创建守护进程"Agent",将进程号写入相应的pid文件
 * 2. 校验参数,如果参数为"-help",则显示帮助信息后退出
 * 3. 根据系统设置(CHUKWA_HOME、CHUKWA_CONF_DIR)加载配置文件(chukwa-agent-conf.xml),并根据配置文件新建ChukwaAgent实例
 * 4. 检测是否有另外一个Agent在运行,如果有则退出
 * 5. 启动Connector,如果第一个参数中为"local",则启动ConsoleOutConnector,否则启动HttpConnector
 */
public static void main(String[] args)

/**
 * 根据配置文件初始化ChukwaAgent
 * 1. 从配置文件中加载配置项
 *    chukwaAgent.checkpoint.enabled 		是否启动checkpoint机制
 *    chukwaAgent.checkpoint.name			checkpoint文件前缀名
 *    chukwaAgent.checkpoint.interval		checkpoint间隔时间
 * 2. 如果启动了checkpoint机制则从checkpoint文件恢复启动adaptor
 *	  如果initialAdaptors不为空且存在,则执行文件中的加载adaptors的命令
 * 3. 获取AgentControlSocketListener实例,将服务绑定在127.0.0.1:9093上 
 * 4. 如果设置了checkpoint间隔时间且checkpoint目录存在,则启动定时器,以间隔一定时间将adaptors信息写入到checkpoint文件
 */
public ChukwaAgent(Configuration conf)

/**
 * 1. 遍历存储checkpoint文件的目录,查找以CHECKPOINT_BASE_NAME为前缀的文件
 * 2. 截取文件名中的数字,对其加一做为下一次checkpoint的编号
 * 3. 从文件中加载启动adaptor 
 */
private boolean restoreFromCheckpoint()

/**
 * 1. 逐行读取文件内容并执行
 */
private void readAdaptorsFile(File checkpoint)

/**
 * 1. 使用正则表达式匹配命令是否符合如下格式
 * 	  "add [name =] <adaptor_class_name> <datatype> <adaptor specific params> <initial offset>"	
 *    [name = ]为可选参数,名称中不能有空格或'=', 要以'adaptor_'开头
 *	  checkpoint文件示例:
 *	  ADD adaptor_90ac9ec5db2de67ebd7ad5a5ec260d26 = org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8 TestLog1 0 /home/matrix/Program/project/log/testlog1 57810
 *	  initial_adaptors文件示例:
 *	  add filetailer.CharFileTailingAdaptorUTF8 TestLog1 0 /home/matrix/Program/project/log/testlog1 0		
 * 2. 分别获取adaptorID、adaptorClassName、dataType、params、offset
 * 3. 根据adaptorClassName由适配器工厂得到适配器对象
 * 4. 根据dataType、params获取流名称,并由此和adaptorClassName、dataType构建adaptorID(如果adaptorID为null)
 *	  以adaptorClassName、dataType、params作为摘要信息完成哈希计算,并将结果以十六进制无符号整数值的字符串形式做为adaptorID
 * 5. 如果adaptorsByName中已有此adaptorID为key的对象则直接返回adaptorID
 * 	  否则将其添加到adaptorsByName、adaptorPositions,然后启动此adaptor,返回adaptorID
 */
public String processAddCommandE(String cmd)

/**
 * 停止代理的方法
 * 1. 停止AgentControlSocketListener,不再处理telnet连接请求
 * 2. 如果启动了checkpoint定时器则将其停止并将最后一次文件增量信息写入checkpoint文件
 * 3. 遍历adaptorsByName,依次停止adaptor 
 */
public void shutdown(boolean exit)

/**
 * 将adaptor信息写入checkpoint文件
 * 1. 遍历adaptorsByName,格式化adaptor信息
 * 2. 将adaptor信息写入新的checkpoint文件
 * 3. 删除旧的checkpoint文件并将执行次数加1
 */
private void writeCheckpoint()

 

(2)代理控制线程
    org.apache.hadoop.chukwa.datacollection.agent.AgentControlSocketListener
    此线程提供了可通过telnet连接到agent进行控制的功能

/**
 * 启动一个线程,循环监听连接请求,有请求到达则使用内部类ListenThread处理
 * 默认telnet端口为9093,可通过"chukwaAgent.control.port"设置
 */
public void run()

    org.apache.hadoop.chukwa.datacollection.agent.AgentControlSocketListener$ListenThread
    用于处理用户对agent的操作命令

/**
 * 构造器,设置超时时间为60秒
 */
ListenThread(Socket conn)

/**
 * 接收输入,并调用processCommand()方法处理命令
 */
public void run()

/**
 * 处理命令,支持如下操作:
 * add [adaptorname] [args] [offset] -- start an adaptor
 * shutdown [adaptornumber]  -- graceful stop
 * stop [adaptornumber]  -- abrupt stop
 * list -- list running adaptors
 * close -- close this connection
 * stopagent -- stop the whole agent process
 * stopall -- stop all adaptors
 * reloadCollectors -- reload the list of collectors
 * help -- print this message
 */
public void processCommand(String cmd, PrintStream out)
 

 

 

(3)适配器的启动流程
    org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8
    以此类为例:

/**
 * 1. 此方法继承自org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor
 *	  在AbstractAdaptor中的实现只是做了赋值操作,具体实现交给了重载的方法start(long offset)	
 */
void start(String adaptorID, String type, long offset, ChunkReceiver dest)

/**
 * 1. 此方法继承自org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor
 * 	  在FileTailingAdaptor中的实现又调用了其父类的实现,启动了一个FileTailer实例	
 */
public void start(long offset) 

/**
 * 1. 此方法继承自org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailingAdaptor
 * 2. 检测被监测的文件是否存在、是否可读
 * 3. 获取被监测的文件长度,与adaptor初始化时设置的已读文件长度(fileReadOffset)做比较	  	
 * 4. 如果存在增量内容则调用slurp(long len, RandomAccessFile reader)方法
 */
public synchronized boolean tailFile()

/**
 * 1. 此方法继承自org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.LWFTAdaptor
 * 2. 获取已读文件长度和现有文件长度的差值,即增量内容大小,判断是否已超过最大读取大小(MAX_READ_SIZE)
 * 	  MAX_READ_SIZE 默认为 128*1024,可通过"chukwaAgent.fileTailingAdaptor.maxReadSize"设置	
 * 3. 新建一个字节数组,长度即为增量内容的长度,如果超过了最大值则设为最大值
 *	  用此数组在reader(监测文件)中以fileReadOffset(已读长度)为起点读取数据
 * 4. 调用extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)方法提取数据
 */
protected boolean slurp(long len, RandomAccessFile reader)

/**
 * 1. 遍历buf,记录换行符的位置,获取最后一个换行符的位置
 * 2. 使用数据类型、监测文件路径、文件偏移量、字节数组、adaptor构建ChunkImpl对象,添加到ChunkReceiver中
 */
protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)

 

(4)文件监测线程
    org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.FileTailer
    共享线程(针对一个agent服务器上的所有adaptor)

// 存储adaptor,因为此列表中的数据大多在启动时添加完成且每隔一定时间就要遍历列表
// 以获取所有监测文件的增量内容,所以这里使用CopyOnWriteArrayList
List<LWFTAdaptor> adaptors

// 默认间隔时间为2秒
int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 2

// 间隔时间,由配置项"chukwaAgent.adaptor.context.switch.time"获取
int SAMPLE_PERIOD_MS

// 最大间隔时间为1分钟
public static final int MAX_SAMPLE_PERIOD = 60 * 1000

/**
 * 遍历adaptors,调用adaptor的tailFile方法
 */
public void run()

 

(5)基本数据结构
    org.apache.hadoop.chukwa.Chunk
    Chukwa中最基本的数据结构,用于收集数据,记录所收集数据的来源、格式、内容、大小等信息

 

    org.apache.hadoop.chukwa.ChunkImpl

    实现了Chunk接口

// 生成当前对象的主机名
String source
// 数据源名称
String streamName
// 数据类型
String dataType
// 标签
String tags
// 数据内容
byte[] data
// 记录了每行记录的偏移量
int[] recordEndOffsets
// 序列号(文件偏移量)
long seqID
0
0
分享到:
评论
1 楼 cgp17 2014-09-12  
请教:
Chukwa支持Push数据吗?目前看到的都是Polling方式拉取数据;
如果有Push方式推送数据,是如何做到的?
如果是Polling方式拉取数据,是否支持到远程节点拉取数据?如果可以,是如何做到的?

相关推荐

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop插件apache-hadoop-3.1.0-winutils-master.zip

    Apache Hadoop是一个开源框架,主要用于分布式存储和计算大数据集。Hadoop 3.1.0是这个框架的一个重要版本,提供了许多性能优化和新特性。在Windows环境下安装和使用Hadoop通常比在Linux上更为复杂,因为Hadoop最初...

    基于Hadoop网站流量日志数据分析系统.zip

    基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...

    Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码

    Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop ...

    编译hadoophadoop-3.2.2-src源码

    编译hadoophadoop-3.2.2-src的源码

    hadoop2.6-common-bin.zip

    标签 "hadoop common" 暗示这个压缩包包含的是Hadoop的公共组件,这些组件是Hadoop分布式文件系统(HDFS)和MapReduce计算框架等核心服务的基础。 压缩包内的文件列表如下: 1. `hadoop.dll`:这是一个动态链接库...

    hadoop-eclipse-plugin-3.1.1.tar.gz

    Hadoop是一个开源框架,由Apache基金会维护,主要用于处理和存储大量数据。它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,这两个组件共同构建了一个高度容错的分布式计算系统。 2. **HDFS*...

    hadoop-common-2.6.0-bin-master.zip

    Hadoop是大数据处理领域的一个关键框架,它由Apache软件基金会维护,主要负责分布式存储和计算。`hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发...

    hadoop-eclipse-plugin-2.7.3和2.7.7

    hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包 hadoop-eclipse-plugin-2.7.3和2.7.7的jar包

    hadoop-eclipse-plugin1.2.1 and hadoop-eclipse-plugin2.8.0

    在大数据领域,Hadoop作为开源分布式计算框架,扮演着核心角色。为了方便开发者在Eclipse或MyEclipse这样的集成开发环境中高效地进行Hadoop应用开发,Hadoop-Eclipse-Plugin应运而生。这个插件允许开发者直接在IDE中...

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip

    Apache Flink 是一个流行的开源大数据处理框架,而 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip` 文件是针对 Flink 优化的一个特殊版本的 Hadoop 库。这个压缩包中的 `flink-shaded-hadoop-2-uber-2.7.5-10.0....

    flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz

    Flink以其低延迟、高吞吐量的流处理能力而受到广泛赞誉,而Hadoop则以其分布式存储和计算框架闻名。然而,随着Hadoop版本的迭代更新,有时会出现新版本的Hadoop与旧版本Flink之间的兼容性问题。本文将针对"flink-...

    spark-1.6.3-bin-hadoop2.4-without-hive.tgz

    Hadoop 2.4 是一个稳定且广泛采用的分布式存储和计算框架,而 Hive 则是基于 Hadoop 的数据仓库工具,用于处理和管理大规模数据。但在这个特定的版本中,Spark 并没有集成 Hive,使得它更适合那些不需要 Hive 支持,...

    Hadoop-eclipse-plugin-2.7.2

    在大数据处理领域,Hadoop作为一个开源的分布式计算框架,因其高效、可扩展的特性而备受青睐。然而,对于开发者而言,有效地集成开发环境至关重要。Hadoop-eclipse-plugin-2.7.2正是为了解决这个问题,它为Eclipse...

    flink-shaded-hadoop-2-uber-2.7.2-10.0.jar

    Flink1.10.1编译hadoop2.7.2 编译flink-shaded-hadoop-2-uber

    hadoop-core-0.20.2 源码 hadoop-2.5.1-src.tar.gz 源码 hadoop 源码

    6. **日志和监控**:Hadoop使用`org.apache.hadoop.log`和`org.apache.hadoop.metrics`包来收集和处理系统的日志和性能指标,帮助管理员监控和诊断系统状态。 **Hadoop 2.5.1源码解析** 从hadoop-2.5.1-src中,...

    hadoop-eclipse-plugin-3.1.3.jar

    hadoop-eclipse-plugin-3.1.3,eclipse版本为eclipse-jee-2020-03

    flink-shaded-hadoop-2-uber-2.6.5-10.0.zip

    4. **数据转换**:Flink支持将Hadoop的MapReduce作业转换为流处理作业,使得基于批处理的传统作业能够无缝过渡到实时处理。 5. **容错机制**:Flink的容错机制与Hadoop的检查点机制相结合,可以提供高可用性和数据...

    hadoop.dll-and-winutils.exe-for-hadoop2.7.3-on-windows_X64

    Hadoop 2.7.3 Windows64位 编译bin(包含winutils.exe, hadoop.dll),自己用的,把压缩包里的winutils.exe, hadoop.dll 放在你的bin 目录 在重启eclipse 就好了

    好用hadoop-eclipse-plugin-1.2.1

    hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1hadoop-eclipse-plugin-1.2.1

Global site tag (gtag.js) - Google Analytics