`
ganliang13
  • 浏览: 253301 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HBase 之HFileOutputFormat

阅读更多

 hadoop mr 输出需要导入hbase的话最好先输出成HFile格式, 再导入到HBase,因为HFile是HBase的内部存储格式, 所以导入效率很高,下面是一个示例
1. 创建HBase表t1

  1. hbase(main):157:0* create 't1','f1' 
  2. 0 row(s) in 1.3280 seconds 
  3.  
  4. hbase(main):158:0> scan 't1' 
  5. ROW                   COLUMN+CELL                                                
  6. 0 row(s) in 1.2770 seconds 

 

2.写MR作业
HBaseHFileMapper.java

 

  1. package com.test.hfile; 
  2. import java.io.IOException; 
  3. import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
  4. import org.apache.hadoop.hbase.util.Bytes; 
  5. import org.apache.hadoop.io.LongWritable; 
  6. import org.apache.hadoop.io.Text; 
  7. import org.apache.hadoop.mapreduce.Mapper; 
  8.  
  9. public class HBaseHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> { 
  10.     private ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(); 
  11.     @Override 
  12.     protected void map(LongWritable key, Text value, 
  13.             org.apache.hadoop.mapreduce.Mapper.Context context) 
  14.             throws IOException, InterruptedException { 
  15.         immutableBytesWritable.set(Bytes.toBytes(key.get())); 
  16.         context.write(immutableBytesWritable, value); 
  17.     } 

 

HBaseHFileReducer.java

 

  1. package com.test.hfile; 
  2. import java.io.IOException; 
  3. import org.apache.hadoop.hbase.KeyValue; 
  4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
  5. import org.apache.hadoop.hbase.util.Bytes; 
  6. import org.apache.hadoop.io.Text; 
  7. import org.apache.hadoop.mapreduce.Reducer; 
  8.  
  9. public class HBaseHFileReducer extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {     
  10.     protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, 
  11.             Context context) 
  12.             throws IOException, InterruptedException { 
  13.         String value=""
  14.         while(values.iterator().hasNext()) 
  15.         { 
  16.             value = values.iterator().next().toString(); 
  17.             if(value != null && !"".equals(value)) 
  18.             { 
  19.                 KeyValue kv = createKeyValue(value.toString()); 
  20.                 if(kv!=null) 
  21.                     context.write(key, kv); 
  22.             } 
  23.         } 
  24.     } 
    // str格式为
    row:family:qualifier:value 简单模拟下
  25.     private KeyValue createKeyValue(String str) 
  26.     { 
  27.         String[] strstrs = str.split(":"); 
  28.         if(strs.length<4
  29.             return null; 
  30.         String row=strs[0]; 
  31.         String family=strs[1]; 
  32.         String qualifier=strs[2]; 
  33.         String value=strs[3]; 
  34.         return new KeyValue(Bytes.toBytes(row),Bytes.toBytes(family),Bytes.toBytes(qualifier),System.currentTimeMillis(), Bytes.toBytes(value)); 
  35.     } 

 

HbaseHFileDriver.java

 

  1. package com.test.hfile; 
  2. import java.io.IOException; 
  3. import org.apache.hadoop.conf.Configuration; 
  4. import org.apache.hadoop.fs.Path; 
  5. import org.apache.hadoop.hbase.HBaseConfiguration; 
  6. import org.apache.hadoop.hbase.client.HTable; 
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
  8. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; 
  9. import org.apache.hadoop.io.Text; 
  10. import org.apache.hadoop.mapreduce.Job; 
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
  13. import org.apache.hadoop.util.GenericOptionsParser; 
  14.  
  15. public class HbaseHFileDriver { 
  16.     public static void main(String[] args) throws IOException, 
  17.             InterruptedException, ClassNotFoundException { 
  18.          
  19.         Configuration conf = new Configuration(); 
  20.         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
  21.  
  22.         Job job = new Job(conf, "testhbasehfile"); 
  23.         job.setJarByClass(HbaseHFileDriver.class); 
  24.  
  25.         job.setMapperClass(com.test.hfile.HBaseHFileMapper.class); 
  26.         job.setReducerClass(com.test.hfile.HBaseHFileReducer.class); 
  27.  
  28.         job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
  29.         job.setMapOutputValueClass(Text.class); 

  30.   // 偷懒, 直接写死在程序里了,实际应用中不能这样, 应从命令行获取
  31.         FileInputFormat.addInputPath(job, new Path("/home/yinjie/input")); 
  32.         FileOutputFormat.setOutputPath(job, new Path("/home/yinjie/output")); 
  33.  
  34.         Configuration HBASE_CONFIG = new Configuration(); 
  35.         HBASE_CONFIG.set("hbase.zookeeper.quorum", "localhost"); 
  36.         HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181"); 
  37.         HBaseConfiguration cfg = new HBaseConfiguration(HBASE_CONFIG); 
  38.         String tableName = "t1"
  39.         HTable htable = new HTable(cfg, tableName); 
  40.         HFileOutputFormat.configureIncrementalLoad(job, htable); 
  41.  
  42.         System.exit(job.waitForCompletion(true) ? 0 : 1); 
  43.     } 

 

/home/yinjie/input目录下有一个hbasedata.txt文件,内容为

 

  1. [root@localhost input]# cat hbasedata.txt  
  2. r1:f1:c1:value1 
  3. r2:f1:c2:value2 
  4. r3:f1:c3:value3 

 

将作业打包,我的到处路径为/home/yinjie/job/hbasetest.jar
提交作业到hadoop运行:

 

  1. [root@localhost job]# hadoop jar /home/yinjie/job/hbasetest.jar com.test.hfile.HbaseHFileDriver -libjars /home/yinjie/hbase-0.90.3/hbase-0.90.3.jar 

 

作业运行完毕后查看下输出目录:

 

  1. [root@localhost input]# hadoop fs -ls /home/yinjie/output 
  2. Found 2 items 
  3. drwxr-xr-x   - root supergroup          0 2011-08-28 21:02 /home/yinjie/output/_logs 
  4. drwxr-xr-x   - root supergroup          0 2011-08-28 21:03 /home/yinjie/output/f1 

 

OK, 已经生成以列族f1命名的文件夹了。
接下去使用Bulk Load将数据导入到HBbase

 

  1. [root@localhost job]# hadoop jar /home/yinjie/hbase-0.90.3/hbase-0.90.3.jar completebulkload /home/yinjie/output t1 

 

导入完毕,查询hbase表t1进行验证

 

  1. hbase(main):166:0> scan 't1' 
  2. ROW                              COLUMN+CELL                                                                                  
  3.  r1                              column=f1:c1, timestamp=1314591150788value=value1                                          
  4.  r2                              column=f1:c2, timestamp=1314591150814value=value2                                          
  5.  r3                              column=f1:c3, timestamp=1314591150815value=value3                                          
  6. 3 row(s) in 0.0210 seconds 

数据已经导入!

 

出自:http://yaoyinjie.blog.51cto.com/3189782/652244

分享到:
评论

相关推荐

    hbase海量数据的全量导入方法

    2. **使用HFileOutputFormat**:HBase提供了HFileOutputFormat类,可以将数据直接写入HFile格式,跳过HBase的内部流程,从而提高数据导入效率。这种方法适用于离线大批量数据导入场景。 3. **并行导入**:利用...

    java解决hive快速导数据到Hbase代码

    而HBase是构建在Hadoop文件系统(HDFS)之上,提供高可靠、高性能、列式存储、支持多版本、实时读写的分布式数据库,适用于大数据实时查询场景。 Java在大数据生态中扮演着连接不同组件的重要角色,它提供了丰富的...

    hadoop mr file2hfile2hbase

    4. **HFileOutputFormat**:使用HFileOutputFormat替换默认的MapReduce输出格式。这个输出格式会将Reducer的输出(即Put对象)写入到临时目录,形成HFile格式的文件。 5. **设置配置**:配置MapReduce作业时,需要...

    Hbase:HBase MapReduce投影

    4. **HFileOutputFormat**:如果需要将MapReduce的结果写回HBase,可以使用HFileOutputFormat。这个格式会将输出结果先写入HDFS上的HFile,然后通过HBase的bulk load功能加载到表中,以提高写入效率。 5. **Java...

    jobs_hbase_mirrorm5y_BulkLoadjava_

    本文将深入探讨如何使用Java API实现HBase的Bulk Load,以及与之相关的`mirrorm5y`工具类。 首先,我们需要理解HBase的基本架构。HBase是一个分布式的、面向列的NoSQL数据库,基于Google的Bigtable设计。它存储大量...

    hbase-bulkload

    hbase批量加载 从RCFile进行HBase批量加载的临时代码 这将使用LoadIncrementalFiles从HFileOutputFormat2中Mapreduce写入的数据中加载HBase表。

    Kylin在贝壳的性能挑战和HBase优化实践

    1. 优化HFileOutputFormat:在HFileOutputFormat3中引入HBASE-12596特性,确保在生成HFile时,会将数据副本写入对应RegionServer的Datanode。通过修改代码,获取Region所在机器信息并在写入数据时创建副本,逐步提高...

    HBaseBulkLoad:使用 MapReduce 作业从文本文件加载 HBase

    这通常通过`HFileOutputFormat`的`completeBulkLoad()`方法实现,它会将HFile从临时位置移动到正确的位置,并触发一个合并操作,以便HBase可以立即看到新的数据。 5. **执行加载**:最后,通过调用HBase的`Admin`...

Global site tag (gtag.js) - Google Analytics