问题导读
1.你认为hbase入库有几种方式?
2.能否想到其中一种的实现方式?
3.根据想的内容,对比下面,说出自己的想法?
1. 预先生成HFile入库
这个地址有详细的说明http://www.aboutyun.com/thread-8603-1-1.html
2. 通过MapReduce入库
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBaseImport extends Configured implements Tool{
static final Log LOG = LogFactory.getLog(HBaseImport.class);
public static final String JOBNAME = "MRImport ";
public static class Map extends Mapper{
Configuration configuration = null;
HTable xTable = null;
private boolean wal = true;
static long count = 0;
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
xTable.flushCommits();
xTable.close();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String all[] = value.toString().split("/t");
If(all.length==2){
put = new Put(Bytes.toBytes(all[0]))); put.add(Bytes.toBytes("xxx"),Bytes.toBytes("20110313"),Bytes.toBytes(all[1]));
}
if (!wal) {
put.setWriteToWAL(false);
}
xTable.put(put);
if ((++count % 100)==0) {
context.setStatus(count +" DOCUMENTS done!");
context.progress();
System.out.println(count +" DOCUMENTS done!");
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
configuration = context.getConfiguration();
xTable = new HTable(configuration,"testKang");
xTable.setAutoFlush(false);
xTable.setWriteBufferSize(12*1024*1024);
wal = true;
}
}
@Override
public int run(String[] args) throws Exception {
String input = args[0];
Configuration conf = HBaseConfiguration.create(getConf());
conf.set("hbase.master", "m0:60000");
Job job = new Job(conf,JOBNAME);
job.setJarByClass(HBaseImport.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, input);
job.setOutputFormatClass(NullOutputFormat.class);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
int res = 1;
try {
res = ToolRunner.run(conf, new HBaseImport (), otherArgs);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(res);
}
}
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBaseImport extends Configured implements Tool{
static final Log LOG = LogFactory.getLog(HBaseImport.class);
public static final String JOBNAME = "MRImport ";
public static class Map extends Mapper{
Configuration configuration = null;
HTable xTable = null;
private boolean wal = true;
static long count = 0;
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
xTable.flushCommits();
xTable.close();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String all[] = value.toString().split("/t");
If(all.length==2){
put = new Put(Bytes.toBytes(all[0]))); put.add(Bytes.toBytes("xxx"),Bytes.toBytes("20110313"),Bytes.toBytes(all[1]));
}
if (!wal) {
put.setWriteToWAL(false);
}
xTable.put(put);
if ((++count % 100)==0) {
context.setStatus(count +" DOCUMENTS done!");
context.progress();
System.out.println(count +" DOCUMENTS done!");
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
configuration = context.getConfiguration();
xTable = new HTable(configuration,"testKang");
xTable.setAutoFlush(false);
xTable.setWriteBufferSize(12*1024*1024);
wal = true;
}
}
@Override
public int run(String[] args) throws Exception {
String input = args[0];
Configuration conf = HBaseConfiguration.create(getConf());
conf.set("hbase.master", "m0:60000");
Job job = new Job(conf,JOBNAME);
job.setJarByClass(HBaseImport.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, input);
job.setOutputFormatClass(NullOutputFormat.class);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
int res = 1;
try {
res = ToolRunner.run(conf, new HBaseImport (), otherArgs);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(res);
}
}
3. 通过Java程序入库
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
public class InsertContactJava {
public static long startTime;
public static long rowkey = 0; //起始rowkey
public static final int lineCount = 100000; //每次提交时录入的行数
public static String tableName = "usercontact_kang"; //录入目的表名
public static int countLie = 8; //表的列数
public static void main(String[] args) throws IOException {
startTime = System.currentTimeMillis() / 1000;
System.out.println("start time = " + startTime);
Thread t1 = new Thread() {
@Override
public void run() {
try {
insert_one("/run/jar/123");
//loadByLieWithVector("/run/jar/123");
//loadByLieWithArrayList("/run/jar/123");
} catch (IOException e) {
e.printStackTrace();
}
}
};
t1.start();
}
public static void insert_one(String path) throws IOException {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, tableName);
File f = new File(path);
ArrayList list = new ArrayList();
BufferedReader br = new BufferedReader(new FileReader(f));
String tmp = br.readLine();
int count = 0;
while (tmp != null) {
if (list.size() > 10000) {
table.put(list);
table.flushCommits();
list.clear();
} else {
String arr_value[] = tmp.toString().split("/t", 10);
String first[] = arr_value[0].split("~", 5);
String second[] = arr_value[1].split("~", 5);
String rowname = getIncreasRowKey();
String firstaccount = first[0];
String firstprotocolid = first[1];
String firstdomain = first[2];
String inserttime = Utils.getToday("yyyyMMdd");
String secondaccount = second[0];
String secondprotocolid = second[1];
String seconddomain = second[2];
String timescount = Integer.valueOf(arr_value[2]).toString();
Put p = new Put(rowname.getBytes());
p.add(("ucvalue").getBytes(), "FIRSTACCOUNT".getBytes(),
firstaccount.getBytes());
p.add(("ucvalue").getBytes(), "FIRSTDOMAIN".getBytes(),
firstdomain.getBytes());
p.add(("ucvalue").getBytes(), "FIRSTPROTOCOLID".getBytes(),
firstprotocolid.getBytes());
p.add(("ucvalue").getBytes(), "INSERTTIME".getBytes(),
inserttime.getBytes());
p.add(("ucvalue").getBytes(), "SECONDACCOUNT".getBytes(),
secondaccount.getBytes());
p.add(("ucvalue").getBytes(), "SECONDDOMAIN".getBytes(),
seconddomain.getBytes());
p.add(("ucvalue").getBytes(), "SECONDPROTOCOLID".getBytes(),
secondprotocolid.getBytes());
p.add(("ucvalue").getBytes(), "TIMESCOUNT".getBytes(),
timescount.getBytes());
list.add(p);
}
tmp = br.readLine();
count++;
}
if (list.size() > 0) {
table.put(list);
table.flushCommits();
}
table.close();
System.out.println("total = " + count);
long endTime = System.currentTimeMillis() / 1000;
long costTime = endTime - startTime;
System.out.println("end time = " + endTime);
System.out.println(path + ": cost time = " + costTime);
}
4. 入库方式比较
Ø 生成HFile方式:
生成HFile的过程比较慢,生成HFile后写入hbase非常快,基本上就是hdfs上的mv过程.对于生成HFile方式入库的时候有一个改进的方案,就是先对数据排序,然后生成HFile。
HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作,最慢的时候这种操作会耗时1小时。
Ø MapReduce方式:
开始会很快,但是由于mr和hbase竞争资源,到一个特定的时间点会变很慢
Ø Java程序方式:
多客户端,多线程同时入库,目前看来是最好的方式,client和regionserver分开,硬盘读写分开,瓶颈只在网络和内存上
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
public class InsertContactJava {
public static long startTime;
public static long rowkey = 0; //起始rowkey
public static final int lineCount = 100000; //每次提交时录入的行数
public static String tableName = "usercontact_kang"; //录入目的表名
public static int countLie = 8; //表的列数
public static void main(String[] args) throws IOException {
startTime = System.currentTimeMillis() / 1000;
System.out.println("start time = " + startTime);
Thread t1 = new Thread() {
@Override
public void run() {
try {
insert_one("/run/jar/123");
//loadByLieWithVector("/run/jar/123");
//loadByLieWithArrayList("/run/jar/123");
} catch (IOException e) {
e.printStackTrace();
}
}
};
t1.start();
}
public static void insert_one(String path) throws IOException {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, tableName);
File f = new File(path);
ArrayList list = new ArrayList();
BufferedReader br = new BufferedReader(new FileReader(f));
String tmp = br.readLine();
int count = 0;
while (tmp != null) {
if (list.size() > 10000) {
table.put(list);
table.flushCommits();
list.clear();
} else {
String arr_value[] = tmp.toString().split("/t", 10);
String first[] = arr_value[0].split("~", 5);
String second[] = arr_value[1].split("~", 5);
String rowname = getIncreasRowKey();
String firstaccount = first[0];
String firstprotocolid = first[1];
String firstdomain = first[2];
String inserttime = Utils.getToday("yyyyMMdd");
String secondaccount = second[0];
String secondprotocolid = second[1];
String seconddomain = second[2];
String timescount = Integer.valueOf(arr_value[2]).toString();
Put p = new Put(rowname.getBytes());
p.add(("ucvalue").getBytes(), "FIRSTACCOUNT".getBytes(),
firstaccount.getBytes());
p.add(("ucvalue").getBytes(), "FIRSTDOMAIN".getBytes(),
firstdomain.getBytes());
p.add(("ucvalue").getBytes(), "FIRSTPROTOCOLID".getBytes(),
firstprotocolid.getBytes());
p.add(("ucvalue").getBytes(), "INSERTTIME".getBytes(),
inserttime.getBytes());
p.add(("ucvalue").getBytes(), "SECONDACCOUNT".getBytes(),
secondaccount.getBytes());
p.add(("ucvalue").getBytes(), "SECONDDOMAIN".getBytes(),
seconddomain.getBytes());
p.add(("ucvalue").getBytes(), "SECONDPROTOCOLID".getBytes(),
secondprotocolid.getBytes());
p.add(("ucvalue").getBytes(), "TIMESCOUNT".getBytes(),
timescount.getBytes());
list.add(p);
}
tmp = br.readLine();
count++;
}
if (list.size() > 0) {
table.put(list);
table.flushCommits();
}
table.close();
System.out.println("total = " + count);
long endTime = System.currentTimeMillis() / 1000;
long costTime = endTime - startTime;
System.out.println("end time = " + endTime);
System.out.println(path + ": cost time = " + costTime);
}
4. 入库方式比较
Ø 生成HFile方式:
生成HFile的过程比较慢,生成HFile后写入hbase非常快,基本上就是hdfs上的mv过程.对于生成HFile方式入库的时候有一个改进的方案,就是先对数据排序,然后生成HFile。
HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作,最慢的时候这种操作会耗时1小时。
Ø MapReduce方式:
开始会很快,但是由于mr和hbase竞争资源,到一个特定的时间点会变很慢
Ø Java程序方式:
多客户端,多线程同时入库,目前看来是最好的方式,client和regionserver分开,硬盘读写分开,瓶颈只在网络和内存上
http://blog.sina.com.cn/s/blog_68674da70102vaoi.html
相关推荐
Hbase 入库方式比较 Hbase 入库是指将数据从外部数据源加载到 Hbase 中的过程。Hbase 提供了多种入库方式,每种方式都有其特点和优缺点。本文将对 Hbase 的几种入库方式进行比较,帮助读者选择合适的入库方式。 1....
标题中的“MR程序Bulkload数据到hbase”指的是使用MapReduce(MR)程序批量加载(Bulkload)数据到HBase数据库的过程。MapReduce是Apache Hadoop框架中的一个关键组件,用于处理和生成大规模数据集。而HBase是一个...
mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载
在大数据领域,HBase是一个基于Hadoop的分布式数据库,它为海量结构化和半结构化数据提供了高可靠性、高性能的存储方案。HBase备份和数据恢复是系统运维中至关重要的一环,确保了业务连续性和数据安全性。同时,...
HBase 和 Hadoop 数据块损坏处理 HBase 和 Hadoop 数据块损坏是非常常见的问题,可能会导致数据丢失、集群崩溃等严重后果。因此,了解如何处理 HBase 和 Hadoop 数据块损坏是非常重要的。本文将介绍 HBase 和 ...
《基于Django、LayUI和HBase的文献数据挖掘系统》 在信息化时代,数据挖掘已经成为科研和商业领域的重要工具,它可以帮助我们从海量的数据中发现有价值的信息和模式。本项目“基于Django LayUI HBase的文献数据挖掘...
总之,Java在Hive和HBase的数据交互中起到桥梁作用,通过精心设计的数据处理流程和合理的利用HBase的Bulk Load特性,可以高效地将Hive中的大量数据导入到HBase,满足实时查询的需求。在大数据场景下,这种方案具有很...
在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...
《HBase数据可视化系统构建详解》 在大数据领域,HBase作为一款分布式列式数据库,因其高并发、低延迟和大规模存储的特点,被广泛应用在实时数据处理和分析中。然而,对于非技术人员来说,直接操作HBase命令行进行...
手把手视频详细讲解项目开发全过程,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 ...2. HBase批量装载——Bulk load 3. HBase的协处理器(Coprocessor) 4. HBase事务 5. HBase数据结构
Python基于Django LayUI HBase的文献数据挖掘系统Python基于Django LayUI HBase的文献数据挖掘系统Python基于Django LayUI HBase的文献数据挖掘系统Python基于Django LayUI HBase的文献数据挖掘系统Python基于Django...
HBase是一种分布式、高性能、基于列族的NoSQL数据库,主要设计用于处理大规模数据存储和检索。在HBase中,数据被组织成表格形式,由行键(Row Key)、列族(Column Family)、列(Qualifier)和时间戳(Timestamp)...
本文当是一个基于HBase的海量数据的实实时查询系统的原理分析。详细的介绍了大数据查询的原理。
Kettle是一款开源的数据集成工具,主要用于ETL(Extract-Transform-Load)过程,能够实现数据的抽取、清洗、转换和加载等功能。Kettle支持多种数据源,并且可以通过图形界面设计数据流,非常适合大规模数据处理场景。 ...
在大数据处理领域,HBase作为一个分布式、高性能的列式存储系统,被广泛应用于处理大规模结构化数据。本文将深入探讨如何使用代码实现将CSV(逗号分隔值)数据存储到HBase中,帮助你更好地理解和掌握HBase的用法。 ...
HBase是一种开源的非关系型分布式数据库(NoSQL),它运行在Hadoop文件系统之上,为大数据存储和处理提供了一种可伸缩的、分布式的方式。HBase的数据模型是其核心特性之一,它具有一些独特之处,这使得它在处理大量...
### HBase海量数据全量导入方法详解 在大数据领域,HBase作为一款分布式、...通过深入理解HBase的数据结构和Hadoop生态的集成方式,我们可以更好地设计和实施大数据导入方案,充分发挥HBase在大数据处理领域的优势。
实验结果表明,相较于其他几种存储方案,基于HBase的方案能够有效地提升数据的写入和查询速度,并且具有更好的扩展性。通过对比分析,该方案在处理海量遥感数据时表现出显著的优势。 7. 关键词解析: - 遥感数据...