Shuffle过程剖析及性能优化
MapReduce确保每个reducer的输入都按键排序。
Shuffle:系统执行排序的过程—将map输出作为输入传给reducer(如图1、图2)。
图1
图2
如图1、图2所示,从map输出到reduce输入就是shuffle阶段。但实际执行过程远比上图所示复杂。
Shuffle 是指从Map 产生输出开始,包括系统执行排序以及传送Map 输出到Reducer 作为输入的过程。
详细执行如图3所示:
图3
1. Map端
当Map 开始产生输出时,它并不是简单的把数据写到磁盘,因为频繁的磁盘操作会导致性能严重下降。它的处理过程更复杂,数据首先是写到内存中的一个缓冲区,并做了一些预排序,以提升效率。
每个Map 任务都有一个用来写入输出数据的循环内存缓冲区。这个缓冲区默认大小是100MB,可以通过io.sort.mb属性来设置具体大小。当缓冲区中的数据量达到一个特定阀值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent默认是0.80)时,系统将会启动一个后台线程把缓冲区中的内容spill 到磁盘。在spill 过程中,Map 的输出将会继续写入到缓冲区,但如果缓冲区已满,Map 就会被阻塞直到spill 完成。spill 线程在把缓冲区的数据写到磁盘前,会对它进行一个二次快速排序,首先根据数据所属的partition 排序,然后每个partition 中再按Key 排序。输出包括一个索引文件和数据文件。
如果设定了Combiner,将在排序输出的基础上运行。Combiner 就是一个Mini Reducer,它在执行Map 任务的节点本身运行,先对Map 的输出做一次简单Reduce,使得Map 的输出更紧凑,更少的数据会被写入磁盘和传送到Reducer。
spill 文件保存在由mapred.local.dir指定的目录中,Map 任务结束后删除。
每当内存中的数据达到spill 阀值的时候,都会产生一个新的spill 文件,所以在Map任务写完它的最后一个输出记录时,可能会有多个spill 文件。在Map 任务完成前,所有的spill 文件将会被归并排序为一个索引文件和数据文件。这是一个多路归并过程,最大归并路数由io.sort.factor控制(默认是10)。如果设定了Combiner,并且spill文件的数量至少是3(由min.num.spills.for.combine 属性控制),那么Combiner 将在输出文件被写入磁盘前运行以压缩数据。
对写入到磁盘的数据进行压缩,通常是一个很好的方法,因为这样做使得数据写入磁盘的速度更快,节省磁盘空间,并减少需要传送到Reducer 的数据量。默认输出是不被压缩的, 但可以很简单的设置mapred.compress.map.output为true 启用该功能。压缩所使用的库由mapred.map.output.compression.codec来设定。
当spill 文件归并完毕后,Map 将删除所有的临时spill 文件,并告知TaskTracker 任务已完成。Reducers 通过HTTP 来获取对应的数据。用来传输partitions 数据的工作线程数由tasktracker.http.threads控制,这个设定是针对每一个TaskTracker 的,并不是单个Map,默认值为40,在运行大作业的大集群上可以增大以提升数据传输速率。
2. Reduce端
2.1 copy阶段
Map 的输出文件放置在运行Map 任务的TaskTracker 的本地磁盘上(注意:Map 输出总是写到本地磁盘,但Reduce 输出不是,一般是写到HDFS),它是运行Reduce 任务的TaskTracker 所需要的输入数据。Reduce 任务的输入数据分布在集群内的多个Map 任务的输出中,Map 任务可能会在不同的时间内完成,只要完成的Map 任务数达到占总Map任务数一定比例(mapred.reduce.slowstart.completed.maps默认0.05),Reduce 任务就开始拷贝它的输出。
Reduce 任务拥有多个拷贝线程, 可以并行的获取Map 输出。可以通过设定mapred.reduce.parallel.copies来改变线程数,默认是5。
如果Map 输出足够小,它们会被拷贝到Reduce TaskTracker 的内存中(缓冲区的大小
由mapred.job.shuffle.input.buffer.percent控制,指定了用于此目的的堆内存的百分比);如果缓冲区空间不足,会被拷贝到磁盘上。当内存中的缓冲区用量达到一定比例阀值(由mapred.job.shuffle.merge.percent控制),或者达到了Map 输出的阀值大小(由mapred.inmem.merge.threshold控制),缓冲区中的数据将会被归并然后spill 到磁盘。
拷贝来的数据叠加在磁盘上,有一个后台线程会将它们归并为更大的排序文件,这样做节省了后期归并的时间。对于经过压缩的Map 输出,系统会自动把它们解压到内存方便对其执行归并。
2.2 sort阶段
当所有的Map 输出都被拷贝后,Reduce 任务进入排序阶段(更恰当的说应该是归并阶段,因为排序在Map 端就已经完成),这个阶段会对所有的Map 输出进行归并排序,这个工作会重复多次才能完成。
假设这里有50 个Map 输出(可能有保存在内存中的),并且归并因子是10(由io.sort.factor控制,就像Map 端的merge 一样),那最终需要5 次归并。每次归并会把10个文件归并为一个,最终生成5 个中间文件。
注:每趟合并的文件数实际上比示例中展示的更微妙。目标是合并最小数量的文件以便满足最后一趟的合并系数。因此如果是40个文件,我们不会在四趟中,每趟合并10个文件从而得到4个文件。相反,第一趟只合并4个文件,随后三趟合并所有十个文件。在最后一趟中,4个已合并的文件和余下的6个(未合并的)文件合计10个文件。这并没有改变合并的次数,它只是一个优化措施,尽量减少写到磁盘的数据量,因为最后一趟总是直接合并到reduce。
2.3 reduce阶段
在Reduce 阶段,Reduce 函数会作用在排序输出的每一个key 上。这个阶段的输出被直接写到输出文件系统,一般是HDFS。在HDFS 中,因为TaskTracker 节点也运行着一个DataNode 进程,所以第一个块备份会直接写到本地磁盘。
3. 配置调优
该配置调优方案主要是对以上Shuffle整个过程中涉及到的配置项按流程顺序一一呈现并给以调优建议。
1. Map端
1) io.sort.mb
用于map输出排序的内存缓冲区大小
类型:Int
默认:100mb
备注:如果能估算map输出大小,就可以合理设置该值来尽可能减少溢出写的次数,这对调优很有帮助。
2)io.sort.spill.percent
map输出排序时的spill阀值(即使用比例达到该值时,将缓冲区中的内容spill 到磁盘)
类型:float
默认:0.80
3)io.sort.factor
归并因子(归并时的最多合并的流数),map、reduce阶段都要用到
类型:Int
默认:10
备注:将此值增加到100是很常见的。
4)min.num.spills.for.combine
运行combiner所需的最少溢出写文件数(如果已指定combiner)
类型:Int
默认:3
5)mapred.compress.map.output
map输出是否压缩
类型:Boolean
默认:false
备注:如果map输出的数据量非常大,那么在写入磁盘时压缩数据往往是个很好的主意,因为这样会让写磁盘的速度更快,节约磁盘空间,并且减少传给reducer的数据量。
6)mapred.map.output.compression.codec
用于map输出的压缩编解码器
类型:Classname
默认:org.apache.hadoop.io.compress.DefaultCodec
备注:推荐使用LZO压缩。Intel内部测试表明,相比未压缩,使用LZO压缩的 TeraSort作业,运行时间减少60%,且明显快于Zlib压缩。
7) tasktracker.http.threads
每个tasktracker的工作线程数,用于将map输出到reducer。
(注:这是集群范围的设置,不能由单个作业设置)
类型:Int
默认:40
备注:tasktracker开http服务的线程数。用于reduce拉取map输出数据,大集群可以将其设为40~50。
2. reduce端
1)mapred.reduce.slowstart.completed.maps
调用reduce之前,map必须完成的最少比例
类型:float
默认:0.05
2)mapred.reduce.parallel.copies
reducer在copy阶段同时从mapper上拉取的文件数
类型:int
默认:5
3)mapred.job.shuffle.input.buffer.percent
在shuffle的复制阶段,分配给map输出的缓冲区占堆空间的百分比
类型:float
默认:0.70
4)mapred.job.shuffle.merge.percent
map输出缓冲区(由mapred.job.shuffle.input.buffer.percent定义)使用比例阀值,当达到此阀值,缓冲区中的数据将会被归并然后spill 到磁盘。
类型:float
默认:0.66
5)mapred.inmem.merge.threshold
map输出缓冲区中文件数
类型:int
默认:1000
备注:0或小于0的数意味着没有阀值限制,溢出写将有mapred.job.shuffle.merge.percent单独控制。
6)mapred.job.reduce.input.buffer.percent
在reduce过程中,在内存中保存map输出的空间占整个堆空间的比例。
类型:float
默认:0.0
备注:reduce阶段开始时,内存中的map输出大小不能大于该值。默认情况下,在reduce任务开始之前,所有的map输出都合并到磁盘上,以便为reducer提供尽可能多的内存。然而,如果reducer需要的内存较少,则可以增加此值来最小化访问磁盘的次数,以提高reduce性能。
3.性能调优补充
相对于大批量的小文件,hadoop更合适处理少量的大文件。一个原因是FileInputFormat生成的InputSplit是一个文件或该文件的一部分。如果文件很小,并且文件数量很多,那么每次map任务只处理很少的输入数据,每次map操作都会造成额外的开销。
问题一:大批量的小文件处理(如果可能,应该尽量避免该情况)
解决方案:用CombineFileInputFormat输入文件,它能把多个文件打包到一个分片中以便每个mapper可以处理更多的数据。关键是,决定哪些块放入同一个分片时,CombineFileInputFormat会考虑到节点和机架的因素,所以在典型MapReduce作业中处理输入的速度并不会下降。
问题二:如何控制map数量以及设置合适的reduce数量
参考:
http://chenwq.iteye.com/blog/1535809
http://blog.csdn.net/strongerbit/article/details/7440111
http://blog.csdn.net/wf1982/article/details/6672607
文中图片详见附件
若有说的不恰当的地方欢迎指教。。。
相关推荐
本文将从MapReduce优化的角度,深入剖析MapReduce平台的优化方法,涵盖Combiner、Partitioner、数据压缩等方面。 一、Combiner优化 Combiner是MapReduce中的一种特殊组件,位于Mapper和Reducer之间,负责对Mapper...
在Java中实现MapReduce的Shuffle过程,首先需要理解以下几个核心概念: 1. **Map阶段**:Map阶段是数据处理的起始点,输入数据被分割成多个小块(split),每个split由一个Mapper任务处理。Mapper接收键值对(key-...
同时,也会涵盖中间数据排序、分区策略以及Shuffle过程,这些都是MapReduce执行过程中不可或缺的环节。 除了基本概念和原理,手册还可能包含MapReduce的优化技巧,比如通过本地化减少网络传输,或者利用Combiner...
第6章:MapReduce优化 本章讨论MapReduce性能优化的策略,如Combiner的使用、分区策略调整、数据本地性和 speculative tasks。优化MapReduce作业可以显著提高大数据处理的效率和资源利用率。 第7章:实战编程案例 ...
通过MapReduce,我们可以高效地分析大量的学生成绩数据,提取出有价值的信息,帮助教育管理者了解教学效果,优化教学策略。而与Hadoop的结合,使得这种分析能够适应不断增长的数据规模,满足大数据时代的需求。
通过深入理解SQL查询到MapReduce程序的转换过程,分析作业间的关系和数据处理逻辑,我们可以设计出更高效的数据处理流程,提高大数据系统的整体性能。这不仅有助于提升数据处理的速度,还有助于降低运行成本,为企业...
根据给定的文件信息,我们可以深入探讨Apache Pig的性能优化及其在大数据处理中的角色与优势。首先,让我们从Apache Pig的基本概念入手。 ### Apache Pig概述 Apache Pig是一种高生产力的数据流语言和执行框架,...
总结,MapReduce的源码分析涵盖了数据分片、Map函数、Shuffle过程、Reduce函数、输入输出格式、任务调度等多个关键部分。理解这些核心组件的工作原理,有助于我们更高效地利用Hadoop MapReduce处理大数据,同时也...
《MapReduce2.0源码分析与实战编程》是一本深度解析Hadoop MapReduce框架的书籍,其中包含详细的源码注释,旨在帮助读者深入理解MapReduce的工作原理,并能进行实际编程应用。这本书的重点在于剖析MapReduce的核心...
《深入解析MapReduce架构设计与实现原理》是针对Hadoop技术的一本专业指南,尤其侧重于MapReduce这一核心组件的深度剖析。MapReduce是Google提出的一种分布式计算模型,被广泛应用于大数据处理领域,如日志分析、...
除了基本的MapReduce模型,还有一些优化策略可以提高性能,例如Combiner(本地化Reduce)、Partitioner(自定义分区)和Secondary Sort(二次排序)等。这些技巧可以在不改变最终结果的情况下,减少数据传输量,提高...
理解Hadoop MapReduce的源码分析对于优化大数据处理性能至关重要。通过深入研究,开发者可以更好地定制和优化Job,提升系统效率,满足复杂的数据处理需求。无论是系统架构师、数据工程师还是Hadoop开发者,都应该对...
通过深入分析这些源代码,开发者可以更好地理解Hadoop MapReduce的工作流程,从而定制化处理逻辑、优化性能,甚至开发新的功能。对于大数据处理和分布式计算领域的研究者和工程师来说,这是一个极有价值的学习材料。
- **Combiner优化**:Combiner是MapReduce中的一个可选组件,它可以在Map阶段就对局部结果进行部分聚合,减少网络传输的数据量,提高整体性能。 - **MapReduce库的扩展**:通过自定义InputFormat、OutputFormat、...
然而,需要注意的是,由于MapReduce模型的天然局限,如数据通信开销、延迟等问题,可能会对算法性能造成影响。因此,在实际应用中,需要结合具体场景,灵活调整算法参数和优化策略,以达到最佳效果。
MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成...在实际应用中,根据具体需求,你可能还需要考虑其他因素,如数据本地化、容错机制、优化性能等,这些都是MapReduce编程中不可忽视的部分。
- **优化策略**:可以通过调整MapReduce的配置参数,如减少shuffle的数据传输量,提高并行度,优化磁盘I/O等,来提升计算效率。 最后,`AlgorithmProject`可能是该项目的源代码或者文档,里面可能包含了具体的实现...