`
chaijuntao
  • 浏览: 24793 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

基于HBase的入库方案效率对比验证(一)

 
阅读更多

一、背景说明

HBase是一个分布式的、面向列的开源NoSQL数据库,不同于传统关系型数据库,它在大数据量级下的性能表现堪称卓越。最近项目也在探索往Hbase方向迁移,故首先整理了一份Hbase入库效率方面的数据。

Hbase入库手段有三种,但针对项目实际情况,我采用了其中两种(JavaAPI和MapReduce)来进行入库操作,并进行比较。

 

二、测试环境

三台主机:一台master:192.168.13.74,两台slave(192.168.13.75/192.168.13.76)

Hadoop:Hadoop 2.6.0-cdh5.4.0

Hbase:HBase 1.0.0-cdh5.4.0

 

三、JavaAPI方式进行入库操作

1、新建java测试工程,新建测试类

2、导入相关jar包

 

3、新建测试类,通过HBase的API初始化连接

    public static Configuration configuration;
    private static Admin admin = null;
    private static Random random = null;//生成主键使用
    private static Connection connection = null;
    static {
        try {
            configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum", "192.168.13.74");
            configuration.set("hbase.zookeeper.property.clientPort", "2181");
            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
            random = new Random();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

 

4、增删改查操作

/**
     * 创建表
     * 
     * @param tableName
     */
    public static void createTable(String tableName) {
        System.out.println("start create table ......");
        TableName tn = TableName.valueOf(tableName);
        try {
            if (admin.tableExists(tn)) {
                admin.disableTable(tn);
                admin.deleteTable(tn);
                System.out.println(tableName + " is exist,detele....");
            }
            HTableDescriptor hTableDescriptor = new HTableDescriptor(tn);
            hTableDescriptor.addFamily(new HColumnDescriptor("column1"));
            hTableDescriptor.addFamily(new HColumnDescriptor("column2"));
            hTableDescriptor.addFamily(new HColumnDescriptor("column3"));
            admin.createTable(hTableDescriptor);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("end create table ......");
    }

    /**
     * 插入数据
     * 
     * @param tableName
     */
    public static void insertData(String tableName) {
        // System.out.println("start insert data ......");
        Table table = null;
        TableName tn = TableName.valueOf(tableName);
        try {
            table = connection.getTable(tn);
            // System.out.println("init insert data ......");
            Put put = new Put(String.valueOf(random.nextLong()).getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值
            put.addColumn("column1".getBytes(), null, "ddd".getBytes());// 本行数据的第一列
            put.addColumn("column2".getBytes(), null, "bbb".getBytes());// 本行数据的第三列
            put.addColumn("column3".getBytes(), null, "ccc".getBytes());// 本行数据的第三列
            // System.out.println("insert data ......");
            table.put(put);
            // System.out.println("insert data over......");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            try {
                if (table != null)
                    table.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        // System.out.println("end insert data ......");
    }

    /**
     * 删除一张表
     * 
     * @param tableName
     */
    public static void dropTable(String tableName) {
        try {
            TableName tn = TableName.valueOf(tableName);
            admin.disableTable(tn);
            admin.deleteTable(tn);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    /**
     * 根据 rowkey删除一条记录
     * 
     * @param tablename
     * @param rowkey
     */
    public static void deleteRow(String tablename, String rowkey) {
        Table table = null;
        TableName tn = TableName.valueOf(tablename);
        try {
            table = connection.getTable(tn);
            List list = new ArrayList();
            Delete d1 = new Delete(rowkey.getBytes());
            list.add(d1);
            table.delete(list);
            System.out.println("删除行成功!");

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (table != null)
                    table.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    /**
     * 组合条件删除
     * 
     * @param tablename
     * @param rowkey
     */
    public static void deleteByCondition(String tablename, String rowkey) {
        // 目前还没有发现有效的API能够实现 根据非rowkey的条件删除 这个功能能,还有清空表全部数据的API操作

    }

    /**
     * 查询所有数据
     * 
     * @param tableName
     */
    public static void QueryAll(String tableName) {
        Table table = null;
        TableName tn = TableName.valueOf(tableName);
        try {
            table = connection.getTable(tn);
            ResultScanner rs = table.getScanner(new Scan());
            for (Result r : rs) {
                System.out.println("获得到rowkey:" + new String(r.getRow()));
                for (Cell cell : r.rawCells()) {
                    System.out.println("列:" + new String(cell.getFamilyArray())
                            + "====值:" + new String(cell.getValueArray()));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (table != null)
                    table.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    /**
     * 单条件查询,根据rowkey查询唯一一条记录
     * 
     * @param tableName
     */
    public static void QueryByCondition1(String tableName) {

        Table table = null;
        TableName tn = TableName.valueOf(tableName);
        try {
            table = connection.getTable(tn);
            Get scan = new Get("112233bbbcccc".getBytes());// 根据rowkey查询
            Result r = table.get(scan);
            System.out.println("获得到rowkey:" + new String(r.getRow()));
            for (Cell cell : r.rawCells()) {
                System.out.println("列:" + new String(cell.getFamilyArray())
                        + "====值:" + new String(cell.getValueArray()));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (table != null)
                    table.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    /**
     * 单条件按查询,查询多条记录
     * 
     * @param tableName
     */
    public static void QueryByCondition2(String tableName) {

        Table table = null;
        TableName tn = TableName.valueOf(tableName);
        try {
            table = connection.getTable(tn);
            Filter filter = new SingleColumnValueFilter(
                    Bytes.toBytes("column1"), null, CompareOp.EQUAL,
                    Bytes.toBytes("ddd")); // 当列column1的值为ddd时进行查询
            Scan s = new Scan();
            s.setFilter(filter);
            ResultScanner rs = table.getScanner(s);
            for (Result r : rs) {
                System.out.println("获得到rowkey:" + new String(r.getRow()));
                for (Cell cell : r.rawCells()) {
                    System.out.println("列:" + new String(cell.getFamilyArray())
                            + "====值:" + new String(cell.getValueArray()));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (table != null)
                    table.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    /**
     * 组合条件查询
     * 
     * @param tableName
     */
    public static void QueryByCondition3(String tableName) {

        Table table = null;
        TableName tn = TableName.valueOf(tableName);
        try {
            table = connection.getTable(tn);
            List<Filter> filters = new ArrayList<Filter>();

            Filter filter1 = new SingleColumnValueFilter(
                    Bytes.toBytes("column1"), null, CompareOp.EQUAL,
                    Bytes.toBytes("aaa"));
            filters.add(filter1);

            Filter filter2 = new SingleColumnValueFilter(
                    Bytes.toBytes("column2"), null, CompareOp.EQUAL,
                    Bytes.toBytes("bbb"));
            filters.add(filter2);

            Filter filter3 = new SingleColumnValueFilter(
                    Bytes.toBytes("column3"), null, CompareOp.EQUAL,
                    Bytes.toBytes("ccc"));
            filters.add(filter3);

            FilterList filterList1 = new FilterList(filters);

            Scan scan = new Scan();
            scan.setFilter(filterList1);
            ResultScanner rs = table.getScanner(scan);
            for (Result r : rs) {
                System.out.println("获得到rowkey:" + new String(r.getRow()));
                for (Cell cell : r.rawCells()) {
                    System.out.println("列:" + new String(cell.getFamilyArray())
                            + "====值:" + new String(cell.getValueArray()));
                }
            }
            rs.close();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (table != null)
                    table.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

 其中,组合条件的删除操作,暂时没有相关API支持

 

5、以上是基于单线程的操作,通过Thread可以实现多线程并发操作

public static class ImportThread extends Thread {
        public void HandleThread() {
            // this.TableName = "T_TEST_1";

        }

        //
        public void run() {
            try {
                InsertProcess("test");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                System.gc();
            }
        }
    }

    /*
     * 多线程环境下线程插入函数
     */
    public static void InsertProcess(String tableName) throws IOException {
        // System.out.println("start insert data ......");
        Table table = null;
        TableName tn = TableName.valueOf(tableName);
        int count = 15000;
        long start = System.currentTimeMillis();
        try {
            table = connection.getTable(tn);
            List<Put> list = new ArrayList<Put>();
            Put put = null;
            for(int i=0;i<count;i++) {
                // System.out.println("init insert data ......");
                put = new Put(String.valueOf(random.nextLong()).getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值
                put.addColumn("column1".getBytes(), null, "ddd".getBytes());// 本行数据的第一列
                put.addColumn("column2".getBytes(), null, "bbb".getBytes());// 本行数据的第三列
                put.addColumn("column3".getBytes(), null, "ccc".getBytes());// 本行数据的第三列
                // System.out.println("insert data ......");
                list.add(put);
            }
            table.put(list);
            long stop = System.currentTimeMillis();
            System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");
            // System.out.println("insert data over......");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            try {
                if (table != null)
                    table.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    /*
     * Mutil thread insert test
     */
    public static void MultThreadInsert() throws InterruptedException {
        System.out.println("---------开始MultThreadInsert测试----------");
        long start = System.currentTimeMillis();
        int threadNumber = 5;
        Thread[] threads = new Thread[threadNumber];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new ImportThread();
            threads[i].start();
        }
        for (int j = 0; j < threads.length; j++) {
            (threads[j]).join();
        }
        long stop = System.currentTimeMillis();

        System.out.println("MultThreadInsert:" + threadNumber * 10000 + "共耗时:"
                + (stop - start) * 1.0 / 1000 + "s");
        System.out.println("---------结束MultThreadInsert测试----------");
    }

 

6、基于以上程序,我们可以针对不同数量级和并发任务数的组合,来进行相关测试工作,测试结果如下:

 

从测试结果可以看出,JavaAPI方式调用的情况下,单线程入库速度为2000条/s~7000条/s之间,而在多线程并发状态下,最高速度能达到10900条/s,稍优于Mysql单节点的入库速度。但小数量级的入库速度,要慢于Mysql。波动幅度比较大。

 

注:在windows上用eclipse远程访问HDFS时,需要配置hosts文件,把HDFS所有主机的主机名与IP对应关系配置好,否则集群在通信时找不到主机:

192.168.13.74 traceMaster
192.168.13.75 traceSlave1
192.168.13.76 traceSlave2

 

 

下一节,我们再来尝试用MapReduce的方式来入库,看看效率是否能进一步提升

  • 大小: 22.4 KB
  • 大小: 9.7 KB
分享到:
评论

相关推荐

    Hbase几种入库方式

    HTable.put() 方法是一种常用的入库方式,可以将数据直接写入到 Hbase 中。这种方式的优点是简单易用,缺点是数据量较大时,写入速度较慢。 多线程入库 多线程入库是指同时使用多个线程将数据写入到 Hbase 中。...

    基于Hbase的大数据查询优化

    类比于传统型数据库里的一些查询方式,本文对Hbase的存储原理进行了研究,借助分布式计算框架Mapreduce在Hbase上构建了二级索引,就可以对表进行有针对性的定位和高效率的查找,同时也减轻zookeeper服务对资源调度的压力...

    基于HBase的遥感数据分布式存储与查询方法研究.pdf

    本研究文档题为《基于HBase的遥感数据分布式存储与查询方法研究》,它针对遥感数据处理中的关键问题,如单节点故障、扩展性低和处理效率低,提出了一种新的数据分布式存储与查询方案。以下是对文档内容的详细解析和...

    1-9+基于+HBase+实时数仓探索实践.pdf

    Phoenix是一个基于HBase的SQL引擎,支持SQL查询、salted tables和secondary indexes。 三、架构设计 基于HBase实时数仓架构主要包括三层:数据仓库、数据处理和数据服务。 1. 数据仓库:包括模型建设、数据校验和...

    MyHBase_一种基于Hbase的NewSQL数据库的设计与实现

    本文介绍了MyHBase数据库的设计与实现,它是基于HBase的新SQL数据库存储引擎。HBase是一个开源的非关系型数据库,建立在Hadoop之上,是Apache Software Foundation的一部分,HBase使用了Hadoop的HDFS作为其文件存储...

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架

    hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架。 针对HBase各版本API(1.x~2.x)间的差异,在其上剥离出了一层统一的抽象。并提供了以类SQL的方式来读写HBase表中的数据。对...

    基于HBase的分布式空间数据库技术

    基于HBase的分布式空间数据库技术是针对大型地理信息系统(GIS)中海量空间数据存储与高效查询的问题提出的一种解决方案。随着地理信息系统技术的迅速发展,数据的时间和空间精度不断提高,同时,GIS系统面临着数据...

    HOS:一种基于HBase的分布式存储系统设计与实现.pdf

    本篇论文介绍了一种名为HOS的分布式存储系统的设计与实现,该系统基于HBase构建。随着大数据时代的到来,数据量的增长速度前所未有,为了有效存储和管理这些数据,迫切需要创新的存储方案。HBase作为一个分布式、列...

    中国HBase技术社区第4届-MeetUp-上海站_基于HBase实时数仓探索实践.pptx

    【描述】:本演讲主要介绍了上海久耶供应链在大数据平台中基于HBase实现的实时数仓的实践与探索,涵盖了从第一代离线数仓到第二代实时数仓的转变,以及业务场景、开发流程、集群调优监控等方面的内容,并分享了两个...

    基于HBase的大数据解决方案.pdf

    "基于HBase的大数据解决方案" HBase是一个基于分布式文件系统的NoSQL数据库,专门为大数据应用设计,具有高可扩展性、可靠性、性能强等特点。在大数据领域,HBase是一个非常重要的解决方案,广泛应用于电商、新闻、...

    基于HBase的矢量空间数据分布式存储研究.pdf

    文章接着提出了一个基于HBase的矢量空间数据存储模型,该模型包括数据准备、数据存储和数据检索三个部分。在数据准备阶段,需要对原始的空间矢量数据进行预处理,包括数据格式转换、坐标变换等,以满足HBase存储模型...

    基于Hbase的海量视频存储简单模拟

    本文将深入探讨一个基于Hbase的海量视频存储简单模拟项目,旨在利用Hadoop和Hbase这两个强大的开源工具来解决这个问题。 首先,我们要理解Hadoop和Hbase的角色。Hadoop是分布式计算框架,其核心组件包括HDFS...

    基于hbase+solr的搜索引擎毕业论文

    Nutch抓取指定网址数据,存储在HBase数据库中,存储过程由zookeeper管理。脚本调用索引器部件将数据索引化,经过索引化的数据被前端检索查询,最后前端展示查询结果,用户点击结果列表查看目标资料。

    基于HBase实时数仓探索实践.pdf

    6. 基于HBase的实时数仓架构设计:文中介绍了数据仓库模型的三个层次,第一层为基础表,第二层为事实表和维度表,第三层为领域表。此外,还涉及了数据校验环节,即数据量的比对工作,确保数据的准确性和完整性。 7....

    Hbase 二级索引方案

    Lily HBase Indexer 是一款灵活的、可扩展的、高容错的、事务性的,并且近实时的处理 HBase 列索引数据的分布式服务软件。它是 NGDATA 公司开发的 Lily 系统的一部分,已开放 源代码。Lily HBase Indexer 使用 ...

Global site tag (gtag.js) - Google Analytics