Combiner是在Map端被执行,共有两个时机会被触发:
① 从环形缓冲器溢写分区文件的时候
② 合并溢写分区文件的时候
1. 初始化combinerRunner和combineCollector
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output = new NewOutputCollector(taskContext, job, umbilical, reporter);
if(job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { // 如果有reduce task,才会有Combiner task output = new NewOutputCollector(taskContext, job, umbilical, reporter); }
┟ collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
┟ 初始化combinerRunner和combineCollector
combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null); if (combinerRunner != null) { combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter); } else { combineCollector = null; }
2. Combiner调用点1:磁盘溢写(spill)
磁盘溢写会触发Combiner,有两个地方会触发溢写操作:
- 输出Key-value到缓冲器
- 关闭map函数输出流,执行flush方法时
2.1 输出Key-Value到缓冲器
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ mapper.run(mapperContext);
┟ map(context.getCurrentKey(), context.getCurrentValue(), context); // map函数
┟ context.write((KEYOUT) key, (VALUEOUT) value); // map函数输出值
┟ NewOutputCollector.write()
┟ MapOutputBuffer.collect()
┟ startSpill();
if (kvstart == kvend && kvsoftlimit) { LOG.info("Spilling map output: record full = " + kvsoftlimit); startSpill(); // 缓冲器达到spill条件,溢写到磁盘 }
┟ spillReady.signal(); // 唤起spill线程
SpillThread.run()
┟ sortAndSpill();
public void run() { try { ... while (kvstart == kvend) { spillReady.await(); // 等待被唤醒 } ... sortAndSpill(); ...
┟ combinerRunner.combine(kvIter, combineCollector); // 运行combiner
int spstart = spindex; // spstart为kvoffet数组start, spindex为kvoffset数组end while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition // 如果start == end,说明该分区只有一条记录,则不进行combiner操作;否则执行combiner if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); }
2.2 map输出流flush方法
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output.close(mapperContext); // 关闭map输出流
┟ NewOutputCollector.close();
┟ collector.flush();
┟ MapOutputBuffer.flush()
┟ sortAndSpill(); 运行combiner,同上
3. Combiner调用点2:map端分区文件合并
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output.close(mapperContext); // 关闭map输出流
┟ NewOutputCollector.close();
┟ collector.flush();
┟ MapOutputBuffer.flush()
┟ mergeParts();
// minSpillsForCombine 在MapOutputBuffer构造函数内被初始化, // numSpills 为mapTask已经溢写到磁盘spill文件数量 if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); <- 执行combiner }
--end
相关推荐
此外,为了优化性能,还需要考虑以下几个关键点: 1. 数据划分策略:矩阵如何被分割成合适的块,直接影响计算效率。通常需要根据集群的节点数量和内存资源来决定。 2. Combiner的使用:在Reducer之前,Combiner...
Hadoop MapReduce框架提供了几种不同的Join实现方法。 **实现方法**: - **Map端Join**:适用于小表Join大表的场景,通过将小表数据加载到内存中,然后在Map阶段完成Join操作。 - **Reduce端Join**:适用于两个大表...
本方案主要涵盖了以下几个核心知识点: 1. **大数据概念与重要性**:首先,实训的目标是让学员理解大数据的概念,认识到大数据在现代企业中的关键作用,它能帮助企业从海量数据中挖掘价值,驱动业务决策。 2. **...
MapReduce 框架主要由以下几个组件构成: 1. **Mapper**:处理输入数据流中的键值对 (key1, value1),并生成新的键值对 (key2, value2)。 2. **Reducer**:接收键2和与之关联的一系列 value2 值,生成键值对 (key3,...
为了实现MapReduce,你需要了解以下几个关键概念: 1. **JobTracker**:在Hadoop中,JobTracker是调度和管理MapReduce作业的中心节点,负责分配任务,监控任务进度,以及处理故障恢复。 2. **TaskTracker**:每个...
### Java大数据开发工程师面试知识点详解 #### 一、Kafka Message结构详解 Kafka消息(Message)是Kafka中非常核心的概念之一,它承载着所有传递的信息内容。一个Kafka消息由固定长度的Header和变长的消息体Body两...
3. `public static void main(String[] args)`:这个方法是程序的起点,用于调用`run`方法,并传递命令行参数。 "README.txt"文件通常是项目说明文档,可能包含了如何构建、运行和测试项目的指南。内容可能包括以下...