`

【原创】HBase 0.98 coprocessor Endpoint实现行数统计

 
阅读更多

当我们对HBase表中的数据进行一些简单的行数统计或者聚合计算时,如果使用MapReduce或Native API将数据传到客户端进行计算,就会有较大延迟和大量网络IO开销。如果能把这些计算放在Server端,就可以减少网络IO开销,从而获得很好的性能提升。HBase的协处理器可以很好的实现上述想法。

HBase coprocessor 分为两大类,分别是:

1、Observer:类似于观察者模式,提供了Get、Put、Delete、Scan等一些钩子方法。RegionObserver具体又可以分为:RegionObserver、WALObserver和MasterObserver

2、Endpoint:通过RPC调用实现。

下面介绍使用Endpoint实现行数统计。

开发环境:

Hadoop 2.6.0

HBase 0.98.4

实现代码:

1、定义RPC通讯协议(ExampleProtos.proto)

option java_package = "com.iss.gbg.protobuf.proto.generated";
option java_outer_classname = "ExampleProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message CountRequest {
}

message CountResponse {
  required int64 count = 1 [default = 0];
}

service RowCountService {
  rpc getRowCount(CountRequest)
    returns (CountResponse);
}

协议的内容不再解析,想了解各行代理什么意思,请查看Protocol Buffers的相关内容。

使用Protocol Buffers的编译器生成Java类

服务器端代码实现:

public class RowCountEndpoint extends ExampleProtos.RowCountService
    implements Coprocessor, CoprocessorService {
  private RegionCoprocessorEnvironment env;

  public RowCountEndpoint() {
  }

  /**
   * Just returns a reference to this object, which implements the RowCounterService interface.
   */
  @Override
  public Service getService() {
    return this;
  }

  /**
   * Returns a count of the rows in the region where this coprocessor is loaded.
   */
  @Override
  public void getRowCount(RpcController controller, ExampleProtos.CountRequest request,
                          RpcCallback<ExampleProtos.CountResponse> done) {
    Scan scan = new Scan();
    scan.setFilter(new FirstKeyOnlyFilter());
    ExampleProtos.CountResponse response = null;
    InternalScanner scanner = null;
    try {
      scanner = env.getRegion().getScanner(scan);
      List<Cell> results = new ArrayList<Cell>();
      boolean hasMore = false;
      byte[] lastRow = null;
      long count = 0;
      do {
        hasMore = scanner.next(results);
        for (Cell kv : results) {
          byte[] currentRow = CellUtil.cloneRow(kv);
          if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
            lastRow = currentRow;
            count++;
          }
        }
        results.clear();
      } while (hasMore);

      response = ExampleProtos.CountResponse.newBuilder()
          .setCount(count).build();
    } catch (IOException ioe) {
      ResponseConverter.setControllerException(controller, ioe);
    } finally {
      if (scanner != null) {
        try {
          scanner.close();
        } catch (IOException ignored) {}
      }
    }
    done.run(response);
  }


  /**
   * Stores a reference to the coprocessor environment provided by the
   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
   * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
   * on a table region, so always expects this to be an instance of
   * {@link RegionCoprocessorEnvironment}.
   * @param env the environment provided by the coprocessor host
   * @throws IOException if the provided environment is not an instance of
   * {@code RegionCoprocessorEnvironment}
   */
  @Override
  public void start(CoprocessorEnvironment env) throws IOException {
    if (env instanceof RegionCoprocessorEnvironment) {
      this.env = (RegionCoprocessorEnvironment)env;
    } else {
      throw new CoprocessorException("Must be loaded on a table region!");
    }
  }

  @Override
  public void stop(CoprocessorEnvironment env) throws IOException {
    // nothing to do
  }
}

 客户端调用代码:

public class RowCountClient {
	private static final Logger LOG = LoggerFactory.getLogger(RowCountClient.class);
	public static void main(String[] args) {
		HTableInterface htable = null;
		try {
			htable = HBaseServer.getTable("test_crawler_data");
			LOG.info("htable 获取成功!");
			final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
			
			Map<byte[], Long> results = htable.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call<ExampleProtos.RowCountService, Long>() {
				/* (non-Javadoc)
				 * @see org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call(java.lang.Object)
				 */
				@Override
				public Long call(RowCountService counter) throws IOException {
					ServerRpcController controller = new ServerRpcController();
					BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback = new BlockingRpcCallback<ExampleProtos.CountResponse>();
					counter.getRowCount(controller, request, rpcCallback);
					ExampleProtos.CountResponse response = rpcCallback.get();
					if(controller.failedOnException()) {
						throw controller.getFailedOn();
					}
					return (null != response && response.hasCount())? response.getCount() : 0 ;
					
				}
			});
			if(null != results && !results.isEmpty()) {
				long sum = 0;
				for(Entry<byte[], Long> entry : results.entrySet()) {
					sum += entry.getValue().longValue();
				}
				System.out.println("sum=" + sum);
			}
		} catch (IOException e) {
			
			e.printStackTrace();
		} catch (ServiceException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (Throwable e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} finally {
			if(null != htable) {
				try {
					htable.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

 部署:

1、将代码打包发布到HBase的lib上目录,重启HBase即可。

2、给指定表添加endpoint

 

alter 'test_crawler_data','coprocessor'=>'|com.iss.gbg.protobuf.hbase.RowCountEndpoint|1001|'

 

查看test_crawler_data的描述如下:

hbase(main):022:0> describe 'test_crawler_data'
DESCRIPTION                                                                                                       ENABLED                                                      
 'test_crawler_data', {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.hadoop.hbase.coprocessor.example.RowCou true                                                         
 ntEndpoint|1001|'}, {NAME => 'extdata', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE =                                                              
 > '0', COMPRESSION => 'NONE', VERSIONS => '1', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FAL                                                              
 SE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'srcdata', DATA_BLOCK_ENCODING                                                               
 => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '1', MIN_VERSIONS                                                               
 => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE                                                               
 => 'true'}                                                                                                     

 

调用方法:

执行客户端程序即可返回指表的行数。

 

转载请注明出处:http://dujian-gu.iteye.com/blog/2225032

 

 

 

分享到:
评论

相关推荐

    Java通过api 操作hbase 0.98

    在集群中创建java项目调用api来操作hbase,主要涉及对hbase的创建表格,删除表格,插入数据,删除数据...2.获取jar包到项目的lib目录下(这边试用的事hbase 0.98 lib目录下的所有jar包) 3.编写java程序 4.编写ant脚本

    HBase源代码 hbase-0.98.23

    总之,通过对hbase-0.98.23源代码的深入学习,我们可以了解到HBase如何实现数据的分布式存储、查询、分区管理以及容错机制。这对于理解HBase的工作原理,优化系统性能,甚至进行二次开发具有极大的价值。然而,HBase...

    HBase中Coprocessor的介绍以及实际业务场景中的使用.pdf

    讲师:陈杨——快手大数据高级研发工程师 ...内容概要:(1)讲解hbase coprocessor的原理以及使用场景,(2) coprocessor整个流程实战,包括开发,加载,运行以及管理(3)结合1,2分析coprocessor在rsgroup中的具体使用

    Hadoop2.4、Hbase0.98、Hive集群安装配置手册

    Hadoop2.4、Hbase0.98、Hive集群安装配置手册

    藏经阁-HBase Coprocessor-22.pdf

    2.高性能:HBase Coprocessor 可以提高 HBase 的性能,例如使用 Endpoint 机制可以实现高效的数据处理。 3.灵活的扩展:HBase Coprocessor 可以根据需要实现自定义的数据处理逻辑,例如二级索引、数据聚合等。 ...

    HBase-coprocessor.pptx

    Endpoint使得用户可以实现自定义的RPC服务,例如统计表的行数。Endpoint的实现涉及protobuf文件的定义,Java类的生成以及服务接口的实现。 Coprocessor 实现: Observer的实现通常涉及到对特定操作(如preGetOp、...

    hbase-0.98.6.1-src.zip

    这个“hbase-0.98.6.1-src.zip”压缩包包含了HBase 0.98.6.1版本的源代码,是研究和学习HBase内部工作原理以及进行定制开发的理想资源。 源码分析: 1. **目录结构**: HBase的源码通常包括多个模块,如`hbase-...

    HBaseCoprocessor的实现与应用.pdf

    ### HBase Coprocessor 的实现与应用 #### 一、Coprocessor简介 HBase Coprocessor 是一种灵活且强大的机制,它允许用户在 HBase 上执行自定义逻辑,从而扩展 HBase 的功能。Coprocessor 的灵感源自 BigTable 的协...

    hbase-0.98.12.1-hadoop2-bin.tar.gz

    本文将围绕HBase 0.98.12.1在Hadoop 2上的实现进行深入探讨。 一、HBase概述 HBase是NoSQL数据库的一种,支持行式存储,采用键值对模型,且具有时间戳特性。其设计目标是处理海量数据,支持快速随机读取,同时具备...

    在集群中java 通过调用API操作HBase 0.98

    本篇文章将深入探讨如何在集群环境中使用Java API来操作HBase 0.98版本,主要包括创建表格、删除表格等基本操作。 首先,为了在Java中调用HBase的API,我们需要添加HBase的依赖库到项目中。对于HBase 0.98,你需要...

    hbase-0.98

    标题中的"hbase-0.98"指的是HBase的0.98版本,这是一个相对稳定的版本,意味着它已经经过了多次迭代和修复,对系统的稳定性和性能进行了优化,适合在生产环境中使用。用户被鼓励下载并部署这个版本,因为它在功能...

    hbase_coprocessor_hbase_coprocessorjava_源码

    在HBase中,Coprocessor机制是一个强大的特性,它允许用户在HBase服务器端自定义扩展功能,如数据过滤、统计计算、访问控制等。这个压缩包“hbase_coprocessor_hbase_coprocessorjava_源码”显然包含了用Java API...

    hbase-0.98.12.1-hadoop1-bin.tar.gz

    《HBase 0.98.12.1与Hadoop1集成详解》 HBase,作为Apache软件基金会的一个开源项目,是构建在Hadoop分布式文件系统(HDFS)之上的一种分布式、列式存储的数据库,特别适合处理海量半结构化数据。本文将围绕"Hbase-...

    2-6+HBase+Coprocessor.pdf

    总的来说,HBase Coprocessor通过Endpoint机制实现了服务端的扩展,允许在RegionServer上执行用户定义的计算逻辑,优化了数据处理流程,降低了延迟,增强了系统的整体处理能力。这对于大数据场景下的实时分析和复杂...

    HBaseCoprocessor的实现与应用.zip

    1. **HBase Coprocessor概述**:Coprocessor是HBase内置的一种机制,它允许用户在RegionServer端和客户端实现自己的逻辑,如数据过滤、数据校验、审计等,从而实现了对数据操作的细粒度控制。Coprocessor的设计理念...

    hbase-0.98.17-hadoop2-bin.tar.gz

    《HBase 0.98.17-hadoop2 全面解读》 HBase,全称为Apache HBase,是一款开源的、分布式的、版本化的非关系型数据库(NoSQL数据库),它构建在Hadoop文件系统(HDFS)之上,为大数据提供了高效、可靠的数据存储解决...

    HBase Coprocessor 优化与实验

    Coprocessor有两种主要的实现方式:Observer和Endpoint。Observer类似于触发器,用于监听特定事件并做出响应;而Endpoint则类似于存储过程,能够执行更复杂的业务逻辑。 #### Coprocessor 实现类型详解 - **...

    hbase-solr-coprocessor:通过solr实现hbase二级索引,主要通过hbase的coprocessor的Observer实现

    主要通过hbase的coprocessor的Observer实现,通过coprocessor在记录插入hbase时向solr中创建索引。 项目核心为SolrIndexCoprocessorObserver,该类继承BaseRegionObserver,并实现postPut和postDelete方法,以实现...

    apache-phoenix-4.8.1-HBase-0.98-bin.tar

    3. **连接器与 JDBC 驱动**: Phoenix 实现了 JDBC 接口,允许任何支持 JDBC 的应用程序(如 Java 应用、BI 工具等)与 HBase 进行交互。这使得 Phoenix 成为连接 HBase 和各种数据访问工具的桥梁。 4. **索引**: ...

Global site tag (gtag.js) - Google Analytics