`

napreduce shuffle 过程记录

阅读更多
           在我看来 hadoop的核心是mapreduce,而mapreduce的核心则是 shuffle,在我们需要优化mapreduce,提高mapreduce效率时,需要优化的核心代码都在这个shuffle过程。
       我从网上拉过来一张图,加上自己的标注来详细记录一下该过程,以便后期优化代码做一个记录
mapreduce整个执行过程如下如所示


其中1、2、3、4....是我自己加上的以便一步一步来分析,下面我们来根据源代码分析这一步一步的过程,在此我跟踪的源代码是 hadoop-1.2.1 版本
1:inputSplit 这个过程我们看JobClient 类的writeNewSplits 方法,此方法为根据获得到的输入文件,将文件分块放入map中,关键的一句代码
List<InputSplit> splits = input.getSplits(job);

我们跟踪进去FileInputFormat  的 getSplits方法
 
        long blockSize = file.getBlockSize();
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);

        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                                   blkLocations[blkIndex].getHosts()));
          bytesRemaining -= splitSize;
        }
        
        if (bytesRemaining != 0) {
          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
                     blkLocations[blkLocations.length-1].getHosts()));
        }

这段代码也是划分文件块的关键所在,首先获得文件块大小,在配置 block.size中可以配置,默认为64MB,然后计算出split的大小,默认也是64MB(可跟踪computeSplitSize方法查看原因),然后开始划分文件(while循环),比如64M文件则默认为一个块,65m文件则为两个块。

2:这一步自不必说,就是map计算的过程

3:这一步中,map计算的输出结果首先是写到一个缓冲区中,当缓冲区数据大小超过一定阀值之后,则进行spill 溢写操作,即将缓冲区中的数据写入到本地磁盘,在此过程中还根据key的hash值进行键值对的排序和合并操作,核心实现代码在MapTask的MapOutputBuffer 类中的collect方法,该方法主要用于收集map输出数据并写入缓冲区,当缓冲区超出临界值则开启溢写线程。
 final boolean kvsoftlimit = ((kvnext > kvend)
              ? kvnext - kvend > softRecordLimit
              : kvend - kvnext <= kvoffsets.length - softRecordLimit);
          if (kvstart == kvend && kvsoftlimit) {
            LOG.info("Spilling map output: record full = " + kvsoftlimit);
            startSpill();
   }

4:这一步是溢写的过程,在这个过程中还进行partition和sort
该Thread会检查内存中的输出缓存区,在满足一定条件的时候将缓冲区中的内容spill到硬盘上。这是一个标准的生产者-消费者模型,MapTask的collect方法是生产者,spillThread是消费者,它们之间同步是通过spillLock(ReentrantLock)和spillLock上的两个条件变量(spillDone和spillReady)完成的。当kvstart == kvend条件成立时,表示没有要spill的记录。
  protected class SpillThread extends Thread {

      @Override
      public void run() {
        spillLock.lock();
        spillThreadRunning = true;
        try {
          while (true) {
            spillDone.signal();
            while (kvstart == kvend) {
              spillReady.await();
            }
            try {
              spillLock.unlock();
              //此处是进行排序溢写的核心方法
              sortAndSpill();
            } catch (Exception e) {
              sortSpillException = e;
            } catch (Throwable t) {
              sortSpillException = t;
              String logMsg = "Task " + getTaskID() + " failed : " 
                              + StringUtils.stringifyException(t);
              reportFatalError(getTaskID(), t, logMsg);
            } finally {
              spillLock.lock();
              if (bufend < bufindex && bufindex < bufstart) {
                bufvoid = kvbuffer.length;
              }
              kvstart = kvend;
              bufstart = bufend;
            }
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } finally {
          spillLock.unlock();
          spillThreadRunning = false;
        }
      }
    }

SpillThread线程的run方法中调用sortAndSpill把缓存中的输出写到格式为+ '/spill' + spillNumber + '.out'的spill文件中。索引(kvindices)保持在spill{spill号}.out.index中,数据保存在spill{spill号}.out中 创建SpillRecord记录,输出文件和IndexRecord记录,然后,需要在kvoffsets上做排序,排完序后顺序访问kvoffsets,也就是按partition顺序访问记录。按partition循环处理排完序的数组,如果没有combiner,则直接输出记录,否则,调用combineAndSpill,先做combin然后输出。循环的最后记录IndexRecord到SpillRecord。
 private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {
      //approximate the length of the output file to be the length of the
      //buffer + header lengths for the partitions
      long size = (bufend >= bufstart
          ? bufend - bufstart
          : (bufvoid - bufend) + bufstart) +
                  partitions * APPROX_HEADER_LENGTH;
      FSDataOutputStream out = null;
      try {
        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename =
            mapOutputFile.getSpillFileForWrite(numSpills, size);
        //创建溢出文件 格式为+ '/spill' + spillNumber + '.out
        out = rfs.create(filename);

        final int endPosition = (kvend > kvstart)
          ? kvend
          : kvoffsets.length + kvend;
        sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
        int spindex = kvstart;
        IndexRecord rec = new IndexRecord();
        InMemValBytes value = new InMemValBytes();
        for (int i = 0; i < partitions; ++i) {
          IFile.Writer<K, V> writer = null;
          try {
            long segmentStart = out.getPos();
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
                                      spilledRecordsCounter);
            if (combinerRunner == null) {//如果为空则直接write
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                final int kvoff = kvoffsets[spindex % kvoffsets.length];
                getVBytesForOffset(kvoff, value);
                key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
                          (kvindices[kvoff + VALSTART] - 
                           kvindices[kvoff + KEYSTART]));
                writer.append(key, value);
                ++spindex;
              }
            } else {如果不为空则先combiner在排序再输出
              int spstart = spindex;
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                ++spindex;
              }
              // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }
            }

            // close the writer
            writer.close();

            // record offsets
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            spillRec.putIndex(rec, i);

            writer = null;
          } finally {
            if (null != writer) writer.close();
          }
        }

        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
          // create spill index file
          Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);
        } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }
        LOG.info("Finished spill " + numSpills);
        ++numSpills;
      } finally {
        if (out != null) out.close();
      }
    }

注:在此系统存放文件的方式使用的是二级索引,在此没有做研究。

5:在此步骤将磁盘中的多个map输出文件具有相同key 的进行合并
核心代码
  @SuppressWarnings("unchecked")
          RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                         keyClass, valClass, codec,
                         segmentList, job.getInt("io.sort.factor", 100),
                         new Path(mapId.toString()),
                         job.getOutputKeyComparator(), reporter,
                         null, spilledRecordsCounter);

          //write merged output to disk
          long segmentStart = finalOut.getPos();
          Writer<K, V> writer =
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
                               spilledRecordsCounter);
          if (combinerRunner == null || numSpills < minSpillsForCombine) {
            Merger.writeFile(kvIter, writer, reporter, job);
          } else {
            combineCollector.setWriter(writer);
            combinerRunner.combine(kvIter, combineCollector);
          }

          //close
          writer.close();


6:在这一步中reduce任务通过http方式将map的输出结果复制到reduce执行的节点上来,在此开始关注 RecudeTask 类中方法
 URL url = mapOutputLoc.getOutputLocation();
 HttpURLConnection connection = (HttpURLConnection)url.openConnection();
  //通过http方式拉取map输出数据
 InputStream input = setupSecureConnection(mapOutputLoc, connection);


下面的核心代码是对reduce输入数据进行混淆,涉及到的操作类似map段 进行合并和排序
    MapOutput mapOutput = null;
        if (shuffleInMemory) {//判断混淆能在缓存中进行(此种方式效率比较高)
          if (LOG.isDebugEnabled()) {
            LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                compressedLength + " raw bytes) " + 
                "into RAM from " + mapOutputLoc.getTaskAttemptId());
          }

          mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
                                      (int)decompressedLength,
                                      (int)compressedLength);
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Shuffling " + decompressedLength + " bytes (" + 
                compressedLength + " raw bytes) " + 
                "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
          }
          
          mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
              compressedLength);
        }
        mapOutput.decompressedSize = decompressedLength;    
        return mapOutput;


7:这一步进行merge 合并数据  不做过多的关注了

8和9是reduce真正运行的逻辑过程并将最终结果输出


上述步骤中涉及到的混淆  shuffle过程为3、4、5、6、7,优化方面有很多方式,其中最主要的优化面就是reduce通过http获取map输出的过程。
  • 大小: 53 KB
分享到:
评论

相关推荐

    MapReduce详解Shuffle过程

    MapReduce详解Shuffle过程 MapReduce是Hadoop生态系统中的一种分布式计算框架,而Shuffle过程是MapReduce的核心部分。Shuffle过程是将map task的输出结果传送到reduce task的过程,顾名思义,Shuffle就是洗牌或弄乱...

    Hadoop Shuffle过程全解析

    Hadoop Mapreduce过程shuffle过程全解析,Shuffle过程

    详解shuffle过程

    Hadoop Shuffle 过程详解 Hadoop 的 Shuffle 过程是 MapReduce 的核心,也被称为奇迹发生的地方。要想理解 MapReduce,Shuffle 是必须要了解的。Shuffle 的正常意思是洗牌或弄乱,可能大家更熟悉的是 Java API里的 ...

    自己实现MapReduce-Shuffle过程.zip

    在Java中实现MapReduce的Shuffle过程,首先需要理解以下几个核心概念: 1. **Map阶段**:Map阶段是数据处理的起始点,输入数据被分割成多个小块(split),每个split由一个Mapper任务处理。Mapper接收键值对(key-...

    简单说一下hadoop和spark的shuffle过程.md

    简单说一下hadoop和spark的shuffle过程

    MapReduce Shuffle 过程图解 Xmind文件

    MapReduce Shuffle 过程图解 Xmind文件

    Spark的shuffle调优

    spark.shuffle.blockTransferService netty shuffle过程中,传输数据的方式,两种选项,netty或nio,spark 1.2开始,默认就是netty,比较简单而且性能较高,spark 1.5开始nio就是过期的了,而且spark 1.6中会去除掉 ...

    让iPod shuffle拷歌真的舒服

    8k程序让iPod shuffle拷歌真的舒服 (下载)iPod shuffle令人诟病的一点就是必须通过苹果的iTunes软件来导入音乐文件,如果想自己控制导入shuffle的歌曲的话操作比较繁琐。令人高兴的是,国外高手Martin Fiedler开发的...

    Spark-shuffle机制.pdf

    **Spark Shuffle**是指在Spark应用程序执行过程中,数据从一个节点或分区移动到另一个节点或分区的过程。这种数据重分布通常发生在诸如`groupByKey`, `reduceByKey`, `join`等操作中。Shuffle操作涉及到大量的磁盘I/...

    matlab开发-Shuffle

    在MATLAB的开发过程中,优化性能是非常重要的,尤其是在处理大数据集时。`Shuffle`的C-MEX实现展示了如何利用低级编程语言的优势来加速MATLAB代码。对于那些对性能有严格要求的算法,使用C-MEX接口可以显著提高效率...

    shuffle中文说明书.pdf

    这一章节强调了使用iPod shuffle时需要特别注意的安全信息,以及在操作过程中需要注意的要点。这是保证用户安全使用设备的重要部分。 ### 第8章:了解更多信息、服务及支持 在最后一章,用户可以了解到更多关于...

    pixel_shuffle.pdf

    Pixel Shuffle 技术的实现过程如下: 1. 特征提取:使用 CNN 从低分辨率图像或视频中提取特征。 2. 特征 upsampling:使用 Pixel Shuffle 层将低分辨率特征 upsampling 到高分辨率空间。 3. 图像或视频生成:使用 ...

    Spark的Shuffle总结分析

    在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。所以shuffle性能的高低也直接决定了整个程序的性能高低。而Spark也会有自己的shuffle实现过程。 1.2 Spark中的 shuffle 介绍 在DAG调度的过程中,Stage 阶段的

    ipod shuffle顺序调整工具

    标题中的“ipod shuffle顺序调整工具”指的是一个专门设计用于改变第一代iPod Shuffle中歌曲播放顺序的应用程序。这个工具的出现是因为第一代iPod Shuffle没有屏幕,用户不能直接在设备上排列歌曲顺序,通常需要依赖...

    Shuffle Transformer重新思考视觉转换器的空间洗牌_Shuffle Transformer Rethinking

    《Shuffle Transformer:对视觉变换器空间洗牌的再思考》 近年来,基于窗口的变换器在非重叠局部窗口内计算自注意力,已经在图像分类、语义分割和目标检测等任务上取得了令人鼓舞的结果。然而,对于改进表示能力的...

    SparkShuffle过程分析:Reduce阶段处理流程

    Spark在Map阶段调度运行的ShuffleMapTask,最后会生成.data和.index文件,可以通过我的这篇文章SparkShuffle过程分析:Map阶段处理流程了解具体流程和详情。同时,在Executor上运行一个ShuffleMapTask,返回了一个...

    iPod_shuffle用户手册

    iPod shuffle是苹果公司推出的便携式数字音乐播放器,属于iPod系列,具有小巧轻便的特点,是苹果最小的mp3播放器。用户手册为用户提供了详细的使用说明,方便用户了解和掌握iPod shuffle的基本功能和操作方法。 在...

    筛选插件Shuffle.js和响应式网格分类、排序

    4. **动画效果** - 内置的动画使得元素的显示和隐藏过程平滑自然,提升了用户体验。 5. **API和事件** - Shuffle.js提供了丰富的API接口和事件,允许开发者自定义行为,如添加新的元素、改变排序规则等。 6. **...

Global site tag (gtag.js) - Google Analytics