`

InputFormat牛逼(6)org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>

 
阅读更多
@Public
@Evolving

A RecordReader that reads records from a SQL table. Emits LongWritables containing the record number as key and DBWritables as value.


@InterfaceAudience.Public
@InterfaceStability.Evolving
public class DBRecordReader<T extends DBWritable> extends
    RecordReader<LongWritable, T> {

  private static final Log LOG = LogFactory.getLog(DBRecordReader.class);

  private ResultSet results = null;

  private Class<T> inputClass;

  private Configuration conf;

  private DBInputFormat.DBInputSplit split;

  private long pos = 0;
  
  private LongWritable key = null;
  
  private T value = null;

  private Connection connection;

  protected PreparedStatement statement;

  private DBConfiguration dbConf;

  private String conditions;

  private String [] fieldNames;

  private String tableName;

  /**
   * @param split The InputSplit to read data for
   * @throws SQLException 
   */
  public DBRecordReader(DBInputFormat.DBInputSplit split, 
      Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig,
      String cond, String [] fields, String table)
      throws SQLException {
    this.inputClass = inputClass;
    this.split = split;
    this.conf = conf;
    this.connection = conn;
    this.dbConf = dbConfig;
    this.conditions = cond;
    this.fieldNames = fields;
    this.tableName = table;
  }

  protected ResultSet executeQuery(String query) throws SQLException {
    this.statement = connection.prepareStatement(query,
        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
    return statement.executeQuery();
  }

  /** Returns the query for selecting the records, 
   * subclasses can override this for custom behaviour.*/
  protected String getSelectQuery() {
    StringBuilder query = new StringBuilder();

    // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits.
    if(dbConf.getInputQuery() == null) {
      query.append("SELECT ");
  
      for (int i = 0; i < fieldNames.length; i++) {
        query.append(fieldNames[i]);
        if (i != fieldNames.length -1) {
          query.append(", ");
        }
      }

      query.append(" FROM ").append(tableName);
      query.append(" AS ").append(tableName); //in hsqldb this is necessary
      if (conditions != null && conditions.length() > 0) {
        query.append(" WHERE (").append(conditions).append(")");
      }

      String orderBy = dbConf.getInputOrderBy();
      if (orderBy != null && orderBy.length() > 0) {
        query.append(" ORDER BY ").append(orderBy);
      }
    } else {
      //PREBUILT QUERY
      query.append(dbConf.getInputQuery());
    }
        
    try {
      query.append(" LIMIT ").append(split.getLength());
      query.append(" OFFSET ").append(split.getStart());
    } catch (IOException ex) {
      // Ignore, will not throw.
    }		

    return query.toString();
  }

  /** {@inheritDoc} */
  public void close() throws IOException {
    try {
      if (null != results) {
        results.close();
      }
      if (null != statement) {
        statement.close();
      }
      if (null != connection) {
        connection.commit();
        connection.close();
      }
    } catch (SQLException e) {
      throw new IOException(e.getMessage());
    }
  }

  public void initialize(InputSplit split, TaskAttemptContext context) 
      throws IOException, InterruptedException {
    //do nothing
  }

  /** {@inheritDoc} */
  public LongWritable getCurrentKey() {
    return key;  
  }

  /** {@inheritDoc} */
  public T getCurrentValue() {
    return value;
  }

  /**
   * @deprecated 
   */
  @Deprecated
  public long getPos() throws IOException {
    return pos;
  }

 
  /** {@inheritDoc} */
  public float getProgress() throws IOException {
    return pos / (float)split.getLength();
  }

  /** {@inheritDoc} */
  public boolean nextKeyValue() throws IOException {
    try {
      if (key == null) {
        key = new LongWritable();
      }
      if (value == null) {
        value = createValue();
      }
      if (null == this.results) {
        // First time into this method, run the query.
        this.results = executeQuery(getSelectQuery());
      }
      if (!results.next())
        return false;

      // Set the key field value as the output key value
      key.set(pos + split.getStart());

      value.readFields(results);

      pos ++;
    } catch (SQLException e) {
      throw new IOException("SQLException in nextKeyValue", e);
    }
    return true;
  }

  //... ...

 }

分享到:
评论

相关推荐

    hadoop2lib.tar.gz

    Hadoop2lib还可能包含Hadoop MapReduce库,这是实现MapReduce任务的关键,它提供了编写和执行MapReduce作业所需的类和接口。此外,YARN(Yet Another Resource Negotiator)作为Hadoop 2.x的新特性,是资源管理和...

    CustomInputFormatCollection:Hadoop Mapreduce InputFormat 集合

    Hadoop 代码使用方式 job.setInputFormatClass(SmallFileCombineTextInputFormat.class);...org.apache.hadoop.mapreduce.sample.SmallFileWordCount -Dmapreduce.input.fileinputformat.split.maxsize=10

    hadoop api.doc

    6. **org.apache.hadoop.mapred**: 这是Hadoop MapReduce的旧版接口,包括作业提交、任务调度和执行。主要类有`JobConf`、`JobClient`和`JobTracker`,用于管理MapReduce作业的生命周期。 7. **org.apache.hadoop....

    hadoop-3.1.3-src.tar.gz

    - **核心类库**:如`org.apache.hadoop.fs.FileSystem`、`org.apache.hadoop.mapreduce.Job`等,提供了与HDFS交互和MapReduce编程的基本接口。 4. **开发与调试** - **Hadoop API**:学习如何使用Hadoop API开发...

    hive inputformat

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class CustomSpaceDelimitedInputFormat extends FileInputFormat&lt;LongWritable, Text&gt; { @Override public RecordReader&lt;LongWritable,...

    Hadoop.MapReduce.分析

    - **新代码**:主要位于`org.apache.hadoop.mapreduce.*`,包含36,915行代码,这部分代码进行了重构,提高了代码质量和可维护性。 - 辅助类:分别位于`org.apache.hadoop.util.*`(148行)和`org.apache.hadoop.file...

    MapReduce之wordcount范例代码

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper&lt;LongWritable, Text, Text, IntWritable&gt;{ private final ...

    hadoop2.7.2 之 snappy压缩支持包.zip

    &lt;value&gt;org.apache.hadoop.io.compress.SnappyCodec&lt;/value&gt; &lt;/property&gt; &lt;/configuration&gt; ``` 同时,如果你想要在MapReduce作业中指定特定的输入或输出压缩格式,可以在作业配置中进行设置,如下所示: ```...

    自定义MapReduce的InputFormat

    1. **创建一个新的类**:首先,你需要创建一个继承自`org.apache.hadoop.mapreduce.InputFormat`的类。这个类将覆盖父类的方法来实现自定义的输入处理逻辑。 2. **实现`getSplits()`方法**:此方法用于将输入数据...

    hadoop源码分析-mapreduce部分.doc

    在源码层面,org.apache.hadoop.mapreduce包包含了关键的接口和类。Writeable、Counter和ID相关类处理计数和标识,Context类提供Mapper和Reducer所需的上下文信息,Mapper、Reducer和Job类定义了MapReduce的基本操作...

    hadoop eclipse mapreduce 下开发所有需要用到的 JAR 包

    在Hadoop生态系统中,Eclipse是一个常用的集成开发环境(IDE),用于编写MapReduce程序。MapReduce是一种编程模型,用于大规模数据集的并行处理。它将大数据任务分解为两个主要阶段:映射(Map)和化简(Reduce)。...

    Hadoop 0.20.2 API

    6. **org.apache.hadoop.util**: 提供了一些通用工具和实用程序,比如`GenericOptionsParser`用于解析命令行参数,`Tool`接口使得编写可执行的Hadoop应用程序变得简单。 了解和熟练掌握Hadoop 0.20.2 API,对于开发...

    Hadoop集群扩容操作手册

    -inputformat org.apache.hadoop.mapred.TextInputFormat \ -outputformat org.apache.hadoop.mapred.TextOutputFormat 在本示例中,我们使用了BZip2Codec对数据进行压缩。该方法可以大大减少存储空间。

    MapReduce源码分析

    相反,OutputFormat接口定义了如何将Reduce任务的输出写回存储系统,`org.apache.hadoop.mapreduce.lib.output.FileOutputFormat`则是常见的输出格式。 **Job与TaskTracker** 在MapReduce框架中,JobTracker是任务...

    Hadoop MapReduce Cookbook 源码

    《Hadoop MapReduce Cookbook 源码》是一本专注于实战的书籍,旨在帮助读者通过具体的例子深入理解并掌握Hadoop MapReduce技术。MapReduce是大数据处理领域中的核心组件,尤其在处理大规模分布式数据集时,它的重要...

    Hadoop CountWord 例子

    import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper&lt;LongWritable, Text, Text, IntWritable&gt; { private final static IntWritable one = new IntWritable(1); private ...

    hadoop-mapreduce

    通常会有自定义的Mapper和Reducer类,它们继承自Hadoop提供的基类,如`org.apache.hadoop.mapreduce.Mapper`和`org.apache.hadoop.mapreduce.Reducer`。此外,可能还有自定义的Driver类,它负责配置和启动MapReduce...

    wonderdog:批量加载以进行弹性搜索

    您可以在自己的Hadoop MapReduce作业中使用的 ,可从轻松使用这些InputFormat和OutputFormat类 从 LOAD和STORE到ElasticSearch的 一些用于与ElasticSearch进行交互的 &lt; project&gt; ... &lt; dependencies&gt; &lt; ...

    中的接口).docx

    在Hadoop 2.x版本之后,MapReduce的API经历了一次重大的更新,产生了两个主要的版本:旧版的`org.apache.hadoop.mapred`和新版的`org.apache.hadoop.mapreduce`。本文将主要探讨旧版的`org.apache.hadoop.mapred`...

    Java-API-Operate-Hadoop.rar_hadoop_hadoop api

    它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,而Java API是开发者与Hadoop交互的主要方式。本文将深入探讨Java如何操作Hadoop,以及在"Java-API-Operate-Hadoop.rar"压缩包中提供的资源。 ...

Global site tag (gtag.js) - Google Analytics