HBase Version: hbase-0.90.3-cdh3u1
org.apache.hadoop.hbase.client.HTablePool
用起来不是很方便. 所以重写了一些HTablePool, 对自己业务逻辑这块比较相关. 欢迎讨论.
主要是对源代码下面4点进行改进和设置:
1. 为不同的table建立的poolSize不一样, 目前HTablePool为所有的table建立的maxSize一致.
private final int maxSize;
2. 从HTablePool中getTable是只初始化了一个HTable, 而且在这个时候才初始化HTable的Queen.
这个点不是很好, 也是我想修改源码的出发点.
/** * Get a reference to the specified table from the pool.<p> * * Create a new one if one is not available. * @param tableName table name * @return a reference to the specified table * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getTable(String tableName) { LinkedList<HTableInterface> queue = tables.get(tableName); if(queue == null) { queue = new LinkedList<HTableInterface>(); tables.putIfAbsent(tableName, queue); return createHTable(tableName); } HTableInterface table; synchronized(queue) { table = queue.poll(); } if(table == null) { return createHTable(tableName); } return table; }
3. 应该有一个createTablePool的方法, 便于用户自己创建HTablePool.此方法可以与closeTablePool相互呼应.创建后面再关闭.
public void closeTablePool(final String tableName) { Queue<HTableInterface> queue = tables.get(tableName); synchronized (queue) { HTableInterface table = queue.poll(); while (table != null) { this.tableFactory.releaseHTableInterface(table); table = queue.poll(); } } HConnectionManager.deleteConnection(this.config, true); }
4. 由于HTable的Put可以使用优化, 让多个Put一起提交flushCommits(). 循环pool的Htable,调用flushCommits().
贴上修改后的源码:
package org.apache.hadoop.hbase.client; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; /** * A rewrite pool of HTable instances.<p> * * Each HTablePool acts as a pool for all tables. To use, instantiate an * HTablePool and use {@link #getTable(String)} to get an HTable from the pool. * Once you are done with it, return it to the pool with {@link #putTable(HTableInterface)}. * * <p>A pool can be created with a <i>maxSize</i> which defines the most HTable * references that will ever be retained for each table. Otherwise the default * is {@link Integer#MAX_VALUE}. * * <p>Pool will manage its own cluster to the cluster. See {@link HConnectionManager}. * * @author greatwqs * @update 2012-08-25 */ public class MyHTablePool { public final static int DEFAULT_POOL_SIZE = 4; /** * ConcurrentMap<String, LinkedList<HTableInterface>> * String tableName * LinkedList<HTableInterface> the HTable pool contains HTableInterface * LinkedList you can create HTable pool different size you want. */ private final ConcurrentMap<String, LinkedList<HTableInterface>> tables = new ConcurrentHashMap<String, LinkedList<HTableInterface>>(); /*** * Configuration for hbase-site.xml */ private final Configuration config; /** * HTableInterfaceFactory that createHTableInterface and releaseHTableInterface */ private final HTableInterfaceFactory tableFactory; /** * Default Constructor. */ public MyHTablePool() { this(HBaseConfiguration.create()); } /** * Constructor to set maximum versions and use the specified configuration. * @param config configuration */ public MyHTablePool(final Configuration config) { this(config, null); } public MyHTablePool(final Configuration config, final HTableInterfaceFactory tableFactory) { // Make a new configuration instance so I can safely cleanup when // done with the pool. this.config = config == null ? new Configuration() : new Configuration( config); this.tableFactory = tableFactory == null ? new HTableFactory() : tableFactory; } /** * Create all the HTable instances , belonging to the given table. * <p> * Note: this is a 'create' of the given table pool. * @param tableName * @param maxSize * @param isAutoFlush */ public void createHTablePool(final String tableName, final int maxSize, boolean isAutoFlush) { LinkedList<HTableInterface> queue = tables.get(tableName); if (queue == null) { queue = new LinkedList<HTableInterface>(); tables.putIfAbsent(tableName, queue); } synchronized (queue) { int addHTableSize = maxSize - queue.size(); if(addHTableSize <= 0){ return; } for(int i=0; i<addHTableSize; i++){ HTable table = (HTable)createHTable(tableName); if(table != null){ table.setAutoFlush(isAutoFlush); queue.add(table); } } } } /** * Create all the HTable instances , belonging to the given tables. * <p> * Note: this is a 'create' of the given table pool. * @param tableNameArray * @param maxSize * @param isAutoFlush default false * usage example: * false: when {@link Put} use. use buffere put. call flushCommits after a time. * you can design a thread(such as 3MS run a time)to loop all pool table, and call flushCommits. * the performance well. * true: when {@link Scan} and {@link Delete} use. */ public void createHTablePool(final String[] tableNameArray, final int maxSize, boolean isAutoFlush) { for(String tableName : tableNameArray){ createHTablePool(tableName,maxSize,isAutoFlush); } } /** * Create all the HTable instances , belonging to the given tables. * <p> * Note: this is a 'create' of the given table pool. * @param tableName * @param maxSize */ public void createHTablePool(final String[] tableNameArray, final int maxSize) { createHTablePool(tableNameArray,maxSize,false); } /** * Get a reference to the specified table from the pool.<p> * * @param tableName table name * @return a reference to the specified table * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getHTable(String tableName) { LinkedList<HTableInterface> queue = tables.get(tableName); if (queue == null) { throw new RuntimeException("There is no pool for the HTable"); } HTableInterface table; synchronized (queue) { table = queue.poll(); } return table; } /** * Get a reference to the specified table from the pool.<p> * * Create a new one if one is not available. * @param tableName table name * @return a reference to the specified table * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getHTable(byte[] tableName) { return getHTable(Bytes.toString(tableName)); } /** * Puts the specified HTable back into the pool. * <p> * * If the HTable not belong to HTablePool before, do not use this method. * * @param table table */ public void putHTableBack(HTableInterface table) { LinkedList<HTableInterface> queue = tables.get(Bytes.toString(table.getTableName())); synchronized (queue) { queue.add(table); } } protected HTableInterface createHTable(String tableName) { return this.tableFactory.createHTableInterface(config, Bytes .toBytes(tableName)); } /** * Closes all the HTable instances , belonging to the given table, in the table pool. * <p> * Note: this is a 'shutdown' of the given table pool and different from * {@link #putTable(HTableInterface)}, that is used to return the table * instance to the pool for future re-use. * * @param tableName */ public void closeHTablePool(final String tableName) { Queue<HTableInterface> queue = tables.get(tableName); synchronized (queue) { HTableInterface table = queue.poll(); while (table != null) { this.tableFactory.releaseHTableInterface(table); table = queue.poll(); } } HConnectionManager.deleteConnection(this.config, true); } /** * See {@link #closeTablePool(String)}. * * @param tableName */ public void closeHTablePool(final byte[] tableName) { closeHTablePool(Bytes.toString(tableName)); } /** * See {@link #closeTablePool(String)}. * * @param tableName */ public void closeHTablePool() { for(String tabName:tables.keySet()){ closeHTablePool(tabName); } } /** * getCurrentPoolSize * @param tableName * @return */ public int getCurrentPoolSize(String tableName) { Queue<HTableInterface> queue = tables.get(tableName); synchronized (queue) { return queue.size(); } } }
org.apache.hadoop.hbase.client.MyHTablePoolTest 测试实例
package org.apache.hadoop.hbase.client; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; /** * @author greatwqs * @update 2012-08-25 */ public class MyHTablePoolTest { /** * test method * @param args * @throws Exception */ public static void main(String[] args) throws Exception{ // 1. my config file String configFile = "conf/hbase-site.xml"; Configuration config = new Configuration(); config.addResource(new Path(configFile)); // 2. init HTablePool MyHTablePool myPool = new MyHTablePool(config); // 3. create HTablePool for a table myPool.createHTablePool("DCP_DataCenter_Base", MyHTablePool.DEFAULT_POOL_SIZE, false); // 4. get already exist HTable from HTablePool HTable table = (HTable) myPool.getHTable("DCP_DataCenter_Base"); if(table != null){ System.out.println("get HTable from HTablePool Success!"); } // 5. get all data from HTable, and print to console. Scan scan = new Scan(); ResultScanner rs = table.getScanner(scan); try { for (Result result : rs) { KeyValue[] kv = result.raw(); byte[] key = kv[0].getRow(); System.out.println("RowKey: " + new String(key)); for (int i = 0; i < kv.length; i++) { System.out.println("ColumnFamily: " + new String(kv[i].getFamily())); System.out.println("Qualifier: "+ new String(kv[i].getQualifier())); System.out.println("Timestamp: "+ String.valueOf(kv[i].getTimestamp())); System.out.println("Value: " + new String(kv[i].getValue())); } System.out.println(); } } catch (Exception e) { e.printStackTrace(); } finally { rs.close(); } // 6. after use HTable end, then put the HTable back to HTablePool. myPool.putHTableBack(table); // 7. close HTablePool myPool.closeHTablePool(); } }
相关推荐
### HBase权威指南知识点概述 #### 一、引言与背景 - **大数据时代的来临**:随着互联网技术的发展,人类社会产生了前所未为的数据量。这些数据不仅数量巨大,而且种类繁多,传统的数据库系统难以应对这样的挑战。 ...
分布式存储系统:HBase:HBase架构与原理.docx
分布式存储系统:HBase:HBase安装与配置.docx
分布式存储系统:HBase:HBase数据读写API教程.docx
分布式存储系统:HBase:HBase集群运维与监控.docx
分布式存储系统:HBase:HBase安全与权限管理.docx
分布式存储系统:HBase:HBase数据模型与表设计.docx
分布式存储系统:HBase:HBase未来发展趋势与挑战.docx
分布式存储系统:HBase:HBase基本操作与Shell命令.docx
通过《HBase:权威指南》,读者不仅可以了解到HBase的基础知识,还能深入理解其在高可用性、可伸缩性和性能优化等方面的应用策略。随着大数据时代的持续发展,HBase作为一款强大的数据存储工具,其重要性日益凸显,...
分布式存储系统:HBase:HBase数据压缩与编码技术教程.docx
分布式存储系统:HBase:HBase在大数据场景下的应用案例.docx
《HBase:The Definition Guide》是一本全面深入探讨HBase的权威指南,旨在为读者提供HBase的详尽理解。HBase,作为Apache Hadoop生态系统中的一个分布式、面向列的数据库,源自Google的BigTable设计,被Facebook等...
《HBase:权威指南》是一本深度探讨分布式列式数据库HBase的专业书籍,它为读者提供了全面、深入的HBase知识。HBase是基于Apache Hadoop的开源项目,旨在为大规模数据集提供低延迟的随机读写能力。本书是HBase开发者...
分布式存储系统:HBase:分布式存储系统概论.docx