Combiner是在Map端被执行,共有两个时机会被触发:
① 从环形缓冲器溢写分区文件的时候
② 合并溢写分区文件的时候
1. 初始化combinerRunner和combineCollector
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output = new NewOutputCollector(taskContext, job, umbilical, reporter);
if(job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { // 如果有reduce task,才会有Combiner task output = new NewOutputCollector(taskContext, job, umbilical, reporter); }
┟ collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
┟ 初始化combinerRunner和combineCollector
combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null); if (combinerRunner != null) { combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter); } else { combineCollector = null; }
2. Combiner调用点1:磁盘溢写(spill)
磁盘溢写会触发Combiner,有两个地方会触发溢写操作:
- 输出Key-value到缓冲器
- 关闭map函数输出流,执行flush方法时
2.1 输出Key-Value到缓冲器
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ mapper.run(mapperContext);
┟ map(context.getCurrentKey(), context.getCurrentValue(), context); // map函数
┟ context.write((KEYOUT) key, (VALUEOUT) value); // map函数输出值
┟ NewOutputCollector.write()
┟ MapOutputBuffer.collect()
┟ startSpill();
if (kvstart == kvend && kvsoftlimit) { LOG.info("Spilling map output: record full = " + kvsoftlimit); startSpill(); // 缓冲器达到spill条件,溢写到磁盘 }
┟ spillReady.signal(); // 唤起spill线程
SpillThread.run()
┟ sortAndSpill();
public void run() { try { ... while (kvstart == kvend) { spillReady.await(); // 等待被唤醒 } ... sortAndSpill(); ...
┟ combinerRunner.combine(kvIter, combineCollector); // 运行combiner
int spstart = spindex; // spstart为kvoffet数组start, spindex为kvoffset数组end while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition // 如果start == end,说明该分区只有一条记录,则不进行combiner操作;否则执行combiner if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); }
2.2 map输出流flush方法
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output.close(mapperContext); // 关闭map输出流
┟ NewOutputCollector.close();
┟ collector.flush();
┟ MapOutputBuffer.flush()
┟ sortAndSpill(); 运行combiner,同上
3. Combiner调用点2:map端分区文件合并
MapTask.run()
┟ runNewMapper(job, split, umbilical, reporter);
┟ output.close(mapperContext); // 关闭map输出流
┟ NewOutputCollector.close();
┟ collector.flush();
┟ MapOutputBuffer.flush()
┟ mergeParts();
// minSpillsForCombine 在MapOutputBuffer构造函数内被初始化, // numSpills 为mapTask已经溢写到磁盘spill文件数量 if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); <- 执行combiner }
--end
相关推荐
Hadoop Combiner是MapReduce编程模型中的一个重要组件,它可以减少发送到Reducer的数据量,从而提高网络效率和Reduce端的效率。下面是Hadoop Combiner的使用方法详解: Combiner的优点 1. 减少发送到Reducer的数据...
以下是关于Hadoop的一些核心知识点,以及可能在这些实例中涉及的内容。 1. **Hadoop架构**:Hadoop主要由两个核心组件构成,HDFS(Hadoop Distributed File System)和MapReduce。HDFS负责数据的分布式存储,而...
远程调用执行Hadoop Map/Reduce主要涉及以下几个方面: 1. **Hadoop客户端**:开发人员通常使用Hadoop的Java API来编写MapReduce程序。客户端包含了提交作业到集群、监控作业状态和获取结果的功能。例如,`Job`类...
web工程调用hadoop集群的实例,包括一个wordcount例子。 输入输入和输出路径点击提交即可提交任务到hadoop集群,同时含有map和reduce过程的监控。 注意点:要把hadoop相关包放入WEB_INF/lib下面;
Java Web程序调用Hadoop 2.6是一个关键的技术整合,它允许Web应用程序与Hadoop分布式文件系统(HDFS)和MapReduce框架交互,以处理大规模数据。在本示例中,我们将深入探讨如何实现这一集成,以及涉及的关键概念和...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop 2.7.3是这个框架的一个稳定版本,它包含了多个改进和优化,以提高性能和稳定性。在这个版本中,Winutils.exe和hadoop.dll是两...
Hadoop API 主要由多个包组成,每个包都提供了特定的功能。 org.apache.hadoop.conf 包定义了系统参数的配置文件处理 API,用于管理 Hadoop 集群的配置文件。org.apache.hadoop.fs 包定义了抽象的文件系统 API,...
基于Hadoop的Java调用Matlab混合编程的车牌识别技术是现代交通管理领域的一个重要应用,它结合了Hadoop的大数据处理能力以及Matlab强大的数值计算和图像处理能力,为车牌识别的准确性和实时性提供了新的解决方案。...
Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在普通硬件上高效处理大量数据。在Windows环境下,Hadoop的使用与Linux有所不同,因为它的设计最初是针对Linux操作系统的。"winutils"和"hadoop.dll...
本文将详细探讨与"Hadoop.dll"和"winutils.exe"相关的知识点,以及它们在Hadoop-2.7.1版本中的作用。 Hadoop.dll是Hadoop在Windows操作系统上的一个关键组件,它是Apache Hadoop对Windows平台的适配部分。由于...
Windows安装调用hadoop所需的dll,合集包含版本有2.6;2.6.6;2.6.4;2.7.1;2.8.0-RC3;2.8.1 将对应的版本文件夹内的所有文件都复制到hadoop安装路径下的bin目录下就下
1. **Hadoop.dll**:这是一个动态链接库(DLL)文件,主要用于Windows平台下的Hadoop本地系统调用。DLL文件是程序的一部分,其中包含了可供多个程序同时使用的代码和数据。在Hadoop中,它可能包含了与Hadoop通信、...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。标题"hadop2.7.x_winutils_exe&&hadoop_dll"暗示我们关注的是Hadoop 2.7.x版本在Windows环境下的两个关键组件:`winutils.exe`和`...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。Hadoop 2.7.3是Hadoop发展中的一个重要版本,它包含了众多的优化和改进,旨在提高性能、稳定性和易用性。在这个版本中,`hadoop.dll`...
在下载这两个文件时,有几点需要注意: 1. 确保你从可信赖的源获取文件,因为这些文件可能会包含恶意软件,对你的系统造成风险。 2. 下载的版本应与你的Hadoop版本兼容,不兼容的版本可能导致各种问题,如运行错误、...
Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进 Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不...