论坛首页 Java企业应用论坛

解析hadoop框架下的Map-Reduce job的输出格式的实现

浏览 4437 次
精华帖 (0) :: 良好帖 (1) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2010-04-10   最后修改:2010-04-10

      Hadoop 其实并非一个单纯用于存储的分布式文件系统,而是一个被设计用来在由普通硬件设备组成的大型集群上执行分布式应用的框架。 Hadoop 包含两个部分:一个分布式文件系统 HDFS (Hadoop Distributed File System),和一个Map-Reduce实现。


    研究hadoop,从nutch入手是比较好的选择,分布式文件系统就不说了,下面说说MapReduce产生Job中设置的输入输出,一般new一个Job会这样设置 输入输出路径:

FileInputFormat.addInputPath(job, in);

FileOutputFormat.setOutputPath(job, out);










接口 :org.apache.hadoop.mapred.OutputFormat<K , V >

public interface OutputFormat<K, V> {

RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,String name, Progressable progress)
throws IOException;

void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;


 checkOutputSpecs :检查job的输出路径是否存在,如果存在则抛出异常(IOException)。我这里的版本是0.19.2,还没有override的功能,可能后面会支持。

 getRecordWriter     :把输出键值对 output <key, value> 写入到输出路径中。




基类FileOutputFormat :org.apache.hadoop.mapred.FileOutputFormat

public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {

public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,JobConf job, String name,Progressable progress) 
throws IOException;

public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, 
InvalidJobConfException, IOException {

    // Ensure that the output directory is set and not already there
    Path outDir = getOutputPath(job);
    if (outDir == null && job.getNumReduceTasks() != 0) {
      throw new InvalidJobConfException("Output directory not set in JobConf.");
    if (outDir != null) {
      FileSystem fs = outDir.getFileSystem(job);
      // normalize the output directory
      outDir = fs.makeQualified(outDir);
      setOutputPath(job, outDir);
      // check its existence
      if (fs.exists(outDir)) {
        throw new FileAlreadyExistsException("Output directory " + outDir +  " already exists");




子类MapFileOutputFormat :org.apache.hadoop.mapred.MapFileOutputFormat

public class MapFileOutputFormat 
extends FileOutputFormat<WritableComparable, Writable> {

  public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
                                      String name, Progressable progress)
    throws IOException {
    // get the path of the temporary output file 
    Path file = FileOutputFormat.getTaskOutputPath(job, name);
    FileSystem fs = file.getFileSystem(job);
    CompressionCodec codec = null;
    CompressionType compressionType = CompressionType.NONE;
    if (getCompressOutput(job)) {
      // find the kind of compression to do
      compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);

      // find the right codec
      Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
      codec = ReflectionUtils.newInstance(codecClass, job);
    // ignore the progress parameter, since MapFile is local
    final MapFile.Writer out =
      new MapFile.Writer(job, fs, file.toString(),
                         compressionType, codec,

    return new RecordWriter<WritableComparable, Writable>() {

        public void write(WritableComparable key, Writable value)
          throws IOException {

          out.append(key, value);

        public void close(Reporter reporter) throws IOException { out.close();}





自己的实现: org.apache.nutch.parse.ParseOutputFormat

public class ParseOutputFormat implements OutputFormat<Text, Parse> {

public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
    Path out = FileOutputFormat.getOutputPath(job);
    if (fs.exists(new Path(out, CrawlDatum.PARSE_DIR_NAME)))
      throw new IOException("Segment already parsed!");

public RecordWriter<Text, Parse> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress)
 throws IOException {

    Path text = new Path(new Path(out, ParseText.DIR_NAME), name); // 一个输出路径
    Path data = new Path(new Path(out, ParseData.DIR_NAME), name); //两个输出路径
 Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//三个输出路径
    final MapFile.Writer textOut =
      new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class,
          CompressionType.RECORD, progress);
    final MapFile.Writer dataOut =
      new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class,
          compType, progress);
    final SequenceFile.Writer crawlOut =
      SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,
          compType, progress);
    return new RecordWriter<Text, Parse>() {

        public void write(Text key, Parse parse)throws IOException {

              crawlOut.append(key, d);
             crawlOut.append(new Text(newUrl), newDatum);
             crawlOut.append(key, adjust);
              dataOut.append(key, parseData);
              crawlOut.append(key, datum);

 public void close(Reporter reporter) throws IOException {




  • 大小: 2.9 KB
  • 大小: 4.5 KB
论坛首页 Java企业应用版

Global site tag (gtag.js) - Google Analytics