在我看来 hadoop的核心是mapreduce,而mapreduce的核心则是 shuffle,在我们需要优化mapreduce,提高mapreduce效率时,需要优化的核心代码都在这个shuffle过程。
我从网上拉过来一张图,加上自己的标注来详细记录一下该过程,以便后期优化代码做一个记录
mapreduce整个执行过程如下如所示
其中1、2、3、4....是我自己加上的以便一步一步来分析,下面我们来根据源代码分析这一步一步的过程,在此我跟踪的源代码是 hadoop-1.2.1 版本
1:inputSplit 这个过程我们看JobClient 类的writeNewSplits 方法,此方法为根据获得到的输入文件,将文件分块放入map中,关键的一句代码
我们跟踪进去FileInputFormat 的 getSplits方法
这段代码也是划分文件块的关键所在,首先获得文件块大小,在配置 block.size中可以配置,默认为64MB,然后计算出split的大小,默认也是64MB(可跟踪computeSplitSize方法查看原因),然后开始划分文件(while循环),比如64M文件则默认为一个块,65m文件则为两个块。
2:这一步自不必说,就是map计算的过程
3:这一步中,map计算的输出结果首先是写到一个缓冲区中,当缓冲区数据大小超过一定阀值之后,则进行spill 溢写操作,即将缓冲区中的数据写入到本地磁盘,在此过程中还根据key的hash值进行键值对的排序和合并操作,核心实现代码在MapTask的MapOutputBuffer 类中的collect方法,该方法主要用于收集map输出数据并写入缓冲区,当缓冲区超出临界值则开启溢写线程。
4:这一步是溢写的过程,在这个过程中还进行partition和sort
该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。当kvstart == kvend条件成立时,表示没有要spill的记录。
SpillThread线程的run方法中调用sortAndSpill把缓存中的输出写到格式为+ '/spill' + spillNumber + '.out'的spill文件中。索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中 创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。
注:在此系统存放文件的方式使用的是二级索引,在此没有做研究。
5:在此步骤将磁盘中的多个map输出文件具有相同key 的进行合并
核心代码
6:在这一步中reduce任务通过http方式将map的输出结果复制到reduce执行的节点上来,在此开始关注 RecudeTask 类中方法
下面的核心代码是对reduce输入数据进行混淆,涉及到的操作类似map段 进行合并和排序
7:这一步进行merge 合并数据 不做过多的关注了
8和9是reduce真正运行的逻辑过程并将最终结果输出
上述步骤中涉及到的混淆 shuffle过程为3、4、5、6、7,优化方面有很多方式,其中最主要的优化面就是reduce通过http获取map输出的过程。
我从网上拉过来一张图,加上自己的标注来详细记录一下该过程,以便后期优化代码做一个记录
mapreduce整个执行过程如下如所示
其中1、2、3、4....是我自己加上的以便一步一步来分析,下面我们来根据源代码分析这一步一步的过程,在此我跟踪的源代码是 hadoop-1.2.1 版本
1:inputSplit 这个过程我们看JobClient 类的writeNewSplits 方法,此方法为根据获得到的输入文件,将文件分块放入map中,关键的一句代码
List<InputSplit> splits = input.getSplits(job);
我们跟踪进去FileInputFormat 的 getSplits方法
long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); }
这段代码也是划分文件块的关键所在,首先获得文件块大小,在配置 block.size中可以配置,默认为64MB,然后计算出split的大小,默认也是64MB(可跟踪computeSplitSize方法查看原因),然后开始划分文件(while循环),比如64M文件则默认为一个块,65m文件则为两个块。
2:这一步自不必说,就是map计算的过程
3:这一步中,map计算的输出结果首先是写到一个缓冲区中,当缓冲区数据大小超过一定阀值之后,则进行spill 溢写操作,即将缓冲区中的数据写入到本地磁盘,在此过程中还根据key的hash值进行键值对的排序和合并操作,核心实现代码在MapTask的MapOutputBuffer 类中的collect方法,该方法主要用于收集map输出数据并写入缓冲区,当缓冲区超出临界值则开启溢写线程。
final boolean kvsoftlimit = ((kvnext > kvend) ? kvnext - kvend > softRecordLimit : kvend - kvnext <= kvoffsets.length - softRecordLimit); if (kvstart == kvend && kvsoftlimit) { LOG.info("Spilling map output: record full = " + kvsoftlimit); startSpill(); }
4:这一步是溢写的过程,在这个过程中还进行partition和sort
该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。当kvstart == kvend条件成立时,表示没有要spill的记录。
protected class SpillThread extends Thread { @Override public void run() { spillLock.lock(); spillThreadRunning = true; try { while (true) { spillDone.signal(); while (kvstart == kvend) { spillReady.await(); } try { spillLock.unlock(); //此处是进行排序溢写的核心方法 sortAndSpill(); } catch (Exception e) { sortSpillException = e; } catch (Throwable t) { sortSpillException = t; String logMsg = "Task " + getTaskID() + " failed : " + StringUtils.stringifyException(t); reportFatalError(getTaskID(), t, logMsg); } finally { spillLock.lock(); if (bufend < bufindex && bufindex < bufstart) { bufvoid = kvbuffer.length; } kvstart = kvend; bufstart = bufend; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { spillLock.unlock(); spillThreadRunning = false; } } }
SpillThread线程的run方法中调用sortAndSpill把缓存中的输出写到格式为+ '/spill' + spillNumber + '.out'的spill文件中。索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中 创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); //创建溢出文件 格式为+ '/spill' + spillNumber + '.out out = rfs.create(filename); final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); int spindex = kvstart; IndexRecord rec = new IndexRecord(); InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) {//如果为空则直接write // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { final int kvoff = kvoffsets[spindex % kvoffsets.length]; getVBytesForOffset(kvoff, value); key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART])); writer.append(key, value); ++spindex; } } else {如果不为空则先combiner在排序再输出 int spstart = spindex; while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } // close the writer writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } finally { if (null != writer) writer.close(); } } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } }
注:在此系统存放文件的方式使用的是二级索引,在此没有做研究。
5:在此步骤将磁盘中的多个map输出文件具有相同key 的进行合并
核心代码
@SuppressWarnings("unchecked") RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, job.getInt("io.sort.factor", 100), new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, null, spilledRecordsCounter); //write merged output to disk long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); } //close writer.close();
6:在这一步中reduce任务通过http方式将map的输出结果复制到reduce执行的节点上来,在此开始关注 RecudeTask 类中方法
URL url = mapOutputLoc.getOutputLocation(); HttpURLConnection connection = (HttpURLConnection)url.openConnection(); //通过http方式拉取map输出数据 InputStream input = setupSecureConnection(mapOutputLoc, connection);
下面的核心代码是对reduce输入数据进行混淆,涉及到的操作类似map段 进行合并和排序
MapOutput mapOutput = null; if (shuffleInMemory) {//判断混淆能在缓存中进行(此种方式效率比较高) if (LOG.isDebugEnabled()) { LOG.debug("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into RAM from " + mapOutputLoc.getTaskAttemptId()); } mapOutput = shuffleInMemory(mapOutputLoc, connection, input, (int)decompressedLength, (int)compressedLength); } else { if (LOG.isDebugEnabled()) { LOG.debug("Shuffling " + decompressedLength + " bytes (" + compressedLength + " raw bytes) " + "into Local-FS from " + mapOutputLoc.getTaskAttemptId()); } mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength); } mapOutput.decompressedSize = decompressedLength; return mapOutput;
7:这一步进行merge 合并数据 不做过多的关注了
8和9是reduce真正运行的逻辑过程并将最终结果输出
上述步骤中涉及到的混淆 shuffle过程为3、4、5、6、7,优化方面有很多方式,其中最主要的优化面就是reduce通过http获取map输出的过程。
发表评论
-
Sort-based Shuffle的设计与实现
2016-03-15 08:49 807原文 http://www.cnblogs.com/hsea ... -
spark的几个重要概念
2015-12-04 14:09 0本节主要记录以下几个概念 一:RDD的五大特点 二:RDD 窄 ... -
spark部署安装调试
2015-12-02 11:28 735本节记录spark下载-->编译-->安装--&g ... -
spark基本概念
2015-11-12 10:45 783记录一下课堂笔记: ... -
hadoop计算能力调度器配置
2015-10-29 10:39 1012问题出现 hadoop默认调度器是FIFO,其原理就是先按照作 ... -
HBase在各大应用中的优化和改进
2015-10-28 14:59 688Facebook之前曾经透露过Facebook的hbase架构 ... -
一篇很好的解决系统问题过程描述文章
2015-09-23 08:40 498在网上看到的一篇解决h ... -
通过GeoHash核心原理来分析hbase rowkey设计
2015-09-08 15:49 3513注:本文是结合hbase ... -
从OpenTsdb来分析rowkey设计
2015-09-06 16:04 4942讨论此问题前,先理解 ... -
HBase中asynchbase的使用方式
2015-08-25 10:32 8187Hbase的原生java 客户端是完全同步的,当你使用原生AP ... -
Mapreduce优化的点滴
2015-07-16 15:18 821注:转载 1. 使用自定义Writable 自带的Text ... -
hadoop 如何自定义类型
2015-07-15 09:37 1236记录一下hadoop 数据类型章节的笔记,以便后期使用,本文是 ... -
ZooKeeper伪分布式集群安装及使用
2015-02-13 08:29 9161. zookeeper介绍 ZooKeeper是一个为分 ... -
hadoop-mahout 核心算法总结
2015-02-07 10:08 1551其实大家都知道hadoop为我们提供了一个大的框架,真正的 ... -
推荐引擎内部原理--mahout
2015-01-22 11:11 568转载自:https://www.ibm.com/devel ... -
hadoop 动态添加删除节点
2015-01-20 13:39 658转自:http://www.cnblogs.com/rill ... -
hbase hadoop zookeeper
2015-01-19 14:47 0hadoop 部署手册 http://www.iteblo ... -
mapreduce 开发以及部署
2015-01-16 13:56 833前面几篇文章的梳理让我对hadoop新yarn 框架有了一 ... -
hadoop yarn几个问题的记录
2015-01-13 11:48 651本文主要介绍以下几 ... -
hadoop集群部署时候的几个问题记录
2015-01-13 10:24 736本章部署一个hadoop 集群 ...
相关推荐
MapReduce详解Shuffle过程 MapReduce是Hadoop生态系统中的一种分布式计算框架,而Shuffle过程是MapReduce的核心部分。Shuffle过程是将map task的输出结果传送到reduce task的过程,顾名思义,Shuffle就是洗牌或弄乱...
Hadoop Mapreduce过程shuffle过程全解析,Shuffle过程
Hadoop Shuffle 过程详解 Hadoop 的 Shuffle 过程是 MapReduce 的核心,也被称为奇迹发生的地方。要想理解 MapReduce,Shuffle 是必须要了解的。Shuffle 的正常意思是洗牌或弄乱,可能大家更熟悉的是 Java API里的 ...
在Java中实现MapReduce的Shuffle过程,首先需要理解以下几个核心概念: 1. **Map阶段**:Map阶段是数据处理的起始点,输入数据被分割成多个小块(split),每个split由一个Mapper任务处理。Mapper接收键值对(key-...
简单说一下hadoop和spark的shuffle过程
MapReduce Shuffle 过程图解 Xmind文件
spark.shuffle.blockTransferService netty shuffle过程中,传输数据的方式,两种选项,netty或nio,spark 1.2开始,默认就是netty,比较简单而且性能较高,spark 1.5开始nio就是过期的了,而且spark 1.6中会去除掉 ...
8k程序让iPod shuffle拷歌真的舒服 (下载)iPod shuffle令人诟病的一点就是必须通过苹果的iTunes软件来导入音乐文件,如果想自己控制导入shuffle的歌曲的话操作比较繁琐。令人高兴的是,国外高手Martin Fiedler开发的...
**Spark Shuffle**是指在Spark应用程序执行过程中,数据从一个节点或分区移动到另一个节点或分区的过程。这种数据重分布通常发生在诸如`groupByKey`, `reduceByKey`, `join`等操作中。Shuffle操作涉及到大量的磁盘I/...
这一章节强调了使用iPod shuffle时需要特别注意的安全信息,以及在操作过程中需要注意的要点。这是保证用户安全使用设备的重要部分。 ### 第8章:了解更多信息、服务及支持 在最后一章,用户可以了解到更多关于...
在MATLAB的开发过程中,优化性能是非常重要的,尤其是在处理大数据集时。`Shuffle`的C-MEX实现展示了如何利用低级编程语言的优势来加速MATLAB代码。对于那些对性能有严格要求的算法,使用C-MEX接口可以显著提高效率...
在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。而Spark也会有自己的shuffle实现过程。 1.2 Spark中的 shuffle 介绍 在DAG调度的过程中,Stage 阶段的
《Shuffle Transformer:对视觉变换器空间洗牌的再思考》 近年来,基于窗口的变换器在非重叠局部窗口内计算自注意力,已经在图像分类、语义分割和目标检测等任务上取得了令人鼓舞的结果。然而,对于改进表示能力的...
Spark在Map阶段调度运行的ShuffleMapTask,最后会生成.data和.index文件,可以通过我的这篇文章SparkShuffle过程分析:Map阶段处理流程了解具体流程和详情。同时,在Executor上运行一个ShuffleMapTask,返回了一个...
标题中的“ipod shuffle顺序调整工具”指的是一个专门设计用于改变第一代iPod Shuffle中歌曲播放顺序的应用程序。这个工具的出现是因为第一代iPod Shuffle没有屏幕,用户不能直接在设备上排列歌曲顺序,通常需要依赖...
iPod shuffle是苹果公司推出的便携式数字音乐播放器,属于iPod系列,具有小巧轻便的特点,是苹果最小的mp3播放器。用户手册为用户提供了详细的使用说明,方便用户了解和掌握iPod shuffle的基本功能和操作方法。 在...
4. **动画效果** - 内置的动画使得元素的显示和隐藏过程平滑自然,提升了用户体验。 5. **API和事件** - Shuffle.js提供了丰富的API接口和事件,允许开发者自定义行为,如添加新的元素、改变排序规则等。 6. **...