`
paddy.w
  • 浏览: 505003 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HBase Coprocessor示例

 
阅读更多
        HBase的coprocessor分为两类,Observer和EndPoint。Observer相当于触发器,代码部署在服务端,相当于对API调用的代理。介绍这方面的文章不少,在此不赘述。这里想说一下EndPoint的使用。

        EndPoint相当于存储过程。0.94.x之前使用EndPoint需要实现CoprocessorProtocol接口,而0.96.x的EndPoint改为用protobufs作为RPC的协议。在此用一个具体的例子说明一下新版的EndPoint该怎么使用。

        例如:统计一张表的行数。
        首先首先编写protobuf文件并编译。
option java_package = "linecounter";
option java_outer_classname = "LineCounterServer";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for=SPEED;

message CountRequest {
    required string askWord = 1;
}

message CountResponse {
    required int64 retWord = 1;
}

service LineCounter {
    rpc countLine(CountRequest)
        returns (CountResponse);
}


        编译后会生成LineCounterServer.java
        CountRequest是发送给服务端的消息,这里定义字符串askWord来存放具体消息内容。CounterResponse是返回的结果,统计的是行数,所以用long类型存放。LineCounter中定义一个方法countLine,传递请求,返回响应。具体说明请参见protobuf。
        实现EndPoint
public class LineCounterEndPoint extends LineCounterServer.LineCounter implements Coprocessor, CoprocessorService {

    private RegionCoprocessorEnvironment env;

    @Override
    public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
        if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment)
            this.env = (RegionCoprocessorEnvironment) coprocessorEnvironment;
        else throw new CoprocessorException("Must be loaded on a table region!!");
    }

    @Override
    public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {

    }

    @Override
    public Service getService() {
        return this;
    }

    @Override
    public void countLine(RpcController controller, LineCounterServer.CountRequest request, RpcCallback<LineCounterServer.CountResponse> done) {
        RegionScanner scanner = null;
        LineCounterServer.CountResponse.Builder respBuilder = LineCounterServer.CountResponse.newBuilder();
        if (!"count".equals(request.getAskWord())) {
            respBuilder.setRetWord(23333);
        } else {
            long count = 0;
            try {
                Scan scan = new Scan();
                scan.setMaxVersions(1);
                scanner = env.getRegion().getScanner(scan);
                List<Cell> list = new ArrayList<>();
                while (scanner.next(list))
                    count += 1;
                respBuilder.setRetWord(count);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (scanner != null)
                    try {
                        scanner.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
            }
        }
        done.run(respBuilder.build());
    }
}

        LineCounterEndPoint需要继承抽象类LineCounter并实现Coprocessor和CoprocessorService接口。LineCounter在刚才生成的java文件里。
        start和stop方法分别负责endpoint执行前的初始化和结束后的清理工作。start方法的参数是一个接口,需要根据实际环境将其转成需要的类型。
        主要需要实现的是countLine方法,这也刚才在protobuf中定义的方法。为了测试效果,这里对请求做了一个区分:如果收到的请求信息不是“count”,那么返回23333;否则统计region的记录行数并返回。
        实现Client端
public class LineCounterClient {

    public static void main(String[] args) throws Throwable {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "zk_host1:2181,zk_host2:2181,zk_host3:2181");
        conf.set("hbase.master", "host_master:60000");
        HTable table = new HTable(conf, "count_test");
        final LineCounterServer.CountRequest req = LineCounterServer.CountRequest.newBuilder().setAskWord("count").build();
        Map<byte[], Long> tmpRet = table.coprocessorService(LineCounterServer.LineCounter.class, null, null, new Batch.Call<LineCounterServer.LineCounter, Long>() {
            @Override
            public Long call(LineCounterServer.LineCounter instance) throws IOException {
                ServerRpcController controller = new ServerRpcController();
                BlockingRpcCallback<LineCounterServer.CountResponse> rpc = new BlockingRpcCallback<>();
                instance.countLine(controller, req, rpc);
                LineCounterServer.CountResponse resp = rpc.get();
                return resp.getRetWord();
            }
        });
        long ret = 0;
        for (long l : tmpRet.values())
            ret += l;
        System.out.println("lines: " + ret);
    }
}

        首先设置zookeeper和master的地址和接口信息。然后构造请求即CountRequest,先将请求信息设置为“count”。调用HTable的coprocessorService方法
public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)

        该方法有四个参数,第1个参数是protobuf生成的LineCounter类对象。第2个和第3个参数分别为起始和结束rowkey,这里的意思是范围内rowkey所在的region都会调用endpoint,这里设为null表明所有的region都会调用。第4个参数为接口,需要重写call方法。
        方法的返回值是Map类型,Map的size与参与计算的region个数一致。所以最后需要做的一步是讲返回结果进行累加,得到最后的结果。
        此程序返回5782,是表count_test的行数。若请求消息设置为“hello”,程序返回23333。

        coprocessorService还有一个五参数方法,第五个参数是一个CallBack接口,还可以如此实现:
public class LineCounterClient {

    public static void main(String[] args) throws Throwable {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "zk_host1:2181,zk_host2:2181,zk_host3:2181");
        conf.set("hbase.master", "host_master:60000");
        HTable table = new HTable(conf, "count_test");
        final LineCounterServer.CountRequest req = LineCounterServer.CountRequest.newBuilder().setAskWord("count").build();
        final AtomicLong ret = new AtomicLong();
        table.coprocessorService(LineCounterServer.LineCounter.class, null, null, new Batch.Call<LineCounterServer.LineCounter, Long>() {
            @Override
            public Long call(LineCounterServer.LineCounter instance) throws IOException {
                ServerRpcController controller = new ServerRpcController();
                BlockingRpcCallback<LineCounterServer.CountResponse> rpc = new BlockingRpcCallback<>();
                instance.countLine(controller, req, rpc);
                LineCounterServer.CountResponse resp = rpc.get();
                return resp.getRetWord();
            }
        }, new Batch.Callback<Long>() {
            @Override
            public void update(byte[] region, byte[] row, Long result) {
                ret.getAndAdd(result);
                System.out.println(Bytes.toString(row)+": "+result);
            }
        });
        System.out.println("lines: " + ret.get());
    }
}

        每调用一次call方法之后会调用一次update方法,因此在外部定义一个变量ret存放结果,每次调用update时更新ret的值即可。
分享到:
评论

相关推荐

    HBaseCoprocessor的实现与应用.pdf

    ### HBase Coprocessor 的实现与应用 #### 一、Coprocessor简介 HBase Coprocessor 是一种灵活且强大的机制,它允许用户在 HBase 上执行自定义逻辑,从而扩展 HBase 的功能。Coprocessor 的灵感源自 BigTable 的协...

    hbase_coprocessor_hbase_coprocessorjava_源码

    在源码中,可能会包含一些示例或者工具类,如`CoprocessorExample`,展示了如何创建和注册Coprocessor,以及如何在Observer中实现业务逻辑。此外,可能还有一些实用工具类,如`CoprocessorClassLoader`用于动态加载...

    使用Hbase协作器(Coprocessor)同步数据到ElasticSearch(hbase 版本 1.2.0-cdh5.8.0, es 2.4.0 版本)

    在提供的压缩包文件"**Hbase-Observer-ElasticSearch**"中,可能包含了实现这个功能的源代码示例。这些代码可以帮助开发者理解如何集成HBase和Elasticsearch,以及如何编写和配置Coprocessor。通过学习和参考这些...

    hbase-coprocessor-example

    hbase-协处理器-示例此示例演示如何使用 HBase 协处理器和 Algebird monoid 实现分组聚合。 我们在这里使用的 HBase 版本是 0.94.18,与 AWS EMR 上可用的版本完全相同。在本地 HBase 应用程序中创建演示表下载并...

    HBASE具体操作指令

    示例:hbase&gt; status、hbase&gt; status 'simple'、hbase&gt; status 'summary'、hbase&gt; status 'detailed' 2. version:显示当前HBase版本。 示例:hbase&gt; version 3. whoami:显示当前HBase用户。 示例:hbase&gt; ...

    HBase开启审计日志

    &lt;name&gt;hbase.coprocessor.region.classes &lt;value&gt;org.apache.hadoop.hbase.security.token.TokenProvider,org.apache.hadoop &lt;name&gt;hbase.coprocessor.master.classes &lt;value&gt;org.apache.hadoop.hbase....

    HBase-coprocessor

    协处理器通过宏和概括的方式,我们可以将协处理器定义为一个框架,该框架提供了一种在HBase中执行自定义代码的简便方法。 代表协处理器的最常用类比是“触发器/存储过程”和AOP。 协处理器可以开发为: 观察者协处理...

    hbaseSecondaryIndex:hbase二级索引实现

    hbaseSecondaryIndex是一个hbase二级索引的实现,基于solr+hbase coprocessor框架为hbase提供索引表支持。 此工程可打包为一个hbase的插件,通用方便。 这篇文档的目的是为了向您介绍hbaseSecondaryIndex的基本使用...

    hbase.tar.gz 已经配置完成拿来即用

    四、HBase操作示例 1. 创建表:`create 'testTable', 'cf'`,创建一个名为`testTable`的表,列族为`cf`。 2. 插入数据:`put 'testTable', 'row1', 'cf:q1', 'value1'`,向`row1`行的`q1`列插入`value1`。 3. 查询...

    hbase-0.98.6-cdh5.3.6.zip

    这个压缩包"**hbase-0.98.6-cdh5.3.6**"可能包含HBase的配置文件、JAR包、示例代码、文档等资源,用于在CDH环境中部署和运行HBase服务。在部署前,你需要根据具体环境配置HBase的相关参数,如HDFS地址、Zookeeper...

    实训5:HBase安装与环境配置.docx

    `hbase.coprocessor.master.classes`和`hbase.coprocessor.region.classes`设置了AccessController,这是实现权限控制的关键类。 完成这些配置后,编辑`regionservers`文件,列出所有将作为HBase RegionServer运行...

    test_coprocessor.zip

    可能的问题包括:未正确继承HBase的协处理器基类,如`org.apache.hadoop.hbase.coprocessor.BaseRegionObserver`或`org.apache.hadoop.hbase.coprocessor.BaseMasterObserver`;未覆盖关键的回调方法,导致协处理器...

    BigDataHBaseESDemo-src_20200708_hbase_elasticsearch_

    【标题】"BigDataHBaseESDemo-src_20200708_hbase_elasticsearch_" 提供了一个关于如何将HBase与Elasticsearch集成,实现大数据文章检索的示例项目。在这个项目中,HBase作为基础数据存储,Elasticsearch则用于提供...

    hbase第04天

    最后,"coprocessor"这个词暗示了课程可能涉及到了HBase的协处理器机制。协处理器是HBase提供的一种扩展框架,允许用户在服务器端编写自定义逻辑,比如实现细粒度的访问控制、数据校验或计算。这使得HBase能够根据...

    hbase reginobserver

    RegionObserver是HBase的协处理器(Coprocessor)框架的一部分,它允许用户在特定操作(如put、get、delete、scan等)执行前或执行后进行干预。这种机制为开发人员提供了透明地增强表行为的能力,例如实现数据过滤...

    1-7+HBase+for+Solr+介绍.zip

    4. **数据同步机制**:可能涵盖HBase和Solr之间的数据同步策略,如使用HBase的Coprocessor、Flume、NIFI或者定制化的数据管道。 5. **性能调优**:针对HBase和Solr的联合使用进行性能优化,包括索引策略、硬件配置...

    cdap-hbase-increments:HBase 的高效无读增量

    这个小项目提供了一个示例,说明如何在任何 HBase 应用程序中使用无读增量:不需要 CDAP。 在这种情况下,您将错过对 CDAP 数据集附带的一些非常酷的支持,这在您的用例中可能会也可能不会很好。 目前该项目配置为...

Global site tag (gtag.js) - Google Analytics