`

InputFormat牛逼(9)FileInputFormat实现类之SequenceFileInputFormat

 
阅读更多
一、SequenceFileInputFormat及SequenceFileRecordReader
/** An {@link InputFormat} for {@link SequenceFile}s. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {

  @Override
  public RecordReader<K, V> createRecordReader(InputSplit split,
                                               TaskAttemptContext context
                                               ) throws IOException {
    return new SequenceFileRecordReader<K,V>();
  }

  @Override
  protected long getFormatMinSplitSize() {
    return SequenceFile.SYNC_INTERVAL;
  }

  @Override
  protected List<FileStatus> listStatus(JobContext job
                                        )throws IOException {

    List<FileStatus> files = super.listStatus(job);
    int len = files.size();
    for(int i=0; i < len; ++i) {
      FileStatus file = files.get(i);
      if (file.isDirectory()) {     // it's a MapFile
        Path p = file.getPath();
        FileSystem fs = p.getFileSystem(job.getConfiguration());
        // use the data file
        files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
      }
    }
    return files;
  }
}
/** An {@link RecordReader} for {@link SequenceFile}s. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileRecordReader<K, V> extends RecordReader<K, V> {
  private SequenceFile.Reader in;
  private long start;
  private long end;
  private boolean more = true;
  private K key = null;
  private V value = null;
  protected Configuration conf;

  @Override
  public void initialize(InputSplit split, 
                         TaskAttemptContext context
                         ) throws IOException, InterruptedException {
    FileSplit fileSplit = (FileSplit) split;
    conf = context.getConfiguration();    
    Path path = fileSplit.getPath();
    FileSystem fs = path.getFileSystem(conf);
    this.in = new SequenceFile.Reader(fs, path, conf);
    this.end = fileSplit.getStart() + fileSplit.getLength();

    if (fileSplit.getStart() > in.getPosition()) {
      in.sync(fileSplit.getStart());                  // sync to start
    }

    this.start = in.getPosition();
    more = start < end;
  }

  @Override
  @SuppressWarnings("unchecked")
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!more) {
      return false;
    }
    long pos = in.getPosition();
    key = (K) in.next(key);
    if (key == null || (pos >= end && in.syncSeen())) {
      more = false;
      key = null;
      value = null;
    } else {
      value = (V) in.getCurrentValue(value);
    }
    return more;
  }

  @Override
  public K getCurrentKey() {
    return key;
  }
  
  @Override
  public V getCurrentValue() {
    return value;
  }
  
  /**
   * Return the progress within the input split
   * @return 0.0 to 1.0 of the input byte range
   */
  public float getProgress() throws IOException {
    if (end == start) {
      return 0.0f;
    } else {
      return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
    }
  }
  
  public synchronized void close() throws IOException { in.close(); }
  
}

二、SequenceFileAsBinaryInputFormat及SequenceFileAsBinaryRecordReader
/**
 * InputFormat reading keys, values from SequenceFiles in binary (raw)
 * format.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileAsBinaryInputFormat
    extends SequenceFileInputFormat<BytesWritable,BytesWritable> {

  public SequenceFileAsBinaryInputFormat() {
    super();
  }

  public RecordReader<BytesWritable,BytesWritable> createRecordReader(
      InputSplit split, TaskAttemptContext context)
      throws IOException {
    return new SequenceFileAsBinaryRecordReader();
  }

  /**
   * Read records from a SequenceFile as binary (raw) bytes.
   */
  public static class SequenceFileAsBinaryRecordReader
      extends RecordReader<BytesWritable,BytesWritable> {
    private SequenceFile.Reader in;
    private long start;
    private long end;
    private boolean done = false;
    private DataOutputBuffer buffer = new DataOutputBuffer();
    private SequenceFile.ValueBytes vbytes;
    private BytesWritable key = null;
    private BytesWritable value = null;

    public void initialize(InputSplit split, TaskAttemptContext context) 
        throws IOException, InterruptedException {
      Path path = ((FileSplit)split).getPath();
      Configuration conf = context.getConfiguration();
      FileSystem fs = path.getFileSystem(conf);
      this.in = new SequenceFile.Reader(fs, path, conf);
      this.end = ((FileSplit)split).getStart() + split.getLength();
      if (((FileSplit)split).getStart() > in.getPosition()) {
        in.sync(((FileSplit)split).getStart());    // sync to start
      }
      this.start = in.getPosition();
      vbytes = in.createValueBytes();
      done = start >= end;
    }
    
    @Override
    public BytesWritable getCurrentKey() 
        throws IOException, InterruptedException {
      return key;
    }
    
    @Override
    public BytesWritable getCurrentValue() 
        throws IOException, InterruptedException {
      return value;
    }

    /**
     * Retrieve the name of the key class for this SequenceFile.
     * @see org.apache.hadoop.io.SequenceFile.Reader#getKeyClassName
     */
    public String getKeyClassName() {
      return in.getKeyClassName();
    }

    /**
     * Retrieve the name of the value class for this SequenceFile.
     * @see org.apache.hadoop.io.SequenceFile.Reader#getValueClassName
     */
    public String getValueClassName() {
      return in.getValueClassName();
    }

    /**
     * Read raw bytes from a SequenceFile.
     */
    public synchronized boolean nextKeyValue()
        throws IOException, InterruptedException {
      if (done) {
        return false;
      }
      long pos = in.getPosition();
      boolean eof = -1 == in.nextRawKey(buffer);
      if (!eof) {
        if (key == null) {
          key = new BytesWritable();
        }
        if (value == null) {
          value = new BytesWritable();
        }
        key.set(buffer.getData(), 0, buffer.getLength());
        buffer.reset();
        in.nextRawValue(vbytes);
        vbytes.writeUncompressedBytes(buffer);
        value.set(buffer.getData(), 0, buffer.getLength());
        buffer.reset();
      }
      return !(done = (eof || (pos >= end && in.syncSeen())));
    }

    public void close() throws IOException {
      in.close();
    }

    /**
     * Return the progress within the input split
     * @return 0.0 to 1.0 of the input byte range
     */
    public float getProgress() throws IOException, InterruptedException {
      if (end == start) {
        return 0.0f;
      } else {
        return Math.min(1.0f, (float)((in.getPosition() - start) /
                                      (double)(end - start)));
      }
    }
  }
}

三、SequenceFileAsBinaryRecordReader
及SequenceFileAsTextRecordReader
/**
 * This class is similar to SequenceFileInputFormat, except it generates
 * SequenceFileAsTextRecordReader which converts the input keys and values
 * to their String forms by calling toString() method. 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileAsTextInputFormat
  extends SequenceFileInputFormat<Text, Text> {

  public SequenceFileAsTextInputFormat() {
    super();
  }

  public RecordReader<Text, Text> createRecordReader(InputSplit split,
      TaskAttemptContext context) throws IOException {
    context.setStatus(split.toString());
    return new SequenceFileAsTextRecordReader();
  }
}

/**
 * This class converts the input keys and values to their String forms by
 * calling toString() method. This class to SequenceFileAsTextInputFormat
 * class is as LineRecordReader class to TextInputFormat class.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileAsTextRecordReader
  extends RecordReader<Text, Text> {
  
  private final SequenceFileRecordReader<WritableComparable<?>, Writable>
    sequenceFileRecordReader;

  private Text key;
  private Text value;

  public SequenceFileAsTextRecordReader()
    throws IOException {
    sequenceFileRecordReader =
      new SequenceFileRecordReader<WritableComparable<?>, Writable>();
  }

  public void initialize(InputSplit split, TaskAttemptContext context)
      throws IOException, InterruptedException {
    sequenceFileRecordReader.initialize(split, context);
  }

  @Override
  public Text getCurrentKey() 
      throws IOException, InterruptedException {
    return key;
  }
  
  @Override
  public Text getCurrentValue() 
      throws IOException, InterruptedException {
    return value;
  }
  
  /** Read key/value pair in a line. */
  public synchronized boolean nextKeyValue() 
      throws IOException, InterruptedException {
    if (!sequenceFileRecordReader.nextKeyValue()) {
      return false;
    }
    if (key == null) {
      key = new Text(); 
    }
    if (value == null) {
      value = new Text(); 
    }
    key.set(sequenceFileRecordReader.getCurrentKey().toString());
    value.set(sequenceFileRecordReader.getCurrentValue().toString());
    return true;
  }
  
  public float getProgress() throws IOException,  InterruptedException {
    return sequenceFileRecordReader.getProgress();
  }
  
  public synchronized void close() throws IOException {
    sequenceFileRecordReader.close();
  }
}
分享到:
评论

相关推荐

    Hadoop源码解析---MapReduce之InputFormat

    Hadoop框架本身提供了一些常用的InputFormat实现,如FileInputFormat,适用于普通的文件输入;DBInputFormat,适用于数据库的输入;还有KeyValueTextInputFormat,用于处理以键值对形式组织的文本文件输入。 接下来...

    自定义MapReduce的InputFormat

    1. **创建一个新的类**:首先,你需要创建一个继承自`org.apache.hadoop.mapreduce.InputFormat`的类。这个类将覆盖父类的方法来实现自定义的输入处理逻辑。 2. **实现`getSplits()`方法**:此方法用于将输入数据...

    【MapReduce篇03】MapReduce之InputFormat数据输入1

    本文将深入讲解MapReduce的InputFormat,特别是默认的TextInputFormat以及几种常见的实现类。 FileInputFormat是所有输入格式的基类,它定义了如何将HDFS上的文件拆分成多个InputSplit,每个InputSplit进一步由...

    hive inputformat

    这通常涉及到创建两个类:RecordReader和InputFormat。RecordReader负责读取数据文件并按需拆分记录,而InputFormat负责初始化RecordReader并确定数据分片(Splits)。 4. **实例代码:按照空格拆分日志文件** ...

    自定义inputFormat&&outputFormat1

    自定义inputFormat&&outputFormat1

    CustomInputFormatCollection:Hadoop Mapreduce InputFormat 集合

    Hadoop 代码使用方式 job.setInputFormatClass(SmallFileCombineTextInputFormat.class); 当前Maven提加如下依赖 讲无法从reposity中找到直接jar,需手动编译下载...-Dmapreduce.input.fileinputformat.split.maxsize=10

    SequenceFileKeyValueInputFormat:自定义 Hadoop InputFormat

    Apache Hive 的 InputFormat,在查询 SequenceFiles 时将返回 (Text) 键和 (Text) 值。 我需要在不拆分内容的情况下完整解析大量文本文件。 HDFS 在处理大型连续文件时提供最佳吞吐量,因此我使用 Apache Mahout 将...

    MapReduce计算模型详讲(结合源码深入解读)

    3. 记录读取器(RecordReader):RecordReader是InputFormat的实现类,负责读取InputSplit中的每条记录,并将读取的记录交给Map函数来处理。它的主要方法有:next(取得输入分片中的下一个键值对)、createKey(创建...

    17_尚硅谷大数据之MapReduce框架原理1

    `FileInputFormat`是InputFormat的一个常见实现,用于处理文件类型的输入数据。 - **Job 提交流程**:客户端首先通过`Job`类提交作业,`waitForCompletion()`方法启动作业并等待其完成。`submit()`方法建立与...

    MapReduce技术原理深入理解.pdf

    MapReduce提供了多种输入处理类,例如InputFormat、FileInputFormat等。这些类提供了不同的输入处理方式,例如读取文件、读取数据库等。 MapReduce序列化 MapReduce提供了序列化机制,可以将,v&gt;对序列化为byte数组...

    Hadoop源代码分析(IDs类和Context类)

    - `mapreduce.inputformat.class`:指定输入格式的实现类; - `mapreduce.map.class`:指定Mapper的实现类; - `mapreduce.combine.class`:指定Combiner的实现类(如果有的话); - `mapreduce.reduce.class`:...

    json-mapreduce:可以分割多行JSON的InputFormat

    目前似乎没有任何可以支持多行 JSON 的 JSON InputFormat 类。 执照 Apache 许可。 用法 要开始,只需: 下载并运行ant 在您的环境中包含dist/lib/json-mapreduce-1.0.jar 使用MultiLineJsonInputFormat类作为您...

    大数据平台-MapReduce介绍.pdf

    MapReduce编程过程是一个继承类与实现接口的过程,这些类与接口来自于Hadoop的Map-Reduce框架,由框架控制其执行流程。编程过程的三个阶段是输入阶段、计算阶段和输出阶段。 在输入阶段,InputFormat文件分割,读取...

    01-02MapReduce深入

    InputFormat是MapReduce框架基础类之一,负责读取输入数据,将其分割成小块,称为split。InputFormat的主要任务是将输入数据分割成小块,以便于后续的处理。 Split Split是InputFormat的输出结果,每个split包含...

    Hadoop.MapReduce.分析

    常见的InputFormat包括`TextInputFormat`、`SequenceFileInputFormat`等。 - **InputSplit**: 表示输入数据的一个逻辑分割。 - **RecordReader**: 负责从InputSplit中读取数据,并将其转换为键值对的形式,即`,v1&gt;`...

    MapReduce技术深入理解.pptx

    - FileInputFormat:所有基于文件的InputFormat的基类,用于设置作业的输入文件位置。 - TextInputFormat:默认的InputFormat,key是行的字节偏移量,value是行内容(不包含行终止符)。 - KeyValueTextInputFormat...

    couchbase-inputformat:将 Couchbase 集成到 Hadoop 等云项目的工具

    ##Couchbase InputFormat 提供什么? 在与 Couchbase Sqoop 连接器搏斗时,发现了一些错误,使其无法与 CDH3 版本正常工作。 从 Couchbase 中提取键/值的实际 InputFormat 存在于 Sqoop 连接器的基于代码中,但对 ...

    Hadoop技术内幕:深入解析MapReduce架构设计与实现原理

    MapReduce编程模型3.1 MapReduce编程模型概述3.1.1 MapReduce编程接口体系...InputFormat接口的设计与实现3.3.3 OutputFormat接口的设计与实现3.3.4 Mapper与Reducer解析3.3.5 Partitioner接口的设计与实现3.4 非...

    java 中自定义OutputFormat的实例详解

    在 Hadoop 中, OutputFormat 的实现类主要有两个:TextInputFormat 和 TextOutputFormat。 TextInputFormat 用于读取文本文件,而 TextOutputFormat 用于将文本数据写入到文件中。但是,在实际应用中,我们可能需要...

Global site tag (gtag.js) - Google Analytics