从MapTask类中分析下去,看一下map任务是如何被调用并执行的。
入口方法是MapTask的run方法,看一下run方法的相关介绍:
org.apache.hadoop.mapred.Task public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException Run this task as a part of the named job. This method is executed in the child process and is what invokes user-supplied map, reduce, etc. methods. Parameters: umbilical - for progress reports
可以看出,这个方法接收两个参数,jobconf就是任务的相关配置,而umbilical是作为任务进度reporter的身份存在的。
map任务根据是否有reduce部分,而将进度分成两种样式,
// If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress. if (conf.getNumReduceTasks() == 0) { mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%). mapPhase = getProgress().addPhase("map", 0.667f); sortPhase = getProgress().addPhase("sort", 0.333f); }
如果没有reduce任务,就没有必要为map的结果进行归并排序操作了,那么整个map过程将以100%进度执行;相反,如果其中含有reduce任务,那么map的任务被分成两部分,map函数执行的部分占整个进度的66.7%(此时我们在RecordReader中的getProgress仅仅给出的是相对这部分的百分比值),剩下的33.3%赋予归并排序的过程。
是否使用当前的new-api执行会有所不同,建议都使用最新的api,org.apache.hadoop.mapreduce是最新的api,org.apache.hadoop.mapred是旧的api,我们这里分析的也是new-api。
if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); }
整个map执行过程大概如下:
input.initialize(split, mapperContext); mapper.run(mapperContext); mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); input.close(); input = null; output.close(mapperContext); output = null;
首先,初始化RecordReader,执行map具体的函数,待所有的map函数执行完成之后,进入SORT阶段,排序完成后,整个map过程就执行完成了。
在new-api中的map函数执行的具体过程:
public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
执行完初始化方法之后,从RecordReader中依次执行nextKeyValue()方法,当返回true时,拿到其key和value,并执行map函数,直到该分片内的数据都已经读取完毕(nextKeyValue()返回false),最后执行清理操作。由于new-api与old-api的差异比较大,分析这段代码可以让我们更加深入地理解这个过程,避免陷入新的编程模型的坑。
在MapTask执行时,根据是否有Reduce任务,其RecordWriter(OutputCollector) 也会有所不同,如果没有直接返回不需要排序,否则构造一个NewOutputCollector类。在NewOutputCollector的构造函数中,会调用方法createSortingCollector(job, reporter),会根据job的参数:mapreduce.job.map.output.collector.class构造一个OutputCollector(需要实现接口:org.apache.hadoop.mapred.MapOutputCollector,主要负责Map端数据的收集,虽然好像很少有人定制这个),默认使用org.apache.hadoop.mapred.MapOutputBuffer类。
在初始化init方法中,mapreduce.map.sort.spill.percent用于控制缓冲区达到哪个百分比后,开始进行Spill操作,默认值80%,只能是[0, 1]中间的数;mapreduce.task.io.sort.mb确定缓冲区的大小,默认值100M,这部分是占用Map堆内存的,设置太大,Map端堆内存也需要设置较大,设置太小,spill的次数可能变多,最大不能设置操作32G,需要具体情况衡量;
//sanity checks final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); } sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
甚至map.sort.class排序的类都可以定制,默认使用的是快速排序。
SpillThread就是控制溢写的线程,是一个守护线程,在MapOutputBuffer中被调用:
spillThread.setDaemon(true); spillThread.setName("SpillThread"); spillLock.lock(); try { spillThread.start(); while (!spillThreadRunning) { spillDone.await(); } } catch (InterruptedException e) { throw new IOException("Spill thread failed to initialize", e); } finally { spillLock.unlock(); } if (sortSpillException != null) { throw new IOException("Spill thread failed to initialize", sortSpillException); }
spillLock是一个可重入锁对象,带有两种类型的条件,done以及ready,分别表示溢写完成和准备开始溢写。
final ReentrantLock spillLock = new ReentrantLock(); final Condition spillDone = spillLock.newCondition(); final Condition spillReady = spillLock.newCondition();
根据代码理解,当累积的index内存超出mapreduce.task.index.cache.limit.bytes的限制后(1024*1024),就会将索引的文件放到本地磁盘上,否则一直在内存里。
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (totalIndexCacheMemory >= indexCacheMemoryLimit) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; }
在spill的过程中,还会同时进行Combine操作:
if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { final int kvoff = offsetFor(spindex % maxRec); int keystart = kvmeta.get(kvoff + KEYSTART); int valstart = kvmeta.get(kvoff + VALSTART); key.reset(kvbuffer, keystart, valstart - keystart); getVBytesForOffset(kvoff, value); writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } }
如果combineRunner是空,表示没有设置Combine Class,这时就会直接溢写;否则,就可能进行Combiner过程。
注意下面的注释,”在某个partition的记录数量过少时,会避免使用Combiner”,可以理解为,为了性能考虑,避免无用的一些操作。
执行Combiner的方法,CombinerRunner.combine,由于要兼容老版本的类型,分为Old/NewCombineRunner,我们查看的是NewCombineRunner,
public void combine(RawKeyValueIterator iterator, OutputCollector<K,V> collector ) throws IOException, InterruptedException, ClassNotFoundException { // make a reducer org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer = (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>) ReflectionUtils.newInstance(reducerClass, job); org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, taskId, iterator, null, inputCounter, new OutputConverter(collector), committer, reporter, comparator, keyClass, valueClass); reducer.run(reducerContext); }
从代码中可以看出,Combiner就是Map端的Reducer,执行方式所有参数都与Reducer没有差别。
Combine会在两个时间点被调用,当map端缓冲区内存占用达到mapreduce.map.sort.spill.percent时,SpillThread进行,进行完成后,会执行一次Combine操作(合理地设置内存大小能够避免过多地spill,尽量放到内存操作);还有一次,就是当Map端Collector收集数据完成之后,在MapOutputBuffer.flush()方法刷新缓冲区之后,也会调用sortAndSpill操作。
但是要注意,combine调用的不确定性,一定不能根据combine执行的次数编程,要做到Combine是幂等操作(@Idempotence, hadoop的源码中这种操作非常多)。
Spill的时候可以打印出来的一些日志:
2014-11-27 21:06:35,113 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output 2014-11-27 21:06:35,113 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 328052900; bufend = 93461297; bufvoid = 536870912 2014-11-27 21:06:35,113 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 82013220(328052880); kvend = 50208864(200835456); length = 31804357/33554432 2014-11-27 21:06:35,113 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 125876129 kvi 31469028(125876112) 2014-11-27 21:07:04,801 INFO [main] org.apache.hadoop.mapred.MapTask: Finished spill 99
最后合理地使用Combiner可以大大减少中间结果的大小,同时带来Reducer端数据处理速度的提升,下面就是我们加入Combiner之后的输入输出纪录对比:
相关推荐
Hadoop源码分析是深入理解Hadoop分布式计算平台原理的起点,通过源码分析,可以更好地掌握Hadoop的工作机制、关键组件的实现方式和内部通信流程。Hadoop项目包括了多个子项目,其中最核心的是HDFS和MapReduce,这两...
【标题】:“2022毕业设计,基于 Hadoop 的游戏数据分析系统源码” 这个毕业设计项目主要聚焦于使用Hadoop框架开发一个游戏数据分析系统。Hadoop是Apache软件基金会的一个开源分布式计算平台,专为处理和存储大规模...
4. **Map任务**:Map函数接收输入数据,将其切分为键值对,然后应用用户定义的逻辑进行处理。处理后的中间结果通过分区器(Partitioner)进行分区,并由Reducer消费。 5. **Reduce任务**:Reduce函数从多个Map任务...
针对本次实验,我们需要用到Hadoop集群作为模拟大数据的分析软件,集群环境必须要包括,hdfs,hbase,hive,flume,sqoop等插件,最后结合分析出来的数据进行可视化展示,需要用到Python(爬取数据集,可视化展示)...
在“hadoop1-2-1源码”中,你可以深入了解HDFS的文件操作、数据块管理、心跳机制等关键功能的实现,以及MapReduce如何调度任务、分配资源、处理数据分片和容错等复杂逻辑。此外,源码还包含了各种实用工具和接口,如...
- Shuffle与Sort:在Map任务完成后,系统自动进行数据排序,准备进入Reduce阶段。 - Reduce阶段:对中间结果进行聚合处理,生成最终结果。 3. YARN:资源调度器 - YARN(Yet Another Resource Negotiator)是...
基于 Hadoop 的游戏数据分析系统源码+项目说明.zip基于 Hadoop 的游戏数据分析系统源码+项目说明.zip基于 Hadoop 的游戏数据分析系统源码+项目说明.zip基于 Hadoop 的游戏数据分析系统源码+项目说明.zip基于 Hadoop ...
在Hadoop Map-Reduce中,输入数据被划分为小块,每个块被分配给不同的Map任务处理。Map任务将处理后的结果输出,这些输出会被排序后分配给Reduce任务进一步处理。最终,Reduce任务将整合的数据写回到文件系统中,...
3. **源码分析**:提供的源码可以让读者深入了解Hadoop内部的工作机制,包括数据流的处理、任务调度、故障恢复等。通过阅读和研究源码,开发者可以更好地理解和定制Hadoop,优化其性能,或者为Hadoop生态系统贡献新...
基于Hadoop的JAVA简易网盘项目源码+数据库.zip基于java的简易网盘项目 hdfs为 192.168.31.10:8020/Mycould 采用mysql本地 panuser 基于Hadoop的JAVA简易网盘项目源码+数据库.zip基于java的简易网盘项目 hdfs为 192....
Hadoop 源代码分析 Hadoop 是一个开源的分布式计算框架,由 Apache 基金会维护。Hadoop 的核心组件包括 HDFS(Hadoop Distributed File System)和 MapReduce。HDFS 是一个分布式文件系统,可以存储大量的数据,而 ...
【标题】"实战hadoop中的源码"涵盖了在大数据处理领域深入理解并应用Apache Hadoop的核心技术。Hadoop是开源的分布式计算框架,它允许在大规模集群上存储和处理海量数据。通过研究Hadoop的源码,开发者可以深入了解...
在Hadoop源码分析中,首先我们要关注的是JobTracker(在Hadoop 2.x中被YARN的ResourceManager取代)和TaskTracker(被NodeManager取代)的角色。它们负责管理和调度Map和Reduce任务,确保整个作业的顺利执行。在Map-...
通过阅读源码,读者可以学习到如何编写Hadoop MapReduce程序,理解HDFS的内部工作流程,以及HBase和Hive的数据处理逻辑。 总之,《Hadoop实战第二版》全面覆盖了Hadoop生态系统的各个重要组件,结合源码的学习,...
### Hadoop源码分析知识点概览 #### 一、Hadoop概述与背景 - **Google核心技术**:Hadoop的设计理念很大程度上受到了Google一系列核心技术的影响,包括Google File System (GFS)、BigTable以及MapReduce等。这些...
基于Hadoop图书推荐系统源码+数据库.zip基于Hadoop图书推荐系统源码+数据库.zip基于Hadoop图书推荐系统源码+数据库.zip基于Hadoop图书推荐系统源码+数据库.zip基于Hadoop图书推荐系统源码+数据库.zip基于Hadoop图书...
这个"Hadoop源码分析视频下载"提供了一种深入理解Hadoop内部工作原理的途径,这对于开发者、系统管理员以及对大数据技术感兴趣的人来说是非常有价值的。接下来,我们将详细探讨Hadoop的核心组件、其设计哲学、源码...
基于Hadoop Hive健身馆可视化分析平台项目源码+数据库文件.zip启动方式 环境启动 hadoop hive2元数据库 sql导入 导入hivesql脚本,修改application.yml 启动主程序 HadoopApplication 基于Hadoop Hive健身馆可视化...
这份“Hadoop源码分析完整版”资料深入剖析了Hadoop的核心组件及其工作原理,旨在帮助开发者理解并优化Hadoop系统。 1. **Hadoop概述** Hadoop由Apache基金会开发,其核心包括两个主要部分:Hadoop Distributed ...
1. **输入分片**:首先,输入的数据被分成多个分片(split),每个分片由一个 Map 任务处理。 2. **Map 阶段**: - 输入键值对:通常是从文件中读取的键值对。 - 处理逻辑:Map 函数根据特定的业务逻辑处理键值对...