- 浏览: 222301 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
andy1015:
请教下楼主rtx问题 ,可以么
用HttpClient实现同步RTX -
cgp17:
请教:Chukwa支持Push数据吗?目前看到的都是Polli ...
基于Hadoo的日志收集框架---Chukwa的源码分析(适配器、代理) -
jimmee:
尼玛, 现在iteye的质量下降到何种水准了.
Mahout协同过滤框架Taste的源码分析 -
aubdiy:
aubdiy 写道我擦。。。。 这你叫分析才看到, 还有个 “ ...
Mahout协同过滤框架Taste的源码分析 -
aubdiy:
我擦。。。。 这你叫分析
Mahout协同过滤框架Taste的源码分析
1.接口、实现类简介
org.apache.hadoop.chukwa.datacollection.collector.CollectorStub
收集器服务类,使用jetty实现了一个webserver以处理连接器提交的数据块
org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector
收集器的servlet,用于处理http请求
org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter
chukwa写入接口
org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter
chukwa写入接口的实现类,做为一级管道使用,只实现了init(Configuration)方法,add(List<Chunk>)的实现放在了二级管道中
org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter
chukwa写入接口的实现类,继承自PipelineStageWriter,实现了add(List<Chunk>)方法
2.启动、处理流程
(1)收集服务的启动
org.apache.hadoop.chukwa.datacollection.collector.CollectorStub
// 线程数 static int THREADS = 120; // 使用jetty提供http服务 public static Server jettyServer = null; /** * 1. 创建守护进程"Collector",将进程号写入相应的pid文件,以便于运行stop命令时可根据此pid文件杀死进程 * 2. 校验启动参数,可使用portno、writer、servlet来指定端口、写入数据的接口实现类、处理请求的servlet实现类 * 3. 设置jetty服务的connector、处理相应请求的servlet,比如 * /* org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector * /acks org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet * /logs org.apache.hadoop.chukwa.datacollection.collector.servlet.LogDisplayServlet * 4. 启动http服务 */ public static void main(String[] args)
(2)处理http请求
org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector
// 访问路径 public static final String PATH = "chukwa"; // 写数据的接口 ChukwaWriter writer /** * servlet初始化,实例化writer,首先使用配置项中"chukwaCollector.writerClass"的设置构造writer,如果失败则使用SeqFileWriter */ public void init(ServletConfig servletConf) /** * 对于get请求以html形式输出jetty服务的信息 */ protected void doGet(HttpServletRequest req, HttpServletResponse resp) /** * 对于post请求的处理放在了accept()方法中 */ protected void doPost(HttpServletRequest req, HttpServletResponse resp) /** * 1. 使用ServletDiagnostics封装当前请求中的数据信息 * 2. 从req中获取输入流,依次读取数据块数量(numEvents)、数据块(ChunkImpl) */ protected void accept(HttpServletRequest req, HttpServletResponse resp)
(3)写入数据信息
org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter
writer接口
/** * 初始化Writer */ public void init(Configuration c) /** * 将数据列表写入 */ public CommitStatus add(List<Chunk> chunks)
org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter
writer接口的实现类
// 将写入数据的实现有此对象完成 ChukwaWriter writer; /** * 1. 读取配置"chukwaCollector.pipeline",对数据块做依次处理 * 2. 将第一个管道赋给writer * 3. 遍历其它管道,如果是PipelineableWriter的子类则将其设置为下一步要处理数据块所使用的管道 * 4. 将最后一个管道设置为当前管道下一步要处理时所使用的管道 * 即此write并未做数据块的处理,只是使用类似unix管道的方式,在"chukwaCollector.pipeline"配置项中 * 配置数据块处理的先后顺序 */ public void init(Configuration conf)
org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter
管道处理的writer实现类,实现了ChukwaWriter接口,但做为抽象类,只定义了管道处理的实现方式
// 下一步处理时所使用的writer ChukwaWriter next; /** * 使用此方法设置writer */ public void setNextStage(ChukwaWriter next) { this.next = next; }
org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter
继承自PipelineableWriter,使用实现了管道处理数据块
/** * 数据格式化的三种类型:原始方式、写入方式、带头方式 */ static enum DataFormat { Raw, Writable, Header }; // 监听数据传入请求 SocketListenThread listenThread; // 把使用管道处理数据的过程放在Tee线程中 List<Tee> tees; // 下一步处理时所使用的writer ChukwaWriter next; /** * 启动监听线程,初始化Tee线程列表 */ public void init(Configuration c) /** * 1. 将数据块传递给下一个管道处理 * 2. 遍历Tee线程列表,循环处理数据块 */ public CommitStatus add(List<Chunk> chunks)
org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter$SocketListenThread
监听socket连接线程
/** * 以9094端口启动socketserver,端口可通过"chukwaCollector.tee.port"配置 */ public SocketListenThread(Configuration conf) /** * 接收到请求后交给Tee线程处理 */* public void run()
org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter$Tee
处理一个socket连接的线程
// 使用阻塞队列存储数据块 final BlockingQueue<Chunk> sendQ; /** * 1. 初始化, 从socket连接中获取输入输出 * 2. 从输入中以readline方式读取命令内容 * 3. 从命令中获取数据格式化类型,过滤类型,将其添加到SocketTeeWriter的tees中 */ public void setup() /** * 使用过滤器校验数据块,通过校验则添加到阻塞队列 */ public void handle(Chunk c) /** * 1. 调用setup方法完成初始化 * 2. 根据数据格式化类型进行数据处理 * RAW 以原始方式写入数据,即先写数据长度再写入数据内容 * Writable 以写入方式写入数据,即直接写入数据内容 * Header 以带头方式写入数据,即先构造由数据块属性(来源、类型、流名称、偏移大小)组成的头信息,然后写入长度,再写入数据 */ public void run()
org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter
继承自PipelineableWriter,实现了ChukwaWriter接口,其作为SocketTeeWriter的下一个管道(或认为其为管道流的最后一个 管道)
/** * 完成writer的初始化 * 1. 使用"chukwaCollector.outputDir"设置输出目录, 默认为/chukwa, 一般设为/chukwa/logs * 2. 使用"chukwaCollector.rotateInterval"设置生成文件的时间间隔, 默认为5分钟 * 3. 使用"chukwaCollector.stats.period"设置统计写入速度的时间间隔, 默认为30秒 * 4. 使用"writer.hdfs.filesystem"设置写入数据时使用的HDFS文件系统的URL, 如果没有设置则使用"fs.default.name", 即hadoop的默认配置 * 5. 设置此writer运行状态为true, 启动循环处理文件的定时任务, 启动循环报告统计信息的定时任务 */ public void init(Configuration conf) /** * 循环处理文件的定时任务, 放在TimerTask中循环调用自身 * 1. 获取当前时间, 转换为"yyyyddHHmmssSSS"格式, 并于本机名称以及使用java.rmi.server.UID()构造的UID拼接在一起, 构成如下形式的文件 * /chukwa/logs/yyyyddHHmmssSSS_localHostAddr_UID.chukwa * 2. 通过信号量控制访问的进程数, 这里的Semaphore lock被设置为只能有一个进程访问 * 获取许可 * 如果存在长度大于0的文件, 则将其改名为.done文件, 否则将当前文件删除 * 使用新生成的文件名创建.chukwa文件, 并更新相应变量的值 * 构造序列化写入数据的seqFileWriter, 使用/chukwa/logs做为输出目录, 以ChukwaArchiveKey为Key, 以ChunkImpl为value * 释放许可 */ void rotate() /** * 写入数据块内容 * 1. 获得许可 * 2. 遍历数据块列表 * 根据数据块信息设置ChukwaArchiveKey的各个属性值:时间、数据类型、流名称(cluster/source/streamName)、seqID * 使用seqFileWriter写入数据 * 将文件名由.chukwa修改为.done * 3. 释放许可 */ public CommitStatus add(List<Chunk> chunks)
发表评论
-
Mahout协同过滤框架Taste的源码分析
2014-06-12 16:12 1351推荐过程 主要分成了如下几步来完成推荐 1. 输入数据 ... -
将Chukwa 0.5部署在基于Cloudera CDH4的Hadoop集群上
2012-04-24 15:51 3153一、使用maven构建基于chukwa 0.5的项目 ... -
Hadoop 0.23 CDH4 高可用集群指南
2012-03-26 21:56 7163CDH4 高可用集群指南 ... -
基于Hadoo的日志收集框架---Chukwa的源码分析(数据处理)
2012-03-06 18:12 37071.工具类、接口简介 (1) // 用于对数据进行分 ... -
基于Hadoo的日志收集框架---Chukwa的源码分析(连接器)
2012-03-01 17:17 17471. 接口、实现类简介 (1)org.apache.hado ... -
基于Hadoo的日志收集框架---Chukwa的源码分析(适配器、代理)
2012-02-29 16:56 24661. 接口、实现类简介 (1)Chukwa使用适配器(Ada ... -
基于Hadoo的日志收集框架---Chukwa的处理流程
2012-02-27 17:50 24261. 模拟增量日志环境 /home/matrix/P ... -
基于Hadoo的日志收集框架---Chukwa的安装部署
2012-02-27 16:42 5699chukwa是解决在集群环境中收集各节点增量日志的一种基于ha ... -
使用Fuse挂载HDFS
2011-09-13 11:06 5790前提,已经安装好hadoop集群 1. 安装 ... -
海量日志分析处理系统 一、开篇
2011-04-25 18:05 5006目前正在写一个日志处理系统,虽然业务很简单,但是感 ... -
CentOSX86-64上测试Xen动态迁移
2011-02-21 11:38 23581. 机器环境: 操作系统:CentOS 5.5 X8 ... -
Hadoop(0.21.0) 常用基本配置
2011-01-06 15:06 13511. Master与Slaver部署在同一台机器 有时候可能 ... -
Hadoop (0.21.0)分布式部署笔记
2010-12-22 15:39 17651. 集群环境 操作系统: ... -
使用CloudStack构建云计算管理平台
2010-11-19 15:24 4672Cloud.com( 前身是VMOps) 提供了开 ... -
Java串口编程----使用SMSLib发送手机短信
2010-02-25 22:43 7323完整日志: SLF4J: Class path contai ... -
超级简单、超级实用的版本升级小工具----代码实现
2010-02-23 17:50 1846接上篇 webserver使用的是resin3.1.9,首先 ... -
超级简单、超级实用的版本升级小工具----功能预览
2010-02-23 17:15 1277项目做久了, ... -
以多线程、断点续传方式下载文件的实现
2009-12-22 16:16 2621以多线程、断点续传方式下载文件,经常出现下载下来的文件大小和服 ... -
Swing JTable 渲染器 进度条 事件线程 观察者模式
2009-12-07 13:48 3468Swing编程中JTable应该是个经常被用到的组件,进度条也 ... -
用swing做界面,写了个支持断点续传的ftp客户端
2009-12-03 15:47 2097最近在写一个支持多线程、断点续传方式(需要服务器支持)下 ...
相关推荐
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
Apache Hadoop是一个开源框架,主要用于分布式存储和计算大数据集。Hadoop 3.1.0是这个框架的一个重要版本,提供了许多性能优化和新特性。在Windows环境下安装和使用Hadoop通常比在Linux上更为复杂,因为Hadoop最初...
基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...
编译hadoophadoop-3.2.2-src的源码
Hadoop是一个开源框架,由Apache基金会维护,主要用于处理和存储大量数据。它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,这两个组件共同构建了一个高度容错的分布式计算系统。 2. **HDFS*...
Hadoop是大数据处理领域的一个关键框架,它由Apache软件基金会维护,主要负责分布式存储和计算。`hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发...
然而,Flink 在处理大数据任务时,可能会依赖于 Hadoop 的某些组件,如 HDFS(Hadoop 分布式文件系统)或 YARN(Hadoop 资源管理器)。 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar` 文件的出现,主要是为了解决...
在大数据领域,Hadoop作为开源分布式计算框架,扮演着核心角色。为了方便开发者在Eclipse或MyEclipse这样的集成开发环境中高效地进行Hadoop应用开发,Hadoop-Eclipse-Plugin应运而生。这个插件允许开发者直接在IDE中...
Hadoop 2.4 是一个稳定且广泛采用的分布式存储和计算框架,而 Hive 则是基于 Hadoop 的数据仓库工具,用于处理和管理大规模数据。但在这个特定的版本中,Spark 并没有集成 Hive,使得它更适合那些不需要 Hive 支持,...
4. **数据转换**:Flink支持将Hadoop的MapReduce作业转换为流处理作业,使得基于批处理的传统作业能够无缝过渡到实时处理。 5. **容错机制**:Flink的容错机制与Hadoop的检查点机制相结合,可以提供高可用性和数据...
Flink以其低延迟、高吞吐量的流处理能力而受到广泛赞誉,而Hadoop则以其分布式存储和计算框架闻名。然而,随着Hadoop版本的迭代更新,有时会出现新版本的Hadoop与旧版本Flink之间的兼容性问题。本文将针对"flink-...
Hive 是一个基于 Hadoop 的数据仓库工具,用于数据ETL(提取、转换、加载)以及提供SQL接口进行数据分析。尽管如此,Spark 仍然可以通过 JDBC 或 Thrift 服务器连接到 Hive Metastore,从而访问 Hive 表。如果你的...
在大数据处理领域,Hadoop作为一个开源的分布式计算框架,因其高效、可扩展的特性而备受青睐。然而,对于开发者而言,有效地集成开发环境至关重要。Hadoop-eclipse-plugin-2.7.2正是为了解决这个问题,它为Eclipse...
Ubuntu虚拟机HADOOP集群搭建eclipse环境 hadoop-eclipse-plugin-3.3.1.jar
Flink1.10.1编译hadoop2.7.2 编译flink-shaded-hadoop-2-uber
Hadoop 2.7.3 Windows64位 编译bin(包含winutils.exe, hadoop.dll),自己用的,把压缩包里的winutils.exe, hadoop.dll 放在你的bin 目录 在重启eclipse 就好了
hadoop-eclipse-plugin-3.1.3,eclipse版本为eclipse-jee-2020-03
hadoop-mapreduce-examples-2.7.1.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
在eclipse中搭建hadoop环境,需要安装hadoop-eclipse-pulgin的插件,根据hadoop的版本对应jar包的版本,此为hadoop3.1.2版本的插件。