`

HBase Coprocessor

 
阅读更多

HBase的coprocessor分为两类,Observer和EndPoint。Observer相当于触发器,代码部署在服务端,相当于对API调用的代理。 EndPoint相当于存储过程。0.94.x之前使用EndPoint需要实现CoprocessorProtocol接口,而0.96.x的EndPoint改为用protobufs作为RPC的协议。

 

EndPoint协处理器 

Endpoint 是一个 Server 端 Service 的具体实现。它的实现有一些框架代码,这些框架代码与具体的业务需求逻辑无关。仅仅是为了和 HBase 的运行时环境协同工作而必须遵循和完成的一些粘合代码。因此多数情况下仅仅需要从一个例子程序拷贝过来并进行命名修改即可。不过我们还是完整地对这些粘合代码进行粗略的讲解以便更好地理解代码。

例如:统计一张表的行数

首先首先编写protobuf文件并编译,protobuf的安装请见protobuf安装与使用;

 

option java_package = "com.cobub.protobuf";
option java_outer_classname = "ExampleProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message CountRequest {
}

message CountResponse {
  required int64 count = 1 [default = 0];
}

service RowCountService {
  rpc getRowCount(CountRequest)
    returns (CountResponse);
  rpc getKeyValueCount(CountRequest)
    returns (CountResponse);
}

 

 

protoc --java_out=$PROJECT_HOME/src/main/java  exampleProtos.proto

 

用protoc编译后,可以看到在工程的 src/main/java/com/cobub/protobuf 目录下生成了一个名为 ExampleProtos.java 的文件。这个 Java 文件就是 RPC 的 Java 代码,在后续的 Server 端代码和 Client 端代码中都要用到这个 Java 文件。

 

endpoint 服务端:

首先 Endpoint 协处理器是一个 Protobuf Service 的实现,因此需要它必须继承某个 Protobuf Service。其次,作为一个 HBase 的协处理器,Endpoint 还必须实现 HBase 定义的协处理器协议,用 Java 的接口来定义。具体来说就是CoprocessorService 和 Coprocessor,这些 HBase 接口负责将协处理器和 HBase 的 RegionServer 等实例联系起来,以便协同工作。

public class RowCountEndpoint extends ExampleProtos.RowCountService
    implements Coprocessor, CoprocessorService {
  private RegionCoprocessorEnvironment env;

  public RowCountEndpoint() {
  }

  /**
   * Just returns a reference to this object, which implements the RowCounterService interface.
   */
  @Override
  public Service getService() {
    return this;
  }

  /**
   * Returns a count of the rows in the region where this coprocessor is loaded.
   */
  @Override
  public void getRowCount(RpcController controller, ExampleProtos.CountRequest request,
                          RpcCallback<ExampleProtos.CountResponse> done) {
    Scan scan = new Scan();
    scan.setFilter(new FirstKeyOnlyFilter());
    ExampleProtos.CountResponse response = null;
    InternalScanner scanner = null;
    try {
      scanner = env.getRegion().getScanner(scan);
      List<Cell> results = new ArrayList<Cell>();
      boolean hasMore = false;
      byte[] lastRow = null;
      long count = 0;
      do {
        hasMore = scanner.next(results);
        for (Cell kv : results) {
          byte[] currentRow = CellUtil.cloneRow(kv);
          if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
            lastRow = currentRow;
            count++;
          }
        }
        results.clear();
      } while (hasMore);

      response = ExampleProtos.CountResponse.newBuilder()
          .setCount(count).build();
    } catch (IOException ioe) {
      ResponseConverter.setControllerException(controller, ioe);
    } finally {
      if (scanner != null) {
        try {
          scanner.close();
        } catch (IOException ignored) {}
      }
    }
    done.run(response);
  }

  /**
   * Returns a count of all KeyValues in the region where this coprocessor is loaded.
   */
  @Override
  public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request,
                               RpcCallback<ExampleProtos.CountResponse> done) {
    ExampleProtos.CountResponse response = null;
    InternalScanner scanner = null;
    try {
      scanner = env.getRegion().getScanner(new Scan());
      List<Cell> results = new ArrayList<Cell>();
      boolean hasMore = false;
      long count = 0;
      do {
        hasMore = scanner.next(results);
        for (Cell kv : results) {
          count++;
        }
        results.clear();
      } while (hasMore);

      response = ExampleProtos.CountResponse.newBuilder()
          .setCount(count).build();
    } catch (IOException ioe) {
      ResponseConverter.setControllerException(controller, ioe);
    } finally {
      if (scanner != null) {
        try {
          scanner.close();
        } catch (IOException ignored) {}
      }
    }
    done.run(response);
  }

  /**
   * Stores a reference to the coprocessor environment provided by the
   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
   * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
   * on a table region, so always expects this to be an instance of
   * {@link RegionCoprocessorEnvironment}.
   * @param env the environment provided by the coprocessor host
   * @throws IOException if the provided environment is not an instance of
   * {@code RegionCoprocessorEnvironment}
   */
  @Override
  public void start(CoprocessorEnvironment env) throws IOException {
    if (env instanceof RegionCoprocessorEnvironment) {
      this.env = (RegionCoprocessorEnvironment)env;
    } else {
      throw new CoprocessorException("Must be loaded on a table region!");
    }
  }

  @Override
  public void stop(CoprocessorEnvironment env) throws IOException {
    // nothing to do
  }
} 

编译服务端代码,并打包存放到hdfs(方便集群所有节点共享)

动态加载coprocessor

alter 'test:test', METHOD => 'table_att', 'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint|1001|arg1=1,arg2=2'

 

 

协处理器在 Region 打开的时候被 RegionServer 自动加载,并会调用器 start 接口,完成初始化工作。一般的该接口函数中仅仅需要将协处理器的运行上下文环境变量 CoprocessorEnviorment保存到本地即可。

CoprocessorEnviorment 保存了协处理器的运行环境,每个协处理器都是在一个 RegionServer 进程内运行,并隶属于某个 Region。通过该变量,可以获取 Region 的实例等 HBase 的运行时环境对象 

 

endpoint客户端代码

public class RowEndPointClient {

    private static final byte[] TEST_FAMILY = Bytes.toBytes("f");
    private static final byte[] TEST_COLUMN = Bytes.toBytes("col1");

    public static void main(String[] args) throws Throwable {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master001:2181,slave001:2181,slave003:2181");
        // conf.set("hbase.master", "host_master:60000");
        HTable table = new HTable(conf, "test:test");
        // insert some test rows
        for (int i = 10; i < 15; i++) {
            byte[] iBytes = Bytes.toBytes(i);
            Put p = new Put(iBytes);
            p.addColumn(TEST_FAMILY, TEST_COLUMN, iBytes);
            table.put(p);
        }

        final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
        Map<byte[], Long> results = table.coprocessorService(ExampleProtos.RowCountService.class,
                null, null,
                new Batch.Call<ExampleProtos.RowCountService, Long>() {
                    public Long call(ExampleProtos.RowCountService counter) throws IOException {
                        ServerRpcController controller = new ServerRpcController();
                        BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback =
                                new BlockingRpcCallback<ExampleProtos.CountResponse>();
                        counter.getRowCount(controller, request, rpcCallback);
                        ExampleProtos.CountResponse response = rpcCallback.get();
                        if (controller.failedOnException()) {
                            throw controller.getFailedOn();
                        }
                        return (response != null && response.hasCount()) ? response.getCount() : 0;
                    }
                });
        // should be one region with results
        Iterator<Long> iter = results.values().iterator();
        long count = 0;
        int num = 0;
        while (iter.hasNext()) {
            num++;
            count += iter.next();
        }
        System.out.println("count=" + count + ",num=" + num);
    }
}

 

Observer 协处理器

 

参考:

HBase 官网上的编程手册,对 HBase 的编程 API 有全面的介绍;

网络文章 HBase Coprocessor 示例,是一篇非常好的 Coprocessor 实例讲解;

HBase 协处理器编程入门,详细介绍了Coprocessor服务端的使用;

 

 

 

 

分享到:
评论

相关推荐

    HBaseCoprocessor的实现与应用.pdf

    ### HBase Coprocessor 的实现与应用 #### 一、Coprocessor简介 HBase Coprocessor 是一种灵活且强大的机制,它允许用户在 HBase 上执行自定义逻辑,从而扩展 HBase 的功能。Coprocessor 的灵感源自 BigTable 的协...

    HBase Coprocessor 优化与实验

    ### HBase Coprocessor 优化与实验 #### HBase及Coprocessor概述 HBase是一种非关系型、面向列的分布式数据库系统,它基于Hadoop之上构建,旨在为大规模数据提供高可靠、高性能的支持。HBase的核心优势在于其能够...

    藏经阁-HBase Coprocessor-22.pdf

    HBase Coprocessor 是什么? HBase Coprocessor 是一种基于 HBase 的 coprocessor 机制,可以实现对 HBase 的扩展和自定义。它提供了一个灵活的方式来实现数据的处理和分析,例如二级索引、聚合计算、数据排序等。 ...

    HBase中Coprocessor的介绍以及实际业务场景中的使用.pdf

    讲师:陈杨——快手大数据高级研发工程师 ...内容概要:(1)讲解hbase coprocessor的原理以及使用场景,(2) coprocessor整个流程实战,包括开发,加载,运行以及管理(3)结合1,2分析coprocessor在rsgroup中的具体使用

    HBaseCoprocessor的实现与应用.zip

    通过阅读“HBaseCoprocessor的实现与应用.pdf”,你可以更深入地了解如何利用Coprocessor来定制HBase的行为,提升系统效率,以及在实际项目中遇到的问题和解决方案。这份资料对于理解和掌握HBase的高级特性具有很高...

    HBase-coprocessor.pptx

    《深入理解HBase Coprocessor:扩展与应用》 HBase Coprocessor 是一个强大的功能,它为HBase提供了一种扩展机制,使得用户可以在不修改核心代码的情况下,实现对HBase的操作进行自定义处理。Coprocessor 分为...

    hbase_coprocessor_hbase_coprocessorjava_源码

    在HBase中,Coprocessor机制是一个强大的特性,它允许用户在HBase服务器端自定义扩展功能,如数据过滤、统计计算、访问控制等。这个压缩包“hbase_coprocessor_hbase_coprocessorjava_源码”显然包含了用Java API...

    2-6+HBase+Coprocessor.pdf

    【HBase Coprocessor简介】 HBase Coprocessor机制源于Google Bigtable的协处理器概念,旨在增强HBase的功能,提供更高效的数据处理能力。Coprocessor允许在HBase的RegionServer上运行用户自定义的代码,实现了数据...

    使用Hbase协作器(Coprocessor)同步数据到ElasticSearch(hbase 版本 1.2.0-cdh5.8.0, es 2.4.0 版本)

    2. **配置Coprocessor**:在HBase的表或列族配置中,添加Coprocessor的类路径和优先级。这样,每当有数据写入时,Coprocessor就会被触发执行。 3. **连接Elasticsearch**:在Coprocessor中,使用Elasticsearch的...

    Hbase coprecessor

    ### HBase Coprocessor:深度解析与应用案例 #### 概览 HBase Coprocessor 是 HBase 的一个核心特性,允许用户在 RegionServer 上执行自定义代码,从而实现数据处理逻辑靠近数据存储的位置。这一特性极大地提高了...

    hbase-solr-coprocessor:通过solr实现hbase二级索引,主要通过hbase的coprocessor的Observer实现

    hbase-solr-coprocessor 测试代码,目的是借助solr实现hbase二级索引,以使hbase支持高效的多条件查询。主要通过hbase的coprocessor的Observer实现,通过coprocessor在记录插入hbase时向solr中创建索引。 项目核心为...

    hbase社区2018精选资料

    另外,HBaseCoprocessor的实现与应用也是重要的知识点,Coprocessor提供了在服务器端扩展HBase功能的能力。 运维指南部分讲解了HBase2.0的新特性,比如In-MemoryCompaction,这是一个内存压缩技术,有助于提升数据...

    关系型数据库的数据导入Hbase

    - Hbase Coprocessor:Hbase支持在RegionServer端运行用户自定义的代码,即Coprocessor。可以编写Coprocessor监听WAL(Write-Ahead Log)事件,实时将关系库的变化写入Hbase。 - Phoenix:Phoenix是建立在Hbase之上...

    HBase 实践 如何破解 HBase+ElasticSearch 组合使用遇到的难题.docx

    - **利用 HBase Coprocessor 触发器**:应用只与 HBase 交互,Coprocessor 处理数据写入和查询加速。这种方式简化了应用接口,但开发和维护 Coprocessor 需要深度技术积累。 综合考虑,选择合适的方案需要根据具体...

    hbase to elasticsearch

    Hbase本身只有一级索引rowkey,现在通过Hbase coprocessor协处理器把Hbase的数据索引存储到Elasticsearch,从而建立二级索引;ppt中讲述了一些注意事项,挺有用的,希望能有所帮忙!

    2018 Apache HBase 技术实战专刊

    同时,还有关于HBaseCoprocessor的实现与应用的介绍,该组件允许在服务器端以分布式方式执行用户自定义代码,进而扩展HBase的功能。在平台篇中,则介绍了HBase平台实践和应用,包括平台建设方面的内容。 运维指南则...

    hbase原理和设计

    3. **HBase Coprocessor**:通过Coprocessor在服务器端执行复杂的查询逻辑,减少网络传输开销。 4. **HBase集成其他索引系统**:例如使用Apache Phoenix提供的SQL层或Apache Solr作为索引引擎。 #### 五、RowKey...

    hbase与hive数据同步共4页.pdf.zip

    2. **HBase Coprocessor**: HBase支持coprocessor机制,可以在RegionServer上执行用户定义的逻辑,实现数据写入HBase的同时,触发数据同步到Hive的操作。这种方法可以减少网络传输,提高效率。 3. **使用Apache ...

    Hbase实战

    3. HBase Coprocessor:允许用户自定义处理逻辑,实现细粒度的数据过滤和计算。 4. HBase监控:通过Hadoop Metrics2和JMX进行系统监控,了解集群健康状况。 六、HBase实战应用 HBase常用于互联网日志分析、推荐系统...

    阿里HBase的数据管道设施实践与演进_阿里巴巴.zip

    3. 数据加载:处理后的数据通过HBase的批量加载接口(如HBase Coprocessor或MapReduce)写入HBase,保证写入性能和数据的一致性。 4. 数据查询:HBase提供了丰富的查询API,包括Java API、HBase Shell以及HBase与...

Global site tag (gtag.js) - Google Analytics