hadoop mr 输出需要导入hbase的话最好先输出成HFile格式, 再导入到HBase,因为HFile是HBase的内部存储格式, 所以导入效率很高,下面是一个示例
1. 创建HBase表t1
- hbase(main):157:0* create 't1','f1'
- 0 row(s) in 1.3280 seconds
- hbase(main):158:0> scan 't1'
- ROW COLUMN+CELL
- 0 row(s) in 1.2770 seconds
2.写MR作业
HBaseHFileMapper.java
- package com.test.hfile;
- import java.io.IOException;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- public class HBaseHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> {
- private ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
- @Override
- protected void map(LongWritable key, Text value,
- org.apache.hadoop.mapreduce.Mapper.Context context)
- throws IOException, InterruptedException {
- immutableBytesWritable.set(Bytes.toBytes(key.get()));
- context.write(immutableBytesWritable, value);
- }
- }
HBaseHFileReducer.java
- package com.test.hfile;
- import java.io.IOException;
- import org.apache.hadoop.hbase.KeyValue;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- public class HBaseHFileReducer extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
- protected void reduce(ImmutableBytesWritable key, Iterable<Text> values,
- Context context)
- throws IOException, InterruptedException {
- String value="";
- while(values.iterator().hasNext())
- {
- value = values.iterator().next().toString();
- if(value != null && !"".equals(value))
- {
- KeyValue kv = createKeyValue(value.toString());
- if(kv!=null)
- context.write(key, kv);
- }
- }
-
}
// str格式为row:family:qualifier:value 简单模拟下 - private KeyValue createKeyValue(String str)
- {
- String[] strstrs = str.split(":");
- if(strs.length<4)
- return null;
- String row=strs[0];
- String family=strs[1];
- String qualifier=strs[2];
- String value=strs[3];
- return new KeyValue(Bytes.toBytes(row),Bytes.toBytes(family),Bytes.toBytes(qualifier),System.currentTimeMillis(), Bytes.toBytes(value));
- }
- }
HbaseHFileDriver.java
- package com.test.hfile;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- public class HbaseHFileDriver {
- public static void main(String[] args) throws IOException,
- InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- Job job = new Job(conf, "testhbasehfile");
- job.setJarByClass(HbaseHFileDriver.class);
- job.setMapperClass(com.test.hfile.HBaseHFileMapper.class);
- job.setReducerClass(com.test.hfile.HBaseHFileReducer.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(Text.class);
- // 偷懒, 直接写死在程序里了,实际应用中不能这样, 应从命令行获取
- FileInputFormat.addInputPath(job, new Path("/home/yinjie/input"));
- FileOutputFormat.setOutputPath(job, new Path("/home/yinjie/output"));
- Configuration HBASE_CONFIG = new Configuration();
- HBASE_CONFIG.set("hbase.zookeeper.quorum", "localhost");
- HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
- HBaseConfiguration cfg = new HBaseConfiguration(HBASE_CONFIG);
- String tableName = "t1";
- HTable htable = new HTable(cfg, tableName);
- HFileOutputFormat.configureIncrementalLoad(job, htable);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
/home/yinjie/input目录下有一个hbasedata.txt文件,内容为
- [root@localhost input]# cat hbasedata.txt
- r1:f1:c1:value1
- r2:f1:c2:value2
- r3:f1:c3:value3
将作业打包,我的到处路径为/home/yinjie/job/hbasetest.jar
提交作业到hadoop运行:
- [root@localhost job]# hadoop jar /home/yinjie/job/hbasetest.jar com.test.hfile.HbaseHFileDriver -libjars /home/yinjie/hbase-0.90.3/hbase-0.90.3.jar
作业运行完毕后查看下输出目录:
- [root@localhost input]# hadoop fs -ls /home/yinjie/output
- Found 2 items
- drwxr-xr-x - root supergroup 0 2011-08-28 21:02 /home/yinjie/output/_logs
- drwxr-xr-x - root supergroup 0 2011-08-28 21:03 /home/yinjie/output/f1
OK, 已经生成以列族f1命名的文件夹了。
接下去使用Bulk Load将数据导入到HBbase
- [root@localhost job]# hadoop jar /home/yinjie/hbase-0.90.3/hbase-0.90.3.jar completebulkload /home/yinjie/output t1
导入完毕,查询hbase表t1进行验证
- hbase(main):166:0> scan 't1'
- ROW COLUMN+CELL
- r1 column=f1:c1, timestamp=1314591150788, value=value1
- r2 column=f1:c2, timestamp=1314591150814, value=value2
- r3 column=f1:c3, timestamp=1314591150815, value=value3
- 3 row(s) in 0.0210 seconds
数据已经导入!
出自:http://yaoyinjie.blog.51cto.com/3189782/652244
相关推荐
2. **使用HFileOutputFormat**:HBase提供了HFileOutputFormat类,可以将数据直接写入HFile格式,跳过HBase的内部流程,从而提高数据导入效率。这种方法适用于离线大批量数据导入场景。 3. **并行导入**:利用...
而HBase是构建在Hadoop文件系统(HDFS)之上,提供高可靠、高性能、列式存储、支持多版本、实时读写的分布式数据库,适用于大数据实时查询场景。 Java在大数据生态中扮演着连接不同组件的重要角色,它提供了丰富的...
4. **HFileOutputFormat**:使用HFileOutputFormat替换默认的MapReduce输出格式。这个输出格式会将Reducer的输出(即Put对象)写入到临时目录,形成HFile格式的文件。 5. **设置配置**:配置MapReduce作业时,需要...
4. **HFileOutputFormat**:如果需要将MapReduce的结果写回HBase,可以使用HFileOutputFormat。这个格式会将输出结果先写入HDFS上的HFile,然后通过HBase的bulk load功能加载到表中,以提高写入效率。 5. **Java...
本文将深入探讨如何使用Java API实现HBase的Bulk Load,以及与之相关的`mirrorm5y`工具类。 首先,我们需要理解HBase的基本架构。HBase是一个分布式的、面向列的NoSQL数据库,基于Google的Bigtable设计。它存储大量...
hbase批量加载 从RCFile进行HBase批量加载的临时代码 这将使用LoadIncrementalFiles从HFileOutputFormat2中Mapreduce写入的数据中加载HBase表。
1. 优化HFileOutputFormat:在HFileOutputFormat3中引入HBASE-12596特性,确保在生成HFile时,会将数据副本写入对应RegionServer的Datanode。通过修改代码,获取Region所在机器信息并在写入数据时创建副本,逐步提高...
这通常通过`HFileOutputFormat`的`completeBulkLoad()`方法实现,它会将HFile从临时位置移动到正确的位置,并触发一个合并操作,以便HBase可以立即看到新的数据。 5. **执行加载**:最后,通过调用HBase的`Admin`...