`
lingqi1818
  • 浏览: 254573 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hbase海量数据的全量导入方法

阅读更多
最近有个需求要对mysql的全量数据迁移到hbase,虽然hbase的设计非常利于高效的读取,但是它的compaction实现对海量数据写入造成非常大的影响,数据到一定量之后,就开始抽风。
分析hbase的实现,不管其运行的机制,其最终存储结构为分布式文件系统中的hfile格式。
刚好hbase的源代码中提供一个HFileOutputFormat类,分析其源代码可以看到:
/**
 * Copyright 2009 The Apache Software Foundation
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.mortbay.log.Log;

/**
 * Writes HFiles. Passed KeyValues must arrive in order.
 * Currently, can only write files to a single column family at a
 * time.  Multiple column families requires coordinating keys cross family.
 * Writes current time as the sequence id for the file. Sets the major compacted
 * attribute on created hfiles.
 * @see KeyValueSortReducer
 */
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
  public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(TaskAttemptContext context)
  throws IOException, InterruptedException {
    // Get the path of the temporary output file 
    final Path outputPath = FileOutputFormat.getOutputPath(context);
    final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
    Configuration conf = context.getConfiguration();
    final FileSystem fs = outputdir.getFileSystem(conf);
    // These configs. are from hbase-*.xml
    final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
    final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536);
    // Invented config.  Add to hbase-*.xml if other than default compression.
    final String compression = conf.get("hfile.compression",
      Compression.Algorithm.NONE.getName());

    return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
      // Map of families to writers and how much has been output on the writer.
      private final Map<byte [], WriterLength> writers =
        new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
      private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
      private final byte [] now = Bytes.toBytes(System.currentTimeMillis());

      public void write(ImmutableBytesWritable row, KeyValue kv)
      throws IOException {
        long length = kv.getLength();
        byte [] family = kv.getFamily();
        WriterLength wl = this.writers.get(family);
        if (wl == null || ((length + wl.written) >= maxsize) &&
            Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
              kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
          // Get a new writer.
          Path basedir = new Path(outputdir, Bytes.toString(family));
          if (wl == null) {
            wl = new WriterLength();
            this.writers.put(family, wl);
            if (this.writers.size() > 1) throw new IOException("One family only");
            // If wl == null, first file in family.  Ensure family dir exits.
            if (!fs.exists(basedir)) fs.mkdirs(basedir);
          }
          wl.writer = getNewWriter(wl.writer, basedir);
          Log.info("Writer=" + wl.writer.getPath() +
            ((wl.written == 0)? "": ", wrote=" + wl.written));
          wl.written = 0;
        }
        kv.updateLatestStamp(this.now);
        wl.writer.append(kv);
        wl.written += length;
        // Copy the row so we know when a row transition.
        this.previousRow = kv.getRow();
      }

      /* Create a new HFile.Writer. Close current if there is one.
       * @param writer
       * @param familydir
       * @return A new HFile.Writer.
       * @throws IOException
       */
      private HFile.Writer getNewWriter(final HFile.Writer writer,
          final Path familydir)
      throws IOException {
        close(writer);
        return new HFile.Writer(fs,  StoreFile.getUniqueFile(fs, familydir),
          blocksize, compression, KeyValue.KEY_COMPARATOR);
      }

      private void close(final HFile.Writer w) throws IOException {
        if (w != null) {
          StoreFile.appendMetadata(w, System.currentTimeMillis(), true);
          w.close();
        }
      }

      public void close(TaskAttemptContext c)
      throws IOException, InterruptedException {
        for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
          close(e.getValue().writer);
        }
      }
    };
  }

  /*
   * Data structure to hold a Writer and amount of data written on it. 
   */
  static class WriterLength {
    long written = 0;
    HFile.Writer writer = null;
  }
}


可以看到,它的工作流程就是首先根据你的配置文件初始化,然后写成hfile的格式。
这里我做了个偷懒的demo:
HFileOutputFormat hf = new HFileOutputFormat();
        HBaseConfiguration conf = new HBaseConfiguration();
        conf.addResource(new Path("/home/performance/softs/hadoop/conf/core-site.xml"));
        conf.set("mapred.output.dir", "/tmp");
        conf.set("hfile.compression", Compression.Algorithm.LZO.getName());
        TaskAttemptContext context = new TaskAttemptContext(conf, new TaskAttemptID());
        RecordWriter writer = hf.getRecordWriter(context);
        KeyValue kv = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:action"),
                                   System.currentTimeMillis(), Bytes.toBytes("test"));
        KeyValue kv1 = new KeyValue(Bytes.toBytes("1111111111111"), Bytes.toBytes("offer:id"),
                                    System.currentTimeMillis(), Bytes.toBytes("123"));
        KeyValue kv3 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:action"),
                                    System.currentTimeMillis(), Bytes.toBytes("test"));
        KeyValue kv4 = new KeyValue(Bytes.toBytes("1111111111112"), Bytes.toBytes("offer:id"),
                                    System.currentTimeMillis(), Bytes.toBytes("123"));
        writer.write(null, kv);
        writer.write(null, kv1);
        writer.write(null, kv3);
        writer.write(null, kv4);
        writer.close(context);

执行然之后,会在hdfs的/tmp目录下生成一份文件。注意批量写数据的时候一定要保证key的有序性
这个时候,hbase自己提供的一个基于jruby的loadtable.rb脚本就可以发挥作用了。
它的格式是loadtable.rb 你希望的表明 hdfs路径:
hbase org.jruby.Main loadtable.rb offer hdfs://user/root/importoffer/_temporary/_attempt__0000_r_000000_0/
执行完之后:
运行./hbase shell
>list
就会显示刚才导入的offer表了。
分享到:
评论
3 楼 Angel_Night 2012-03-26  
解决了我的难题啊

十分感谢
2 楼 依然仰望天空 2011-11-04  
很不错,学习
1 楼 pengpeng 2011-02-18  
牛 ~~    学习了

相关推荐

    基于HBase的海量数据查询

    本文当是一个基于HBase的海量数据的实实时查询系统的原理分析。详细的介绍了大数据查询的原理。

    HBase海量数据存储实战视频教程

    从HBase的集群搭建、HBaseshell操作、java编程、架构、原理、涉及的数据结构,并且结合陌陌海量消息存储案例来讲解实战HBase 课程亮点 1,知识体系完备,从小白到大神各阶段读者均能学有所获。 2,生动形象,化繁为...

    elasticsearch+hbase海量数据查询

    elasticsearch+hbase海量数据查询,支持千万数据秒回查询 一、ElasticSearch和Hbase ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch...

    HBase存储海量图片

    海量图片的存储是通过HBase实现的,HBase是一种面向列的NoSQL数据库,特别适合存储海量数据。 一、直接上传本地栅格数据将导致的问题 栅格数据的特点是每层的图片个数都为上层数量的四倍。在第20层时,仅仅第20层...

    Hive数据导入HBase的方法.docx

    Hive 数据导入 HBase 的方法 Hive 是一个基于 Hadoop 的数据仓库工具,而 HBase 是一个基于 Hadoop 的 NoSQL 数据库。它们都是大数据处理的重要组件。在数据处理过程中,经常需要将数据从 Hive 导入到 HBase 中。...

    基于Apache HBase的CSV数据批量导入与操作工具.zip

    本项目是一个基于Apache HBase的工具,旨在从CSV文件中批量导入数据到HBase数据库,并提供基本的数据操作示例。HBase是一个分布式、可扩展的大数据存储系统,适用于处理海量数据。本项目利用HBase的强大功能,实现了...

    java解决hive快速导数据到Hbase代码

    总之,Java在Hive和HBase的数据交互中起到桥梁作用,通过精心设计的数据处理流程和合理的利用HBase的Bulk Load特性,可以高效地将Hive中的大量数据导入到HBase,满足实时查询的需求。在大数据场景下,这种方案具有很...

    hbase备份和数据恢复

    在大数据领域,HBase是一个基于Hadoop的分布式数据库,它为海量结构化和半结构化数据提供了高可靠性、高性能的存储方案。HBase备份和数据恢复是系统运维中至关重要的一环,确保了业务连续性和数据安全性。同时,...

    将hdfs上的文件导入hbase的源代码

    7. **监控和验证**:导入完成后,通过HBase的监控工具或自定义脚本检查导入结果,确保数据正确无误地导入到HBase。 在提供的"ImpDataToHbase"源代码中,我们可以看到这些步骤的具体实现。源代码可能包括了数据...

    hbase数据可视化系统

    《HBase数据可视化系统构建详解》 在大数据领域,HBase作为一款分布式列式数据库,因其高并发、低...在实际应用中,可以根据需求进一步扩展功能,例如支持更复杂的查询条件、数据导出导入等,以满足不同场景的需求。

    基于MapReduce和HBase的海量网络数据处理.pdf

    基于MapReduce和HBase的海量网络数据处理 大数据时代,网络数据的处理和分析变得越来越重要。传统的网络数据处理模式已经无法满足大数据量的需求,需要寻求更高效的网络数据计算模式和数据存储模式。基于MapReduce...

    MySQL数据库同redis以及hbase高速全量,增量同步工具.zip

    MySQL数据库同redis以及hbase高速全量,增量同步工具我的巴士实现MySQL数据库到Redis,以及HBASE的全量,以及增量同步支持通过正则表达式指定需要导出的db以及表总线程序无状态,每一行都有自己的位置点,位置点信息...

    基于Hadoop和HBase的大规模海量数据去重.zip

    在大数据处理领域,基于Hadoop和HBase的大规模海量数据去重是一个常见的需求。Hadoop是Apache开源项目,提供了一个分布式文件系统(HDFS)和MapReduce计算框架,旨在处理和存储海量数据。HBase是建立在Hadoop之上的...

    基于HBase的海量数据分布式序列存储策略优化.pdf

    本文提出并实现了一套基于HBase的海量数据分布式序列存储方案,旨在优化存储性能,并提高系统的扩展性。 要理解本文提出的优化策略,首先需要了解HBase的基本概念和数据模型。HBase的表可以看作一个多维映射表,...

    spark读取hbase数据,并使用spark sql保存到mysql

    接下来,创建一个 DataFrame 代表 MySQL 表结构,然后使用 `saveAsTable` 方法将数据写入: ```scala import org.apache.spark.sql.jdbc.JdbcDialects$ val jdbcUrl = s"$url?useSSL=false&serverTimezone=UTC" ...

    基于HBase的海量GIS数据分布式处理实践.pdf

    文章通过在HBase集群环境下使用真实GIS数据验证了上述方法,实验结果显示,提出的系统在海量数据存储和检索方面性能卓越,能够实现地理信息数据的高效存储和实时高速检索。 文章中的关键词包括大数据、HBase、栅格...

    关系型数据库的数据导入Hbase

    本篇文章将详细介绍如何将关系型数据库的数据导入到Hbase中,包括离线和实时两种方式。 1. 离线数据导入: 离线数据导入通常在系统低峰期进行,适用于大量数据迁移。常见的工具包括Apache Nifi、Sqoop和Hadoop ...

Global site tag (gtag.js) - Google Analytics