如果要统对hbase中的数据,进行某种统计,比如统计某个字段最大值,统计满足某种条件的记录数,统计各种记录特点,并按照记录特点分类(类似于sql的group by)~
常规的做法就是把hbase中整个表的数据scan出来,或者稍微环保一点,加一个filter,进行一些初步的过滤(对于rowcounter来说,就加了FirstKeyOnlyFilter),但是这么做来说还是会有很大的副作用,比如占用大量的网络带宽(当标级别到达千万级别,亿级别之后)尤为明显,RPC的量也是不容小觑的。
理想的方式应该是怎样?
拿row counter这个简单例子来说,我要统计总行数,如果每个region 告诉我他又多少行,然后把结果告诉我,我再将他们的结果汇总一下,不就行了么?
现在的问题是hbase没有提供这种接口,来统计每个region的行数,那是否我们可以自己来实现一个呢?
没错,正如本文标题所说,我们可以自己来实现一个Endpoint,然后让hbase加载起来,然后我们远程调用即可。
什么是Endpoint?
先弄清楚什么是hbase coprocessor
hbase有两种coprocessor,一种是Observer(观察者),类似于关系数据库的trigger(触发器),另外一种就是EndPoint,类似于关系数据库的存储过程。
观察者这里就多做介绍了,这里介绍Endpoint。
EndPoint是动态RPC插件的接口,它的实现代码被部署在服务器端(regionServer),从而能够通过HBase RPC调用。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个EndPoint,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。
怎么实现一个EndPoint
1. 定义一个新的protocol接口,必须继承CoprocessorProtocol.
2. 实现终端接口,继承抽象类BaseEndpointCoprocessor,改实现代码需要部署到
3. 在客户端,终端可以被两个新的HBase Client API调用 。单个region:HTableInterface.coprocessorProxy(Class<T> protocol, byte[] row) 。rigons区域:HTableInterface.coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable),这里的region是通过一个row来标示的,就是说,改row落到那个region,RPC就发给哪个region,对于start-end的,[start,end)范围内的region都会受到RPC调用。
1
2
3
|
public interface CounterProtocol extends CoprocessorProtocol {
public long count( byte [] start, byte [] end) throws IOException;
} |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public class CounterEndPoint extends BaseEndpointCoprocessor implements CounterProtocol {
@Override
public long count( byte [] start, byte []end) throws IOException {
// aggregate at each region
Scan scan = new Scan();
long numRow = 0 ;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
.getScanner(scan);
try {
List<KeyValue> curVals = new ArrayList<KeyValue>();
boolean hasMore = false ;
do {
curVals.clear();
hasMore = scanner.next(curVals);
if (Bytes.compareTo(curVals.get( 0 ).getRow(), start)< 0 ) {
continue ;
}
if (Bytes.compareTo(curVals.get( 0 ).getRow(), end)>= 0 ) {
break ;
}
numRow++;
} while (hasMore);
} finally {
scanner.close();
}
return numRow;
}
} |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public class CounterEndPointDemo {
public static void main(String[] args) throws IOException, Throwable {
final String startRow = args[ 0 ];
final String endRow = args[ 1 ];
@SuppressWarnings ( "resource" )
HTableInterface table = new HTable(HBaseConfiguration.create(), "tc" );
Map< byte [], Long> results;
// scan: for all regions
results = table.coprocessorExec(CounterProtocol. class , startRow.getBytes(),
endRow.getBytes(), new Batch.Call<CounterProtocol, Long>() {
public Long call(CounterProtocol instance) throws IOException {
return instance.count(startRow.getBytes(), endRow.getBytes());
}
});
long total = 0 ;
for (Map.Entry< byte [], Long> e : results.entrySet()) {
System.out.println(e.getValue());
total += e.getValue();
}
System.out.println( "total:" + total);
}
} |
整个程序的框架其实又是另外一个mapreduce,只是运行在region server上面,reduce运行在客户端,其中map计算量较大,reduce计算量很小!
另外需要提醒的是:
protocol的返回类型,可以是基本类型。
如果是一个自定义的类型需要实现org.apache.hadoop.io.Writable接口。
关于详细的支持类型,请参考代码hbase源码:org.apache.hadoop.hbase.io.HbaseObjectWritable
怎么部署?
1. 通过hbase-site.xml增加
1
2
3
4
|
< property >
< name >hbase.coprocessor.region.classes</ name >
< value >xxxx.CounterEndPoint </ value >
</ property >
|
- 如果要配置多个,就用逗号(,)分割。
- 包含此类的jar必须位于hbase的classpath
- 这种coprocessor是作用于所有的表,如果你只想作用于部分表,请使用下面一种方式。
2. 通过shell方式
增加:
1
2
3
4
5
6
|
hbase(main):005:0> alter 't1' , METHOD => 'table_att' ,
Updating all regions with the new schema... 1 /1 regions updated.
Done. 0 row(s) in 1.0730 seconds
|
coprocessor格式为:
[FilePath]|ClassName|Priority|arguments
arguments: k=v[,k=v]+
- 其中FilePath是hdfs路径,例如/tmp/zhenhe/cp/zhenhe-1.0.jar
- ClassNameEndPoint实现类的全名
- Priority为,整数,框架会根据这个数据决定多个cp的执行顺序
- Arguments,传给cp的参数
- 如果hbase的classpath包含改类,FilePath可以留空
卸载:
- 先describe “tableName‘,查看你要卸载的cp的编号
- 然后alter 't1', METHOD => 'table_att_unset', NAME=> 'coprocessor$3',coprocessor$3可变。
应用场景
这是一个最简单的例子,另外还有很多统计场景,可以用在这种方式实现,有如下好处:
- 节省网络带宽
- 减少RPC调用(scan的调用随着CacheSzie的变小而线性增加),减轻hbase压力
- 可以提高统计效率,那我之前写过的一个groupby类型的例子来说,大约可以提高50%以上的统计速度。
其他应用场景?
- 一个保存着用户信息的表,可以统计每个用户信息(counter job)
- 统计最大值,最小值,平均值,参考:https://issues.apache.org/jira/browse/HBASE-1512
- 批量删除记录,批量删除某个时间戳的记录
参考:
1. http://blogs.apache.org/hbase/entry/coprocessor_introduction
2. https://issues.apache.org/jira/browse/HBASE-1512
相关推荐
HBase RowKey 设计与协处理器运用 HBase 是一个基于 HDFS 的分布式、面向列的 NoSQL 数据库,具有高性能、可靠性和扩展性等特点。本文将详细介绍 HBase 的 RowKey 设计和协处理器运用。 HBase 的介绍 HBase 是一...
Hbase协处理器详解,进阶篇
该项目展示了如何使用 HBase 区域观察者协处理器将流行的开源 Drool 规则引擎与 HBase 集成,以并行应用和执行规则。 这里,Drool 规则引擎位于 HBase 的数据路径上,并且能够为 HBase 表的每个请求触发(如果满足...
本资料“Hadoop_Learning:MapReduce,HBase,协处理器的学习与实现”将深入探讨这三个核心组件,并通过JavaScript的应用来增强理解。 MapReduce是Hadoop的核心计算模型,它将大型数据集分解为小块,然后在分布式...
"基于协处理器的HBase内存索引机制的研究" 本文研究了基于协处理器的HBase内存索引机制,以提高HBase的多条件查询速度。在大数据时代,传统的关系型数据库难以处理无规范模式的数据集,于是NoSQL数据库如HBase、...
本文将深入探讨如何利用协处理器(Coprocessor)机制在HBase中实现分类的二级索引设计,以提升查询效率并优化数据管理。 一、HBase与二级索引 HBase是建立在Hadoop之上的键值存储系统,它以行键、列族、列限定符和...
协处理器是一种嵌入在HBase服务器端的自定义代码,它可以在数据读写过程中执行用户定义的操作,为HBase提供了扩展功能的平台。利用协处理器,我们可以为那些频繁查询的字段创建映射到行键的二级索引。这样,在查询时...
在“观察者类型协处理器ObserverProcessorLog.zip”这个资源中,我们可以推断它与HBase的观察者模式协处理器有关,这涉及到HBase的扩展性和自定义行为。 观察者模式(Observer Pattern)是设计模式的一种,它定义了...
### Hadoop之Hbase从入门到精通 #### HBase技术介绍与概述 HBase是一种分布式、高可靠性且高性能的列式存储系统,它基于Hadoop生态体系构建,并且能够支持大规模的数据存储需求。HBase的设计灵感来源于Google的...
在HBase数据库系统中,协处理器(Coprorocessor)是一种强大的特性,它允许开发者在数据存储和处理层面上进行自定义扩展。本项目"test_coprocessor.zip"通过对比错误实现与正确实现的方式,深入浅出地揭示了如何有效...
HDFS+MapReduce+Hive+HBase十分钟快速入门.pdf
协处理器通过宏和概括的方式,我们可以将协处理器定义为一个框架,该框架提供了一种在HBase中执行自定义代码的简便方法。 代表协处理器的最常用类比是“触发器/存储过程”和AOP。 协处理器可以开发为: 观察者协处理...
《Hadoop之HBase从入门到精通》是一个深入学习Hadoop和HBase的全面指南,旨在帮助初学者和有经验的开发者快速掌握这两个强大的大数据处理工具。Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在廉价...
《大数据云计算技术系列:Hadoop之Hbase从入门到精通》 HBase,全称Hadoop Database,是一款基于Hadoop生态系统的分布式列式存储系统,旨在处理海量结构化数据。它借鉴了Google Bigtable的设计思想,但开源并适应了...
HDFS+MapReduce+Hive+HBase十分钟快速入门,包括这几个部分的简单使用
Hadoop之HBase从入门到精通 本文将详细介绍HBase技术,从基础概念到高级应用,旨在帮助读者快速掌握HBase技术。 一、HBase技术介绍 HBase是Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储...
课时10:Observer协处理器实战之Master级别原理剖析 课时11:Observer协处理器实战之Region级别原理剖析 课时12:Observer协处理器实战之表复制应用实战 课时13:Endpoint协处理器实战之原理剖析 课时14:Endpoint...
### HBase入门与使用 HBase作为Apache Hadoop生态系统中的一个关键组件,提供了一种分布式、版本化的非关系型数据库,特别适用于大规模数据处理。它借鉴了Google Bigtable的设计理念,能够实现在廉价硬件上存储PB...