1. 接口、实现类简介
(1)org.apache.hadoop.chukwa.datacollection.connector.Connector
连接器接口,旨在设置一个与收集器的长连接以反复发送数据
// 启动连接器
public void start();
// 停止连接器
public void shutdown();
// 重新加载配置
public void reloadConfiguration();
org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector
Connector接口的实现类,维护了一个负责连接收集器、发送由适配器收集到共享队列中数据的连接器客户端
// 共享队列,存储由agent下各个adaptor提交的数据块
ChunkQueue chunkQueue;
// 代理
ChukwaAgent agent;
// 连接器客户端
ChukwaSender connectorClient;
(2)org.apache.hadoop.chukwa.datacollection.ChunkReceiver
接收数据块接口
/**
* 添加chunk到队列,如果队列已满则阻塞
*/
public void add(Chunk event)
org.apache.hadoop.chukwa.datacollection.ChunkQueue
通用块队列接口,继承自ChunkReceiver
/**
* 将数据收集在chunks中,至少返回一个,但不会超过count个,如果队列已空则阻塞
*/
public void collect(List<Chunk> chunks, int count)
org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue
ChunkQueue接口的实现类,实现了add(Chunk chunk)、collect(List<Chunk> chunks, int count)方法
(3)org.apache.hadoop.chukwa.datacollection.sender.ChukwaSender
数据发送接口,封装了数据块与收集器间的通讯过程
/**
* 发送数据块并返回发送结果
*/
public List<CommitListEntry> send(List<Chunk> chunksToSend)
org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender
数据发送接口的实现类,实现了send(List<Chunk> toSend)方法
2. 启动、处理流程
(1)启动连接器
在ChukwaAgent中有如下成员
Connector connector = null;
在main()方法中会根据参数启动相应的Connector(HttpConnector)
if (args.length > 0) {
if (args[uriArgNumber].equals("local"))
agent.connector = new ConsoleOutConnector(agent);
else {
if (!args[uriArgNumber].contains("://"))
args[uriArgNumber] = "http://" + args[uriArgNumber];
agent.connector = new HttpConnector(agent, args[uriArgNumber]);
}
} else
agent.connector = new HttpConnector(agent);
agent.connector.start();
org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector
// 定时器,用于打印connector的状态信息
Timer statTimer = null;
// 用于记录数据块大小
volatile int chunkCount = 0;
// 可发送的最大数据量(默认2M)
int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
// 发送数据的时间间隔(默认5秒)
int MIN_POST_INTERVAL = 5 * 1000;
// 发送数据时间间隔配置项的名称
public static final String MIN_POST_INTERVAL_OPT = "httpConnector.minPostInterval";
// 最大发送数据量配置项的名称
public static final String MAX_SIZE_PER_POST_OPT = "httpConnector.maxPostSize";
// 是否异步配置项的名称
public static final String ASYNC_ACKS_OPT = "httpConnector.asyncAcks";
// 默认不为异步
boolean ASYNC_ACKS = false;
// 存储数据块的队列
ChunkQueue chunkQueue;
// 代理
ChukwaAgent agent;
// 收集器列表
private Iterator<String> collectors = null;
// 连接器客户端
protected ChukwaSender connectorClient = null;
/**
* 1. 工厂类使用单例模式获取chunkQueue(MemLimitQueue),使同一个agent服务器上共享同一个数据块队列,大小为10KB
* 2. 从配置文件中配置每次发送的最大数据量、发送数据的间隔时间、是否异步,没有配置则使用默认值
* 3. 启动线程
*/
public void start()
/**
* 1. 加载配置文件($CHUKWA_HOME/conf/collectors),设置collectors
* 2. 根据异步与否设置相应的数据发送器(connectorClient)
* 3. 从共享数据块队列(chunkQueue)中提取数据列表
* 4. 使用发送器(connectorClient)发送数据
* 5. 运行时间小于间隔时间则休眠
*/
public void run()
(2)共享数据块队列
在Adaptor实现类CharFileTailingAdaptorUTF8的extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)方法中
由增量内容构造出的ChunkImpl会被添加到ChunkReceiver(即ChunkQueue的实现类)中
org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue
// 由LinkedList实现队列功能
private Queue<Chunk> queue = new LinkedList<Chunk>();
// 最大使用量,在HttpConnector启动时由工厂类的单例模式获取时设为10KB
private final long MAX_MEM_USAGE;
/**
* 1. 如果数据块大小与队列中的数据块大小之和超过了最大使用量则阻塞
* 如果数据块自身的大小就已经超过了最大使用量则直接返回
* 2. 否则将数据块添加到内部队列(queue)中,并重置队列的数据块大小
*/
public void add(Chunk chunk)
/**
* 1. 队列为空则阻塞
* 2. 逐次从队列中取出数据块,比较大小是否已经超过maxSize,没有则将其添加到events并重置队列的数据块大小
*/
public void collect(List<Chunk> events, int maxSize)
(3)数据块发送器
org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender
// 使用HttpClient提交请求
static HttpClient client = null;
// 创建连接
static MultiThreadedHttpConnectionManager connectionManager = null;
// 发送一次则加1
int postID = 0;
// 所有可用的收集器
protected Iterator<String> collectors;
/**
* 在构造器中初始化连接收集器的重试次数和超时时间
*/
public ChukwaHttpSender(Configuration c)
/**
* 1. 使用List<DataOutputBuffer> serializedEvents存储数据块
* 使用List<CommitListEntry> commitResults存储提交信息、
* 2. 遍历要发送的数据块列表
* 3. 得到数据块对象的预估序列化长度,字符串一个字符占2字节,int为4字节,long为8字节
* 2*{主机名(source) + 数据源(streamName) + 数据类型(dataType) + 调试信息(debuggingInfo)}
* data.length + 4
* 4 * (recordEndOffsets.length + 1)
* 8(seqID为long型)
* 根据数据块对象的预估序列化长度新建DataOutputBuffer对象,将数据块写入
* 4. 根据数据块的Adaptor(适配器实现)、seqID(数据块大小)、start(偏移量)构建CommitListEntry对象
* 5. 使用RequestEntity将serializedEvents封装,并最终封装到PostMethod
* 6. 使用post方式提交请求并解析响应,返回结果
*/
public List<CommitListEntry> send(List<Chunk> toSend)
/**
* 发送请求并直接返回List<CommitListEntry>
*/
public List<CommitListEntry> postAndParseResponse(PostMethod method, List<CommitListEntry> expectedCommitResults)
/**
* 从可用的收集器列表中获取一个进行发送请求,出现异常则尝试下使用一个收集器进行发送请求
*/
protected List<String> reliablySend(HttpMethodBase method, String pathSuffix)
/**
* 1. 获取请求方法中的参数,并添加重试次数、超时时间等参数,设置请求路径(chukwa)
* 2. 使用httpclient执行post请求
* 3. 从执行完成的method中获取响应体,并以其做为输入流逐行读取响应信息(用于debug)
*/
protected List<String> doRequest(HttpMethodBase method, String dest)
分享到:
相关推荐
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
Apache Hadoop是一个开源框架,主要用于分布式存储和计算大数据集。Hadoop 3.1.0是这个框架的一个重要版本,提供了许多性能优化和新特性。在Windows环境下安装和使用Hadoop通常比在Linux上更为复杂,因为Hadoop最初...
基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...
Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop ...
编译hadoophadoop-3.2.2-src的源码
Hadoop是一个开源框架,由Apache基金会维护,主要用于处理和存储大量数据。它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,这两个组件共同构建了一个高度容错的分布式计算系统。 2. **HDFS*...
在大数据领域,Hadoop作为开源分布式计算框架,扮演着核心角色。为了方便开发者在Eclipse或MyEclipse这样的集成开发环境中高效地进行Hadoop应用开发,Hadoop-Eclipse-Plugin应运而生。这个插件允许开发者直接在IDE中...
Hadoop是大数据处理领域的一个关键框架,它由Apache软件基金会维护,主要负责分布式存储和计算。`hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发...
然而,Flink 在处理大数据任务时,可能会依赖于 Hadoop 的某些组件,如 HDFS(Hadoop 分布式文件系统)或 YARN(Hadoop 资源管理器)。 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar` 文件的出现,主要是为了解决...
本文将深入探讨“flink-shaded-hadoop-2-uber-2.6.5-10.0.zip”这个压缩包,它是Flink与Hadoop 2.6.5版本的连接器,旨在帮助开发者更好地在Flink中利用Hadoop的资源。 首先,我们来了解“flink-shaded-hadoop-2-...
Hive 是一个基于 Hadoop 的数据仓库工具,用于数据ETL(提取、转换、加载)以及提供SQL接口进行数据分析。尽管如此,Spark 仍然可以通过 JDBC 或 Thrift 服务器连接到 Hive Metastore,从而访问 Hive 表。如果你的...
Flink以其低延迟、高吞吐量的流处理能力而受到广泛赞誉,而Hadoop则以其分布式存储和计算框架闻名。然而,随着Hadoop版本的迭代更新,有时会出现新版本的Hadoop与旧版本Flink之间的兼容性问题。本文将针对"flink-...
Hadoop 2.4 是一个稳定且广泛采用的分布式存储和计算框架,而 Hive 则是基于 Hadoop 的数据仓库工具,用于处理和管理大规模数据。但在这个特定的版本中,Spark 并没有集成 Hive,使得它更适合那些不需要 Hive 支持,...
6. **日志和监控**:Hadoop使用`org.apache.hadoop.log`和`org.apache.hadoop.metrics`包来收集和处理系统的日志和性能指标,帮助管理员监控和诊断系统状态。 **Hadoop 2.5.1源码解析** 从hadoop-2.5.1-src中,...
在大数据处理领域,Hadoop作为一个开源的分布式计算框架,因其高效、可扩展的特性而备受青睐。然而,对于开发者而言,有效地集成开发环境至关重要。Hadoop-eclipse-plugin-2.7.2正是为了解决这个问题,它为Eclipse...
Flink1.10.1编译hadoop2.7.2 编译flink-shaded-hadoop-2-uber
hadoop-eclipse-plugin-3.1.3,eclipse版本为eclipse-jee-2020-03
在IT行业中,大数据处理是一个至关重要的领域,而Hadoop作为开源的大数据处理框架,为开发者提供了强大的工具。本文将深入探讨如何使用Eclipse IDE结合hadoop-eclipse-plugin-2.6.0.jar插件,实现在Windows环境下...
Hadoop-eclipse-plugin是Hadoop生态系统中的一个重要工具,它允许开发者使用Eclipse IDE直接在Hadoop集群上开发、测试和部署MapReduce程序。这个插件极大地简化了Hadoop应用程序的开发流程,使得Java开发者能够利用...
在eclipse中搭建hadoop环境,需要安装hadoop-eclipse-pulgin的插件,根据hadoop的版本对应jar包的版本,此为hadoop3.1.2版本的插件。