一、背景说明
HBase是一个分布式的、面向列的开源NoSQL数据库,不同于传统关系型数据库,它在大数据量级下的性能表现堪称卓越。最近项目也在探索往Hbase方向迁移,故首先整理了一份Hbase入库效率方面的数据。
Hbase入库手段有三种,但针对项目实际情况,我采用了其中两种(JavaAPI和MapReduce)来进行入库操作,并进行比较。
二、测试环境
三台主机:一台master:192.168.13.74,两台slave(192.168.13.75/192.168.13.76)
Hadoop:Hadoop 2.6.0-cdh5.4.0
Hbase:HBase 1.0.0-cdh5.4.0
三、JavaAPI方式进行入库操作
1、新建java测试工程,新建测试类
2、导入相关jar包
3、新建测试类,通过HBase的API初始化连接
public static Configuration configuration; private static Admin admin = null; private static Random random = null;//生成主键使用 private static Connection connection = null; static { try { configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "192.168.13.74"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); random = new Random(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
4、增删改查操作
/** * 创建表 * * @param tableName */ public static void createTable(String tableName) { System.out.println("start create table ......"); TableName tn = TableName.valueOf(tableName); try { if (admin.tableExists(tn)) { admin.disableTable(tn); admin.deleteTable(tn); System.out.println(tableName + " is exist,detele...."); } HTableDescriptor hTableDescriptor = new HTableDescriptor(tn); hTableDescriptor.addFamily(new HColumnDescriptor("column1")); hTableDescriptor.addFamily(new HColumnDescriptor("column2")); hTableDescriptor.addFamily(new HColumnDescriptor("column3")); admin.createTable(hTableDescriptor); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("end create table ......"); } /** * 插入数据 * * @param tableName */ public static void insertData(String tableName) { // System.out.println("start insert data ......"); Table table = null; TableName tn = TableName.valueOf(tableName); try { table = connection.getTable(tn); // System.out.println("init insert data ......"); Put put = new Put(String.valueOf(random.nextLong()).getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值 put.addColumn("column1".getBytes(), null, "ddd".getBytes());// 本行数据的第一列 put.addColumn("column2".getBytes(), null, "bbb".getBytes());// 本行数据的第三列 put.addColumn("column3".getBytes(), null, "ccc".getBytes());// 本行数据的第三列 // System.out.println("insert data ......"); table.put(put); // System.out.println("insert data over......"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { try { if (table != null) table.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } // System.out.println("end insert data ......"); } /** * 删除一张表 * * @param tableName */ public static void dropTable(String tableName) { try { TableName tn = TableName.valueOf(tableName); admin.disableTable(tn); admin.deleteTable(tn); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 根据 rowkey删除一条记录 * * @param tablename * @param rowkey */ public static void deleteRow(String tablename, String rowkey) { Table table = null; TableName tn = TableName.valueOf(tablename); try { table = connection.getTable(tn); List list = new ArrayList(); Delete d1 = new Delete(rowkey.getBytes()); list.add(d1); table.delete(list); System.out.println("删除行成功!"); } catch (IOException e) { e.printStackTrace(); } finally { try { if (table != null) table.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** * 组合条件删除 * * @param tablename * @param rowkey */ public static void deleteByCondition(String tablename, String rowkey) { // 目前还没有发现有效的API能够实现 根据非rowkey的条件删除 这个功能能,还有清空表全部数据的API操作 } /** * 查询所有数据 * * @param tableName */ public static void QueryAll(String tableName) { Table table = null; TableName tn = TableName.valueOf(tableName); try { table = connection.getTable(tn); ResultScanner rs = table.getScanner(new Scan()); for (Result r : rs) { System.out.println("获得到rowkey:" + new String(r.getRow())); for (Cell cell : r.rawCells()) { System.out.println("列:" + new String(cell.getFamilyArray()) + "====值:" + new String(cell.getValueArray())); } } } catch (IOException e) { e.printStackTrace(); } finally { try { if (table != null) table.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** * 单条件查询,根据rowkey查询唯一一条记录 * * @param tableName */ public static void QueryByCondition1(String tableName) { Table table = null; TableName tn = TableName.valueOf(tableName); try { table = connection.getTable(tn); Get scan = new Get("112233bbbcccc".getBytes());// 根据rowkey查询 Result r = table.get(scan); System.out.println("获得到rowkey:" + new String(r.getRow())); for (Cell cell : r.rawCells()) { System.out.println("列:" + new String(cell.getFamilyArray()) + "====值:" + new String(cell.getValueArray())); } } catch (IOException e) { e.printStackTrace(); } finally { try { if (table != null) table.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** * 单条件按查询,查询多条记录 * * @param tableName */ public static void QueryByCondition2(String tableName) { Table table = null; TableName tn = TableName.valueOf(tableName); try { table = connection.getTable(tn); Filter filter = new SingleColumnValueFilter( Bytes.toBytes("column1"), null, CompareOp.EQUAL, Bytes.toBytes("ddd")); // 当列column1的值为ddd时进行查询 Scan s = new Scan(); s.setFilter(filter); ResultScanner rs = table.getScanner(s); for (Result r : rs) { System.out.println("获得到rowkey:" + new String(r.getRow())); for (Cell cell : r.rawCells()) { System.out.println("列:" + new String(cell.getFamilyArray()) + "====值:" + new String(cell.getValueArray())); } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (table != null) table.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** * 组合条件查询 * * @param tableName */ public static void QueryByCondition3(String tableName) { Table table = null; TableName tn = TableName.valueOf(tableName); try { table = connection.getTable(tn); List<Filter> filters = new ArrayList<Filter>(); Filter filter1 = new SingleColumnValueFilter( Bytes.toBytes("column1"), null, CompareOp.EQUAL, Bytes.toBytes("aaa")); filters.add(filter1); Filter filter2 = new SingleColumnValueFilter( Bytes.toBytes("column2"), null, CompareOp.EQUAL, Bytes.toBytes("bbb")); filters.add(filter2); Filter filter3 = new SingleColumnValueFilter( Bytes.toBytes("column3"), null, CompareOp.EQUAL, Bytes.toBytes("ccc")); filters.add(filter3); FilterList filterList1 = new FilterList(filters); Scan scan = new Scan(); scan.setFilter(filterList1); ResultScanner rs = table.getScanner(scan); for (Result r : rs) { System.out.println("获得到rowkey:" + new String(r.getRow())); for (Cell cell : r.rawCells()) { System.out.println("列:" + new String(cell.getFamilyArray()) + "====值:" + new String(cell.getValueArray())); } } rs.close(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (table != null) table.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
其中,组合条件的删除操作,暂时没有相关API支持
5、以上是基于单线程的操作,通过Thread可以实现多线程并发操作
public static class ImportThread extends Thread { public void HandleThread() { // this.TableName = "T_TEST_1"; } // public void run() { try { InsertProcess("test"); } catch (IOException e) { e.printStackTrace(); } finally { System.gc(); } } } /* * 多线程环境下线程插入函数 */ public static void InsertProcess(String tableName) throws IOException { // System.out.println("start insert data ......"); Table table = null; TableName tn = TableName.valueOf(tableName); int count = 15000; long start = System.currentTimeMillis(); try { table = connection.getTable(tn); List<Put> list = new ArrayList<Put>(); Put put = null; for(int i=0;i<count;i++) { // System.out.println("init insert data ......"); put = new Put(String.valueOf(random.nextLong()).getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值 put.addColumn("column1".getBytes(), null, "ddd".getBytes());// 本行数据的第一列 put.addColumn("column2".getBytes(), null, "bbb".getBytes());// 本行数据的第三列 put.addColumn("column3".getBytes(), null, "ccc".getBytes());// 本行数据的第三列 // System.out.println("insert data ......"); list.add(put); } table.put(list); long stop = System.currentTimeMillis(); System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s"); // System.out.println("insert data over......"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { try { if (table != null) table.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /* * Mutil thread insert test */ public static void MultThreadInsert() throws InterruptedException { System.out.println("---------开始MultThreadInsert测试----------"); long start = System.currentTimeMillis(); int threadNumber = 5; Thread[] threads = new Thread[threadNumber]; for (int i = 0; i < threads.length; i++) { threads[i] = new ImportThread(); threads[i].start(); } for (int j = 0; j < threads.length; j++) { (threads[j]).join(); } long stop = System.currentTimeMillis(); System.out.println("MultThreadInsert:" + threadNumber * 10000 + "共耗时:" + (stop - start) * 1.0 / 1000 + "s"); System.out.println("---------结束MultThreadInsert测试----------"); }
6、基于以上程序,我们可以针对不同数量级和并发任务数的组合,来进行相关测试工作,测试结果如下:
从测试结果可以看出,JavaAPI方式调用的情况下,单线程入库速度为2000条/s~7000条/s之间,而在多线程并发状态下,最高速度能达到10900条/s,稍优于Mysql单节点的入库速度。但小数量级的入库速度,要慢于Mysql。波动幅度比较大。
注:在windows上用eclipse远程访问HDFS时,需要配置hosts文件,把HDFS所有主机的主机名与IP对应关系配置好,否则集群在通信时找不到主机:
192.168.13.74 traceMaster
192.168.13.75 traceSlave1
192.168.13.76 traceSlave2
下一节,我们再来尝试用MapReduce的方式来入库,看看效率是否能进一步提升
相关推荐
HTable.put() 方法是一种常用的入库方式,可以将数据直接写入到 Hbase 中。这种方式的优点是简单易用,缺点是数据量较大时,写入速度较慢。 多线程入库 多线程入库是指同时使用多个线程将数据写入到 Hbase 中。...
类比于传统型数据库里的一些查询方式,本文对Hbase的存储原理进行了研究,借助分布式计算框架Mapreduce在Hbase上构建了二级索引,就可以对表进行有针对性的定位和高效率的查找,同时也减轻zookeeper服务对资源调度的压力...
本研究文档题为《基于HBase的遥感数据分布式存储与查询方法研究》,它针对遥感数据处理中的关键问题,如单节点故障、扩展性低和处理效率低,提出了一种新的数据分布式存储与查询方案。以下是对文档内容的详细解析和...
Phoenix是一个基于HBase的SQL引擎,支持SQL查询、salted tables和secondary indexes。 三、架构设计 基于HBase实时数仓架构主要包括三层:数据仓库、数据处理和数据服务。 1. 数据仓库:包括模型建设、数据校验和...
本文介绍了MyHBase数据库的设计与实现,它是基于HBase的新SQL数据库存储引擎。HBase是一个开源的非关系型数据库,建立在Hadoop之上,是Apache Software Foundation的一部分,HBase使用了Hadoop的HDFS作为其文件存储...
hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架。 针对HBase各版本API(1.x~2.x)间的差异,在其上剥离出了一层统一的抽象。并提供了以类SQL的方式来读写HBase表中的数据。对...
基于HBase的分布式空间数据库技术是针对大型地理信息系统(GIS)中海量空间数据存储与高效查询的问题提出的一种解决方案。随着地理信息系统技术的迅速发展,数据的时间和空间精度不断提高,同时,GIS系统面临着数据...
本篇论文介绍了一种名为HOS的分布式存储系统的设计与实现,该系统基于HBase构建。随着大数据时代的到来,数据量的增长速度前所未有,为了有效存储和管理这些数据,迫切需要创新的存储方案。HBase作为一个分布式、列...
【描述】:本演讲主要介绍了上海久耶供应链在大数据平台中基于HBase实现的实时数仓的实践与探索,涵盖了从第一代离线数仓到第二代实时数仓的转变,以及业务场景、开发流程、集群调优监控等方面的内容,并分享了两个...
"基于HBase的大数据解决方案" HBase是一个基于分布式文件系统的NoSQL数据库,专门为大数据应用设计,具有高可扩展性、可靠性、性能强等特点。在大数据领域,HBase是一个非常重要的解决方案,广泛应用于电商、新闻、...
文章接着提出了一个基于HBase的矢量空间数据存储模型,该模型包括数据准备、数据存储和数据检索三个部分。在数据准备阶段,需要对原始的空间矢量数据进行预处理,包括数据格式转换、坐标变换等,以满足HBase存储模型...
本文将深入探讨一个基于Hbase的海量视频存储简单模拟项目,旨在利用Hadoop和Hbase这两个强大的开源工具来解决这个问题。 首先,我们要理解Hadoop和Hbase的角色。Hadoop是分布式计算框架,其核心组件包括HDFS...
Nutch抓取指定网址数据,存储在HBase数据库中,存储过程由zookeeper管理。脚本调用索引器部件将数据索引化,经过索引化的数据被前端检索查询,最后前端展示查询结果,用户点击结果列表查看目标资料。
6. 基于HBase的实时数仓架构设计:文中介绍了数据仓库模型的三个层次,第一层为基础表,第二层为事实表和维度表,第三层为领域表。此外,还涉及了数据校验环节,即数据量的比对工作,确保数据的准确性和完整性。 7....
Lily HBase Indexer 是一款灵活的、可扩展的、高容错的、事务性的,并且近实时的处理 HBase 列索引数据的分布式服务软件。它是 NGDATA 公司开发的 Lily 系统的一部分,已开放 源代码。Lily HBase Indexer 使用 ...