`

HBase: HTablePool重构及优化

阅读更多

HBase Version: hbase-0.90.3-cdh3u1

org.apache.hadoop.hbase.client.HTablePool

用起来不是很方便. 所以重写了一些HTablePool, 对自己业务逻辑这块比较相关. 欢迎讨论.

主要是对源代码下面4点进行改进和设置:

1.  为不同的table建立的poolSize不一样, 目前HTablePool为所有的table建立的maxSize一致.

private final int maxSize;

2.  从HTablePool中getTable是只初始化了一个HTable, 而且在这个时候才初始化HTable的Queen.

    这个点不是很好, 也是我想修改源码的出发点.

  /**
   * Get a reference to the specified table from the pool.<p>
   *
   * Create a new one if one is not available.
   * @param tableName table name
   * @return a reference to the specified table
   * @throws RuntimeException if there is a problem instantiating the HTable
   */
  public HTableInterface getTable(String tableName) {
    LinkedList<HTableInterface> queue = tables.get(tableName);
    if(queue == null) {
      queue = new LinkedList<HTableInterface>();
      tables.putIfAbsent(tableName, queue);
      return createHTable(tableName);
    }
    HTableInterface table;
    synchronized(queue) {
      table = queue.poll();
    }
    if(table == null) {
      return createHTable(tableName);
    }
    return table;
  }

 3.   应该有一个createTablePool的方法, 便于用户自己创建HTablePool.此方法可以与closeTablePool相互呼应.创建后面再关闭.

  public void closeTablePool(final String tableName)  {
    Queue<HTableInterface> queue = tables.get(tableName);
    synchronized (queue) {
      HTableInterface table = queue.poll();
      while (table != null) {
        this.tableFactory.releaseHTableInterface(table);
        table = queue.poll();
      }
    }
    HConnectionManager.deleteConnection(this.config, true);
  }

 4.  由于HTable的Put可以使用优化, 让多个Put一起提交flushCommits(). 循环pool的Htable,调用flushCommits().

 

贴上修改后的源码:

package org.apache.hadoop.hbase.client;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
/**
 * A rewrite pool of HTable instances.<p>
 *
 * Each HTablePool acts as a pool for all tables.  To use, instantiate an
 * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
 * Once you are done with it, return it to the pool with {@link #putTable(HTableInterface)}.
 * 
 * <p>A pool can be created with a <i>maxSize</i> which defines the most HTable
 * references that will ever be retained for each table.  Otherwise the default
 * is {@link Integer#MAX_VALUE}.
 *
 * <p>Pool will manage its own cluster to the cluster. See {@link HConnectionManager}.
 * 
 * @author greatwqs
 * @update 2012-08-25
 */
public class MyHTablePool {
	
	public final static int DEFAULT_POOL_SIZE   = 4;
	
	/**
	 * ConcurrentMap<String, LinkedList<HTableInterface>>
	 * String tableName
	 * LinkedList<HTableInterface> the HTable pool contains HTableInterface
	 * LinkedList you can create HTable pool different size you want.
	 */
    private final ConcurrentMap<String, LinkedList<HTableInterface>> tables 
                    = new ConcurrentHashMap<String, LinkedList<HTableInterface>>();
    /***
     * Configuration for hbase-site.xml
     */
	private final Configuration config;
	/**
	 * HTableInterfaceFactory that createHTableInterface and releaseHTableInterface
	 */
	private final HTableInterfaceFactory tableFactory;

	/**
	 * Default Constructor. 
	 */
	public MyHTablePool() {
		this(HBaseConfiguration.create());
	}

	/**
	 * Constructor to set maximum versions and use the specified configuration.
	 * @param config configuration
	 */
	public MyHTablePool(final Configuration config) {
		this(config, null);
	}

	public MyHTablePool(final Configuration config, final HTableInterfaceFactory tableFactory) {
		// Make a new configuration instance so I can safely cleanup when
		// done with the pool.
		this.config = config == null ? new Configuration() : new Configuration(
				config);
		this.tableFactory = tableFactory == null ? new HTableFactory()
				: tableFactory;
	}
	
	/**
	 * Create all the HTable instances , belonging to the given table.
	 * <p>
	 * Note: this is a 'create' of the given table pool.
	 * @param tableName 
	 * @param maxSize
	 * @param isAutoFlush
	 */
	public void createHTablePool(final String tableName, final int maxSize, boolean isAutoFlush) {
		LinkedList<HTableInterface> queue = tables.get(tableName);
		if (queue == null) {
			queue = new LinkedList<HTableInterface>();
			tables.putIfAbsent(tableName, queue);
		}
		synchronized (queue) {
			int addHTableSize = maxSize - queue.size();
			if(addHTableSize <= 0){
				return;
			}
			for(int i=0; i<addHTableSize; i++){
				HTable table = (HTable)createHTable(tableName);
				if(table != null){
					table.setAutoFlush(isAutoFlush);
					queue.add(table);
				}
			}
		}
	}
	
	/**
	 * Create all the HTable instances , belonging to the given tables.
	 * <p>
	 * Note: this is a 'create' of the given table pool.
	 * @param tableNameArray
	 * @param maxSize
	 * @param isAutoFlush default false
	 * usage example:
	 * false: when {@link Put} use. use buffere put. call flushCommits after a time. 
	 *        you can design a thread(such as 3MS run a time)to loop all pool table, and call flushCommits.
	 *        the performance well.
	 * true: when {@link Scan} and {@link Delete} use. 
	 */
	public void createHTablePool(final String[] tableNameArray, final int maxSize, boolean isAutoFlush) {
		for(String tableName : tableNameArray){
			createHTablePool(tableName,maxSize,isAutoFlush);
		}
	}
	
	/**
	 * Create all the HTable instances , belonging to the given tables.
	 * <p>
	 * Note: this is a 'create' of the given table pool.
	 * @param tableName
	 * @param maxSize
	 */
	public void createHTablePool(final String[] tableNameArray, final int maxSize) {
		createHTablePool(tableNameArray,maxSize,false);
	}

	/**
	 * Get a reference to the specified table from the pool.<p>
	 *
	 * @param tableName table name
	 * @return a reference to the specified table
	 * @throws RuntimeException if there is a problem instantiating the HTable
	 */
	public HTableInterface getHTable(String tableName) {
		LinkedList<HTableInterface> queue = tables.get(tableName);
		if (queue == null) {
			throw new RuntimeException("There is no pool for the HTable");
		}
		HTableInterface table;
		synchronized (queue) {
			table = queue.poll();
		}
		return table;
	}

	/**
	 * Get a reference to the specified table from the pool.<p>
	 *
	 * Create a new one if one is not available.
	 * @param tableName table name
	 * @return a reference to the specified table
	 * @throws RuntimeException if there is a problem instantiating the HTable
	 */
	public HTableInterface getHTable(byte[] tableName) {
		return getHTable(Bytes.toString(tableName));
	}
	
	/**
	* Puts the specified HTable back into the pool.
    * <p>
    * 
    * If the HTable not belong to HTablePool before, do not use this method.
    * 
    * @param table table
    */
	public void putHTableBack(HTableInterface table) {
		LinkedList<HTableInterface> queue = tables.get(Bytes.toString(table.getTableName()));
		synchronized (queue) {
			queue.add(table);
		}
	}

	protected HTableInterface createHTable(String tableName) {
		return this.tableFactory.createHTableInterface(config, Bytes
				.toBytes(tableName));
	}

	/**
	 * Closes all the HTable instances , belonging to the given table, in the table pool.
	 * <p>
	 * Note: this is a 'shutdown' of the given table pool and different from
	 * {@link #putTable(HTableInterface)}, that is used to return the table
	 * instance to the pool for future re-use.
	 *
	 * @param tableName
	 */
	public void closeHTablePool(final String tableName) {
		Queue<HTableInterface> queue = tables.get(tableName);
		synchronized (queue) {
			HTableInterface table = queue.poll();
			while (table != null) {
				this.tableFactory.releaseHTableInterface(table);
				table = queue.poll();
			}
		}
		HConnectionManager.deleteConnection(this.config, true);
	}

	/**
	 * See {@link #closeTablePool(String)}.
	 *
	 * @param tableName
	 */
	public void closeHTablePool(final byte[] tableName) {
		closeHTablePool(Bytes.toString(tableName));
	}
	
	/**
	 * See {@link #closeTablePool(String)}.
	 *
	 * @param tableName
	 */
	public void closeHTablePool() {
		for(String tabName:tables.keySet()){
			closeHTablePool(tabName);
		}
	}

	/**
	 * getCurrentPoolSize
	 * @param tableName
	 * @return
	 */
	public int getCurrentPoolSize(String tableName) {
		Queue<HTableInterface> queue = tables.get(tableName);
		synchronized (queue) {
			return queue.size();
		}
	}
	
} 

 

org.apache.hadoop.hbase.client.MyHTablePoolTest 测试实例

package org.apache.hadoop.hbase.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;

/**
 * @author greatwqs
 * @update 2012-08-25
 */
public class MyHTablePoolTest {
	
	/**
	 * test method
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{
		// 1. my config file 
		String configFile = "conf/hbase-site.xml";
		Configuration config = new Configuration();
		config.addResource(new Path(configFile));
		// 2. init HTablePool
		MyHTablePool myPool = new MyHTablePool(config);
		// 3. create HTablePool for a table
		myPool.createHTablePool("DCP_DataCenter_Base", MyHTablePool.DEFAULT_POOL_SIZE, false);
		// 4. get already exist HTable from HTablePool
		HTable table = (HTable) myPool.getHTable("DCP_DataCenter_Base");
		if(table != null){
			System.out.println("get HTable from HTablePool Success!");
		}
		// 5. get all data from HTable, and print to console.
		Scan scan = new Scan();
		ResultScanner rs = table.getScanner(scan);
		try {
			for (Result result : rs) {
				KeyValue[] kv = result.raw();
				byte[] key = kv[0].getRow();
				System.out.println("RowKey: " + new String(key));
				for (int i = 0; i < kv.length; i++) {
					System.out.println("ColumnFamily: "	+ new String(kv[i].getFamily()));
					System.out.println("Qualifier: "+ new String(kv[i].getQualifier()));
					System.out.println("Timestamp: "+ String.valueOf(kv[i].getTimestamp()));
					System.out.println("Value: " + new String(kv[i].getValue()));
				}
				System.out.println();
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			rs.close();
		}
		// 6. after use HTable end, then put the HTable back to HTablePool.
		myPool.putHTableBack(table);
		// 7. close HTablePool
		myPool.closeHTablePool();
	}
}

 

1
8
分享到:
评论
6 楼 greatwqs 2013-06-26  
新版本hbase-0.92.1-cdh4.1.3, HTablePool有很大的变化;
课直接使用了.
http://blog.csdn.net/mrtitan/article/details/8892815
http://helpbs.iteye.com/blog/1492054
5 楼 greatwqs 2013-06-05  
leibnitz 写道
你的hbase是什么版本?
我在看94,你说的除了1外,其它觉得不太实用吧

hbase-0.90.3-cdh3u1, 当时看源码时觉得HBase自带的HTablePool鸡肋, 重写了HTablePool.
具体有如下实现:
根据每个table初始化不同数目的HTable, 对put启用定时flashcommits.
4 楼 leibnitz 2013-06-04  
你的hbase是什么版本?
我在看94,你说的除了1外,其它觉得不太实用吧
3 楼 greatwqs 2013-03-26  
asd51731 写道
楼主,我用您的代码测试,怎么提示“ERROR client.HConnectionManager: Connection not found in the list, can't delete it (connection key=HConnectionKey{properties={hbase.zookeeper.property.clientPort=2181, hbase.client.pause=1000, zookeeper.znode.parent=/hbase, hbase.client.retries.number=10, hbase.zookeeper.quorum=192.168.0.130}, username='zxx'}). May be the key was modified?”,您遇到同样的问题了吗

我在hbase测试没有问题, 能连接hbase, 读写数据. 可能是你的连接配置没有配置好吧.
先不用我的代码, 用最简单的方式读数据试试.
2 楼 asd51731 2013-03-25  
楼主,我用您的代码测试,怎么提示“ERROR client.HConnectionManager: Connection not found in the list, can't delete it (connection key=HConnectionKey{properties={hbase.zookeeper.property.clientPort=2181, hbase.client.pause=1000, zookeeper.znode.parent=/hbase, hbase.client.retries.number=10, hbase.zookeeper.quorum=192.168.0.130}, username='zxx'}). May be the key was modified?”,您遇到同样的问题了吗
1 楼 greatwqs 2013-03-03  
测试说明:
// 3. create HTablePool for a table  
myPool.createHTablePool("XXXX_Table", MyHTablePool.DEFAULT_POOL_SIZE, true);  

设置true时,需要有一个线程, 定时Flush到HBase.
这样才能把数据写入到HBase.

相关推荐

    Hbase权威指南(HBase: The Definitive Guide)

    ### HBase权威指南知识点概述 #### 一、引言与背景 - **大数据时代的来临**:随着互联网技术的发展,人类社会产生了前所未为的数据量。这些数据不仅数量巨大,而且种类繁多,传统的数据库系统难以应对这样的挑战。 ...

    HBase:The Definition Guide,HBase权威指南完全版

    《HBase:The Definition Guide》是一本全面深入探讨HBase的权威指南,旨在为读者提供HBase的详尽理解。HBase,作为Apache Hadoop生态系统中的一个分布式、面向列的数据库,源自Google的BigTable设计,被Facebook等...

    HBase:权威指南(中文版)

    《HBase:权威指南》是一本深度探讨分布式列式数据库HBase的专业书籍,它为读者提供了全面、深入的HBase知识。HBase是基于Apache Hadoop的开源项目,旨在为大规模数据集提供低延迟的随机读写能力。本书是HBase开发者...

    HBase:权威指南

    ### HBase: 权威指南 #### 一、HBase概览 《HBase:权威指南》是一本全面介绍HBase技术的电子书,旨在帮助读者深入理解HBase的工作原理和应用场景。本书不仅覆盖了HBase的基础知识,还探讨了其在大数据处理中的角色...

    论文:HBase: A NoSQL database

    ### HBase: 一种NoSQL数据库 #### 引言与背景 在过去十年中,我们见证了数据爆炸式的增长,如何高效地存储和检索这些数据成为了一项挑战。直到20世纪70年代,我们主要依赖关系型数据库管理系统(RDBMS)来处理数据...

    hbase-common-1.4.3-API文档-中文版.zip

    Maven坐标:org.apache.hbase:hbase-common:1.4.3; 标签:apache、common、hbase、jar包、java、API文档、中文版; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化...

    HBase:HBase简介与安装配置PDF

    1.1 HBase 的概念与特点 ...面向列:数据存储和查询以列族为单位,便于数据的压缩和存储优化。可伸缩性:能够水平扩展,支持PB级别的数据存储。 ●实时数据处理:提供实时数据处理能力,适用于需要快速响应的应用场景。

    HBase:HBase_in_Alibaba_Search(绝顶).pdf

    标题:HBase在阿里搜索推荐中的应用 知识点: 1. HBase应用场景:HBase是一种开源...阿里集团内部对HBase的深入应用和持续优化,不仅保证了搜索引擎和推荐系统性能的提升,也为HBase社区贡献了宝贵的经验和改进方案。

    HBase(hbase-2.4.9-bin.tar.gz)

    HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...

    HBase: Official Guide

    While reading into Hadoop you found that for random access to the accumulated data there is something call HBase. Or it was the hype that is prevalent these days addressing a new kind of data storage...

    大数据开发之案例实践Hbase的设计及企业优化视频教程(视频+讲义+笔记+配置+代码+练习)

    │ [案例:Hbase的设计及企业优化].pdf ├─02_视频 │ Hbase表的设计 │ Hbase表中rowkey及列簇的设计 │ Hbase表设计-微博案例的表设计 │ Hbase表设计-微博案例的业务实现 │ Hbase列簇属性的介绍 │ Hbase性能...

    Cloudera-HBase最佳实践及优化.zip

    《Cloudera-HBase最佳实践及优化》是针对大数据存储和处理领域中HBase的一份重要参考资料,由Cloudera公司权威发布。这份文档详细介绍了如何有效地使用和优化HBase,以实现更高效、稳定的数据管理。以下是其中涵盖的...

    HBase基本操作 Java代码

    HBase基本操作 增删改查 java代码 要使用须导入对应的jar包

Global site tag (gtag.js) - Google Analytics