MapReduce作业的运行过程包含4个独立的实体:
1)Client:提交MapReduce作业.(2)JobTracker:协调作业的运行。(3)TaskTracker:运行作业划分后的Map任务或Reduce任务。(4)Shared FileSystem(一般为HDFS),用来在其他实体间共享作业文件。
1.作业的提交
Job的waitForCompletion(true)方法所实现的作业提交过程如下:向jobtracker请求一个新的作业ID,见步骤2。检查作业的输出说明。计算作业的InputSplit。将运行作业所需要的资源(包括作业JAR文件、配置文件和计算所得的输入文件)复制到一个以作业ID命名的目录下jobtracker的文件系统。作业JAR的副本由mapred.submit.replication属性控制(默认值为10),见步骤3。告知jobtracker作业准备执行,见步骤4。
2.作业初始化
JobTracker会把作业放入一个内部队列中,交由job scheduler进行调度,并对其进行初始化(见步骤5)。为了创建任务运行列表,作业调度器首先从共享文件系统中获取Job已计算好的InputSplit的信息(见步骤6)。然后为每个InputSplit创建一个map任务。
3.任务的分配
TaskTracker定期发送“心跳”(heartbeat)给JobTracker.“心跳”告知jobtracker,tasktracker是否还存活,同时也充当两者之间的消息通道(见步骤7)。在jobtracker为tasktracker选择任务之前,jobtracker必须先选定任务所在的作业。在Hadoop中,MapReduce的调度器可以选择,默认的调度器是原始的基于队列的FIFO调度器,还有两个多用户调度器,分别名为Fair Scheduler和 Capacity Scheduler。一旦选择好作业,jobtracker就可以为该作业选定一个任务。对于map任务和reduce任务,tasktracker有固定数量的任务槽。
4.任务的执行
tasktracker已经被分配一个任务,下一步是运行该任务。第一步,通过从共享文件系统把作业的JAR文件复制到tasktracker所在的文件系统。同时,tasktracker将应用程序所需要的全部文件从分布式缓存复制到本地磁盘(见步骤8)。第二步,tasktracker为任务新建一个本地工作目录,并把JAR文件中的内容解压到这个文件夹下。第三步,tasktracker新建一个TaskRunner实例来运行该任务。TaskRunner启动一个新的child JVM(见步骤9)来运行每个任务。
shuffle和排序
map函数开始产生输出时,它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。图6-4展示了这个过程。
每个map任务都有一个环形内存缓冲区,用于存储任务的输出。
首先从Map端开始分析,当Map开始产生输出的时候,他并不是简单的把数据写到磁盘,因为频繁的操作会导致性能严重下降,他的处理更加复杂,数据首先是写到内存中的一个缓冲区,并作一些预排序,以提升效率,如图:
每个Map任务都有一个用来写入输出数据的循环内存缓冲区,这个缓冲区默认大小是100M,可以通过io.sort.mb属性来设置具体的大小,当缓冲区中的数据量达到一个特定的阀值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 默认是0.80)时,系统将会启动一个后台线程把缓冲区中的内容spill 到磁盘。在spill过程中,Map的输出将会继续写入到缓冲区,但如果缓冲区已经满了,Map就会被阻塞直道spill完成。spill线程在把缓冲区的数据写到磁盘前,会对他进行一个二次排序,首先根据数据所属的partition排序,然后每个partition中再按Key排序。输出包括一个索引文件和数据文件,如果设定了Combiner,将在排序输出的基础上进行。Combiner就是一个Mini Reducer,它在执行Map任务的节点本身运行,先对Map的输出作一次简单的Reduce,使得Map的输出更紧凑,更少的数据会被写入磁盘和传送到Reducer。Spill文件保存在由mapred.local.dir指定的目录中,Map任务结束后删除。
每当内存中的数据达到spill阀值的时候,都会产生一个新的spill文件,所以在Map任务写完他的最后一个输出记录的时候,可能会有多个spill文件,在Map任务完成前,所有的spill文件将会被归并排序为一个索引文件和数据文件。如图3 所示。这是一个多路归并过程,最大归并路数由io.sort.factor 控制(默认是10)。如果设定了Combiner,并且spill文件的数量至少是3(由min.num.spills.for.combine 属性控制),那么Combiner 将在输出文件被写入磁盘前运行以压缩数据。
对写入到磁盘的数据进行压缩(这种压缩同Combiner 的压缩不一样)通常是一个很好的方法,因为这样做使得数据写入磁盘的速度更快,节省磁盘空间,并减少需要传送到Reducer 的数据量。默认输出是不被压缩的, 但可以很简单的设置mapred.compress.map.output为true 启用该功能。压缩所使用的库由mapred.map.output.compression.codec来设定
当spill 文件归并完毕后,Map 将删除所有的临时spill 文件,并告知TaskTracker 任务已完成。Reducers 通过HTTP 来获取对应的数据。用来传输partitions 数据的工作线程个数由tasktracker.http.threads 控制,这个设定是针对每一个TaskTracker 的,并不是单个Map,默认值为40,在运行大作业的大集群上可以增大以提升数据传输速率。
现在让我们转到Shuffle的Reduce部分。Map的输出文件放置在运行Map任务的TaskTracker的本地磁盘上(注意:Map输出总是写到本地磁盘,但是Reduce输出不是,一般是写到HDFS),它是运行Reduce任务的TaskTracker所需要的输入数据。Reduce任务的输入数据分布在集群内的多个Map任务的输出中,Map任务可能会在不同的时间内完成,只要有其中一个Map任务完成,Reduce任务就开始拷贝他的输出。这个阶段称为拷贝阶段,Reduce任务拥有多个拷贝线程,可以并行的获取Map输出。可以通过设定mapred.reduce.parallel.copies来改变线程数。
Reduce是怎么知道从哪些TaskTrackers中获取Map的输出呢?当Map任务完成之后,会通知他们的父TaskTracker,告知状态更新,然后TaskTracker再转告JobTracker,这些通知信息是通过心跳通信机制传输的,因此针对以一个特定的作业,jobtracker知道Map输出与tasktrackers的映射关系。Reducer中有一个线程会间歇的向JobTracker询问Map输出的地址,直到把所有的数据都取到。在Reducer取走了Map输出之后,TaskTracker不会立即删除这些数据,因为Reducer可能会失败,他们会在整个作业完成之后,JobTracker告知他们要删除的时候才去删除。
如果Map输出足够小,他们会被拷贝到Reduce TaskTracker的内存中(缓冲区的大小由mapred.job.shuffle.input.buffer.percnet控制),或者达到了Map输出的阀值的大小(由mapred.inmem.merge.threshold控制),缓冲区中的数据将会被归并然后spill到磁盘。
拷贝来的数据叠加在磁盘上,有一个后台线程会将它们归并为更大的排序文件,这样做节省了后期归并的时间。对于经过压缩的Map 输出,系统会自动把它们解压到内存方便对其执行归并。
当所有的Map 输出都被拷贝后,Reduce 任务进入排序阶段(更恰当的说应该是归并阶段,因为排序在Map 端就已经完成),这个阶段会对所有的Map 输出进行归并排序,这个工作会重复多次才能完成。
假设这里有50 个Map 输出(可能有保存在内存中的),并且归并因子是10(由io.sort.factor控制,就像Map 端的merge 一样),那最终需要5 次归并。每次归并会把10个文件归并为一个,最终生成5 个中间文件。在这一步之后,系统不再把5 个中间文件归并成一个,而是排序后直接“喂”给Reduce 函数,省去向磁盘写数据这一步。最终归并的数据可以是混合数据,既有内存上的也有磁盘上的。由于归并的目的是归并最少的文件数目,使得在最后一次归并时总文件个数达到归并因子的数目,所以每次操作所涉及的文件个数在实际中会更微妙些。譬如,如果有40 个文件,并不是每次都归并10 个最终得到4 个文件,相反第一次只归并4 个文件,然后再实现三次归并,每次10 个,最终得到4 个归并好的文件和6 个未归并的文件。要注意,这种做法并没有改变归并的次数,只是最小化写入磁盘的数据优化措施,因为最后一次归并的数据总是直接送到Reduce 函数那里。在Reduce 阶段,Reduce 函数会作用在排序输出的每一个key 上。这个阶段的输出被直接写到输出文件系统,一般是HDFS。在HDFS 中,因为TaskTracker 节点也运行着一个DataNode 进程,所以第一个块备份会直接写到本地磁盘。到此,MapReduce 的Shuffle 和Sort 分析完毕。
相关推荐
官方MapReduce运行机制动画图,详细描述Job的提交流程
MapReduce的工作机制可以分为五个主要步骤: 1. **准备阶段**:首先,原始数据会被分成多个块,每个块都会被分配给不同的Map任务处理。 2. **Map阶段**:每个Map任务都会读取分配给它的数据块,并应用`Map`函数处理...
4. **容错机制**:MapReduce也内置了容错机制,如果某个Map或Reduce任务失败,系统会重新调度任务。 通过HDFS和MapReduce的协同工作,Hadoop能够处理PB级别的数据,广泛应用于数据分析、日志处理、推荐系统、机器...
MapReduce工作流是一种在Hadoop生态系统中处理大数据的机制,它允许多个MapReduce作业(MR作业)按照特定的依赖顺序依次执行,以完成更复杂的计算任务。这些作业之间的依赖关系通常形成一个有向无环图(DAG),其中...
本文旨在深入剖析Hadoop中的两大核心组件——HDFS(Hadoop Distributed File System)和MapReduce的工作原理及其实现机制。首先,我们将介绍Hadoop NameNode与DataNode的基本运行模式;随后,将重点分析MapReduce的...
- **容错性**:MapReduce框架内置了容错机制,当某个任务失败时,框架会自动重试该任务,确保数据处理的完整性。 #### 六、MapReduce的限制 尽管MapReduce是一种强大的数据处理模型,但它也有一定的局限性: - **...
8. 计数模式(Counting with Counters):计数器是MapReduce中用于记录任务执行过程中特定事件次数的机制。它可以用来监控MapReduce作业的性能,例如计算错误数据的数量或特定数据的出现频率。 9. 过滤模式...
接下来的部分(原文未完全提供)可能会深入探讨MapReduce的基本编程模型,给出具体的应用案例,解释其内部架构,如数据分片、任务调度、容错机制等,并可能涉及MapReduce如何适应不同规模和类型的数据集,以及如何...
MapReduce工作原理详解 Hadoop是一个开源的分布式计算框架,起源于Apache项目,专注于大规模数据的分布式存储和处理。它的核心特性包括可扩展性、经济性、高效性和可靠性,使得处理PB级别的数据变得可能,同时利用...
#### 二、MapReduce的工作机制 1. **文件切片**:在MapReduce任务启动之前,首先对输入文件进行逻辑上的切片处理。每个切片对应一个独立的Map任务。切片的大小默认与HDFS块大小一致,但可以通过配置调整。 2. **Map...
MapReduce是Google开发的一种分布式计算模型,广泛应用于大数据处理领域。它将大规模的数据处理任务分解为许多小的子任务,这些子任务...理解MapReduce的工作流程和InputFormat机制对于优化Hadoop作业性能至关重要。
#### 二、MapReduce的工作原理 MapReduce的基本思想是将大规模的数据集分割成较小的部分,通过并行处理的方式在多台计算机上进行计算。整个计算过程可以分为两个主要阶段:**Map** 和 **Reduce** 阶段。 1. **Map...
1. **理解MapReduce的工作原理**:深入学习MapReduce的工作机制,理解其分布式计算的优势。 2. **实际编程经验积累**:通过编写MapReduce程序,积累了实际编程经验,熟悉了Hadoop和MapReduce的API。 3. **分布式计算...
在这个过程中,Hadoop MapReduce通过并行化处理和容错机制,能够高效地处理大规模数据,即使在硬件故障的情况下也能确保数据完整性。同时,MapReduce的编程模型相对简单,使得开发者能够专注于业务逻辑,而不是底层...
首先,我们要理解MapReduce的基本工作原理。Map阶段负责数据的拆分和处理,将原始输入数据分解为键值对,并发送到各个工作节点进行并行处理。Reduce阶段则负责整合Map阶段的结果,执行聚合操作,最终生成所需的输出...
通过理解并动手实现这个框架,开发者不仅可以深入理解MapReduce的工作机制,还能掌握如何在实际项目中应用分布式计算解决大数据问题。对于那些关注人工智能和大数据处理的开发者,这将是一次宝贵的实践经历。
通过《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》这本书,读者不仅可以了解到MapReduce的基本工作原理,还能学习到如何设计和优化MapReduce作业,以及如何在实际项目中应用MapReduce解决复杂问题。...