`

InputFormat牛逼(5)org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>

 
阅读更多
@Public
@Stable

A InputFormat that reads input data from an SQL table.

DBInputFormat emits LongWritables containing the record number as key and DBWritables as value. The SQL query, and input class can be using one of the two setInput methods.

InputFormat核心方法有两个:getSplits和createRecordReader
核心方法一
  /** {@inheritDoc} */
  public List<InputSplit> getSplits(JobContext job) throws IOException {

    ResultSet results = null;  
    Statement statement = null;
    try {
      statement = connection.createStatement();

      results = statement.executeQuery(getCountQuery());
      results.next();

      long count = results.getLong(1);
      int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
      long chunkSize = (count / chunks);

      results.close();
      statement.close();

      List<InputSplit> splits = new ArrayList<InputSplit>();

      // Split the rows into n-number of chunks and adjust the last chunk
      // accordingly
      for (int i = 0; i < chunks; i++) {
        DBInputSplit split;

        if ((i + 1) == chunks)
          split = new DBInputSplit(i * chunkSize, count);
        else
          split = new DBInputSplit(i * chunkSize, (i * chunkSize)
              + chunkSize);

        splits.add(split);
      }

      connection.commit();
      return splits;
    } catch (SQLException e) {
      throw new IOException("Got SQLException", e);
    } finally {
      try {
        if (results != null) { results.close(); }
      } catch (SQLException e1) {}
      try {
        if (statement != null) { statement.close(); }
      } catch (SQLException e1) {}

      closeConnection();
    }
  }



核心方法二
public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
      TaskAttemptContext context) throws IOException, InterruptedException {  

    return createDBRecordReader((DBInputSplit) split, context.getConfiguration());
  }

protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
      Configuration conf) throws IOException {

    @SuppressWarnings("unchecked")
    Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
    try {
      // use database product name to determine appropriate record reader.
      if (dbProductName.startsWith("ORACLE")) {
        // use Oracle-specific db reader.
        return new OracleDBRecordReader<T>(split, inputClass,
            conf, getConnection(), getDBConf(), conditions, fieldNames,
            tableName);
      } else if (dbProductName.startsWith("MYSQL")) {
        // use MySQL-specific db reader.
        return new MySQLDBRecordReader<T>(split, inputClass,
            conf, getConnection(), getDBConf(), conditions, fieldNames,
            tableName);
      } else {
        // Generic reader.
        return new DBRecordReader<T>(split, inputClass,
            conf, getConnection(), getDBConf(), conditions, fieldNames,
            tableName);
      }
    } catch (SQLException ex) {
      throw new IOException(ex.getMessage());
    }
  }
分享到:
评论

相关推荐

    hadoop2lib.tar.gz

    Hadoop2lib还可能包含Hadoop MapReduce库,这是实现MapReduce任务的关键,它提供了编写和执行MapReduce作业所需的类和接口。此外,YARN(Yet Another Resource Negotiator)作为Hadoop 2.x的新特性,是资源管理和...

    CustomInputFormatCollection:Hadoop Mapreduce InputFormat 集合

    Hadoop 代码使用方式 job.setInputFormatClass(SmallFileCombineTextInputFormat.class);...org.apache.hadoop.mapreduce.sample.SmallFileWordCount -Dmapreduce.input.fileinputformat.split.maxsize=10

    hadoop api.doc

    5. **org.apache.hadoop.ipc**: 这个包提供了进程间通信(IPC)的基础工具,使得客户端和服务端能通过网络进行异步通信。`Protocol`接口定义了服务端提供的服务,而`RPC`类实现了RPC调用的逻辑。 6. **org.apache....

    hadoop-3.1.3-src.tar.gz

    - **核心类库**:如`org.apache.hadoop.fs.FileSystem`、`org.apache.hadoop.mapreduce.Job`等,提供了与HDFS交互和MapReduce编程的基本接口。 4. **开发与调试** - **Hadoop API**:学习如何使用Hadoop API开发...

    hive inputformat

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class CustomSpaceDelimitedInputFormat extends FileInputFormat&lt;LongWritable, Text&gt; { @Override public RecordReader&lt;LongWritable,...

    Hadoop.MapReduce.分析

    - **新代码**:主要位于`org.apache.hadoop.mapreduce.*`,包含36,915行代码,这部分代码进行了重构,提高了代码质量和可维护性。 - 辅助类:分别位于`org.apache.hadoop.util.*`(148行)和`org.apache.hadoop.file...

    MapReduce之wordcount范例代码

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper&lt;LongWritable, Text, Text, IntWritable&gt;{ private final ...

    自定义MapReduce的InputFormat

    1. **创建一个新的类**:首先,你需要创建一个继承自`org.apache.hadoop.mapreduce.InputFormat`的类。这个类将覆盖父类的方法来实现自定义的输入处理逻辑。 2. **实现`getSplits()`方法**:此方法用于将输入数据...

    hadoop2.7.2 之 snappy压缩支持包.zip

    &lt;value&gt;org.apache.hadoop.io.compress.SnappyCodec&lt;/value&gt; &lt;/property&gt; &lt;/configuration&gt; ``` 同时,如果你想要在MapReduce作业中指定特定的输入或输出压缩格式,可以在作业配置中进行设置,如下所示: ```...

    hadoop源码分析-mapreduce部分.doc

    在源码层面,org.apache.hadoop.mapreduce包包含了关键的接口和类。Writeable、Counter和ID相关类处理计数和标识,Context类提供Mapper和Reducer所需的上下文信息,Mapper、Reducer和Job类定义了MapReduce的基本操作...

    hadoop eclipse mapreduce 下开发所有需要用到的 JAR 包

    在Hadoop生态系统中,Eclipse是一个常用的集成开发环境(IDE),用于编写MapReduce程序。MapReduce是一种编程模型,用于大规模数据集的并行处理。它将大数据任务分解为两个主要阶段:映射(Map)和化简(Reduce)。...

    Hadoop 0.20.2 API

    5. **org.apache.hadoop.conf**: 提供配置管理功能,如`Configuration`类,它允许用户设置和获取Hadoop系统的配置参数。 6. **org.apache.hadoop.util**: 提供了一些通用工具和实用程序,比如`GenericOptionsParser...

    Hadoop集群扩容操作手册

    -inputformat org.apache.hadoop.mapred.TextInputFormat \ -outputformat org.apache.hadoop.mapred.TextOutputFormat 在本示例中,我们使用了BZip2Codec对数据进行压缩。该方法可以大大减少存储空间。

    MapReduce源码分析

    相反,OutputFormat接口定义了如何将Reduce任务的输出写回存储系统,`org.apache.hadoop.mapreduce.lib.output.FileOutputFormat`则是常见的输出格式。 **Job与TaskTracker** 在MapReduce框架中,JobTracker是任务...

    Hadoop MapReduce Cookbook 源码

    《Hadoop MapReduce Cookbook 源码》是一本专注于实战的书籍,旨在帮助读者通过具体的例子深入理解并掌握Hadoop MapReduce技术。MapReduce是大数据处理领域中的核心组件,尤其在处理大规模分布式数据集时,它的重要...

    Hadoop CountWord 例子

    import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper&lt;LongWritable, Text, Text, IntWritable&gt; { private final static IntWritable one = new IntWritable(1); private ...

    hadoop-mapreduce

    通常会有自定义的Mapper和Reducer类,它们继承自Hadoop提供的基类,如`org.apache.hadoop.mapreduce.Mapper`和`org.apache.hadoop.mapreduce.Reducer`。此外,可能还有自定义的Driver类,它负责配置和启动MapReduce...

    wonderdog:批量加载以进行弹性搜索

    您可以在自己的Hadoop MapReduce作业中使用的 ,可从轻松使用这些InputFormat和OutputFormat类 从 LOAD和STORE到ElasticSearch的 一些用于与ElasticSearch进行交互的 &lt; project&gt; ... &lt; dependencies&gt; &lt; ...

    中的接口).docx

    在Hadoop 2.x版本之后,MapReduce的API经历了一次重大的更新,产生了两个主要的版本:旧版的`org.apache.hadoop.mapred`和新版的`org.apache.hadoop.mapreduce`。本文将主要探讨旧版的`org.apache.hadoop.mapred`...

    Java-API-Operate-Hadoop.rar_hadoop_hadoop api

    它的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,而Java API是开发者与Hadoop交互的主要方式。本文将深入探讨Java如何操作Hadoop,以及在"Java-API-Operate-Hadoop.rar"压缩包中提供的资源。 ...

Global site tag (gtag.js) - Google Analytics