OutputFormat主要用于描述数据输出的格式,它能够将用户提供的key/value对写入特定格式的文件中。
与InputFormat相似,OutputFormat也是一个接口,旧版API有两个方法:
RecordWriter<K,V> getRecordWriter(FileSystem ignored,JobConf job,String name,Progressable progress) throws IOException;
void checkOutputSpecs(FileSystem ignored,JobConf job) throws IOException;
新版API增加:OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException;
其中,getRecordWriter方法返回一个RecordWriter类的对象,它有如下两个方法:
void write(K key, V value);
void close(TaskAttemptContext context);
前者负责将Reduce输出的Key/Value写成特定的格式,后者负责对输出做最后的确认并关闭输出。
再其中,checkOutputSpecs是在JobClient提交Job之前被调用的,即在InputFomat进行输入数据划分之前,用于检测Job的输出路径,如果存在则抛出异常,防止之前的数据被覆盖,若不存在,然后该方法又会重新创建这个Output路径,这样一来,就能确保Job结束后,Output路径下的东西就是且仅是该Job输出的。。关于InputFomat以及数据划分,可以参考http://924389979.iteye.com/blog/2059267。
hadoop自带很多OutputFormat实现,具体如下:
最后,OutputCommitter用于控制Job的输出环境,它有下面几个方法:
void setupJob(JobContext jobContext);
Job开始被执行之前,框架会调用OutputCommitter.setupJob()为Job创建一个临时输出路径 ${mapred.out.dir}/_tempory;
void commitJob(JobContext jobContext);
如果Job成功完成,框架会调用OutputCommitter.commitJob()提交Job的输出,删除临时路径并在${mapred.out.dir}下创建空文件_SUCCESS;
void abortJob(JobContext jobContext, JobStatus.State state);
如果Job失败,框架会调用OutputCommitter.abortJob()撤销Job的输出,删除临时路径;
void setupTask(TaskAttemptContext taskContext);
任务初始化,需要时创建 side-effect file;
boolean needsTaskCommit(TaskAttemptContext taskContext);
判断是否需要提交,若存在 side-effect file,返回true;
void commitTask(TaskAttemptContext taskContext);
提交结果,将 side-effect file 移动到 ${mapred.out.dir} 目录下;
void abortTask(TaskAttemptContext taskContext);
任务运行失败,则删除 side-effect file ;
对应于Job下的每一个Task,同样牵涉创建、提交和撤销三个动作,分别由OutputCommitter.setupTask()、OutputCommitter.commitTask()、OutputCommitter.abortTask()来完成。而一个Task可能没有输出,从而也就不需要提交,这个可以通过OutputCommitter.needsTaskCommit()来判断;
关于 side-effect file 的处理:
side-effect file 并不是任务的最终输出文件,该文件用于执行推测式任务。为防止一个结点上的任务执行速度过慢而拖后腿,hadoop会在另一个结点上启动一个相同的任务,即推测式任务。又为防止这两个任务同时向一个输出文件中写入数据时发生冲突,FileOutputFormat会为每个Task的数据创建一个 side-effect file,并将临时数据写入该文件,待Task完成后,移动到最终输出目录下。
细节部分:
首先,一个Job被提交到JobTracker后会生成若干的Map和Reduce任务,这些任务会被分派到TaskTracker上。对于每一个Task,TaskTracker会使用一个子JVM来执行它们。那么对于Task的setup/commit/abort这些操作,自然应该在执行Task的子JVM里面去完成。
重点说一下任务执行失败时的情况,首先OutputCommitter.abortTask()会被调用。这个调用很特殊,它不大可能在执行任务的子JVM里面完成。因为执行任务的子JVM里面跑的是用户提供的Map/Reduce代码,Hadoop框架是无法保证这些代码的稳定性的,所以任务的失败往往伴随着子JVM的异常退出(这也就是为什么要用子JVM来执行Map和Reduce任务的原因,否则异常退出的可能就是整个框架了)。于是,对于失败的任务,JobTracker除了要考虑它的重试之外,还要为其生成一个cleanup任务。这个cleanup任务像普通的Map和Reduce任务一样,会被分派到TaskTracker上去执行(不一定分派到之前执行该任务失败的那个TaskTracker上,因为输出是在HDFS上,是全局的)。而它的执行逻辑主要就是调用OutputCommitter.abortTask();
相关推荐
MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系结构3.1.2 新旧MapReduce API比较3.2 MapReduce API基本概念3.2.1 序列化3.2.2 Reporter参数3.2.3 回调机制3.3 Java API解析3.3.1 ...
1. **MapReduce编程模型**:MapReduce的核心是Map和Reduce两个函数。Map负责将输入数据拆分成键值对,进行局部处理;Reduce则将Map阶段的结果进行聚合,生成最终结果。中间结果通过 Shuffle 和 Sort 阶段进行排序和...
在这个"Day04"的学习资料中,我们将深入探讨MapReduce编程模型,并通过具体的案例来理解其工作原理。 MapReduce 分为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,原始数据被分割成多个块,每个块由一个Map任务...
6. **实战编程**:MapReduce编程涉及定义Mapper和Reducer类,以及配置输入输出格式。例如,自定义InputFormat处理非标准输入,OutputFormat定义结果存储方式。此外,还需关注作业提交、监控和调试技巧。 7. **...
2. **MapReduce编程模型**:介绍Map和Reduce函数的编写,以及Combiner和Partitioner的使用,它们分别用于局部聚合和分区优化。 3. **数据输入与输出**:探讨InputFormat和OutputFormat接口,理解如何自定义输入输出...
4. **hadoop-mapreduce-client-core**: 包含MapReduce编程模型的核心类和接口,如Mapper、Reducer、Partitioner、InputFormat和OutputFormat等。 5. **hadoop-mapreduce-client-app**: 提供了JobTracker的替代,即...
总的来说,"0324大数据代码与数据_JAVA大数据_文本分析_运用MapReduce做数据分析_"项目涵盖了大数据处理的关键技术,包括Java编程、MapReduce模型、文本分析以及可能的数据存储和可视化。通过学习和实践这个项目,...
MapReduce是一种编程模型,主要用于处理大规模数据集的并行运算。该模型由Google提出,并被广泛应用于分布式计算领域。Hadoop作为开源框架之一,实现了MapReduce的核心思想,为大数据处理提供了强大的支持。 在...
在MapReduce中,输入和输出数据的格式由InputFormat和OutputFormat接口定义。InputFormat用于定义输入数据的读取方式和格式,而OutputFormat用于定义输出数据的格式。在Hadoop中,一个常见的InputFormat实现是...
它提供了一种简单有效的编程模型,允许开发者编写并行处理大规模数据的应用程序。MapReduce的核心概念是将复杂的计算任务分解成较小的子任务,并在分布式环境中并行执行。 **1.1 编程模式** MapReduce的基本编程...
"simple-cdmh-mapreduce" 是一个与MapReduce编程模型相关的项目,可能是一个简化版的CDMH(可能是某个公司或组织的缩写)MapReduce实现。MapReduce是Google提出的一种分布式计算模型,常用于大数据处理,它将大规模...
Hadoop MapReduce 是一种分布式编程模型,旨在处理大规模数据集(TB级别甚至PB级别)的计算任务。它通过将计算任务分解为多个子任务并在多台机器上并行执行这些任务来提高效率。Hadoop MapReduce 由两个主要组件构成...
Writable 是 Hadoop 自定义的序列化接口,实现该类的接口可以用作 MapReduce 过程中的 value 数据使用。 MapReduce 是一种高效的计算模型,通过将数据交给不同的机器去处理,数据划分,结果归约,实现了并行计算,...
MapReduce编程过程是一个继承类与实现接口的过程,这些类与接口来自于Hadoop的Map-Reduce框架,由框架控制其执行流程。编程过程的三个阶段是输入阶段、计算阶段和输出阶段。 在输入阶段,InputFormat文件分割,读取...
MapReduce是一种编程模型,用于处理大规模数据集,它是Hadoop项目的核心组成部分。MapReduceV2是其更新的版本,其设计思想和架构主要是为了解决大数据环境下的存储和计算问题。MapReduceV2的工作机制可以分为map阶段...
本指南分为五个主要章节,涵盖了从Hadoop的基本概念到MapReduce编程模型,再到HDFS和I/O处理,最后深入到MapReduce的实际开发。 第一章“初始Hadoop”为读者提供了Hadoop的入门介绍。这一章将讲解Hadoop的起源、...
Hadoop包含多个相互关联的子项目,最核心的两个部分是Hadoop分布式文件系统(HDFS)和MapReduce编程模型。 Hadoop的核心概念包括集群管理和数据处理。集群是由多个计算机节点组成的硬件集合,这些计算机协同工作,...
MapReduce是一种编程模型,用于大规模数据集的并行处理。它将大数据任务分解为两个主要阶段:映射(Map)和化简(Reduce)。Eclipse与Hadoop的集成使得开发者可以在本地环境中编写、测试和调试MapReduce作业,然后再...
3. **编程模型**:深入讲解MapReduce的编程模型,包括自定义Mapper和Reducer类,以及它们的生命周期方法,如setup、map、cleanup和reduce。 4. **数据类型和I/O**:讨论Hadoop中的基本数据类型,如Writable接口,...
MapReduce是一种编程模型,用于大规模数据集的并行计算。它由两个主要阶段组成:Map阶段和Reduce阶段,中间通过Shuffle和Sort过程连接。MapReduce设计的目标是简化分布式计算,使得程序员可以专注于业务逻辑,而无需...