前言:研究的HBase版本是0.94.12,贴出的源代码可能是经过我删减或者加工过的(简化篇幅,更易读)
以Scan查询为例介绍数据查询时HBase client端的实现
public static void main(String[] args) { scan("product", "f", "", 2); //从product表查询2条记录 } public static void scan(String tableName, String fml, String startRow, int limit) { HConnection conn = null; HTableInterface table = null; ResultScanner rs = null; try { Configuration conf = HBaseConfiguration.create(); //步骤1、加载配置文件 conn = HConnectionManager.createConnection(conf);//步骤2、创建连接对象 table = conn.getTable(tableName);//步骤3、获取表对象 PageFilter filter = new PageFilter(limit);//步骤4、创建过滤器(如果需要) Scan scan = new Scan(startRow.getBytes(), filter);//步骤5、创建scan rs = table.getScanner(scan);//步骤6、获取scanner for (Result r : rs) { //步骤7、迭代结果集 NavigableMap<byte[], byte[]> map = r.getFamilyMap(fml.getBytes()); StringBuffer sb = new StringBuffer("id:"+new String(r.getRow())+" "); for(byte[] key : map.keySet()) { sb.append(new String(key)+":"+new String(map.get(key))+" "); } System.out.println(sb); } } catch (Exception e) { e.printStackTrace(); } finally { try { if ( rs != null ) rs.close(); //步骤8、关闭scanner if ( table != null ) table.close(); //步骤9、关闭table if ( conn != null ) conn.close(); //步骤10、关闭连接 } catch (Exception e) { e.printStackTrace(); } } } |
以下将详细解析以上9步的执行过程:
步骤1、加载hbase配置文件
代码:Configuration conf = HBaseConfiguration.create();
说明:该过程的核心方法是HBaseConfiguration类的addHbaseResources()方法
public static Configuration addHbaseResources(Configuration conf) { // hbase-default.xml是hbase的默认配置文件,如果有自定义的配置项则需要配置到hbase-site.xml文件,hbase-site.xml的配置会覆盖hbase-default.xml的配置 conf.addResource("hbase-default.xml"); conf.addResource("hbase-site.xml");
checkDefaultsVersion(conf); checkForClusterFreeMemoryLimit(conf); return conf; } //检查集群的memstore和blockcache的内存总大小,限制在hbase总内存的80%以内 // hbase.regionserver.global.memstore.upperLimit:控制memstore的内存比例,默认值0.4,影响写性能 // hfile.block.cache.size:控制blockcache的内存比例,默认值0.25,影响读性能 //在性能测试中发现HBase的以上两项默认配置基本上兼顾了读写,如果对某一方面没有太多的要求可以采用默认值 //该段源码里涉及的常量等被我替换成具体的数字并做了代码微调,以便更易读 private static void checkForClusterFreeMemoryLimit(Configuration conf) { float globalMemstoreLimit = conf.getFloat("hbase.regionserver.global.memstore.upperLimit", 0.4f); int gml = (int)(globalMemstoreLimit * 100); float blockCacheUpperLimit = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.25f); int bcul = (int)(blockCacheUpperLimit * 100); if (100 - (gml + bcul) < 20) { throw new RuntimeException(…+"The combined value cannot exceed 0.8 " +…); } } |
步骤2、创建连接对象
代码:HConnection conn = HConnectionManager.createConnection(conf);
说明:该步骤只是创建HConnectionImplementation的实例,并不会产生与Master、HRegionServer、zookeeper的连接,在调用getTable、getScanner等需要与hbase交互的方法时才会真正建立连接,这些连接以及region信息均会被共享,通常一个HBase集群的一个client端实例只需创建一个HConnection对象。
public static HConnection createConnection(Configuration conf) throws ZooKeeperConnectionException { return new HConnectionImplementation(conf, false, null); } //只是对一些类属性赋值,不连接HRegionServer public HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool) throws ZooKeeperConnectionException { this.conf = conf; this.batchPool = pool; … } |
可以通过HConnectionManager.getConnection(conf)方法获取一个受管理的连接,如果缓存中没有连接,则会自动创建一个连接,源码如下:
public static HConnection getConnection(Configuration conf) throws ZooKeeperConnectionException { HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (HBASE_INSTANCES) { // connectionKey的hash值由hbase.zookeeper.quorum、zookeeper.znode.parent、 hbase.zookeeper.property.clientPort等配置属性决定,因此虽然HConnectionKey每次均是new一个新的对象,但是实际上如果Configuration对象中涉及hash计算的配置项的值相同则会定位到同一个connection对象,由此可见如果每次调用该方法时均是传入具有相同配置项的Configuration对象则还不如直接使用createConnection方法创建一个连接,然后一直使用。 HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey); if (connection == null) { connection = new HConnectionImplementation(conf, true, null); HBASE_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) { HConnectionManager.deleteConnection(connectionKey, true); connection = new HConnectionImplementation(conf, true, null); HBASE_INSTANCES.put(connectionKey, connection); } connection.incCount(); return connection; } } |
步骤3、获取表对象
代码:table = conn.getTable(tableName);
说明:每次getTable均会创建一个HTable实例,HTable实例会共享HConnection连接对象,创建HTable并不消耗资源(第一次会比较耗时,如果没有没有建立zookeeper链接则需要建立,并会缓存所有的表涉及的region信息,这些信息是所有htable实例共享的)。
//该方法实现了创建HTable对象的核心逻辑 private void finishSetup() throws IOException { //定位表涉及的region,会一次性缓存表相关的所有region信息,在下次创建HTable时就直接从缓存中获取 this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); //默认的client端写缓存是2M大小,通常超过2M发起一次批量提交,可适当调大该值,改进写性能,通过性能测试看,改为6M最佳(没有严格意义上的最佳,姑且根据性能测试的场景得出该最佳值),后续会有HBase性能测试报告放出 this.writeBufferSize = this.configuration.getLong("hbase.client.write.buffer", 2097152); //scanner的cache大小,控制scanner一次从服务端获取的数据条数,可适当调大,减少与服务端交互的次数,提高批量读性能 this.scannerCaching = this.configuration.getInt("hbase.client.scanner.caching", 1); //限制keyvalue的大小,默认不限制 this.maxKeyValueSize = this.configuration.getInt("hbase.client.keyvalue.maxsize", -1); … }
//定位region private HRegionLocation locateRegion(final byte [] tableName, final byte [] row, boolean useCache, boolean retry){ //建立zookeeper连接(如果没有连接),从zookeeper中获取master和-ROOT- 表的region信息,并且实例化HBaseClient对象 ensureZookeeperTrackers(); if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { //从zookeeper中获取-ROOT-表信息 ServerName servername = this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout); return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, servername.getHostname(), servername.getPort()); } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { //根据-ROOT-表定位.META.表,定位后缓存到本地 return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row, useCache, metaRegionLock, retry); } else { //根据.META.表定位业务表,在定位过程中会缓存该表涉及的所有region信息 return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row, useCache, userRegionLock, retry); } }
|
步骤4、创建过滤器(如果需要)
代码:PageFilter filter = new PageFilter(limit);
说明:Hbase默认提供了很多过滤器,通过这些过滤器可以实现分页、条件查询等功能,在后续的博客中会对该章节的内容进行详细的讲解。
步骤5、创建scan对象
代码:Scan scan = new Scan(startRow.getBytes(), filter);
//查询所有数据 public Scan() {} //查询startRow及之后的数据 public Scan(byte [] startRow) //查询startRow及之后的数据,并采用filter进行过滤 public Scan(byte [] startRow, Filter filter) //查询startRow和stopRow之间的数据 public Scan(byte [] startRow, byte [] stopRow) |
步骤6、获取scanner和步骤7、迭代结果集
代码:rs = table.getScanner(scan);for (Result r : rs){}
说明:该过程会创建一个ClientScanner类(ResultScanner的实现类)的实例,体现该类核心属性和方法的类图如下:
caching:执行next()方法时缓存服务端数据的大小,先填满cache(如果数据足够多),再从cache中一条一条的取出,如果一次scan的数据只涉及一个region,则该值决定了一次从服务端批量读取数据的条数。可由hbase.client.scanner.caching配置项配置,默认值为1,可以通过Scan的setCaching(int)方法制定每次查询的批量数,可适当调大该值,减少与服务端交互的次数,具体多大合适,或许没有一个明确的值,但是在设置该值时有两个关键点需要明确:
- 调大后会消耗更多的client和server端的内存;
- 调大后可能会导致从server端读取时一次操作占用过大的网络带宽,出现波峰现象,影响到网络中的其它应用。
cache:缓存从服务端批量加载的数据,在调用next方法迭代结果集时从cache中获取。
以下重点讲解ClientScanner的核心方法nextScanner()和next(),这些方法中涉及的RPC调用内容,在后续的博客中会系统介绍。
//获取下一个region的scanner,从server端读取数据前均需先执行该方法,在该方法中会通过ScannerCallable进行RPC调用打开server端的scanner,打开成功后会获取到一个scannerId,client端在scan过程中每次均需把该id传给server端,client端通过该id判断是否需要open scanner以及close scanner,server端通过该id找到对应的RegionScanner查询数据以及close。一次scan,一个region只打开一个RegionScanner。 private boolean nextScanner(int nbRows, final boolean done) throws IOException { if (this.callable != null) { // nextScanner被非首次调用时(即从不同的region获取数据时) this.callable.setClose(); callable.withRetries();//RPC调用,关闭server端的RegionScanner this.callable = null; } byte [] localStartKey; if (this.currentRegion != null) { //nextScanner被非首次调用时 byte [] endKey = this.currentRegion.getEndKey(); //上次scan的region是最后一个region或者已经到了end rowKey等时终止查询 if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || checkScanStopRow(endKey) || done) { close(); return false; } //把上次查询region的endKey作为下一个region的startKey,因为scan的数据是一个连续的区间,因此可如此实现 localStartKey = endKey; } else { localStartKey = this.scan.getStartRow(); //从第一个region查询数据时 }
try { callable = getScannerCallable(localStartKey, nbRows); callable.withRetries(); //RPC调用,执行HRegionServer的openScanner方法 this.currentRegion = callable.getHRegionInfo(); } catch (IOException e) { } return true; }
//返回下一条记录 public Result next() throws IOException { if (cache.size() == 0) { Result [] values = null; int countdown = this.caching; do { values = callable.withRetries();//RPC调用,执行HRegionServer的next()方法 if (values != null && values.length > 0) { for (Result rs : values) { cache.add(rs); //cache可存储多region的数据 countdown--; //如果该region的数据没有达到caching值则会大于0,在while条件中使用 } } //如果没有查询到数据则values为null,这时nextScanner(int nbRows, final boolean done)将返回false,停止查询 } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null)); }
if (cache.size() > 0) { return cache.poll(); } } |
步骤8、关闭RegionScanner
该步骤会释放RegionServer端的scanner资源。
步骤9、关闭table
代码:if ( table != null ) table.close();
//HTable的close实现 public void close() throws IOException { if (this.closed) { return; } flushCommits(); //如果writeBuffer没有数据则不会真正提交 if (cleanupPoolOnClose) {//通过不同的HTable构造函数创建的htable实例该值可能不同,参考后面的源码 this.pool.shutdown(); } if (cleanupConnectionOnClose) {//同cleanupPoolOnClose if (this.connection != null) { this.connection.close(); } } this.closed = true; } //通过本博客获取HTable的方式采用的是该构造函数:conn.getTable(tableName) public HTable(final byte[] tableName, final HConnection connection, final ExecutorService pool) { this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false; }
public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool){ this.cleanupPoolOnClose = false; this.cleanupConnectionOnClose = true; }
public HTable(Configuration conf, final byte [] tableName) { this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true; } |
步骤9、关闭连接
代码:if ( conn != null ) conn.close();
说明:如果一个HBase client程序只访问一个hbase集群,则通常只需要一个HConnection实例,因此无需close。
相关推荐
下面是一段简单的Java代码示例,演示如何从HBase中读取数据: ```java import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client....
HBase,作为Apache软件基金会的一个开源项目,是构建在Hadoop分布式文件系统(HDFS)之上的分布式、面向列的数据库,专为大数据设计,支持海量数据的实时读写。在最新的版本HBase 2.3.2中,其客户端接口提供了丰富的...
HbaseClient是Apache HBase的核心组件之一,它是客户端与HBase分布式数据库进行交互的桥梁。本文将深入探讨HbaseClient的工作原理、主要功能以及使用技巧,帮助读者更好地理解和掌握HBase的数据操作。 首先,Hbase...
本示例将详细介绍如何使用 Spark 从 HBase 中读取数据,并通过 Spark SQL 将其存储到 MySQL 数据库中。 首先,让我们了解 Spark 与 HBase 的交互。Spark 提供了 `spark-hbase-connector` 库,允许我们方便地连接到 ...
本主题将详细探讨如何利用Java通过Thrift-0.9.1版本来读取HBase表数据。 HBase是一个基于Google Bigtable设计的开源NoSQL数据库,它构建在Hadoop之上,提供高可靠性、高性能、分布式的行存储。HBase支持实时读写,...
HBase是构建在Hadoop文件系统(HDFS)之上的开源NoSQL数据库,它为非结构化和半结构化数据提供了高并发、低延迟的随机读写能力。HBase基于Google的Bigtable设计,适用于大数据分析和实时查询。 Java API是与HBase...
- **数据的标志**: 在HBase中,写入或删除操作实际上是在数据上添加不同的标志,而不是立即物理删除数据。真正的数据删除操作会在`Compaction`(合并操作)期间发生。 #### 二、Client API介绍 **2.1 配置** `...
**读取(Read)**: 读取数据时,我们需要指定行键(Row Key)和要查询的列族及列。以下是一个简单的读取示例: ```java import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.util.Bytes; ...
文章主要关注的是HBase数据读取流程中的Client-Server交互逻辑,这是理解HBase工作原理的关键部分。首先,客户端通过配置文件中的Zookeeper地址建立连接,Zookeeper在这里扮演着服务发现的角色。客户端会获取到`/...
Java API包括HBaseAdmin类用于管理表和列族,HTableInterface和它的实现HTable用于与表进行交互,Put和Get类分别用于写入和读取数据。例如,通过HTable实例,我们可以创建Put对象,设置行键、列族、列和值,然后调用...
5. **执行MapReduce作业**:编写一个MapReduce作业,该作业使用HFileOutputFormat作为输出格式,将上一步骤中写入HDFS的数据转化为HBase可读的HFile格式。 6. **加载HFiles**:最后,通过HBase的Admin API或者HBase...
首先,hbase-client-2.2.4.jar是HBase客户端的核心库,它提供了与HBase服务器交互的API,包括数据的读写、扫描、行键操作等。这个版本的HBase客户端已经对HBase 2.2.4进行了优化,确保了与服务端的兼容性和性能。 ...
数据迁移的核心在于编写Java代码,将MySQL中的数据读取并写入到HBase中。这里我们使用JDBC连接MySQL,使用HBase Java API操作HBase。以下是一个简单的示例: ```java import org.apache.hadoop.conf.Configuration;...
Hadoop是一个开源框架,主要用于处理和存储大量数据,而HBase是建立在Hadoop之上的非关系型数据库,提供高可靠性、高性能、可伸缩的数据存储。 HBase是一个基于列族的分布式数据库,设计灵感来源于Google的Bigtable...
HBase是Apache软件基金会开发的一个开源、分布式、版本化、基于列族的NoSQL数据库,它构建在Hadoop文件系统(HDFS)之上,专为处理海量数据而设计。HBase 1.2.0是该数据库的一个稳定版本,包含了众多优化和改进,...
在读取数据时,`CellUtil`类提供了便利的方法来访问Cell的不同部分,如行键、列族、限定符和值。 总的来说,通过Java API与HBase交互涉及到配置连接、管理表结构和批量处理数据。理解这些基本操作对于高效地使用...
SpringBoot集成HBase是当前大数据处理和存储解决方案中的一种常见组合。HBase是基于Hadoop的分布式、可扩展的NoSQL数据库,能够存储大量的结构化和非结构化数据。SpringBoot则是一个基于Java的现代Web框架,提供了...
HBase 是一个分布式的、基于列族的开源数据库,它运行在 Apache Hadoop 文件系统(HDFS)之上。HBase 提供了对大规模数据集的实时读写访问,是大数据领域的重要组件。它设计用于处理PB级别的数据,适合于拥有数十亿...
HBase是一个分布式、面向列的NoSQL数据库,它构建于Hadoop之上,提供实时访问大量数据的能力。Scala是一种强大的函数式编程语言,与Java虚拟机(JVM)兼容,因此非常适合编写HBase的客户端程序。 首先,确保你的...
HBase 的读写过程可以分为两部分:写数据流程和读数据流程。 写数据流程 写数据流程主要包括以下步骤: 1. 客户端将数据写入到 RegionServer 的 HLog 中。 2. RegionServer 将数据写入到 MemStore 中。 3. 当 ...