`
justinyao
  • 浏览: 40481 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

sqoop扩展

阅读更多
sqoop是一款数据互导工具,利用它可以在关系数据库与hbase/hive/hdfs进行数据互导,过程中会生成一个mapreduce任务,也就说是sqoop是基于hadoop的.淘宝也有自己的数据互导工具,叫datax,它跟sqoop实现原理不同,没有基于hadoop.
 
sqoop目前还没有很好的支持不同的导数据业务,比如数据库字段名称不能与java关键字同名、row-key不支持整型导入等,所以还需要对sqoop进行扩展,使其能很好地支持我们的业务。今天实现了row-key支持整形导入
 
继承PutTransformer类
public class ToIntPutTransformer extends PutTransformer {

  public static final Log LOG = LogFactory.getLog(
      ToIntPutTransformer.class.getName());

  // A mapping from field name -> bytes for that field name.
  // Used to cache serialization work done for fields names.
  private Map<String, byte[]> serializedFieldNames;

  public ToIntPutTransformer() {
    serializedFieldNames = new TreeMap<String, byte[]>();
  }

  /**
   * Return the serialized bytes for a field name, using
   * the cache if it's already in there.
   */
  private byte [] getFieldNameBytes(String fieldName) {
    byte [] cachedName = serializedFieldNames.get(fieldName);
    if (null != cachedName) {
      // Cache hit. We're done.
      return cachedName;
    }

    // Do the serialization and memoize the result.
    byte [] nameBytes = Bytes.toBytes(fieldName);
    serializedFieldNames.put(fieldName, nameBytes);
    return nameBytes;
  }

  @Override
  /** {@inheritDoc} */
  public List<Put> getPutCommand(Map<String, Object> fields)
      throws IOException {

    String rowKeyCol = getRowKeyColumn();
    String colFamily = getColumnFamily();
    byte [] colFamilyBytes = Bytes.toBytes(colFamily);

    Object rowKey = fields.get(rowKeyCol);
    if (null == rowKey) {
      // If the row-key column is null, we don't insert this row.
      LOG.warn("Could not insert row with null value for row-key column: "
          + rowKeyCol);
      return null;
    }

    //rowKey转换为整形,指定的rowKey对应的数据库字段要是整型,否则报错
    Put put = new Put(Bytes.toBytes(Integer.parseInt(rowKey.toString())));

    for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) {
      String colName = fieldEntry.getKey();
      if (!colName.equals(rowKeyCol)) {
        // This is a regular field, not the row key.
        // Add it if it's not null.
        Object val = fieldEntry.getValue();
        if (null != val) {
          put.add(colFamilyBytes, getFieldNameBytes(colName),
              Bytes.toBytes(val.toString()));
        }
      }
    }

    return Collections.singletonList(put);
  }
 编译打包,放到lib目录下.
配置conf/sqoop-site.xml文件,增加
<property>
    <name>sqoop.hbase.insert.put.transformer.class</name>
    <value>org.apache.sqoop.hbase.ToIntPutTransformer</value>
</property>
 完成.
分享到:
评论

相关推荐

    Sqoop集群搭建.

    4. 良好的扩展性和可靠性。 Sqoop 集群搭建是指在 Hadoop 集群环境中安装和配置 Sqoop,以实现数据的高效转换。Sqoop 的主要应用场景是数据转换,包括从结构化数据存储到 Hadoop 集群的数据转换,以及从 Hadoop ...

    sqoop2的安装包

    它是Apache Sqoop项目的第二代版本,旨在提供更高级的功能和更好的可扩展性,以支持大数据环境中的复杂数据导入导出任务。在这个“sqoop2的安装包”中,包含的文件是`sqoop-1.99.7-bin-hadoop200`,这表明我们处理的...

    Sqoop-linux.zip

    Hadoop 2.0 引入了 YARN(Yet Another Resource Negotiator),以改进集群资源管理,增强了可扩展性和容错性。此版本的 Sqoop 可能不包含某些较新的特性或优化。 - **sqoop-1.4.7.bin__hadoop-2.6.0**: 相对更新的 ...

    sqoop-1.4.6.tar.gz

    6. **连接器支持**:Sqoop 支持多种常见的 RDBMS,如 Oracle、MySQL、PostgreSQL 等,并且可以通过扩展开发新的连接器来支持更多数据库。 7. **安全性**:在导入导出过程中,Sqoop 支持 Kerberos 认证,确保数据...

    sqoop-1.4.4-cdh5.0.6.tar

    - 增强的连接器:提供了更多的数据库连接器,扩展了对不同数据库系统的支持。 - 集成了 Avro 和 Parquet:支持将数据导出为 Avro 和 Parquet 格式,这两种格式在大数据领域具有高效的存储和查询能力。 3. **CDH ...

    sqoop需要导入的包

    其次,`json-to-sqoop.jar` 是一个扩展 Sqoop 功能的第三方库,它可能包含用于将 JSON 数据转换为 Sqoop 可以理解的格式的功能。JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,常用于 Web 应用...

    sqoop-1.4.7.bin__hadoop-2.6.0.tar

    在 Hadoop 生态系统中,Sqoop 提供了一种高效、可扩展的方式,用于将大量结构化数据导入到 Hadoop 分布式文件系统(HDFS)中,或者将数据导出回传统的关系型数据库。这使得 Hadoop 能够处理来自企业级数据库的数据,...

    sqoop.1.99.5

    Sqoop是Apache Hadoop生态中的一个工具,主要用于在...总的来说,Sqoop 1.99.5作为版本升级,旨在提供更好的性能、扩展性和用户体验。为了充分利用这些改进,建议对新特性进行深入研究,并根据实际情况调整工作流程。

    Atlas2.3.0依赖: org.restlet/sqoop-1.4.6.2.3.99.0-195

    Sqoop负责数据的导入和导出,Restlet则提供了一种灵活的方式来暴露和交互这些数据,而Servlet扩展确保了这些服务能够适应企业级的部署环境。这样的组合使得Apache Atlas能够在一个统一的框架下,实现数据的全生命...

    sqoop2-1.99.5-cdh5.6.0.tar.gz

    Sqoop2相对于初代的Sqoop,在功能和架构上进行了扩展和优化,提供了更丰富的功能和更好的可扩展性。以下是一些关于Sqoop2的关键知识点: 1. **服务化架构**:与 Sqoop1 不同,Sqoop2 引入了服务化概念,它包含一个...

    sqoop 使用手册

    四、Sqoop 的扩展和集成 1. Sqoop2:提供了更强大的管理和监控功能,支持多用户同时操作。 2. Sqoop-Connector:允许与其他数据存储系统(如 NoSQL 数据库)进行交互。 3. 集成工具:可以与 Hive、Pig、HBase 等 ...

    sqoop-1.4.6

    此外,Sqoop支持多种数据库类型,并且可以通过连接管理器扩展以支持更多。它还允许用户通过命令行参数或配置文件进行复杂的数据导入导出设置,比如分片导入(split-by)、并行度控制(mappers数量)以及错误处理策略...

    sqoop-1.3.0-cdh3u4.tar.gz

    6. **src** 目录:源代码目录,对于开发者来说,可以查看和理解 Sqoop 的工作原理,甚至进行定制和扩展。 7. **README.txt** 或类似文件:提供关于 Sqoop 安装、配置和使用的基本指导。 使用 Sqoop 的主要知识点有...

    sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz

    同时, Sqoop 具有良好的可扩展性,可以通过编写自定义的Java插件来实现特定的数据转换和处理逻辑。 总之,Sqoop 1.4.6 是大数据环境中连接Hadoop与传统数据库的重要桥梁,它的功能强大且易于使用。通过熟练掌握 ...

    sqoop1.99.0

    1.99.0 版本可能增加了新的连接器,扩展了支持的数据库类型。 6. **Sqoop 整合与 MapReduce** - Sqoop 利用 MapReduce 框架实现大规模并行数据导入导出,通过配置可以调整并行度,以提高数据传输效率。 7. **...

    sqoop-1.4.6.bin__hadoop-0.23.tar.gz

    YARN 是 Hadoop 2.x 的核心组件,负责集群资源管理和任务调度,提高了系统的灵活性和可扩展性。 解压 sqoop-1.4.6.bin__hadoop-0.23.tar.gz 文件后,你会得到 Sqoop 的可执行文件、配置文件、文档和其他相关资源。...

    sqoop导入数据到hdfs路径

    #### 四、高级用法与扩展 - **增量导入**:Sqoop支持增量导入功能,可以在首次全量导入后仅导入自上次导入以来新增或修改的数据。 - **并行处理**:通过增加`--num-mappers`参数的值来提高数据导入的速度。 - **...

    java大数据案例_7Flume、Kafka、Sqoop、Lucene

    ### Java大数据案例详解:Flume、Kafka、Sqoop、Lucene #### 一、Flume日志收集至HDFS 在大数据处理流程中,数据的收集与传输是非常关键的一环。Apache Flume是一个分布式的、可靠的、高可用的系统,用于有效地...

    java连接sqoop源码-sqoop-example:sqoop-示例

    我只是扩展了 JobStorage 类并使用 slick db 与 MySql 数据库交互来存储 Sqoop 通常会存储在嵌入式 HSQLDb 中的作业。 通过使用共享元存储,我们可以拥有 Sqoop2,但通过使用 Sqoop 1,我们可以获得以下好处: Sqoop...

    Oracle与HDFS的桥梁_Sqoop

    总的来说,Sqoop 是大数据环境中不可或缺的工具,它使得 RDBMS 数据可以无缝地融入到 Hadoop 生态系统中,极大地扩展了大数据分析的可能性。通过熟练掌握 Sqoop 的使用,用户可以高效地管理、导入和导出大规模数据,...

Global site tag (gtag.js) - Google Analytics