Hadoop C++ Streaming
可以直接读取压缩文件,提取其中的日志。程序流程如下:
每个 map 进程从 stdin 读取压缩的日志文件。产生 3 种不同的记录,每个记录一行,写到 stdout 。每种日志每行第一个字符不同,用来做日志种类的区分。
通过这种方式,可以有效地将计算分布到集群中不同的进程。因为: IP 相同的 ip 记录,一定会被分布到相同的进程; cookie 相同的 ck 记录,也会被分布的相同的进程; key 不同的 key 记录,也会被分布到不同的进程。从而,不同进程中的 ip 集合是不相交的, cookie 集合也是不相交的;因此,在不同的进程中,相同 Key 对应的独立 ip 和独立 cookie 数就可以直接相加了。
map 输出的记录格式:
格式标志
|
ck 表示 cookie ; isPv:1 表示 Pv, 2 表示 click
|
’k’
|
key->(cnt, fee,isPv)
|
’i’
|
ip->(key, cnt,isPv)
|
’u’
|
ck->(key, cnt,isPv)
|
kadd 读取 map 的输出,将结果累加, key.cnt 累加到 cnt , ip.cnt 忽略, ck.cnt 也忽略( ip.cnt 和 ck.cnt 可用于以后统计单个 ip 、用户某段时间内的 pv 和 click )。每遇到一个相同 key 的 cookie 记录, ck_cnt++ ;每遇到一个相同 key 的 ip 记录, ip_cnt++ ;每遇到一个 key 记录, cnt+=key->cnt,fee+=key->fee 。
这样,不管这些不同种类的记录怎么在集群上分布。每个类别都是和自己的 key-> 属性相加。永远不会加错。
reduce 的输出只有一种记录,并且尺寸很小( partition-num 个几十 KB 的文件)。因此 join 只需要单机运行,读取 reduce 的输出,对于 join 来说, key 相同的记录,必然映射到相同的相同的进程。
思路
因为 mapreduce 没有 schema ,这一点曾被 PostgreSQL 的一个大牛诟病过,被认为是 mapreduce 一大缺点。但是,在我们这里,这却成了一大优点,因为可以产生异构的记录。
一开始时,为了同时计算多个维度的 unique 计数,只能运行多次 mapreduce 。我一直在思考,怎样才能一次就完成这些计算呢?当时想的是,从通讯上讲,如果 hadoop 支持多通道的 map-output ,就可以完成这些,但是,它显然不支持,一个 map 的 output 只有一个通道。
就在前几天,有一会儿,忽然想通了,可以在记录中加入通道标记,使用这个标记,一个通道就可以模拟多个通道了。
并且,这样还有一个好处:更加有利于负载平衡。因为对于 mapreduce 系统来说,它并不知道这些模拟的通道,它只知道 key-value ,而模拟了多个通道, hash(key) 的分布就更加均匀,从而达到更好的负载均衡。
而如果是在 mapreduce 系统级别支持多通道,因为每个通道的数据量、计算量都有不同,尽管在一个通道内有可能达到较好的负载均衡,但是,多个通道之间很难均衡。
核心思想
Hadoop-MapReduce 的核心是 Scatter-Gather ,在一般的 MapReduce 中, Reduce 过程只是计算单独 Key 的集合。看看 reduce 的函数声明:
void reduce (K2 key,
Iterator <V2 > values,
OutputCollector <K3 ,V3 > output,
Reporter reporter)
throws IOException
原则上讲, reduce 只能处理分别的各个 key 对应的 valueset 。——除非在手工在 reduce 过程中保存一些 per-reducer 的全局信息。
目前的解决方式,打破了 MapReduce 的这个规则。在 reduce 阶段,不光是处理 per-key 对应的 valueset ,而是把整个 reducer 的数据集统一处理。原先的 {Key,{value}} 二维集合,在这里看成是一维的记录集合 {record} 。
这样,传统意义上的 MapReduce.Reduce 的依赖单元是 Per-Key 的,而这种方式的依赖单元是 Per-Reducer ,在计算上,需要的内存会大一些,和 Reduce 的总输入呈正相关。传统 Reduce 的理论最小内存用量和单个 Key 的 valueset 相关。
最最核心的是, Reduce 帮我们做了一件事:将 map 的输出【使用标记表达的多通道】,每个通道中的记录,都按照 Key 切分成互不相交的子集。这样,至少在进行集合计数的时候, Reduce 的输出可以直接相加。
MapReduce 做了多余的事情
在这种使用 Streaming 的方式中, Map 阶段的作用和传统方式相同。而 Reduce 阶段, MapReduce 框架却做了多余的事情。传统上, Reduce 阶段做三件事情:
l shuffle
把各个 Map 进程产生的结果,按 Hash(Key, nReduce) 产生各个 Reduce 进程的原始输入。
l sort
将每个 Reduce 进程原始输入( shuffle 产生的)进行排序,将 key 相同的记录对应的 value 集中到一起,产生 {key,{value}} 。
l reduce
读取 {key,{value}} ,为每个 (key,{value}) ,调用应用程序自定义的 reduce 函数。
对于我们的这种应用,有用的只有 shuffle 过程 。 sort 和 reduce 可以说是多于的。 reduce 过程至少是把输入转发到输出上, sort 则是完全多余的。很不幸的是, sort 过程占用了很多计算资源,并且,导致了依赖 ,因为直到 sort 结束,才能开始 reduce 。而如果没有 sort ,就可以一边读取输入,一边转发给 streaming.readuce 进程,从而大大提高并行度。然而这可能是个很复杂的问题,因为牵涉到 schedule 和 failover 。
通道标记的位置
为了简洁,目前的实现是:每行第一个字符是类型标记 ,最后一个字符是来源标记 。然而理论上,标记可以在任何位置,只要读取程序可以正确地识别它。
需要注意的是:
如果标记位于 key ( streaming 默认把 tab 分隔的第一个字段作为 key )中,同类记录(例如同为 ip 记录的 pv 日志和 click 日志)要使用相同的类型标记 。
灵活性
如果需要统计更多维度的 unique 计数,只需要再加一个记录类型。对于我们的应用, key 实际上是 (adzone_id,ad_id,member_id) 联合键。
如果要再加上:统计 adzone_id/ad_id/member_id 下的独立 IP 和独立用户。只需要:
1. map 不用变
2. kadd 产生多个输出(通道),每个维度一个
3. 使用不同的 kjoin
根据我们的数据模式,增加一个维度统计,几乎不会增加运行时间,因为瓶颈不在这里。
C 编程注意事项
读取记录的方式
glibc 中有个非 posix 标准的函数:
ssize_t getline(char **lineptr, size_t *n, FILE *stream);
该函数每次读一行,并且可以按需要分配内存(由 caller 释放),一定不会出现缓冲溢出。该函数一般用法是开始置 lineptr 和 n 为 NULL ,然后一直调用到返回值为 -1 。
size_t n;
ssize_t len;
char* line;
for (line = NULL, n = 0; (len = getline(&line, &n) != -1; )
{
int fields = sscanf(line, …..);
if (wanted == fields)
{
// got a valid record, process the record
}
}
getline 的详细用法请看相关文档。
读完一行后,然后可以使用 sscanf 解码。
每次读一行,而不是使用 fscanf ,是因为读取每行(每条记录)应该看成一个事务,要么完全成功,要么完全失败。如果使用 fscanf 时,如果发生失败,有可能失败点位于半行。导致后续读取失去同步。我开始时犯过这个错误。
feof
feof 本质上没有实现什么功能。因为,如果最后一次读取操作使 readptr 刚好到达文件末尾,并且也刚好返回了期望的读取数目。那么,接下来的 feof 将返回 false 。
详见: http://febird.iteye.com/admin/blogs/419790
最好的经验就是:从来不要使用 feof
使用其他语言
Hadoop-Streaming 是语言无关的,使用任何语言,只要能读取 stdin ,写 stdout ,就行。使用 awk/python/perl 等等,都是完全可行的。
- 大小: 17.9 KB
分享到:
相关推荐
(1)熟悉Hadoop开发包 (2)编写MepReduce程序 (3)调试和运行MepReduce程序 (4)完成上课老师演示的内容 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 二、实验内容 1.单词计数实验...
### Hadoop集群配置及MapReduce开发手册知识点梳理 #### 一、Hadoop集群配置说明 ##### 1.1 环境说明 本手册适用于基于CentOS 5系统的Hadoop集群配置,具体环境配置如下: - **操作系统**:CentOS 5 - **JDK版本...
在这个项目“基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据”中,我们将深入探讨如何利用 Hadoop 的核心组件 MapReduce 对 NBA 球员的数据进行分析。 MapReduce 是一种编程模型,用于大规模数据集...
### Hadoop集群配置及MapReduce开发手册知识点梳理 #### 一、Hadoop集群配置 **1.1 环境说明** - **操作系统**: CentOS 5 - **JDK版本**: Sun Java 6 - **SSH**: 已安装 - **Eclipse版本**: 3.4.2 (Linux版) - **...
在大数据处理领域,Hadoop是一个不可或...随着技术的发展,现在还有更多高级工具如Spark、Flink等,但Hadoop作为基础,仍然是大数据领域不可或缺的一部分,理解其工作原理和实践操作对任何大数据从业者来说都是必要的。
文档较详尽的讲述了MR的简介,MR初学分析示例(有代码)、MR特性,MR的执行过程(有代码),MR单元测试介绍(有代码)、HA的架构和配置、同时也向大众推荐了两本书。其中部分有较为详尽的链接以供参考。
Hadoop介绍,HDFS和MapReduce工作原理
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
hadoop-mapreduce-examples-2.7.1.jar
【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...
对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
在Hadoop生态系统中,MapReduce是一种分布式计算框架,它允许用户编写并运行处理大量数据的程序。这个"mapred.zip"文件显然包含了与Hadoop MapReduce相关的测试样例、文档和源码,这对于理解MapReduce的工作原理以及...
详细介绍 Hadoop 家族中的 MapReduce 原理 MapReduce 是 Hadoop 家族中的核心组件之一,是一个分布式运算程序的编程框架。MapReduce 的核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式...
包org.apache.hadoop.mapreduce的Hadoop源代码分析
Hadoop的出现,特别是其中的MapReduce组件,更是将这一模型推广到更为广泛的领域,让更多的人能够享受到大规模数据处理带来的便利和高效。随着技术的不断发展,大数据的处理能力会越来越强大,谷歌MapReduce模型和...
在Windows操作系统中运行Hadoop和MapReduce程序,通常需要借助Cygwin来模拟Linux环境,因为Hadoop主要设计用于类Unix系统。Cygwin是一个提供Linux环境的开源工具,它允许用户在Windows上运行许多原本只能在Linux或...
【大数据Hadoop MapReduce词频统计】 大数据处理是现代信息技术领域的一个重要概念,它涉及到海量数据的存储、管理和分析。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据集。Hadoop的...