HBase Version: hbase-0.90.3-cdh3u1
org.apache.hadoop.hbase.client.HTablePool
用起来不是很方便. 所以重写了一些HTablePool, 对自己业务逻辑这块比较相关. 欢迎讨论.
主要是对源代码下面4点进行改进和设置:
1. 为不同的table建立的poolSize不一样, 目前HTablePool为所有的table建立的maxSize一致.
private final int maxSize;
2. 从HTablePool中getTable是只初始化了一个HTable, 而且在这个时候才初始化HTable的Queen.
这个点不是很好, 也是我想修改源码的出发点.
/** * Get a reference to the specified table from the pool.<p> * * Create a new one if one is not available. * @param tableName table name * @return a reference to the specified table * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getTable(String tableName) { LinkedList<HTableInterface> queue = tables.get(tableName); if(queue == null) { queue = new LinkedList<HTableInterface>(); tables.putIfAbsent(tableName, queue); return createHTable(tableName); } HTableInterface table; synchronized(queue) { table = queue.poll(); } if(table == null) { return createHTable(tableName); } return table; }
3. 应该有一个createTablePool的方法, 便于用户自己创建HTablePool.此方法可以与closeTablePool相互呼应.创建后面再关闭.
public void closeTablePool(final String tableName) { Queue<HTableInterface> queue = tables.get(tableName); synchronized (queue) { HTableInterface table = queue.poll(); while (table != null) { this.tableFactory.releaseHTableInterface(table); table = queue.poll(); } } HConnectionManager.deleteConnection(this.config, true); }
4. 由于HTable的Put可以使用优化, 让多个Put一起提交flushCommits(). 循环pool的Htable,调用flushCommits().
贴上修改后的源码:
package org.apache.hadoop.hbase.client; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; /** * A rewrite pool of HTable instances.<p> * * Each HTablePool acts as a pool for all tables. To use, instantiate an * HTablePool and use {@link #getTable(String)} to get an HTable from the pool. * Once you are done with it, return it to the pool with {@link #putTable(HTableInterface)}. * * <p>A pool can be created with a <i>maxSize</i> which defines the most HTable * references that will ever be retained for each table. Otherwise the default * is {@link Integer#MAX_VALUE}. * * <p>Pool will manage its own cluster to the cluster. See {@link HConnectionManager}. * * @author greatwqs * @update 2012-08-25 */ public class MyHTablePool { public final static int DEFAULT_POOL_SIZE = 4; /** * ConcurrentMap<String, LinkedList<HTableInterface>> * String tableName * LinkedList<HTableInterface> the HTable pool contains HTableInterface * LinkedList you can create HTable pool different size you want. */ private final ConcurrentMap<String, LinkedList<HTableInterface>> tables = new ConcurrentHashMap<String, LinkedList<HTableInterface>>(); /*** * Configuration for hbase-site.xml */ private final Configuration config; /** * HTableInterfaceFactory that createHTableInterface and releaseHTableInterface */ private final HTableInterfaceFactory tableFactory; /** * Default Constructor. */ public MyHTablePool() { this(HBaseConfiguration.create()); } /** * Constructor to set maximum versions and use the specified configuration. * @param config configuration */ public MyHTablePool(final Configuration config) { this(config, null); } public MyHTablePool(final Configuration config, final HTableInterfaceFactory tableFactory) { // Make a new configuration instance so I can safely cleanup when // done with the pool. this.config = config == null ? new Configuration() : new Configuration( config); this.tableFactory = tableFactory == null ? new HTableFactory() : tableFactory; } /** * Create all the HTable instances , belonging to the given table. * <p> * Note: this is a 'create' of the given table pool. * @param tableName * @param maxSize * @param isAutoFlush */ public void createHTablePool(final String tableName, final int maxSize, boolean isAutoFlush) { LinkedList<HTableInterface> queue = tables.get(tableName); if (queue == null) { queue = new LinkedList<HTableInterface>(); tables.putIfAbsent(tableName, queue); } synchronized (queue) { int addHTableSize = maxSize - queue.size(); if(addHTableSize <= 0){ return; } for(int i=0; i<addHTableSize; i++){ HTable table = (HTable)createHTable(tableName); if(table != null){ table.setAutoFlush(isAutoFlush); queue.add(table); } } } } /** * Create all the HTable instances , belonging to the given tables. * <p> * Note: this is a 'create' of the given table pool. * @param tableNameArray * @param maxSize * @param isAutoFlush default false * usage example: * false: when {@link Put} use. use buffere put. call flushCommits after a time. * you can design a thread(such as 3MS run a time)to loop all pool table, and call flushCommits. * the performance well. * true: when {@link Scan} and {@link Delete} use. */ public void createHTablePool(final String[] tableNameArray, final int maxSize, boolean isAutoFlush) { for(String tableName : tableNameArray){ createHTablePool(tableName,maxSize,isAutoFlush); } } /** * Create all the HTable instances , belonging to the given tables. * <p> * Note: this is a 'create' of the given table pool. * @param tableName * @param maxSize */ public void createHTablePool(final String[] tableNameArray, final int maxSize) { createHTablePool(tableNameArray,maxSize,false); } /** * Get a reference to the specified table from the pool.<p> * * @param tableName table name * @return a reference to the specified table * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getHTable(String tableName) { LinkedList<HTableInterface> queue = tables.get(tableName); if (queue == null) { throw new RuntimeException("There is no pool for the HTable"); } HTableInterface table; synchronized (queue) { table = queue.poll(); } return table; } /** * Get a reference to the specified table from the pool.<p> * * Create a new one if one is not available. * @param tableName table name * @return a reference to the specified table * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getHTable(byte[] tableName) { return getHTable(Bytes.toString(tableName)); } /** * Puts the specified HTable back into the pool. * <p> * * If the HTable not belong to HTablePool before, do not use this method. * * @param table table */ public void putHTableBack(HTableInterface table) { LinkedList<HTableInterface> queue = tables.get(Bytes.toString(table.getTableName())); synchronized (queue) { queue.add(table); } } protected HTableInterface createHTable(String tableName) { return this.tableFactory.createHTableInterface(config, Bytes .toBytes(tableName)); } /** * Closes all the HTable instances , belonging to the given table, in the table pool. * <p> * Note: this is a 'shutdown' of the given table pool and different from * {@link #putTable(HTableInterface)}, that is used to return the table * instance to the pool for future re-use. * * @param tableName */ public void closeHTablePool(final String tableName) { Queue<HTableInterface> queue = tables.get(tableName); synchronized (queue) { HTableInterface table = queue.poll(); while (table != null) { this.tableFactory.releaseHTableInterface(table); table = queue.poll(); } } HConnectionManager.deleteConnection(this.config, true); } /** * See {@link #closeTablePool(String)}. * * @param tableName */ public void closeHTablePool(final byte[] tableName) { closeHTablePool(Bytes.toString(tableName)); } /** * See {@link #closeTablePool(String)}. * * @param tableName */ public void closeHTablePool() { for(String tabName:tables.keySet()){ closeHTablePool(tabName); } } /** * getCurrentPoolSize * @param tableName * @return */ public int getCurrentPoolSize(String tableName) { Queue<HTableInterface> queue = tables.get(tableName); synchronized (queue) { return queue.size(); } } }
org.apache.hadoop.hbase.client.MyHTablePoolTest 测试实例
package org.apache.hadoop.hbase.client; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; /** * @author greatwqs * @update 2012-08-25 */ public class MyHTablePoolTest { /** * test method * @param args * @throws Exception */ public static void main(String[] args) throws Exception{ // 1. my config file String configFile = "conf/hbase-site.xml"; Configuration config = new Configuration(); config.addResource(new Path(configFile)); // 2. init HTablePool MyHTablePool myPool = new MyHTablePool(config); // 3. create HTablePool for a table myPool.createHTablePool("DCP_DataCenter_Base", MyHTablePool.DEFAULT_POOL_SIZE, false); // 4. get already exist HTable from HTablePool HTable table = (HTable) myPool.getHTable("DCP_DataCenter_Base"); if(table != null){ System.out.println("get HTable from HTablePool Success!"); } // 5. get all data from HTable, and print to console. Scan scan = new Scan(); ResultScanner rs = table.getScanner(scan); try { for (Result result : rs) { KeyValue[] kv = result.raw(); byte[] key = kv[0].getRow(); System.out.println("RowKey: " + new String(key)); for (int i = 0; i < kv.length; i++) { System.out.println("ColumnFamily: " + new String(kv[i].getFamily())); System.out.println("Qualifier: "+ new String(kv[i].getQualifier())); System.out.println("Timestamp: "+ String.valueOf(kv[i].getTimestamp())); System.out.println("Value: " + new String(kv[i].getValue())); } System.out.println(); } } catch (Exception e) { e.printStackTrace(); } finally { rs.close(); } // 6. after use HTable end, then put the HTable back to HTablePool. myPool.putHTableBack(table); // 7. close HTablePool myPool.closeHTablePool(); } }
相关推荐
### HBase权威指南知识点概述 #### 一、引言与背景 - **大数据时代的来临**:随着互联网技术的发展,人类社会产生了前所未为的数据量。这些数据不仅数量巨大,而且种类繁多,传统的数据库系统难以应对这样的挑战。 ...
《HBase:The Definition Guide》是一本全面深入探讨HBase的权威指南,旨在为读者提供HBase的详尽理解。HBase,作为Apache Hadoop生态系统中的一个分布式、面向列的数据库,源自Google的BigTable设计,被Facebook等...
《HBase:权威指南》是一本深度探讨分布式列式数据库HBase的专业书籍,它为读者提供了全面、深入的HBase知识。HBase是基于Apache Hadoop的开源项目,旨在为大规模数据集提供低延迟的随机读写能力。本书是HBase开发者...
### HBase: 权威指南 #### 一、HBase概览 《HBase:权威指南》是一本全面介绍HBase技术的电子书,旨在帮助读者深入理解HBase的工作原理和应用场景。本书不仅覆盖了HBase的基础知识,还探讨了其在大数据处理中的角色...
### HBase: 一种NoSQL数据库 #### 引言与背景 在过去十年中,我们见证了数据爆炸式的增长,如何高效地存储和检索这些数据成为了一项挑战。直到20世纪70年代,我们主要依赖关系型数据库管理系统(RDBMS)来处理数据...
Maven坐标:org.apache.hbase:hbase-common:1.4.3; 标签:apache、common、hbase、jar包、java、API文档、中文版; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化...
1.1 HBase 的概念与特点 ...面向列:数据存储和查询以列族为单位,便于数据的压缩和存储优化。可伸缩性:能够水平扩展,支持PB级别的数据存储。 ●实时数据处理:提供实时数据处理能力,适用于需要快速响应的应用场景。
标题:HBase在阿里搜索推荐中的应用 知识点: 1. HBase应用场景:HBase是一种开源...阿里集团内部对HBase的深入应用和持续优化,不仅保证了搜索引擎和推荐系统性能的提升,也为HBase社区贡献了宝贵的经验和改进方案。
HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...
While reading into Hadoop you found that for random access to the accumulated data there is something call HBase. Or it was the hype that is prevalent these days addressing a new kind of data storage...
│ [案例:Hbase的设计及企业优化].pdf ├─02_视频 │ Hbase表的设计 │ Hbase表中rowkey及列簇的设计 │ Hbase表设计-微博案例的表设计 │ Hbase表设计-微博案例的业务实现 │ Hbase列簇属性的介绍 │ Hbase性能...
《Cloudera-HBase最佳实践及优化》是针对大数据存储和处理领域中HBase的一份重要参考资料,由Cloudera公司权威发布。这份文档详细介绍了如何有效地使用和优化HBase,以实现更高效、稳定的数据管理。以下是其中涵盖的...
HBase基本操作 增删改查 java代码 要使用须导入对应的jar包