MapReduce 是现今一个非常流行的分布式计算框架,它被设计用于并行计算海量数据。第一个提出该技术框架的是Google 公司,而Google 的灵感则来自于函数式编程语言,如LISP,Scheme,ML 等。MapReduce 框架的核心步骤主要分两部分:Map 和Reduce。当你向MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map 任务,然后分配到不同的节点上去执行,每一个Map 任务处理输入数据中的一部分,当Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个Map 的输出汇总到一起并输出。从高层抽象来看,MapReduce的数据流图如图1 所示:
本文的重点是剖析MapReduce的核心过程----Shuffle和Sort。在本文中,Shuffle是指从Map产生输出开始,包括系统执行排序以及传送Map输出到Reducer作为输入的过程。在这里我们将去探究Shuffle是如何工作的,因为对基础的理解有助于对MapReduce程序进行调优。
首先从Map端开始分析,当Map开始产生输出的时候,他并不是简单的把数据写到磁盘,因为频繁的操作会导致性能严重下降,他的处理更加复杂,数据首先是写到内存中的一个缓冲区,并作一些预排序,以提升效率,如图:
每个Map任务都有一个用来写入输出数据的循环内存缓冲区,这个缓冲区默认大小是100M,可以通过io.sort.mb属性来设置具体的大小,当缓冲区中的数据量达到一个特定的阀值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 默认是0.80)时,系统将会启动一个后台线程把缓冲区中的内容spill 到磁盘。在spill过程中,Map的输出将会继续写入到缓冲区,但如果缓冲区已经满了,Map就会被阻塞直道spill完成。spill线程在把缓冲区的数据写到磁盘前,会对他进行一个二次排序,首先根据数据所属的partition排序,然后每个partition中再按Key排序。输出包括一个索引文件和数据文件,如果设定了Combiner,将在排序输出的基础上进行。Combiner就是一个Mini Reducer,它在执行Map任务的节点本身运行,先对Map的输出作一次简单的Reduce,使得Map的输出更紧凑,更少的数据会被写入磁盘和传送到Reducer。Spill文件保存在由mapred.local.dir指定的目录中,Map任务结束后删除。
每当内存中的数据达到spill阀值的时候,都会产生一个新的spill文件,所以在Map任务写完他的最后一个输出记录的时候,可能会有多个spill文件,在Map任务完成前,所有的spill文件将会被归并排序为一个索引文件和数据文件。如图3 所示。这是一个多路归并过程,最大归并路数由io.sort.factor 控制(默认是10)。如果设定了Combiner,并且spill文件的数量至少是3(由min.num.spills.for.combine 属性控制),那么Combiner 将在输出文件被写入磁盘前运行以压缩数据。
对写入到磁盘的数据进行压缩(这种压缩同Combiner 的压缩不一样)通常是一个很好的方法,因为这样做使得数据写入磁盘的速度更快,节省磁盘空间,并减少需要传送到Reducer 的数据量。默认输出是不被压缩的, 但可以很简单的设置mapred.compress.map.output为true 启用该功能。压缩所使用的库由mapred.map.output.compression.codec来设定
当spill 文件归并完毕后,Map 将删除所有的临时spill 文件,并告知TaskTracker 任务已完成。Reducers 通过HTTP 来获取对应的数据。用来传输partitions 数据的工作线程个数由tasktracker.http.threads 控制,这个设定是针对每一个TaskTracker 的,并不是单个Map,默认值为40,在运行大作业的大集群上可以增大以提升数据传输速率。
现在让我们转到Shuffle的Reduce部分。Map的输出文件放置在运行Map任务的TaskTracker的本地磁盘上(注意:Map输出总是写到本地磁盘,但是Reduce输出不是,一般是写到HDFS),它是运行Reduce任务的TaskTracker所需要的输入数据。Reduce任务的输入数据分布在集群内的多个Map任务的输出中,Map任务可能会在不同的时间内完成,只要有其中一个Map任务完成,Reduce任务就开始拷贝他的输出。这个阶段称为拷贝阶段,Reduce任务拥有多个拷贝线程,可以并行的获取Map输出。可以通过设定mapred.reduce.parallel.copies来改变线程数。
Reduce是怎么知道从哪些TaskTrackers中获取Map的输出呢?当Map任务完成之后,会通知他们的父TaskTracker,告知状态更新,然后TaskTracker再转告JobTracker,这些通知信息是通过心跳通信机制传输的,因此针对以一个特定的作业,jobtracker知道Map输出与tasktrackers的映射关系。Reducer中有一个线程会间歇的向JobTracker询问Map输出的地址,直到把所有的数据都取到。在Reducer取走了Map输出之后,TaskTracker不会立即删除这些数据,因为Reducer可能会失败,他们会在整个作业完成之后,JobTracker告知他们要删除的时候才去删除。
如果Map输出足够小,他们会被拷贝到Reduce TaskTracker的内存中(缓冲区的大小由mapred.job.shuffle.input.buffer.percnet控制),或者达到了Map输出的阀值的大小(由mapred.inmem.merge.threshold控制),缓冲区中的数据将会被归并然后spill到磁盘。
拷贝来的数据叠加在磁盘上,有一个后台线程会将它们归并为更大的排序文件,这样做节省了后期归并的时间。对于经过压缩的Map 输出,系统会自动把它们解压到内存方便对其执行归并。
当所有的Map 输出都被拷贝后,Reduce 任务进入排序阶段(更恰当的说应该是归并阶段,因为排序在Map 端就已经完成),这个阶段会对所有的Map 输出进行归并排序,这个工作会重复多次才能完成。
假设这里有50 个Map 输出(可能有保存在内存中的),并且归并因子是10(由io.sort.factor控制,就像Map 端的merge 一样),那最终需要5 次归并。每次归并会把10个文件归并为一个,最终生成5 个中间文件。在这一步之后,系统不再把5 个中间文件归并成一个,而是排序后直接“喂”给Reduce 函数,省去向磁盘写数据这一步。最终归并的数据可以是混合数据,既有内存上的也有磁盘上的。由于归并的目的是归并最少的文件数目,使得在最后一次归并时总文件个数达到归并因子的数目,所以每次操作所涉及的文件个数在实际中会更微妙些。譬如,如果有40 个文件,并不是每次都归并10 个最终得到4 个文件,相反第一次只归并4 个文件,然后再实现三次归并,每次10 个,最终得到4 个归并好的文件和6 个未归并的文件。要注意,这种做法并没有改变归并的次数,只是最小化写入磁盘的数据优化措施,因为最后一次归并的数据总是直接送到Reduce 函数那里。
在Reduce 阶段,Reduce 函数会作用在排序输出的每一个key 上。这个阶段的输出被直接写到输出文件系统,一般是HDFS。在HDFS 中,因为TaskTracker 节点也运行着一个DataNode 进程,所以第一个块备份会直接写到本地磁盘。到此,MapReduce 的Shuffle 和Sort 分析完毕。
相关推荐
Shuffle是MapReduce中的一个重要环节,负责在Map和Reduce之间进行数据的排序和分区,确保正确性。 在Java中实现MapReduce的Shuffle过程,首先需要理解以下几个核心概念: 1. **Map阶段**:Map阶段是数据处理的起始...
在MapReduce的工作流程中,Shuffle机制扮演着至关重要的角色,它确保了数据在传递到Reducer阶段之前被正确地分区和排序。本文将深入探讨MapReduce的Shuffle机制,并讨论如何通过自定义Partitioner来满足特定的业务...
4. Shuffle和Sort阶段在MapReduce中至关重要。Map任务的输出会被分区并排序,然后由Reduce任务消费。源码中可以看到这些过程的实现细节。 5. MapReduce的容错机制在YARN下得到了强化。如果某个任务失败,AM会重新向...
2. Shuffle和Sort阶段:Map的输出被排序并分区,准备进行Reduce操作。这个阶段确保了属于同一簇的数据点被传递给同一个Reducer。 3. Reduce阶段:Reducer接收到同一簇的所有数据点,重新计算该簇的中心点。这通常...
当发现多个Reduce作业的输入数据可以通过Map阶段直接生成时,可以省略不必要的Shuffle和Sort步骤。 优化前后的性能对比通常能直观地证明优化的效果。通过减少冗余操作,压缩作业链,可以显著提高查询的执行效率,...
3. Shuffle和Sort:这是MapReduce框架自动执行的步骤,确保所有相同键的值被归到一起,并按键排序。 4. Reduce函数:处理Map阶段生成的键值对,进行聚合操作。比如计算每个学生的总分、平均分,或者统计成绩分布区间...
MapReduce模型还包括一个 Shuffle 和 Sort 阶段,这是介于Map和Reduce之间的关键步骤。Shuffle负责将Map阶段产生的键值对按照键进行排序,以便Reduce函数可以按顺序处理。Sort则确保相同键的值被放在一起,为Reduce...
Shuffle和Sort阶段是MapReduce流程中的一个重要环节,它确保相同键的值会被分到同一个Reducer上。这个阶段会对Map阶段产生的键值对进行排序,以便Reduce函数可以按顺序处理。 接下来,Reduce函数会接收相同的键值对...
- Shuffle & Sort:Map产生的中间结果会根据键进行排序和分区,以便相同键的数据会被发送到同一个Reduce任务。 - Reduce:Reduce函数接收来自Map阶段的键值对,对每个键的所有值进行聚合操作,生成最终的输出结果...
在“MapReduce高阶实现”这一主题中,我们将深入探讨如何优化和扩展MapReduce框架,以应对更复杂的数据处理需求。 1. **Map阶段**: - 映射阶段是数据处理的起点,它接收输入数据,将大块数据分割成小片(键值对)...
在这个场景中,我们将讨论如何使用Hadoop的MapReduce来实现词统计和列式统计。 **一、MapReduce原理** MapReduce的工作流程主要包括三个主要阶段:Map、Shuffle(排序)和Reduce。在Map阶段,输入数据被分割成多个...
4. **Shuffle和Sort**:在Mapper和Reducer之间,系统会自动进行数据的Shuffle和Sort过程,将具有相同键的值归类到一起,为Reducer提供有序的输入。 5. **提交和运行作业**:完成Mapper和Reducer代码后,你需要将...
- JobTracker是MapReduce框架中的中心调度器,负责分配任务和监控任务状态。TaskTracker是运行在每个工作节点上的服务,接收JobTracker的指令,执行具体的Map和Reduce任务。 8. **应用程序开发**: - 开发者需要...
MapReduce通过shuffle和sort过程保证数据正确传输。尽管主要使用Java编写,但通过Hadoop Streaming,MapReduce也可支持其他编程语言。 Apache Spark是一个更快、更通用的大数据处理框架,它不仅支持批处理,还支持...
### MapReduce原理实现分析 #### 一、MapReduce概述与工作流程 MapReduce是一种编程模型,主要用于处理大规模数据集的并行运算。该模型由Google提出,并被广泛应用于分布式计算领域。Hadoop作为开源框架之一,实现...
4. **Shuffle & Sort**: Map 输出的键值对会被 Hadoop 自动进行分区、排序和合并。这一步是 MapReduce 的一个重要环节,确保相同键的值被聚集在一起,以便 Reduce 阶段处理。 5. **Reduce 阶段**: 在 Reduce ...
中间结果通过 Shuffle 和 Sort 阶段进行排序和分组,确保相同键值的数据被传递到同一个Reduce任务。 2. **编程接口**:MapReduce提供了编程接口,包括Mapper类和Reducer类,用户需要继承这两个类并重写其方法。...
在Hadoop中,MapReduce框架提供了一整套的生态系统,包括JobTracker(在Hadoop 2.x中被YARN取代)来调度和监控任务,TaskTracker(在Hadoop 2.x中被NodeManager取代)执行实际的Map和Reduce任务,以及DataNode存储...