`

解析hadoop框架下的Map-Reduce job的输出格式的实现

阅读更多

      Hadoop 其实并非一个单纯用于存储的分布式文件系统,而是一个被设计用来在由普通硬件设备组成的大型集群上执行分布式应用的框架。 Hadoop 包含两个部分:一个分布式文件系统 HDFS (Hadoop Distributed File System),和一个Map-Reduce实现。

 

    研究hadoop,从nutch入手是比较好的选择,分布式文件系统就不说了,下面说说MapReduce产生Job中设置的输入输出,一般new一个Job会这样设置 输入输出路径:

FileInputFormat.addInputPath(job, in);

FileOutputFormat.setOutputPath(job, out);

 

从方法名称上,你可能会发现add、set的前缀,没错,输入可以添加多个路径,输出只能设置一个路径。

设置输入、输出格式:

job.setInputFormat(SequenceFileInputFormat.class);

job.setOutputFormat(MapFileOutputFormat.class);

 

输出格式

看过nutch的同志,会发现nutch的一个精彩实现,就是实现OutputFormat接口的FetcherOutputFormat类,我们来看看怎么个回事。

 

接口 :org.apache.hadoop.mapred.OutputFormat<K , V >

public interface OutputFormat<K, V> {

RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,String name, Progressable progress)
throws IOException;

void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;

}

 checkOutputSpecs :检查job的输出路径是否存在,如果存在则抛出异常(IOException)。我这里的版本是0.19.2,还没有override的功能,可能后面会支持。

 getRecordWriter     :把输出键值对 output <key, value> 写入到输出路径中。

 

mapred下面的实现有三个,如下图:

 

基类FileOutputFormat :org.apache.hadoop.mapred.FileOutputFormat

public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {

public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,JobConf job, String name,Progressable progress) 
throws IOException;

public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, 
InvalidJobConfException, IOException {

    // Ensure that the output directory is set and not already there
    Path outDir = getOutputPath(job);
    if (outDir == null && job.getNumReduceTasks() != 0) {
      throw new InvalidJobConfException("Output directory not set in JobConf.");
    }
    if (outDir != null) {
      FileSystem fs = outDir.getFileSystem(job);
      // normalize the output directory
      outDir = fs.makeQualified(outDir);
      setOutputPath(job, outDir);
      // check its existence
      if (fs.exists(outDir)) {
        throw new FileAlreadyExistsException("Output directory " + outDir +  " already exists");
      }
    }
  }

 这是个抽象类,实现了检查输入路径是否存在的方法,具体输出方式写成抽象方法预留给了子类。

 子类见下图:

 

子类MapFileOutputFormat :org.apache.hadoop.mapred.MapFileOutputFormat

public class MapFileOutputFormat 
extends FileOutputFormat<WritableComparable, Writable> {

  public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
                                      String name, Progressable progress)
    throws IOException {
    // get the path of the temporary output file 
    Path file = FileOutputFormat.getTaskOutputPath(job, name);
    
    FileSystem fs = file.getFileSystem(job);
    CompressionCodec codec = null;
    CompressionType compressionType = CompressionType.NONE;
    if (getCompressOutput(job)) {
      // find the kind of compression to do
      compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);

      // find the right codec
      Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
	  DefaultCodec.class);
      codec = ReflectionUtils.newInstance(codecClass, job);
    }
    
    // ignore the progress parameter, since MapFile is local
    final MapFile.Writer out =
      new MapFile.Writer(job, fs, file.toString(),
                         job.getOutputKeyClass().asSubclass(WritableComparable.class),
                         job.getOutputValueClass().asSubclass(Writable.class),
                         compressionType, codec,
                         progress);

    return new RecordWriter<WritableComparable, Writable>() {

        public void write(WritableComparable key, Writable value)
          throws IOException {

          out.append(key, value);
        }

        public void close(Reporter reporter) throws IOException { out.close();}
      };
  }
}

   关键点在于获取分布式文件输出句柄MapFile.Writer,完成输出任务后会关闭输出。每个实现都有特定用途,都需要弄清楚,在这里就不再一一介绍了。

 

    上面是hadoop自己的实现,在具体的编程过程中,我们肯定会有自己的实现去定义输出格式。上面也讲到了job只能设置输出路径,不能添加多个输出路径,那么有什么解决措施呢?来看看nutch中的精彩实现,会给我们启示:

 

自己的实现: org.apache.nutch.parse.ParseOutputFormat

public class ParseOutputFormat implements OutputFormat<Text, Parse> {

//这里不是检查输出路径,是检查数据路径下的子路径,改变了接口中的定义
public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
    Path out = FileOutputFormat.getOutputPath(job);
    if (fs.exists(new Path(out, CrawlDatum.PARSE_DIR_NAME)))
      throw new IOException("Segment already parsed!");
  }

//下面获取了三个输入句柄,分别向三个路径中输出键值对
public RecordWriter<Text, Parse> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress)
 throws IOException {

   ......
    Path text = new Path(new Path(out, ParseText.DIR_NAME), name); // 一个输出路径
    Path data = new Path(new Path(out, ParseData.DIR_NAME), name); //两个输出路径
 Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//三个输出路径
     
//一个写入
    final MapFile.Writer textOut =
      new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class,
          CompressionType.RECORD, progress);
    
//第二个写入
    final MapFile.Writer dataOut =
      new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class,
          compType, progress);
    
//第三个写入
    final SequenceFile.Writer crawlOut =
      SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,
          compType, progress);
    
    return new RecordWriter<Text, Parse>() {

        public void write(Text key, Parse parse)throws IOException {

......
              crawlOut.append(key, d);
.......
             crawlOut.append(new Text(newUrl), newDatum);
......
             crawlOut.append(key, adjust);
......
              dataOut.append(key, parseData);
......
              crawlOut.append(key, datum);

          }
        }
//关闭三个句柄
 public void close(Reporter reporter) throws IOException {
          textOut.close();
          dataOut.close();
          crawlOut.close();
        }
      };
 }
}

   ParseOutputFormat实现了OutputFormat接口,改变了job中设置的输出路径,并且把不同的内容输出到不同的路径,从而达到了多个输出(并且根据逻辑划分)。这个我觉得值得借鉴。

 

   关于输入以及输入输出的各个实现都有什么用处,以后有机会再来写写。本人现在还是一知半解,见笑了。

  • 大小: 2.9 KB
  • 大小: 4.5 KB
分享到:
评论

相关推荐

    Hadoop学习总结之四:Map-Reduce过程解析

    ### Hadoop MapReduce任务提交与执行流程解析 #### 一、客户端提交任务 在Hadoop MapReduce框架中,客户端的任务提交是整个MapReduce作业启动的关键步骤。这一过程主要由`JobClient`类中的`runJob(JobConf job)`...

    好用的hadoop-eclipse-plugin-2.6.4.jar

    另外,它还支持直接在Eclipse中启动和停止Job,以及调试Map和Reduce函数,这对于调试和优化代码来说是非常重要的。 Hadoop-Eclipse-Plugin-2.6.4版本是针对Hadoop 2.x系列的,因此,它支持YARN资源管理器,这使得在...

    Hadoop-eclipse-plugin-2.7.6下载与说明

    3. **创建MapReduce项目**:在Eclipse中,你可以通过"File" -&gt; "New" -&gt; "Project" -&gt; "Map/Reduce Project"创建一个新的Hadoop项目。选择合适的Hadoop版本(这里是2.7.6),然后为项目命名。 4. **编写MapReduce...

    eclipse运行mr插件hadoop-eclipse-plugin-2.6.0.jar

    本文将深入探讨如何使用Eclipse IDE结合hadoop-eclipse-plugin-2.6.0.jar插件,实现在Windows环境下进行远程连接到Hadoop集群,尤其适用于64位操作系统。 首先,我们要理解Hadoop的核心概念。Hadoop是由Apache基金...

    Hadoop MapReduce实现tfidf源码

    你需要导入Hadoop的相关库,创建Mapper和Reducer类,并实现它们的map()、reduce()方法。此外,还需要配置Job参数,如输入路径、输出路径、Mapper和Reducer类等,并提交Job到Hadoop集群执行。 压缩包中的"tfidf"文件...

    hadoop2x-eclipse-plugin

    1. 设置Hadoop安装路径:进入Eclipse的"Window" -&gt; "Preferences" -&gt; "Hadoop Map/Reduce",在"Local Hadoop Installation"中指定你的Hadoop安装目录。 2. 配置集群信息:如果你的Hadoop集群不是本地模式,需要在...

    hadoop-map-reduce-demo

    《Hadoop MapReduce实战指南——基于&lt;hadoop-map-reduce-demo&gt;项目解析》 在大数据处理领域,Hadoop MapReduce作为核心组件,承担着数据分布式计算的任务。本篇将通过一个名为“hadoop-map-reduce-demo”的示例项目...

    map-reduce详细

    ### Map-Reduce 实现细节与问题解决 #### 客户端操作流程 Map-Reduce 的启动过程始于客户端向系统提交任务。此过程的核心是通过 `JobClient` 类的 `runJob` 静态方法来实现。具体步骤如下: 1. **JobClient 对象...

    map-reduce.rar

    Hadoop是Apache软件基金会开发的一个开源框架,它实现了MapReduce模型,使得在大规模集群上进行数据处理变得更加简单。 在描述中提到的“hadoop测试代码”意味着这个压缩包可能包含了用Hadoop实现的MapReduce程序或...

    hadoop-training-map-reduce-example-4

    标题中的"hadoop-training-map-reduce-example-4"表明这是一个关于Hadoop MapReduce的教程实例,很可能是第四个阶段或示例。Hadoop是Apache软件基金会的一个开源项目,它提供了分布式文件系统(HDFS)和MapReduce...

    基于Eclipse的hadoop-eclipse-plugin-2.0.0插件

    1. **创建MapReduce项目**:安装插件后,Eclipse会新增一个"New" -&gt; "Other" -&gt; "Hadoop Map/Reduce Project"选项,点击即可创建一个MapReduce项目,提供了模板化的Job类和Mapper/Reducer类。 2. **编辑HDFS文件**...

    大数据云计算技术 优酷网Hadoop及Mapreduce入门教程(共35页).pptx

    Hadoop Map-reduce Job Scheduler Resources Hadoop, Why? 数据太多了,需要能存储、快速分析Pb级数据集的系统 单机的存储、IO、内存、CPU有限,需要可扩展的集群 使用门槛低,数据分析是个庞杂的问题,MPI太复杂 ...

    hadoop-eclipse-plugin-2.6.1.jar

    当你准备好测试或运行作业时,只需在Eclipse的"Run Configurations"中选择"Map/Reduce Job",设置好输入和输出路径,以及必要的参数,然后点击"Run",作业就会被提交到集群执行。 值得注意的是,hadoop-eclipse-...

    hadoop-common-2.6.0-bin-master.zip.rar

    本文将深入探讨Hadoop Common 2.6.0版本,特别是在MapReduce框架下进行Driver测试的关键知识点。这个压缩包“hadoop-common-2.6.0-bin-master.zip.rar”包含了运行和测试Hadoop MapReduce项目所需的核心组件。 首先...

    map-reduce详细.pdf

    在Hadoop MapReduce中,`JobConf`对象是任务配置的核心,它包含了关于输入格式、输出格式、Mapper、Reducer类以及各种配置参数。`InputFormat`和`OutputFormat`定义了如何读取和写入数据,而`Mapper`和`Reducer`则是...

    hadoop-eclipse-plugin-2.6.0.jar

    完成代码编写后,可以通过Eclipse的“Run As”菜单选择“Hadoop Job”,指定输入数据的位置和输出结果的目录。插件会自动将程序打包成JAR文件,并提交到Hadoop集群执行。在运行过程中,Eclipse会显示作业的进度和...

    hadoop之map/reduce

    在Hadoop生态系统中,MapReduce是一种分布式编程模型,主要用于处理和生成大数据集。它通过将大规模数据分割成小块,然后在多台机器上并行处理这些数据块,最后将结果汇总,从而实现高效的批量数据处理。MapReduce的...

    hadoop-3.1.3-src.tar.gz

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在廉价硬件上处理大量数据,实现了大数据的存储和处理。`hadoop-3.1.3-src.tar.gz` 是Hadoop 3.1.3版本的源代码压缩包,包含了所有相关的Java源代码和...

Global site tag (gtag.js) - Google Analytics