`
heipark
  • 浏览: 2101628 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop Combiner的几个调用时间点

 
阅读更多


  Combiner是在Map端被执行,共有两个时机会被触发:

         ① 从环形缓冲器溢写分区文件的时候

         ② 合并溢写分区文件的时候

 

1. 初始化combinerRunner和combineCollector

MapTask.run()

  ┟ runNewMapper(job, split, umbilical, reporter);

     ┟ output = new NewOutputCollector(taskContext, job, umbilical, reporter);

if(job.getNumReduceTasks() == 0) {
  output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
  // 如果有reduce task,才会有Combiner task
  output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}

       ┟ collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);

           ┟ 初始化combinerRunner和combineCollector

combinerRunner = CombinerRunner.create(job, getTaskID(), 
				     combineInputCounter,
				     reporter, null);
if (combinerRunner != null) {
  combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
} else {
  combineCollector = null;
}

2. Combiner调用点1:磁盘溢写(spill)

磁盘溢写会触发Combiner,有两个地方会触发溢写操作:

  1. 输出Key-value到缓冲器
  2. 关闭map函数输出流,执行flush方法时

2.1 输出Key-Value到缓冲器

MapTask.run()

  ┟ runNewMapper(job, split, umbilical, reporter);

    ┟ mapper.run(mapperContext);

      ┟ map(context.getCurrentKey(), context.getCurrentValue(), context); // map函数

        ┟ context.write((KEYOUT) key, (VALUEOUT) value); // map函数输出值
          ┟ NewOutputCollector.write()

            ┟ MapOutputBuffer.collect()

              ┟ startSpill();

if (kvstart == kvend && kvsoftlimit) {
  LOG.info("Spilling map output: record full = " + kvsoftlimit);
  startSpill(); //  缓冲器达到spill条件,溢写到磁盘
}

               ┟ spillReady.signal(); // 唤起spill线程

 

SpillThread.run()

  ┟ sortAndSpill();

public void run() {
try {
...
  while (kvstart == kvend) {
    spillReady.await(); // 等待被唤醒
  }
...
sortAndSpill(); 
...

     ┟ combinerRunner.combine(kvIter, combineCollector); // 运行combiner

 

int spstart = spindex; // spstart为kvoffet数组start, spindex为kvoffset数组end
while (spindex < endPosition &&
  kvindices[kvoffsets[spindex % kvoffsets.length]
	    + PARTITION] == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
// 如果start == end,说明该分区只有一条记录,则不进行combiner操作;否则执行combiner
if (spstart != spindex) {
  combineCollector.setWriter(writer);
  RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex);
  combinerRunner.combine(kvIter, combineCollector);
}

 

2.2 map输出流flush方法

MapTask.run()

  ┟ runNewMapper(job, split, umbilical, reporter);

    ┟ output.close(mapperContext); // 关闭map输出流

      ┟ NewOutputCollector.close();

        ┟ collector.flush();

          ┟ MapOutputBuffer.flush()

            ┟ sortAndSpill(); 运行combiner,同上

 

3. Combiner调用点2:map端分区文件合并

MapTask.run()

  ┟ runNewMapper(job, split, umbilical, reporter);

    ┟ output.close(mapperContext); // 关闭map输出流

      ┟ NewOutputCollector.close();

        ┟ collector.flush();

          ┟ MapOutputBuffer.flush()

              ┟ mergeParts();

// minSpillsForCombine 在MapOutputBuffer构造函数内被初始化,
// numSpills 为mapTask已经溢写到磁盘spill文件数量
if (combinerRunner == null || numSpills < minSpillsForCombine) {
  Merger.writeFile(kvIter, writer, reporter, job);
} else {
  combineCollector.setWriter(writer);
  combinerRunner.combine(kvIter, combineCollector); <- 执行combiner
}

 

 --end

 

 

  • 大小: 193.3 KB
分享到:
评论

相关推荐

    hadoop Big matrix muiltiple on java source.zip

    此外,为了优化性能,还需要考虑以下几个关键点: 1. 数据划分策略:矩阵如何被分割成合适的块,直接影响计算效率。通常需要根据集群的节点数量和内存资源来决定。 2. Combiner的使用:在Reducer之前,Combiner...

    《Hadoop开发者》第四期

    Hadoop MapReduce框架提供了几种不同的Join实现方法。 **实现方法**: - **Map端Join**:适用于小表Join大表的场景,通过将小表数据加载到内存中,然后在Map阶段完成Join操作。 - **Reduce端Join**:适用于两个大表...

    (精品word)大数据实训方案.doc

    本方案主要涵盖了以下几个核心知识点: 1. **大数据概念与重要性**:首先,实训的目标是让学员理解大数据的概念,认识到大数据在现代企业中的关键作用,它能帮助企业从海量数据中挖掘价值,驱动业务决策。 2. **...

    HadoopMapReduceArch.pdf

    MapReduce 框架主要由以下几个组件构成: 1. **Mapper**:处理输入数据流中的键值对 (key1, value1),并生成新的键值对 (key2, value2)。 2. **Reducer**:接收键2和与之关联的一系列 value2 值,生成键值对 (key3,...

    MapReduceCSharp

    为了实现MapReduce,你需要了解以下几个关键概念: 1. **JobTracker**:在Hadoop中,JobTracker是调度和管理MapReduce作业的中心节点,负责分配任务,监控任务进度,以及处理故障恢复。 2. **TaskTracker**:每个...

    2021Java大厂面试题——大厂真题之唯品会-Java大数据开发工程师.pdf

    ### Java大数据开发工程师面试知识点详解 #### 一、Kafka Message结构详解 Kafka消息(Message)是Kafka中非常核心的概念之一,它承载着所有传递的信息内容。一个Kafka消息由固定长度的Header和变长的消息体Body两...

    java代码-大数据 03 实训4-1

    3. `public static void main(String[] args)`:这个方法是程序的起点,用于调用`run`方法,并传递命令行参数。 "README.txt"文件通常是项目说明文档,可能包含了如何构建、运行和测试项目的指南。内容可能包括以下...

Global site tag (gtag.js) - Google Analytics