`
heipark
  • 浏览: 2094601 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop Combiner的几个调用时间点

 
阅读更多


  Combiner是在Map端被执行,共有两个时机会被触发:

         ① 从环形缓冲器溢写分区文件的时候

         ② 合并溢写分区文件的时候

 

1. 初始化combinerRunner和combineCollector

MapTask.run()

  ┟ runNewMapper(job, split, umbilical, reporter);

     ┟ output = new NewOutputCollector(taskContext, job, umbilical, reporter);

if(job.getNumReduceTasks() == 0) {
  output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
  // 如果有reduce task,才会有Combiner task
  output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}

       ┟ collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);

           ┟ 初始化combinerRunner和combineCollector

combinerRunner = CombinerRunner.create(job, getTaskID(), 
				     combineInputCounter,
				     reporter, null);
if (combinerRunner != null) {
  combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
} else {
  combineCollector = null;
}

2. Combiner调用点1:磁盘溢写(spill)

磁盘溢写会触发Combiner,有两个地方会触发溢写操作:

  1. 输出Key-value到缓冲器
  2. 关闭map函数输出流,执行flush方法时

2.1 输出Key-Value到缓冲器

MapTask.run()

  ┟ runNewMapper(job, split, umbilical, reporter);

    ┟ mapper.run(mapperContext);

      ┟ map(context.getCurrentKey(), context.getCurrentValue(), context); // map函数

        ┟ context.write((KEYOUT) key, (VALUEOUT) value); // map函数输出值
          ┟ NewOutputCollector.write()

            ┟ MapOutputBuffer.collect()

              ┟ startSpill();

if (kvstart == kvend && kvsoftlimit) {
  LOG.info("Spilling map output: record full = " + kvsoftlimit);
  startSpill(); //  缓冲器达到spill条件,溢写到磁盘
}

               ┟ spillReady.signal(); // 唤起spill线程

 

SpillThread.run()

  ┟ sortAndSpill();

public void run() {
try {
...
  while (kvstart == kvend) {
    spillReady.await(); // 等待被唤醒
  }
...
sortAndSpill(); 
...

     ┟ combinerRunner.combine(kvIter, combineCollector); // 运行combiner

 

int spstart = spindex; // spstart为kvoffet数组start, spindex为kvoffset数组end
while (spindex < endPosition &&
  kvindices[kvoffsets[spindex % kvoffsets.length]
	    + PARTITION] == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
// 如果start == end,说明该分区只有一条记录,则不进行combiner操作;否则执行combiner
if (spstart != spindex) {
  combineCollector.setWriter(writer);
  RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex);
  combinerRunner.combine(kvIter, combineCollector);
}

 

2.2 map输出流flush方法

MapTask.run()

  ┟ runNewMapper(job, split, umbilical, reporter);

    ┟ output.close(mapperContext); // 关闭map输出流

      ┟ NewOutputCollector.close();

        ┟ collector.flush();

          ┟ MapOutputBuffer.flush()

            ┟ sortAndSpill(); 运行combiner,同上

 

3. Combiner调用点2:map端分区文件合并

MapTask.run()

  ┟ runNewMapper(job, split, umbilical, reporter);

    ┟ output.close(mapperContext); // 关闭map输出流

      ┟ NewOutputCollector.close();

        ┟ collector.flush();

          ┟ MapOutputBuffer.flush()

              ┟ mergeParts();

// minSpillsForCombine 在MapOutputBuffer构造函数内被初始化,
// numSpills 为mapTask已经溢写到磁盘spill文件数量
if (combinerRunner == null || numSpills < minSpillsForCombine) {
  Merger.writeFile(kvIter, writer, reporter, job);
} else {
  combineCollector.setWriter(writer);
  combinerRunner.combine(kvIter, combineCollector); <- 执行combiner
}

 

 --end

 

 

  • 大小: 193.3 KB
分享到:
评论

相关推荐

    Hadoop Combiner使用方法详解

    Hadoop Combiner是MapReduce编程模型中的一个重要组件,它可以减少发送到Reducer的数据量,从而提高网络效率和Reduce端的效率。下面是Hadoop Combiner的使用方法详解: Combiner的优点 1. 减少发送到Reducer的数据...

    hadoop几个实例

    以下是关于Hadoop的一些核心知识点,以及可能在这些实例中涉及的内容。 1. **Hadoop架构**:Hadoop主要由两个核心组件构成,HDFS(Hadoop Distributed File System)和MapReduce。HDFS负责数据的分布式存储,而...

    远程调用执行Hadoop Map/Reduce

    远程调用执行Hadoop Map/Reduce主要涉及以下几个方面: 1. **Hadoop客户端**:开发人员通常使用Hadoop的Java API来编写MapReduce程序。客户端包含了提交作业到集群、监控作业状态和获取结果的功能。例如,`Job`类...

    web 工程调用hadoop集群

    web工程调用hadoop集群的实例,包括一个wordcount例子。 输入输入和输出路径点击提交即可提交任务到hadoop集群,同时含有map和reduce过程的监控。 注意点:要把hadoop相关包放入WEB_INF/lib下面;

    java web程序调用hadoop2.6

    Java Web程序调用Hadoop 2.6是一个关键的技术整合,它允许Web应用程序与Hadoop分布式文件系统(HDFS)和MapReduce框架交互,以处理大规模数据。在本示例中,我们将深入探讨如何实现这一集成,以及涉及的关键概念和...

    hadoop2.7.3 Winutils.exe hadoop.dll

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop 2.7.3是这个框架的一个稳定版本,它包含了多个改进和优化,以提高性能和稳定性。在这个版本中,Winutils.exe和hadoop.dll是两...

    HadoopAPI使用

    Hadoop API 主要由多个包组成,每个包都提供了特定的功能。 org.apache.hadoop.conf 包定义了系统参数的配置文件处理 API,用于管理 Hadoop 集群的配置文件。org.apache.hadoop.fs 包定义了抽象的文件系统 API,...

    基于Hadoop的Java调用Matlab混合编程的车牌识别.pdf

    基于Hadoop的Java调用Matlab混合编程的车牌识别技术是现代交通管理领域的一个重要应用,它结合了Hadoop的大数据处理能力以及Matlab强大的数值计算和图像处理能力,为车牌识别的准确性和实时性提供了新的解决方案。...

    hadoop winutils hadoop.dll

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在普通硬件上高效处理大量数据。在Windows环境下,Hadoop的使用与Linux有所不同,因为它的设计最初是针对Linux操作系统的。"winutils"和"hadoop.dll...

    hadoop.dll & winutils.exe For hadoop-2.7.1

    本文将详细探讨与"Hadoop.dll"和"winutils.exe"相关的知识点,以及它们在Hadoop-2.7.1版本中的作用。 Hadoop.dll是Hadoop在Windows操作系统上的一个关键组件,它是Apache Hadoop对Windows平台的适配部分。由于...

    hadoop.dll等Windows调用合集

    Windows安装调用hadoop所需的dll,合集包含版本有2.6;2.6.6;2.6.4;2.7.1;2.8.0-RC3;2.8.1 将对应的版本文件夹内的所有文件都复制到hadoop安装路径下的bin目录下就下

    hadoop.dll & winutils.exe For hadoop-2.8.0

    1. **Hadoop.dll**:这是一个动态链接库(DLL)文件,主要用于Windows平台下的Hadoop本地系统调用。DLL文件是程序的一部分,其中包含了可供多个程序同时使用的代码和数据。在Hadoop中,它可能包含了与Hadoop通信、...

    hadoop2.7.x_winutils_exe&&hadoop_dll

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。标题"hadop2.7.x_winutils_exe&&hadoop_dll"暗示我们关注的是Hadoop 2.7.x版本在Windows环境下的两个关键组件:`winutils.exe`和`...

    hadoop2.7.3的hadoop.dll和winutils.exe

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop 2.7.3是Hadoop发展中的一个重要版本,它包含了众多的优化和改进,旨在提高性能、稳定性和易用性。在这个版本中,`hadoop.dll`...

    hadoop的hadoop.dll和winutils.exe下载

    在下载这两个文件时,有几点需要注意: 1. 确保你从可信赖的源获取文件,因为这些文件可能会包含恶意软件,对你的系统造成风险。 2. 下载的版本应与你的Hadoop版本兼容,不兼容的版本可能导致各种问题,如运行错误、...

    Hadoop下载 hadoop-3.3.3.tar.gz

    Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进 Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不...

Global site tag (gtag.js) - Google Analytics