`

HBase RPC通信功能实现

阅读更多

Table of Contents

 

RPC通信功能实现

HBase的RPC通信功能主要基于Protobuf和NIO这两个组件来实现,在通信管道上选择的是protobuf对外声明的BlockingRpcChannel(阻塞式),其callBlockingMethod方法决定了客户端与服务端的交互行为,比如采用什么样的方法进行通信以及通信报文的格式规则都是通过该方法来描述的。

HBase对外声明了BlockingRpcChannelImplementation实现类用于实现BlockingRpcChannel接口的业务逻辑,其在通信方式的选择上采用的是Socket通信,在通信的服务端通过RpcServer来构建ServerSocket,而在客户端使用RpcClient来构建与服务端通信的Socket,按照功能职责的不同,RpcServer可划分成3大组件,其中:

  • Listener负责监听客户端的连接请求

    在Listener的内部主要封装着一个ServerSocketChannel以及多个Reader线程,其中ServerSocketChannel主要负责接收客户端的连接请求,请求被响应前,会暂存于等待队列中,等待队列的长度通过hbase.ipc.server.listen.queue.size参数来设置(默认为128)。针对每个已建立的连接,系统还会实时检测其空闲时间,如果空闲时间超过2秒(即2秒内客户端没有再次通过该连接来发送请求,并且之前的请求操作已经处理完毕),系统会将该连接进行关闭,时间阀值是通过hbase.ipc.client.connection.maxidletime参数来控制的。

    当请求信息到达后,其会派遣合适的Reader进行读取(基于轮训的方式来使每个Reader的负载能够均衡),Reader线程的数量是通过hbase.ipc.server.read.threadpool.size参数来指定的,默认为10个,线程启动后会进入阻塞状态直至客户端请求操作的到来。客户端向服务端发送的通信报文是按照一定格式进行组织的,如图所示:

    1. 当客户端与服务端进行初次握手时,其会向服务端发送RPCHeader报文,以便服务端能够对客户端的连接请求做校验处理。

      如果校验结果满足以下规则,说明该请求操作是合法的:

      (1)前4个字节信息为HBas;

      (2)第5个字节(VERSION信息)的值为0;

      (3)在没有启用security的情况下(hbase.security.authentication属性值不为kerberos),第6个字节的值为80。

    2. 接着,客户端会向服务端发送ConnectionHeader报文,通过它来封装客户端所请求的服务。

      ConnectionHeader是通过使用protobuf来完成序列化处理的,其protocol声明如下:

      message ConnectionHeader {
          optional UserInformation user_info = 1;
          optional string service_name = 2;
          optional string cell_block_codec_class = 3;
          optional string cell_block_compressor_class = 4;
      }
      					

      服务端收到该请求消息之后,可通过其service_name属性来判断客户端所要访问的服务名称,从而定位到具体的服务。

    3. 确定了具体的服务之后,客户端便可持续向服务端发送Request报文,通过它来定位将要执行服务的哪一个方法。

      方法名称是通过RequestHeader来封装的,其属于Request报文的一部分,如图所示:

      RequestHeader同样是采用protobuf进行序列化处理,其protocol声明如下:

      message RequestHeader {
          optional uint32 call_id = 1;
          optional RPCTInfo trace_info = 2;
          optional string method_name = 3;
          optional bool request_param = 4;
          optional CellBlockMeta cell_block_meta = 5;
          optional uint32 priority = 6;
      }
      					

      其method_name属性用于定位将要执行的方法名称,方法参数是通过Param报文来封装的,除此之外,客户端还可向服务端传递一些KeyValue数据(比如Replication功能会用到这些数据),这些数据会序列化到CellBlock报文里。Reader线程读取到这些信息后开始构造CallRunner对象,并将其赋予空闲的Handler进行处理。

  • Handler负责处理客户端的请求操作

    从服务端的角度观察,客户端的所有请求都可封装成CallRunner对象,如果把Reader看做是CallRunner的生产者,那么Handler便是消费者。为了加快服务端的响应效率,RpcServer是允许同时存在多个消费者的,以此来并发消费所有的CallRunner产品。然而CallRunner产品在所有的消费者之间应当如何做到合理分配?这主要是通过RpcScheduler来调度的。HBase对外声明了两种RpcScheduler的功能实现类,其中HMaster使用的是FifoRpcScheduler,而HRegionServer使用的SimpleRpcScheduler。

    1. FifoRpcScheduler

      基于线程池来消费所有的CallRunner产品,CallRunner的消费顺序采用FIFO原则(按照产出的先后顺序依次进行消费),针对每个CallRunner产品,系统都会开启一个Handler线程负责对其进行消费处理,线程池所能允许的最大并发数是由具体的服务来对外进行声明的,如HMaster默认情况下允许25个并发Handler(通过hbase.master.handler.count参数进行设置)。

    2. SimpleRpcScheduler

      采用该策略进行调度处理以后,系统会根据不同的请求类型将所有的CallRunner产品划分成3组:

      (1)如果其封装的请求是基于meta表格的操作,将其划分到priorityExecutor组里;

      (2)如果其封装的请求是基于用户表格的操作,将其划分到callExecutor组里;

      (3)如果其封装的是replication请求,将其划分到replicationExecutor组里。

      然后为每一个产品组分配数量不等的Handler,让Handler只消费指定组中的产品。不同的产品组所分配的Handler数量同样是由具体的服务来对外声明的,拿HRegionServer举例:

      分配给priorityExecutor组的Handler数量通过hbase.regionserver.metahandler.count参数来指定,默认为10个;

      分配给callExecutor组的Handler数量通过hbase.regionserver.handler.count参数来指定,默认为30个;

      分配给replicationExecutor组的Handler数量通过hbase.regionserver.replication.handler.count参数来指定,默认为3个。

      每一个产品组还可细分成多个产品队列,默认情况下每个产品组只包含一个产品队列。这样产品组中的所有Handler都会去竞争该队列中的资源,为了防止竞争惨烈的情况发生,可将每一个产品组划分成多个产品队列,让每个Handler只去抢占指定队列中的资源。在HRegionServer中,可通过如下方法来计算callExecutor组可以划分成多少个产品队列:

      Math.max(1,hbase.regionserver.handler.count*hbase.ipc.server.callqueue.handler.factor)

      其中hbase.ipc.server.callqueue.handler.factor属性值默认为0,即在默认情况下只将该产品组划分成一个产品队列。

      单个产品队列的容量并不是按需使用无限增长的,HBase对其长度及空间大小都做了相应的阀值控制,其中:

      hbase.ipc.server.max.callqueue.length用于限制产品队列的长度(默认为handler数乘以10)

      hbase.ipc.server.max.callqueue.size用于限制产品队列的空间大小(默认为1G)

      成功将CallRunner产品分配给Handler之后,该Handler开始对其进行消费处理,消费过程主要是通过调用RpcServer的call方法来执行指定服务的相应方法,并通过Responder将方法的执行结果返回给客户端。

    3. Responder负责将服务端的处理结果返回给客户端

      服务端返回给客户端的通信报文是按照如下格式进行组织的:

      其中ResponseHeader是采用protobuf进行序列化的,其protocol声明如下:

      message ResponseHeader {
          optional uint32 call_id = 1;
          optional ExceptionResponse exception = 2;
          optional CellBlockMeta cell_block_meta = 3;
      }
      					

      其内部主要封装了服务端的执行异常信息,以及CellBlock的元数据信息;Result用于封装执行方法的返回结果,其序列化方法需要根据具体的返回值类型来做决定;CellBlock用于封装服务端所返回的KeyValue数据(如scan操作的查询结果)。

客户端发送请求消息之后,会进入循环等待状态,直至服务端返回执行结果,如果等待时间超过10秒,则系统会认为该请求失败,将开启重试或关闭连接(如果hbase.ipc.client.connect.max.retries参数值为0)。

配置参数

  • 服务端相关配置如下:

    1. hbase.ipc.server.listen.queue.size

      存放连接请求的等待队列长度,默认与ipc.server.listen.queue.size参数值相同,为128个。

    2. hbase.ipc.server.tcpnodelay

      是否在TCP通信过程中启用Nagle算法,默认不启用。

    3. hbase.ipc.server.tcpkeepalive

      是否启用TCP的keepalive机制,通过心跳包来判断连接是否断开,默认启用。

    4. hbase.ipc.server.read.threadpool.size

      Reader线程数,默认为10个。

    5. hbase.ipc.server.max.callqueue.size

      单个消费队列所允许的存储空间上限(默认为1GB),超过该上限客户端会抛出以下异常:

      Call queue is full, is ipc.server.max.callqueue.size too small?

    6. hbase.ipc.server.max.callqueue.length

      单个消费队列的长度限制,默认值为10倍的Handler数。

    7. hbase.ipc.server.callqueue.handler.factor

      该参数用于决定消费队列的个数。

    8. hbase.ipc.server.callqueue.read.share

      读Handler数占总Handler数的比例。

  • 客户端相关配置如下:

    1. hbase.ipc.ping.interval

      客户端与服务端的心跳时间间隔,以及Socket的默认读写超时时间(HBase的其他一些参数会覆盖该值,如hbase.rpc.timeout)。

    2. hbase.client.rpc.codec

      CellBlock报文内容的编码/解码器,默认与hbase.client.default.rpc.codec的参数值相同,为org.apache.hadoop.hbase.codec.KeyValueCodec。

      如果将hbase.client.default.rpc.codec设置成空字符串,并且不对hbase.client.rpc.codec参数进行设置,则在rpc通信过程中将不在使用CellBlock报文对KeyValue进行序列化,而是将其序列化到protobuf的message里(Param或Result)。

    3. hbase.client.rpc.compressor

      CellBlock报文内容的压缩/解压缩算法,默认不采用压缩。

    4. hbase.ipc.socket.timeout

      客户端尝试与服务端建立连接的超时时间,默认与ipc.socket.timeout相同为20秒。

    5. hbase.rpc.timeout

      客户端对RegionServer的rpc请求超时时间。

    6. hbase.client.pause

      Socket连接失败后,会休眠一段时间,然后在重新连接,该参数用于指定休眠多久,默认为0.1秒。

    7. hbase.ipc.client.connect.max.retries

      当客户端与服务端的连接出现错误时,通过该参数来指定重试次数,默认为0(不重试)。

    8. hbase.ipc.client.connection.maxidletime

      客户端与服务端的连接空闲时间超过该数值时(即指定时间范围内,客户端没有收到服务端的响应信息),系统会将该连接关闭,默认的响应超时时间为10秒。

    9. hadoop.rpc.socket.factory.class.default

      SocketFactory实现类,默认为org.apache.hadoop.net.StandardSocketFactory,其createSocket方法会创建SocketChannel用于NIO通信。

调用方法

  1. 在客户端可通过如下代码来构建所需服务,拿ClientService举例:

    RpcClient rpcClient = new RpcClient(conf, clusterId); 1
    BlockingRpcChannel channel = 
        rpcClient.createBlockingRpcChannel(serverName 2, user 3, rpcTimeout 4);
    ClientService.BlockingInterface stub = ClientService.newBlockingStub(channel);
    					

    1

    首先构建RpcClient,其中clusterId属性可从Zookeeper的/hbase/hbaseid节点中读取;

    2

    serverName用于定位服务端地址,可通过ServerName.valueOf("${host},${port},${startCode}")方法来构建,如ServerName.valueOf("localhost,60020,1422961250317");

    3

    user为客户端的请求用户,可通过User.getCurrent()来获取;

    4

    rpcTimeout为rpc请求的超时时间。

  2. 在服务端可通过如下代码来发布服务,同样拿ClientService举例:

    List<BlockingServiceAndInterface> services = 
        new ArrayList<BlockingServiceAndInterface>();
    services.add(new BlockingServiceAndInterface(
        ClientService.newReflectiveBlockingService(regionServer), 1
        ClientService.BlockingInterface.class));
    RpcServer rpcServer = new RpcServer(serverInstance 2, name 3, services 4, 
        isa 5, conf, scheduler 6);
    rpcServer.start();
    					

    1

    构造ClientService实例,通过其newReflectiveBlockingService方法,方法参数为HRegionServer实例,其实现了ClientService.BlockingInterface接口;

    2

    serverInstance为服务进程实例,这里为HRegionServer;

    3

    name为服务进程名称;

    4

    services为服务进程中包含的服务列表;

    5

    isa为服务的通信地址;

    6

    scheduler为rpc请求调度器,目前有两种实现:FifoRpcScheduler和SimpleRpcScheduler。

 

 

 

http://blog.csdn.net/javaman_chen/article/details/47039517

分享到:
评论

相关推荐

    HBaseCoprocessor的实现与应用.pdf

    - **Endpoint**:动态 RPC 插件接口,实现代码部署在服务器端,通过 HBase RPC 调用触发。 #### 二、Endpoint服务端实现 Endpoint 作为一种特殊的 Coprocessor,允许在服务器端直接处理请求,无需将所有数据返回给...

    HBase源码分析

    HBase为了实现以上提到的RPC通信机制,提供了丰富的API和内部实现。例如,HBaseClient类就是客户端操作的核心类,通过它可以直接获取到MasterServer的HMasterInterface实例,进而调用其提供的各种接口方法来执行集群...

    藏经阁-HBase Coprocessor-22.pdf

    3.使用 Protobuf 协议可以实现数据交换和 RPC 通信。 HBase Coprocessor 的优点: 1.提高性能:HBase Coprocessor 可以提高 HBase 的性能,例如使用 Endpoint 机制可以实现高效的数据处理。 2.灵活的扩展:HBase ...

    hbase-0.98.1源码包

    4. RPC机制:理解HBase如何通过HBaseRpcController和RpcServer实现客户端与服务器之间的通信。 5. 并发控制:学习RegionSplitPolicy、RegionSplitter等类,理解HBase如何处理并发请求和Region分裂。 6. 客户端API:...

    hadoop2.73-eclipse开发hbase所需要的所有jar包

    8. HBase Protobuf:protobuf序列化库,用于HBase的RPC通信。 开发者通常需要将这些JAR文件添加到Eclipse项目的构建路径中,以便编译和运行HBase程序。正确配置后,可以使用HBase的API创建表、插入数据、执行扫描和...

    thrift1 查询hbase

    1. **Thrift接口**:Thrift提供了一种序列化和RPC(远程过程调用)机制,允许开发者定义服务接口,并在多种语言之间实现这些接口。Thrift1是早期版本,虽然现在已更新到Thrift2,但对某些场景仍然适用。它通过生成...

    hbase-client

    HBase客户端是连接HBase服务器并与之交互的重要工具,它使得开发者能够在各种编程语言环境下与HBase进行通信,执行数据的读写操作。本文将深入探讨HBase客户端的功能、使用方法以及相关知识点。 一、HBase客户端...

    java开发hbase1.2.6需要的jar包整合

    `hbase-protocol-1.2.6.jar`定义了HBase的RPC协议,用于客户端与服务器之间的通信。 为了处理Zookeeper,你需要`zookeeper-3.4.6.jar`,这是一个分布式协调服务,HBase使用它来管理集群的状态信息和元数据。`...

    hbase- java开发连接工具类

    3. **RPC机制**:HBase使用远程过程调用(RPC)与RegionServer进行通信,处理数据请求。这个JAR包包含了相关的RPC实现。 4. **行键(RowKey)索引**:HBase是一种列族式数据库,行键是其主要的索引方式。`hbase-...

    HBase技术介绍.docx

    - **Client**: 使用HBase RPC机制与HMaster和HRegionServer通信。 - **HMaster**: 负责整个集群的管理和协调工作,包括Region分配、负载均衡等。 - **HRegionServer**: 负责处理客户端的数据读写请求,每个Region ...

    php-hbase-thrift

    Thrift的核心在于它的序列化机制和RPC(远程过程调用)框架,使得开发者可以轻松地在各种编程语言间构建和消费服务。 在PHP访问HBase时,由于HBase本身是用Java实现的,因此需要一个中间层来桥接PHP和HBase。这就是...

    2-6+HBase+Coprocessor.pdf

    Endpoint在服务端的实现涉及到RPC通信,因此客户端和服务端需定义一致的接口。HBase使用Protobuf协议进行数据交换。例如,定义了一个`AggregateRequest`消息,包含了列解释器类名、扫描器配置(Scan),以及特定于...

    hbase安装与hbase架构说明

    在HBase的架构中,Client是用户与系统交互的接口,它通过远程过程调用(RPC)机制与HMaster和HRegionServer通信。对于数据读写操作,Client直接与HRegionServer交互,而对于表管理和元数据操作,Client则与HMaster...

    最近很火的大数据Hadoop之Hbase0.99.2最新版源码

    此外,HBase的RPC通信框架也是其性能的关键。0.99.2版本的`HBaseRpcController`和`RpcServer`实现了异步调用和请求调度,极大地提升了系统吞吐量。同时,HBase还支持多种数据压缩算法,如Snappy和LZO,通过`...

    企业中应用HBase

    - **解决方案**:为了实现这一目标,HBase采用了一种新的RPC引擎,即Protobuf RPC引擎来替代原有的Writable RPC引擎。这种改变不仅提高了RPC请求/响应消息的效率,还通过使用Protobuf格式实现了更灵活的数据序列化。...

    hbase原理和设计

    1. **客户端(Client)**:使用HBase的RPC机制与HMaster和HRegionServer进行通信,实现数据读写等操作。 2. **ZooKeeper**:作为HMaster选举和服务状态监控的核心组件,同时也是Region寻址的入口。 3. **HMaster**:...

    hbase java api 所需最精简 jar

    - **hbase-protocol.jar**:提供了HBase的RPC协议,客户端和服务端通信需要。 - **zookeeper.jar**:HBase依赖Zookeeper进行集群协调,所以需要Zookeeper的JAR。 - **slf4j-api.jar**和**slf4j-log4j12.jar**:...

    大数据HBASE考题材料

    HRegionServer与HMaster及客户端之间的通信采用RPC协议,即远程过程调用协议,这是一种用于不同计算机系统间的进程间通信的方式。 8. **HFile中的KeyValue结构** 在HFile数据格式中,KeyValue数据结构的Value...

    hbase 源码包

    4. **RPC通信**:`org.apache.hadoop.hbase.ipc`包下的RpcServer实现了客户端与服务器之间的远程过程调用,处理客户端请求。 5. **版本控制与并发控制**:每个Cell都有时间戳,用于版本控制;`org.apache.hadoop....

Global site tag (gtag.js) - Google Analytics