Coprocessors
之前我们的filter都是在客户端定义,然后传到服务端去执行的,这个Coprocessors是在服务端定义,在客户端调用,然后在服务端执行,他有点儿想我们熟悉的存储过程,传一些参数进去,然后进行我们事先定义好的操作,我们常常用它来做一些比如二次索引啊,统计函数什么的,它也和自定义filter一样,需要事先定好,然后在hbase-env.sh中的HBASE_CLASSPATH中指明,就像我的上一篇中的写的那样。
Coprocessors分两种,observer和endpoint。
(1)observer就像触发器一样,当某个事件发生的时候,它就出发。
已经有一些内置的接口让我们去实现,RegionObserver、MasterObserver、WALObserver,看名字就大概知道他们是干嘛的。
(2)endpoint可以认为是自定义函数,可以把这个理解为关系数据库的存储过程。
所有的Coprocessor都是实现自Coprocessor 接口,它分SYSTEM和USER,前者的优先级比后者的优先级高,先执行。
它有两个方法,start和stop方法,两个方法都有一个相同的上下文对象CoprocessorEnvironment。
void start(CoprocessorEnvironment env) throws IOException;
void stop(CoprocessorEnvironment env) throws IOException;
这是CoprocessorEnvironment的方法。
Working with Tables
对表进行操作的时候,必须先调用getTable方法活得HTable,不可以自己定义一个HTable,目前貌似没有禁止,但是将来会禁止。
并且在对表操作的时候,不能对行加锁。
Coprocessor Loading
Coprocessor加载需要在配置文件里面全局加载,比如在hbase-site.xml中设置。
<property>
<name>hbase.coprocessor.region.classes</name>
<value>coprocessor.RegionObserverExample,coprocessor.AnotherCoprocessor</value>
</property>
<property>
<name>hbase.coprocessor.master.classes</name>
<value>coprocessor.MasterObserverExample</value>
</property>
<property>
<name>hbase.coprocessor.wal.classes</name>
<value>coprocessor.WALObserverExample,bar.foo.MyWALObserver</value>
</property>
我们自定义的时间可以注册到三个配置项上,分别是hbase.coprocessor.region.classes,hbase.coprocessor.master.classes,
hbase.coprocessor.wal.classes上,他们分别负责region,master,wal,注册到region的要特别注意小心,因为它是针对所有表的。
<property>
<name>hbase.coprocessor.region.classes</name>
<value>coprocessor.RegionObserverExample</value></property>
注册到这三个触发器上,可以监控到几乎所有我们的操作上面,非常恐怖。。可以说是想要什么就有什么,详细的代码大家自己去摸索。
EndPoint的可以用来定义聚合函数,我们可以调用CoprocessorProtocol中的方法来实现我们的需求。
调用coprocessorProxy() 传一个单独的row key,这是在单独一个region上操作的。
要在所有region上面操作,我们要调用coprocessorExec()方法 传一个开始row key 和结束row key。
Demo
说了那么多废话,我都不好意思再说了,来个例子吧,统计行数的。
public interface RowCountProtocol extends CoprocessorProtocol { long getRowCount() throws IOException; long getRowCount(Filter filter) throws IOException; long getKeyValueCount() throws IOException;
}
public class RowCountEndpoint extends BaseEndpointCoprocessor implements
RowCountProtocol { private long getCount(Filter filter, boolean countKeyValues)
throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(1); if (filter != null) {
scan.setFilter(filter);
}
RegionCoprocessorEnvironment environment = (RegionCoprocessorEnvironment) getEnvironment(); // use an internal scanner to perform scanning.
InternalScanner scanner = environment.getRegion().getScanner(scan); int result = 0; try {
List<KeyValue> curVals = new ArrayList<KeyValue>();
boolean done = false; do {
curVals.clear();
done = scanner.next(curVals);
result += countKeyValues ? curVals.size() : 1;
} while (done);
} finally {
scanner.close();
} return result;
}
@Override public long getRowCount() throws IOException { return getRowCount(new FirstKeyOnlyFilter());
}
@Override public long getRowCount(Filter filter) throws IOException { return getCount(filter, false);
}
@Override public long getKeyValueCount() throws IOException { return getCount(null, true);
}
}
写完之后,注册一下吧。
<property>
<name>hbase.coprocessor.region.classes</name>
<value>coprocessor.RowCountEndpoint</value></property>
JAVA 客户端调用
在服务端定义之后,我们怎么在客户端用java代码调用呢,看下面的例子你就明白啦!
public class EndPointExample { public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "testtable"); try {
Map<byte[], Long> results = table.coprocessorExec(
RowCountProtocol.class, null, null, new Batch.Call<RowCountProtocol, Long>() {
@Override public Long call(RowCountProtocol counter) throws IOException { return counter.getRowCount();
}
}); long total = 0; for (Map.Entry<byte[], Long> entry : results.entrySet()) {
total += entry.getValue().longValue();
System.out.println("Region: " + Bytes.toString(entry.getKey()) + ", Count: " + entry.getValue());
}
System.out.println("Total Count: " + total);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
通过table的coprocessorExec方法调用,然后调用RowCountProtocol接口的getRowCount()方法。
然后遍历每个Region返回的结果,合起来就是最终的结果,打印结果如下。
Region:
testtable,,1303417572005.51f9e2251c29ccb2...cbcb0c66858f.,
Count: 2Region:
testtable,row3,1303417572005.7f3df4dcba3f...dbc99fce5d87.,
Count: 3Total Count: 5
在上面的例子当中,我们是用Batch.Call()方法来调用接口当中的方法,我们可以用另外一个方法来简化上述代码,来看例子。
Batch.Call call =Batch.forMethod(RowCountProtocol.class,"getKeyValueCount");
Map<byte[], Long> results = table.coprocessorExec(RowCountProtocol.class, null, null, call);
采用Batch.Call方法调用同时调用多个方法
Map<byte[], Pair<Long, Long>> results =table.coprocessorExec(
RowCountProtocol.class,null, null,new Batch.Call<RowCountProtocol, Pair<Long, Long>>()
{ public Pair<Long, Long> call(RowCountProtocol counter) throws IOException { return new Pair(counter.getRowCount(),counter.getKeyValueCount());
}
});long totalRows = 0;long totalKeyValues = 0;for (Map.Entry<byte[], Pair<Long, Long>> entry :results.entrySet()) {
totalRows +=
entry.getValue().getFirst().longValue();
totalKeyValues +=entry.getValue().getSecond().longValue();
System.out.println("Region: " +Bytes.toString(entry.getKey()) +", Count: " + entry.getValue());
}
System.out.println("Total Row Count: " + totalRows);
System.out.println("Total KeyValue Count: " +totalKeyValues);
调用coprocessorProxy()在单个region上执行
RowCountProtocol protocol = table.coprocessorProxy(RowCountProtocol.class, Bytes.toBytes("row4"));long rowsInRegion = protocol.getRowCount();
System.out.println("Region Row Count: " +rowsInRegion);
上面这个例子是查找row4行所在region的数据条数,这个可以帮助我们统计每个region上面的数据分布。
相关推荐
### HBase学习利器:HBase实战 #### 一、HBase简介与背景 HBase是Apache Hadoop生态系统中的一个分布式、可扩展的列族数据库,它提供了类似Bigtable的能力,能够在大规模数据集上进行随机读写操作。HBase是基于...
学习HBase,可以从《HBase权威指南》和《HBase实战》这两本书入手,它们分别深入探讨了HBase的设计原理、使用方法以及最佳实践,是理解并掌握HBase的重要资源。通过阅读这些书籍,你可以全面了解HBase的核心特性和...
总的来说,这个压缩包的源码学习可以帮助我们掌握如何利用Java API实现和部署HBase Coprocessor,从而在HBase中实现自定义的功能扩展。通过理解和实践,我们可以更好地优化HBase的数据处理流程,提升系统的性能和...
7. **实时查询**:讲述HBase如何与HBase Coprocessors、Secondary Indexes等技术配合,实现高效的实时查询。 8. **备份与恢复**:讲解HBase的数据备份策略,包括快照、导出导入等方法,以及灾难恢复方案。 9. **...
《HBase权威指南》是一本深入探讨分布式大数据存储系统HBase的专业书籍,其源代码的提供为读者提供了更直观的学习材料。HBase是基于Apache Hadoop的非关系型数据库(NoSQL),它在大规模数据存储方面表现卓越,尤其...
MapReduce可以用于批量处理HBase数据,而HBase的 Coprocessors 和 Filters 功能则允许在数据存储和检索时进行定制化的逻辑处理。书中会介绍如何利用这些功能进行复杂的数据处理和分析。 最后,你还将了解到如何使用...
对于初学者,建议先了解HBase的基本概念,然后动手实践设置HBase环境,创建表,插入数据,进行查询和删除操作,进一步探索HBase的高级特性,如 Coprocessors、Compaction、Splitting等。 总之,"hbase-2.4.1-bin....
总之,《HBase不睡觉书》作为一本高清完整版的最新教材,不仅覆盖了HBase的基础知识,还深入探讨了其高级特性和实践应用,是学习和进阶HBase技术的理想读物。无论是开发人员还是运维人员,都能从中获取丰富的知识,...
HBase还支持Secondary Index,虽然不如传统关系型数据库那样完善,但可以通过 Coprocessors 或者索引服务如 phoenix 来实现。 在实际应用中,HBase常用于日志分析、物联网(IoT)数据存储、实时监控系统、社交网络...
《HBase不睡觉书》是一本专注于介绍Apache HBase的大数据存储系统的技术书籍。HBase是构建在Hadoop分布式...总的来说,《HBase不睡觉书》是一本全面且实用的HBase学习指南,值得所有关注大数据存储和处理的读者珍藏。
这个“hbase-0.98.6.1-src.zip”压缩包包含了HBase 0.98.6.1版本的源代码,是研究和学习HBase内部工作原理以及进行定制开发的理想资源。 源码分析: 1. **目录结构**: HBase的源码通常包括多个模块,如`hbase-...
总的来说,《HBase权威指南》是学习和掌握HBase的理想资源,无论你是初学者还是有经验的开发人员,都能从中受益。通过阅读本书,你将能够充分利用HBase的强大功能,解决大数据存储和处理中的挑战,为企业的数据分析...
在Java编程环境下,读者将学习如何安装配置HBase,创建表,插入、查询和更新数据,以及进行复杂的扫描操作。 首先,HBase的数据模型基于行、列族、列和时间戳,这种设计允许高效地存储和检索大规模稀疏数据集。在表...
CDH5.14.2中的HBase 1.2.0还支持 Coprocessors 和 Filter,这是两个强大的扩展机制。Coprocessors允许用户在RegionServer端编写自定义逻辑,提高数据处理的灵活性和效率;而Filter则可以在数据读取时进行过滤,减少...