`
kissmett
  • 浏览: 40200 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hadoop mr数据流总结

阅读更多
hadoop mr数据流
/*
符注:
()内为数据;[]内为处理;
{}内为框架模块;
()数据若无说明则为在内存;
->本机数据流;=>网络数据流;~>分布式-本地读写数据流;
/**/为标注;
*/
(分布式源文件)~>{JobTracker分配到各TaskTracker本机上}=>
-------------------------------- @TaskTracker(Map) machine
(split)->[InputFormat]->
[RecordReader]迭代begin
	(k1,v1)->[map]->
	(k2,v2)->[partition]->
[RecordReader]迭代end->
(k2,v2内存集合1)->[sort,merge]->
(k2,v2内存排序集合)->
[combine]迭代begin/*若有partition则按其规则*/
	(k2,iter(v2))->[combine]->
	(k3,v3)->
[combine]迭代end->
(k3,iter(v3)本地文件)->
-------------------------------- @TaskTracker(Map) machine
[shuffle]=>
-------------------------------- @TaskTracker(Reduce) machine
(k3,iter(v3)来自各mapper的子集)->[sort,merge]->
(k3,iter(v3)来自各mapper的合集)->[reduce]->
(k4,v4)->[OutputFormat]~>
-------------------------------- @TaskTracker(Reduce) machine
(分布式结果文件)


======================================
总结mapreduce数据处理流程:
所谓分布式计算在hadoop的实现里可表达为:
1.基于hdfs分布式存储的各存储节点的map运算过程;
2.之后的在少量(甚至唯一)节点上的reduce运算过程;
3.以及连接map运算输出和reduce输入的shuffle过程;
下图表达在某datanode机器节点进行map和shuffle的过程:


InputFomat:从block到split到(k1,v1)
hdfs中文件是按照配置大小(默认64M)分block存储n份(一般为)到n个独立datanode节点的;
当要解析某分布式文件,要执行map任务的TaskTracker将读取存储于本机的相关block进行本地文件解析;
个人理解是一个block可以被拆分为多个split,每个split作为一个map tasktracker的输入(但如何切分还没搞清楚),我所涉及的所有mr测试中都是一个block对应一个map任务.
这个split可以通过FSDataInputStream输入流读取,这是一个本地文件读取操作.
此过程中可以编写自己的InputFormat进行自定义的读取,此类功能的核心是返回一个RecordReader,RR是具体解析文件逻辑实现类:
对于hadoop0.20及以前版本,对应hadoop core jar里的 org.apache.hadoop.mapred包,RR我理解是一种被动模式,从其接口函数命名看,主要实现如下函数:
createKey()
createValue()
getPos()
next(Text key, Text value)

这个RecordReader接口具体实现类的调用者,应是先调用createKey和createValue来实例化key,value对象(这里是为初始化自定义对象考虑的),然后再调用next(key,value)来填充这两个对象而得到解析的结果.
对于hadoop1.0及之后的版本,对应hadoop core jar里的 org.apache.hadoop.mapreduce包,RR是一种主动模式,主要实现如下函数:
getCurrentKey()
getCurrentValue()
nextKeyValue()
getPos()
isSplitable(JobContext context, Path file)

此接口调用者,应是直接调用nextKeyValue()来获取key value实例,然后通过两个getCurrent方法由调用者获取其对应实例;
无论是next(Text key, Text value),还是nextKeyValue(),split被RR解析为一条的(k1,v1)
MapClass.map():从(k1,v1)到(k2,v2)
不多说.

Partition.partition():给(k2,v2)盖章(指定reduce)
经过此函数数据流中的k,v是不变的,函数相当于给此k,v对盖个章,指定其要去的reduce.
值得注意的是,经过RR出来的(k1,v1)是先顺序经过map和partition,而非全过完map之后再全过partition.

所有(k2,v2)写入membuffer.
经过InputFormat中RR的next/nextKeyValue迭代,形成的系列(k2,v2)会被写入到内存的buffer中,此buffer大小通过io.sort.mb参数指定.
另为了控制资源,会有另一个进程来监控此buffer容量,当实际容量达到/超过此buffer某百分比时,将发生spill操作,此百分比通过io.sort.spill.percent参数设定.
此监控进程与写入独立,所以不会影响写入速度.

spill:mem buffer中的(k2,v2)落地为local disk的文件
当buffer实际容量超过门槛限制,或者split所有数据经RR next/nextKeyValue迭代完成时,均触发spill操作,所以每个map任务流中至少会有一次spill.
spill作用是将buffer里的kv对sort到本地磁盘文件.
sort:针对spill kv数据的排序
在spill时,sort会参考partition时给数据所盖的章,即reduce号,另在reduce号相同的情况下再按照k2排序.这个排序至少有两个目的:
1.使得在有可能的combine中进行同key的迭代变的很容易;
2.使得shuffle后的reduce大流程中基于同key合并变得相对容易;

combine:从(k2,iter(v2))到(k3,v3)
combine类建立的初衷是尽量减少spill发生的磁盘写操作的量(拿网上通用的说法就是本地的reduce,也确实是实现了Reduce的接口/父类)
combine基于业务规则处理key及同key的所有value.这(k2,iter(v2))经处理后,仅返回一个(k3,v3),减少了spill磁盘写入量.
combine很有意思,特别是当它跟partition联合来使用时,在此我在理解上了也颇费了翻功夫:
partition给(k2,v2)盖章,决定了这个(k2,v2)具体要到哪个reduce去.而其后的combine处理,我完全随便写逻辑来处理同key的(k2,iter(v2)),大家是否也有跟我一样的疑问:假设我在partition里是根据value的某种规则来决定reduce号,但到了combine中,某个(k2,v2)却要跟其他很多(k2,v2')进行处理,而其他(k2,v2')并不一定会在partition根据规则归到同一个reduce号上,那combine却笼统的返回了一个(k3,v3),那么这个(k3,v3)到底是会给哪个reduce呢?
我理解hadoop在这里的机制是,在spill时,首先按照partition的reduce号来将分到同reduce的(k2,v2)放到一起,在此前提下,再进行同key的sort,最终写入到一个与reduce号相关的spill file中(位于local disk).我还没有去看具体代码,暂时把这个理解写这里.
另对于这个问题,网上有文章(http://langyu.iteye.com/blog/992916)提及:"Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。"
这位作者文章很精彩,我很赞同以上的说法,但,实际上我认为运用spill的这种特性,可以完成二级(或者多级)统计的功能(见后面一个测试的例子).

merge and sort:将多个map的spill文件按找partition分配的reduce号归并
首先是按照reduce号进行分组,分组内将按照key(即k3)排序.

shuffle:洗牌,不如称之为摸牌
当一个TaskTracker执行完了map任务,他将完成状态汇报给JobTracker.这时(我认为)JobTracker会让分配reduce的TaskTracker来获取这个map生成的文件,每个TaskTracker获取它所执行reduce对应的那份.

merge and sort:reduce端的
当TaskTracker从各map机上取得属于自己的文件后,要执行merge和sort过程,即按照k3进行排序,这样有利于通过一次遍历就可以达到输入(k3,iter(v3))的效果.

reduce:从(k3,iter(v3))到(k4,v4)并写入至分布式文件系统
每个reduce会生成一个分布式文件,放置于任务目录下.





附:一个测试的例子:
...经过map的处理到partition,
partition的输入(k2,v2)其中k2的含义是书籍出版的年份m个(比如,1999/2010/2012/...),v2的含义是书的分类n个(比如,技术/文学/艺术/...)
partition的处理是根据value来映射到reduce,即每中书籍分类让一个reduce处理.
combine的输入(k2,iter(v2)),输出(k3,v3)其中k3=k2;v3=count(iter(v2));
reduce的输入是(k3,iter(v3)),输出(k4,v4),其中k4=k3;v4=sum(iter(v2));
整个mr执行完之后,按书籍分类输出n个文件,每个文件里根据年份输出该年份(该分类)的书籍总数;
达到做二级分类统计的目的







  • 大小: 101.4 KB
分享到:
评论

相关推荐

    IT面试-Hadoop总结-云计算

    2. 数据流系统:数据流系统是Hadoop中的一个高性能数据处理工具,提供了实时数据处理能力。 3. Streaming Storm:Streaming Storm是Hadoop中的一个流式数据处理工具,提供了高性能和高可靠性的数据处理能力。 七、...

    Hadoop mapreduce 实现MR_DesicionTreeBuilder 决策树

    总之,MR_DesicionTreeBuilder 是一个利用 Hadoop MapReduce 平台构建分布式决策树的解决方案,它在处理海量数据时具有优势,但也需要解决并行化和效率问题。理解和掌握这一技术对于大数据环境下的机器学习应用至关...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    02-hdfs源码跟踪之打开输入流总结.avi 03-mapreduce介绍及wordcount.avi 04-wordcount的编写和提交集群运行.avi 05-mr程序的本地运行模式.avi 06-job提交的逻辑及YARN框架的技术机制.avi 07-MR程序的几种提交...

    大数据之路选择Hadoop还是MaxCompute?Hadoop开源与MaxCompute对比材料

    - **数据流控制**:监控数据流动,防止未授权的数据泄露。 - **用户作业的隔离运行**:确保不同用户的作业之间相互隔离。 - **作业运行时使用最小权限**:限制作业执行过程中的权限范围,提高安全性。 - **数据访问...

    Hadoop 大数据学习ppt

    Pig是Hadoop上的一个高级数据流语言和执行框架,主要用于数据分析。Pig Latin是Pig的语言,通过抽象的脚本描述数据处理流程,使得复杂的数据处理任务变得更加简洁。Pig可以自动将这些脚本转化为一系列MapReduce任务...

    hadoop段海涛老师八天实战视频

    02-hdfs源码跟踪之打开输入流总结.avi 03-mapreduce介绍及wordcount.avi 04-wordcount的编写和提交集群运行.avi 05-mr程序的本地运行模式.avi 06-job提交的逻辑及YARN框架的技术机制.avi 07-MR程序的几种提交...

    2020年hadoop简历模板.doc

    设计Storm实时处理方案和数据落地完整性需求,意味着具备实时数据流处理和数据完整性保障的知识。 7. **HBase数据仓库**:基于Hadoop的分布式NoSQL数据库,支持高并发读写操作。简历中提到的HBase二级索引和批量写...

    基于hadoop的应用开发.zip

    《基于Hadoop的应用开发》 在当今大数据时代,Hadoop作为开源的分布式计算框架,...通过深入学习“HadoopMR-master”这样的项目,我们可以更好地理解和运用Hadoop,从而在大数据和人工智能的交叉领域实现更大的创新。

    HadoopMR-CombineLocalFiles:它将目录中的所有本地文件合并为一个文件

    总之,"HadoopMR-CombineLocalFiles"项目是针对Hadoop MapReduce的一种优化策略,它利用Java编程语言实现了将多个小文件合并成一个大文件的功能,以减少map任务数量,提升大数据处理的效率。在处理大量小文件的场景...

    HADOOP生态系统.docx

    Hadoop生态系统是大数据处理的核心组成部分,它包含了多个组件,如HDFS(Hadoop Distributed File System)、Spark、Hive、Elasticsearch和Kafka,这些工具共同构建了一个高效、可扩展的数据处理平台。 HDFS是...

    spark-2.4.0-bin-hadoop2.7.tgz

    它能处理来自多种源的数据流,如Kafka、Flume等,并且可以与其他Spark组件无缝集成。 4. **MLlib**:Spark的机器学习库,包含各种机器学习算法,如分类、回归、聚类、协同过滤等,以及模型选择和评估工具。在2.4.0...

    hadoop实战源代码Java

    通过`FileSystem.open()`方法打开文件,然后读取输入流的数据,通常结合`FSDataInputStream.read()`方法,将数据写入到本地文件系统。完成后,记得关闭输入流。 3. **文件删除**:Hadoop提供了`FileSystem.delete()...

    googleMR Hadoop

    Hadoop生态系统还包含了其他组件,比如用于数据仓库的Hive、数据流处理的Storm和Spark、数据索引的Solr等。Hadoop也支持多种编程语言,如Java、Python和C++等,使得开发者可以从容地使用熟悉的语言开发MapReduce作业...

    (最终版)大数据Hadoop与Spark学习经验谈.pdf

    2. 数据收集:Flume、HDFS、MR/Spark Core等 3. 数据存储:SQL和NoSQL、HBase等 4. 批处理:MapReduce、Spark Core等 5. 交互式分析:Presto、Impala、Spark SQL等 6. 流处理:Storm、Spark Streaming等 7. 数据挖掘...

    hadoop-data

    4. **Hadoop生态**:除了HDFS和MapReduce,Hadoop生态系统还包括许多其他组件,如HBase(分布式NoSQL数据库)、Hive(数据仓库工具)、Pig(数据流处理)、Spark(快速通用的大数据处理引擎)、Oozie(工作流调度...

    hadoop面试题汇总1

    5. 其他技术:Pig Latin 是一种用于 Hadoop 的数据流语言,用于数据分析;Hive 提供了 SQL-like 查询接口;HBase 是一个分布式列式数据库,常与 ZooKeeper 配合使用;下一代 MapReduce(MR2)引入了 Tez 和 YARN ...

    spark-3.2.4-bin-hadoop3.2-scala2.13 安装包

    综上所述,“spark-3.2.4-bin-hadoop3.2-scala2.13”安装包是构建和运行Spark应用程序的基础,涵盖了大数据处理、流处理、机器学习等多个领域,为开发者提供了高效、灵活的数据处理平台。通过深入理解和熟练运用,...

    Hadoop权威指南中文版

    5. Hadoop其他项目:除以上核心组件外,Hadoop项目还包括一些子项目,如HBase(分布式列式存储数据库)、ZooKeeper(协调服务)、Avro(数据序列化系统)、Hive(数据仓库工具)、Pig(高级数据流语言)和Sqoop...

    【Hadoop篇03】Hadoop配置历史服务1

    通过历史服务器,用户可以追踪作业的执行状态,分析性能瓶颈,以及进行问题排查,这对于优化和管理大规模数据处理工作流非常有用。 ### 配置历史服务器 配置Hadoop历史服务器主要涉及`mapred-site.xml`文件的修改...

    mr 集成ECLIPSE

    总的来说,通过"mr集成eclipse",我们可以将Eclipse的强大开发能力与Hadoop MapReduce的分布式计算能力相结合,使得大数据应用的开发变得更加便捷高效。这对于大数据工程师来说是一个非常实用的工作流,能够极大地...

Global site tag (gtag.js) - Google Analytics