`
刘小小尘
  • 浏览: 67572 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

存入hbase的方法

 
阅读更多

1、通过mapreduce的方式存入hbase,只有map,其实reduce阶段也是一样的

代码如下:

import java.io.IOException; 

import org.apache.commons.logging.Log; 

import org.apache.commons.logging.LogFactory; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.conf.Configured; 

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.client.HTable; 

import org.apache.hadoop.hbase.client.Put; 

import org.apache.hadoop.hbase.util.Bytes; 

import org.apache.hadoop.io.LongWritable; 

import org.apache.hadoop.io.NullWritable; 

import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.Job; 

import org.apache.hadoop.mapreduce.Mapper; 

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 

import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 

import org.apache.hadoop.util.GenericOptionsParser; 

import org.apache.hadoop.util.Tool; 

import org.apache.hadoop.util.ToolRunner; 

public class HBaseImport extends Configured implements Tool{ 

static final Log LOG = LogFactory.getLog(HBaseImport.class); 

public static final String JOBNAME = "MRImport "; 

public static class Map extends Mapper<LongWritable , Text, NullWritable, NullWritable>{ 

Configuration configuration = null; 

HTable xTable = null; 

private boolean wal = true; 

static long count = 0; 

@Override 

protected void cleanup(Context context) throws IOException, 

InterruptedException { 

// TODO Auto-generated method stub 

super.cleanup(context); 

xTable.flushCommits(); 

xTable.close(); 

} 

@Override 

protected void map(LongWritable key, Text value, Context context) 

throws IOException, InterruptedException { 

String all[] = value.toString().split("/t"); 

If(all.length==2){ 

put = new Put(Bytes.toBytes(all[0]))); put.add(Bytes.toBytes("xxx"),Bytes.toBytes("20110313"),Bytes.toBytes(all[1])); 

} 

if (!wal) { 

put.setWriteToWAL(false); 

} 

xTable.put(put); 

if ((++count % 100)==0) { 

context.setStatus(count +" DOCUMENTS done!"); 

context.progress(); 

System.out.println(count +" DOCUMENTS done!"); 

} 

} 

@Override 

protected void setup(Context context) throws IOException, 

InterruptedException { 

// TODO Auto-generated method stub 

super.setup(context); 

configuration = context.getConfiguration(); 

xTable = new HTable(configuration,"testKang"); 

xTable.setAutoFlush(false); 

xTable.setWriteBufferSize(12*1024*1024); 

wal = true; 

} 

} 

@Override 

public int run(String[] args) throws Exception { 

String input = args[0]; 

Configuration conf = HBaseConfiguration.create(getConf()); 

conf.set("hbase.master", "m0:60000"); 

Job job = new Job(conf,JOBNAME); 

job.setJarByClass(HBaseImport.class); 

job.setMapperClass(Map.class); 

job.setNumReduceTasks(0); 

job.setInputFormatClass(TextInputFormat.class); 

TextInputFormat.setInputPaths(job, input); 

job.setOutputFormatClass(NullOutputFormat.class); 

return job.waitForCompletion(true)?0:1; 

} 

public static void main(String[] args) throws IOException { 

Configuration conf = new Configuration(); 

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 

int res = 1; 

try { 

res = ToolRunner.run(conf, new HBaseImport (), otherArgs); 

} catch (Exception e) { 

e.printStackTrace(); 

} 

System.exit(res); 

} 

} 

2、通过Java程序入库
Java多线程读取本地磁盘上的文件,以HTable.put(put)的方式完成数据写入

import java.io.BufferedReader; 

import java.io.File; 

import java.io.FileReader; 

import java.io.IOException; 

import java.util.ArrayList; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.client.HTable; 

import org.apache.hadoop.hbase.client.Put; 

public class InsertContactJava { 

public static long startTime; 

public static long rowkey = 0; //起始rowkey 

public static final int lineCount = 100000; //每次提交时录入的行数 

public static String tableName = "usercontact_kang"; //录入目的表名 

public static int countLie = 8; //表的列数 

public static void main(String[] args) throws IOException { 

startTime = System.currentTimeMillis() / 1000; 

System.out.println("start time = " + startTime); 

Thread t1 = new Thread() { 

@Override 

public void run() { 

try { 

insert_one("/run/jar/123"); 

//loadByLieWithVector("/run/jar/123"); 

//loadByLieWithArrayList("/run/jar/123"); 

} catch (IOException e) { 

e.printStackTrace(); 

} 

} 

}; 

t1.start(); 

} 

public static void insert_one(String path) throws IOException { 

Configuration conf = HBaseConfiguration.create(); 

HTable table = new HTable(conf, tableName); 

File f = new File(path); 

ArrayList<Put> list = new ArrayList<Put>(); 

BufferedReader br = new BufferedReader(new FileReader(f)); 

String tmp = br.readLine(); 

int count = 0; 

while (tmp != null) { 

if (list.size() > 10000) { 

table.put(list); 

table.flushCommits(); 

list.clear(); 

} else { 

String arr_value[] = tmp.toString().split("/t", 10); 

String first[] = arr_value[0].split("~", 5); 

String second[] = arr_value[1].split("~", 5); 

String rowname = getIncreasRowKey(); 

String firstaccount = first[0]; 

String firstprotocolid = first[1]; 

String firstdomain = first[2]; 

String inserttime = Utils.getToday("yyyyMMdd"); 

String secondaccount = second[0]; 

String secondprotocolid = second[1]; 

String seconddomain = second[2]; 

String timescount = Integer.valueOf(arr_value[2]).toString(); 

Put p = new Put(rowname.getBytes()); 

p.add(("ucvalue").getBytes(), "FIRSTACCOUNT".getBytes(), 

firstaccount.getBytes()); 

p.add(("ucvalue").getBytes(), "FIRSTDOMAIN".getBytes(), 

firstdomain.getBytes()); 

p.add(("ucvalue").getBytes(), "FIRSTPROTOCOLID".getBytes(), 

firstprotocolid.getBytes()); 

p.add(("ucvalue").getBytes(), "INSERTTIME".getBytes(), 

inserttime.getBytes()); 

p.add(("ucvalue").getBytes(), "SECONDACCOUNT".getBytes(), 

secondaccount.getBytes()); 

p.add(("ucvalue").getBytes(), "SECONDDOMAIN".getBytes(), 

seconddomain.getBytes()); 

p.add(("ucvalue").getBytes(), "SECONDPROTOCOLID".getBytes(), 

secondprotocolid.getBytes()); 

p.add(("ucvalue").getBytes(), "TIMESCOUNT".getBytes(), 

timescount.getBytes()); 

list.add(p); 

} 

tmp = br.readLine(); 

count++; 

} 

if (list.size() > 0) { 

table.put(list); 

table.flushCommits(); 

} 

table.close(); 

System.out.println("total = " + count); 

long endTime = System.currentTimeMillis() / 1000; 

long costTime = endTime - startTime; 

System.out.println("end time = " + endTime); 

System.out.println(path + ": cost time = " + costTime); 

} 

两种方式的优劣比较

MapReduce方式:

开始会很快,但是由于mr和hbase竞争资源,到一个特定的时间点会变很慢

Java程序方式:

多客户端,多线程同时入库,目前看来是最好的方式,client和regionserver分开,硬盘读写分开,瓶颈只在网络和内存上。咨询了一些牛人,大多推荐这种方式,并且一定要多客户端,多线程。


分享到:
评论

相关推荐

    大数据清洗,存入Hbase.zip

    大数据清洗,存入HbasePyspark_ETL大数据清洗,存入Hbase

    spark streamming消费kafka数据存入hbase示例代码

    然后,通过 `put` 方法指定行键、列族和值,将数据插入到指定的表和列中。 此外,Maven 是一个 Java 项目管理工具,用于构建、依赖管理和项目信息管理。在这个示例中,Maven 用于管理项目的依赖关系,如 Spark、...

    spark读取hbase数据,并使用spark sql保存到mysql

    使用spark读取hbase中的数据,并插入到mysql中

    mapreduce方式入库hbase hive hdfs

    mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载

    SparkStreaming_HBase:将从Kafka收集过来的数据保存到HBase中

    SparkStreaming_HBase将从Kafka收集过来的数据保存到HBase中数据来源:日志生成器。 编写一个python工程,用于产生行为日志,每运行一次,产生所设定的数量数据,使用Linux的定时器,每隔60s执行一次,行为日志保存...

    hbase-1.3.1-src.tar.gz

    HBase,全称为Apache HBase,是一个基于Google Bigtable理念设计的开源分布式数据库。这个数据库系统专门为处理海量数据而设计...对于开发者来说,了解和掌握HBase的原理和使用方法,是迈进大数据领域的关键步骤之一。

    大数据技术应用(一) 应用Flume+HBase采集和存储日志数据

    5. **数据分析**: 一旦数据存入HBase,就可以利用HBase的查询能力进行实时分析,或者与其他大数据处理框架如Hadoop MapReduce、Spark相结合,进行更复杂的数据挖掘和分析。 在实际项目中,Flume和HBase的结合使用能...

    HBase-简介-(来自京东)

    7. 与Hadoop无缝集成:分析结果可以直接写入HBase,存入HBase的数据也可直接用于Hadoop分析。 【HBase 与 RDBMS 对比】 与传统的关系型数据库(RDBMS)相比,HBase在数据类型、数据操作、存储模式和可伸缩性等方面...

    HBase完整学习笔记

    HBase的写入流程如下:首先,更新数据会被记录在预写日志(WAL)中,然后存入内存的memstore。当memstore达到最大值,数据会以HFile形式写入HDFS。WAL在服务器崩溃时用于恢复未写入磁盘的数据,确保数据的一致性。 ...

    hbase-0.90.2中创建表、插入数据,更新数据,删除数据

    假设有一个不知道是干什么表:) 表里需要存入人员和其相对应的部门信息 HBaseAdmin admin = new HBaseAdmin(configuration); List&lt;Put&gt; putuser = new ArrayList();

    MapReduce输出至hbase共16页.pdf.zip

    当MapReduce需要将处理结果存入HBase时,这种结合便能发挥出强大的效能。 一、MapReduce简介 MapReduce是由Google提出的分布式计算模型,主要由Map(映射)和Reduce(规约)两部分组成。Map阶段将输入数据拆分成...

    HADOOP+HBASE+HIVE整合工程和文档

    1. **数据采集与预处理**:首先,数据可能来自各种源头,如日志文件、传感器数据等,需要通过Flume、Sqoop等工具进行采集和预处理,然后存入HDFS。 2. **HBase配置与数据加载**:设置HBase的集群环境,包括Master、...

    冠字号查询系统数据

    2. **数据预处理**:清洗、格式化收集到的数据,准备存入HBase。 3. **数据导入**:使用HBase的批处理工具(如Hadoop MapReduce)将数据批量导入到HBase表中。 4. **查询服务**:构建RESTful API或Web应用,为用户...

    Spark-hbase实战.zip

    本实战案例将展示如何利用Spark接收来自Kafka的数据,解析后存入HBase实例,而不是采用传统的Kafka API和HBase API,而是通过Hadoop的方式来实现这一过程。 首先,我们需要理解Spark的核心组件。Spark主要由Spark ...

    毕业设计 基于Hbase的Bigtable系统的研究与实践

    主要是自己大学时候的毕业设计,关于Hbase下用聚类算法写的一个搜索工具,实现了将文本存入数据库,然后进行搜索的算法。其中包括了word毕业设计文档,还有答辩的ppt,还有在linux平台下的java源码,希望对这方面有...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    项目流程大致如下:日志数据由各个服务发送到Kafka主题,Spark Streaming消费这些数据,进行实时处理,如统计访问频率、异常检测等,然后将结果存入HBase。用户可以实时查询HBase,获取分析结果,以便于监控系统状态...

    4_HBase.docx

    - **写入操作**:数据写入时先存入MemStore,当达到一定条件时触发刷盘操作,数据写入StoreFile。 - **读取操作**:客户端首先从Zookeeper中找到-.META.表的位置,然后根据表名和RowKey定位到具体的RegionServer。 ...

Global site tag (gtag.js) - Google Analytics