1、为何要 BulkLoad 导入?传统的 HTableOutputFormat 写 HBase 有什么问题?
我们先看下 HBase 的写流程:
通常 MapReduce 在写HBase时使用的是 TableOutputFormat 方式,在reduce中直接生成put对象写入HBase,该方式在大数据量写入时效率低下(HBase会block写入,频繁进行flush,split,compact等大量IO操作),并对HBase节点的稳定性造成一定的影响(GC时间过长,响应变慢,导致节点超时退出,并引起一系列连锁反应),而HBase支持 bulk load 的入库方式,它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在HDFS中生成持久化的HFile数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载,在大数据量写入时能极大的提高写入效率,并降低对HBase节点的写入压力。
通过使用先生成HFile,然后再BulkLoad到Hbase的方式来替代之前直接调用HTableOutputFormat的方法有如下的好处:
(1)消除了对HBase集群的插入压力
(2)提高了Job的运行速度,降低了Job的执行时间
目前此种方式仅仅适用于只有一个列族的情况,在新版 HBase 中,单列族的限制会消除。
2、bulkload 流程与实践
bulkload 方式需要两个Job配合完成:(1)第一个Job还是运行原来业务处理逻辑,处理的结果不直接调用HTableOutputFormat写入到HBase,而是先写入到HDFS上的一个中间目录下(如 middata)
(2)第二个Job以第一个Job的输出(middata)做为输入,然后将其格式化HBase的底层存储文件HFile
(3)调用BulkLoad将第二个Job生成的HFile导入到对应的HBase表中
下面给出相应的范例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class GeneratePutHFileAndBulkLoadToHBase {
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
private Text wordText= new Text();
private IntWritable one= new IntWritable( 1 );
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String line=value.toString();
String[] wordArray=line.split( " " );
for (String word:wordArray)
{
wordText.set(word);
context.write(wordText, one);
}
}
}
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private IntWritable result= new IntWritable();
protected void reduce(Text key, Iterable<IntWritable> valueList,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum= 0 ;
for (IntWritable value:valueList)
{
sum+=value.get();
}
result.set(sum);
context.write(key, result);
}
}
public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String wordCountStr=value.toString();
String[] wordCountArray=wordCountStr.split( "\t" );
String word=wordCountArray[ 0 ];
int count=Integer.valueOf(wordCountArray[ 1 ]);
//创建HBase中的RowKey
byte [] rowKey=Bytes.toBytes(word);
ImmutableBytesWritable rowKeyWritable= new ImmutableBytesWritable(rowKey);
byte [] family=Bytes.toBytes( "cf" );
byte [] qualifier=Bytes.toBytes( "count" );
byte [] hbaseValue=Bytes.toBytes(count);
// Put 用于列簇下的多列提交,若只有一个列,则可以使用 KeyValue 格式
// KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);
Put put= new Put(rowKey);
put.add(family, qualifier, hbaseValue);
context.write(rowKeyWritable, put);
}
}
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration hadoopConfiguration= new Configuration();
String[] dfsArgs = new GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs();
//第一个Job就是普通MR,输出到指定的目录
Job job= new Job(hadoopConfiguration, "wordCountJob" );
job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase. class );
job.setMapperClass(WordCountMapper. class );
job.setReducerClass(WordCountReducer. class );
job.setOutputKeyClass(Text. class );
job.setOutputValueClass(IntWritable. class );
FileInputFormat.setInputPaths(job, new Path(dfsArgs[ 0 ]));
FileOutputFormat.setOutputPath(job, new Path(dfsArgs[ 1 ]));
//提交第一个Job
int wordCountJobResult=job.waitForCompletion( true )? 0 : 1 ;
//第二个Job以第一个Job的输出做为输入,只需要编写Mapper类,在Mapper类中对一个job的输出进行分析,并转换为HBase需要的KeyValue的方式。
Job convertWordCountJobOutputToHFileJob= new Job(hadoopConfiguration, "wordCount_bulkload" );
convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase. class );
convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper. class );
//ReducerClass 无需指定,框架会自行根据 MapOutputValueClass 来决定是使用 KeyValueSortReducer 还是 PutSortReducer
//convertWordCountJobOutputToHFileJob.setReducerClass(KeyValueSortReducer.class);
convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable. class );
convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put. class );
//以第一个Job的输出做为第二个Job的输入
FileInputFormat.addInputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[ 1 ]));
FileOutputFormat.setOutputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[ 2 ]));
//创建HBase的配置对象
Configuration hbaseConfiguration=HBaseConfiguration.create();
//创建目标表对象
HTable wordCountTable = new HTable(hbaseConfiguration, "word_count" );
HFileOutputFormat.configureIncrementalLoad(convertWordCountJobOutputToHFileJob,wordCountTable);
//提交第二个job
int convertWordCountJobOutputToHFileJobResult=convertWordCountJobOutputToHFileJob.waitForCompletion( true )? 0 : 1 ;
//当第二个job结束之后,调用BulkLoad方式来将MR结果批量入库
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConfiguration);
//第一个参数为第二个Job的输出目录即保存HFile的目录,第二个参数为目标表
loader.doBulkLoad( new Path(dfsArgs[ 2 ]), wordCountTable);
//最后调用System.exit进行退出
System.exit(convertWordCountJobOutputToHFileJobResult);
}
} |
比如原始的输入数据的目录为:/rawdata/test/wordcount/20131212
中间结果数据保存的目录为:/middata/test/wordcount/20131212最终生成的HFile保存的目录为:/resultdata/test/wordcount/20131212
运行上面的Job的方式如下:
hadoop jar test.jar /rawdata/test/wordcount/20131212 /middata/test/wordcount/20131212 /resultdata/test/wordcount/20131212
3、说明与注意事项:
(1)HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作。
(2)最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
否则报这样的错误:
1
2
3
|
java.lang.IllegalArgumentException: Can't read partitions file ... Caused by: java.io.IOException: wrong key class : org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable
|
1
2
3
4
5
6
7
|
if (KeyValue. class .equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer. class );
} else if (Put. class .equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer. class );
} else {
LOG.warn( "Unknown map output value type:" + job.getMapOutputValueClass());
} |
(5) MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。
(6)最后一个 Reduce 没有 setNumReduceTasks 是因为,该设置由框架根据region个数自动配置的。
(7)下边配置部分,注释掉的其实写不写都无所谓,因为看源码就知道configureIncrementalLoad方法已经把固定的配置全配置完了,不固定的部分才需要手动配置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
public class HFileOutput {
//job 配置
public static Job configureJob(Configuration conf) throws IOException {
Job job = new Job(configuration, "countUnite1" );
job.setJarByClass(HFileOutput. class );
//job.setNumReduceTasks(2);
//job.setOutputKeyClass(ImmutableBytesWritable.class);
//job.setOutputValueClass(KeyValue.class);
//job.setOutputFormatClass(HFileOutputFormat.class);
Scan scan = new Scan();
scan.setCaching( 10 );
scan.addFamily(INPUT_FAMILY);
TableMapReduceUtil.initTableMapperJob(inputTable, scan,
HFileOutputMapper. class , ImmutableBytesWritable. class , LongWritable. class , job);
//这里如果不定义reducer部分,会自动识别定义成KeyValueSortReducer.class 和PutSortReducer.class
job.setReducerClass(HFileOutputRedcuer. class );
//job.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.configureIncrementalLoad(job, new HTable(
configuration, outputTable));
HFileOutputFormat.setOutputPath(job, new Path());
//FileOutputFormat.setOutputPath(job, new Path()); //等同上句
return job;
}
public static class HFileOutputMapper extends
TableMapper<ImmutableBytesWritable, LongWritable> {
public void map(ImmutableBytesWritable key, Result values,
Context context) throws IOException, InterruptedException {
//mapper逻辑部分
context.write( new ImmutableBytesWritable(Bytes()), LongWritable());
}
}
public static class HFileOutputRedcuer extends
Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
//reducer逻辑部分
KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[ 1 ].getBytes(),
Bytes.toBytes(count));
context.write(key, kv);
}
}
} |
4、Refer:
1、Hbase几种数据入库(load)方式比较
http://blog.csdn.net/kirayuan/article/details/6371635
2、MapReduce生成HFile入库到HBase及源码分析
http://blog.pureisle.net/archives/1950.html
3、MapReduce生成HFile入库到HBase
相关推荐
标题中的“MR程序Bulkload数据到hbase”指的是使用MapReduce(MR)程序批量加载(Bulkload)数据到HBase数据库的过程。MapReduce是Apache Hadoop框架中的一个关键组件,用于处理和生成大规模数据集。而HBase是一个...
本文将详细讨论如何使用Java编程语言实现从Hive到HBase的快速数据导入方案。 首先,Hive是一个基于Hadoop的数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,适合大规模数据的离线...
在大数据存储和管理领域,HBase是使用HDFS作为其底层存储系统的NoSQL数据库,广泛应用于需要快速随机访问、海量数据存储的场景中。然而,随着数据量和访问量的增加,如何对HBase的写性能进行优化成为一个重要的议题...
Spark 通过对 HBase 的 bulkLoad 实现快速写入,可以减少写入时间,提高数据写入效率。 HBase 的批量写入 HBase 的批量写入是指使用 bulkLoad 将大量数据写入到 HBase 中。这种方法可以减少写入时间,提高数据...
在大数据处理领域,Apache HBase是一个分布式的、版本化的NoSQL数据库,它构建于Hadoop之上,特别适合处理海量结构化数据。这篇博客“Hbase调用Java API实现批量导入操作”聚焦于如何利用Java编程语言高效地向HBase...
本文将深入探讨HBase性能优化的各种策略,旨在帮助你提升数据操作的效率。 一、硬件优化 1. **磁盘选择**:HBase对I/O性能要求较高,因此推荐使用SSD硬盘,以减少读写延迟。 2. **内存配置**:合理分配HBase的堆...
Hbase 入库是指将数据从外部数据源加载到 Hbase 中的过程。Hbase 提供了多种入库方式,每种方式都有其特点和优缺点。本文将对 Hbase 的几种入库方式进行比较,帮助读者选择合适的入库方式。 1. 预先生成 HFile 入库...
3. 查询功能实现:根据RowKey查询数据是HBase的基本操作,通过输入RowKey,后台执行get操作获取对应行数据,并展示在页面上。 4. 表管理:支持HBase的建表和删除操作,这需要调用HBase的Admin API,完成表的创建、...
手把手视频详细讲解项目开发全过程,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 ...2. HBase批量装载——Bulk load 3. HBase的协处理器(Coprocessor) 4. HBase事务 5. HBase数据结构
在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...
HBase是构建在Hadoop之上的NoSQL数据库,它以行键、列族、列和时间戳的形式存储数据,提供实时读写操作。HBase的数据模型非常适合处理大规模稀疏数据集。 在将CSV数据导入HBase之前,我们通常需要进行预处理,这...
HBase是Apache软件基金会旗下的一个开源分布式NoSQL数据库,它使用了Google的Bigtable模型,并运行在Hadoop文件系统(HDFS)之上。HBase具有良好的扩展性、水平扩展和高性能等特点,特别适合处理大量的稀疏数据。...
合理地进行优化配置,需要深入理解HBase的工作机制和数据存储模型,同时结合实际应用场景和业务需求,对各个参数进行微调,以达到最优的读取性能。在生产环境中,这种优化往往需要反复测试和调整,以实现系统性能的...
Hbase有着先天的优势和先天的劣势,而劣势就是其较差的数据定位能力,也就是数据查询能力。因为面向列的特点,Hbase只能单单地以rowkey为主键作查询,而无法对表进行多维查询和join操作,并且查询通常都是全表扫描,耗费...
HBase则是在Hadoop之上的NoSQL数据库,它提供实时读写、高并发能力,适用于结构化和半结构化的数据存储。 在搭建Hadoop 2.7.1集群时,你需要按照以下步骤操作: 1. 安装Java环境:Hadoop和HBase都需要Java运行环境...
在HBase性能优化的过程中,表设计和...综上所述,HBase的性能优化是一个涉及表设计、RowKey策略、内存管理、读写优化等多个层面的综合过程,需要根据业务特性和硬件资源灵活调整,以实现最佳的数据存储和处理性能。
此外,HBase还提供了批量操作的工具,如HBase的`BulkLoad`功能,它可以将预先格式化的数据文件直接加载到HFile中,进一步提升写入速度。这个过程通常包括数据预处理、生成SequenceFile、上传到HDFS以及执行`...
这种方法适用于数据量大的情况(大于 4TB),通过 Hive 将数据转换为 HFile,然后使用 bulkload 将数据导入到 HBase 中。 首先,需要将 Hive 数据转换为 HFile: CREATE TABLE hbase_hfile_table(key int, name ...