`

[Binospace] 深入分析HBase RPC(Protobuf)实现机制

阅读更多

背景

在HMaster、RegionServer内部,创建了RpcServer实例,并与Client三者之间实现了Rpc调用,HBase0.95内部引入了Google-Protobuf作为中间数据组织方式,并在Protobuf提供的Rpc接口之上,实现了基于服务的Rpc实现,本文详细阐述了HBase-Rpc实现细节。

HBase的RPC Protocol

 在HMaster、RegionServer内部,实现了rpc 多个protocol来完成管理和应用逻辑,具体如下protocol如下:

 
HMaster支持的Rpc协议:
MasterMonitorProtocol,Client与Master之间的通信,Master是RpcServer端,主要实现HBase集群监控的目的。

 

MasterAdminProtocol,Client与Master之间的通信,Master是RpcServer端,主要实现HBase表格的管理。例如TableSchema的更改,Table-Region的迁移、合并、下线(Offline)、上线(Online)以及负载平衡,以及Table的删除、快照等相关功能。
RegionServerStatusProtoco,RegionServer与Master之间的通信,Master是RpcServer端,负责提供RegionServer向HMaster状态汇报的服务。
 
RegionServer支持的Rpc协议:
ClientProtocol,Client与RegionServer之间的通信,RegionServer是RpcServer端,主要实现用户的读写请求。例如get、multiGet、mutate、scan、bulkLoadHFile、执行Coprocessor等。
AdminProtocols,Client与RegionServer之间的通信,RegionServer是RpcServer端,主要实现Region、服务、文件的管理。例如storefile信息、Region的操作、WAL操作、Server的开关等。
 
(备注:以上提到的Client可以是用户Api、也可以是RegionServer或者HMaster)

hbase-protorpc_1

 

 HBase-RPC实现机制分析

RpcServer配置三个队列:

1)普通队列callQueue,绝大部分Call请求存在该队列中:callQueue上maxQueueLength为${ipc.server.max.callqueue.length},默认是${hbase.master.handler.count}*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER,目前0.95.1中,每个Handler上CallQueue的最大个数默认值(DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER)为10。

2)优先级队列: PriorityQueue。如果设置priorityHandlerCount的个数,会创建与callQueue相当容量的queue存储Call,该优先级队列对应的Handler的个数由rpcServer实例化时传入。

3)拷贝队列:replicationQueue。由于RpcServer由HMaster和RegionServer共用,该功能仅为RegionServer提供,queue的大小为${ipc.server.max.callqueue.size}指定,默认为1024*1024*1024,handler的个数为hbase.regionserver.replication.handler.count。

RpcServer由三个模块组成:

Listener ===Queue=== Responder

 hbase_rpc_95

这里以HBaseAdmin.listTables为例,分析一个Rpc请求的函数调用过程:

1) RpcClient创建一个BlockingRpcChannel。

2)以channel为参数创建执行RPC请求需要的stub,此时的stub已经被封装在具体Service下,stub下定义了可执行的rpc接口。

3)stub调用对应的接口,实际内部channel调用callBlockingMethod方法。

RpcClient内实现了protobuf提供的BlockingRpcChannel接口方法callBlockingMethod,

?View Code JAVA
 
  @Override
public Message callBlockingMethod(MethodDescriptor md, RpcController controller,
Message param, Message returnType)
throws ServiceException {
return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket,
this.isa, this.rpcTimeout);
}

通过以上的实现细节,最终转换成rpcClient的调用,使用MethodDescriptor封装了不同rpc函数,使用Message基类可以接收基于Message的不同的Request和Response对象。

4)RpcClient创建Call对象,查找或者创建合适的Connection,并唤醒Connection。

5)Connection等待Call的Response,同时rpcClient调用函数中,会使用connection.writeRequest(Call call)将请求写入到RpcServer网络流中。

6)等待Call的Response,然后层层返回给更上层接口,从而完成此次RPC调用。

RPCServer收到的Rpc报文的内部组织如下:

Magic

(4Byte)

Version

(1Byte)

AuthMethod

(1Byte)

Connection

HeaderLength

(4Byte)

ConnectionHeader

Request

“HBas”

         
 

验证RpcServer的CURRENT_VERSION

与RPC报文一致

目前支持三类:

AuthMethod.SIMPLE

AuthMethod.KERBEROS

AuthMethod.DIGEST

 

RPC.proto定义
RPCProtos.ConnectionHeader
message ConnectionHeader {
optional UserInformation

userInfo = 1;
optional string

serviceName = 2;
// Cell block codec we will use

sending over optional cell

blocks.  Server throws

exception
// if cannot deal.
optional string

cellBlockCodecClass = 3

[default = "org.apache.

hadoop.hbase.codec.

KeyValueCodec"];
// Compressor we will use

if cell block is compressed. 

Server will throw exception

if not supported.
// Class must implement

hadoop’s

CompressionCodec

Interface
optional string

cellBlockCompressorClass = 4;
}
序列化之后的数据

 

整个Request存储是经过编码之后的byte数组,包括如下几个部分:

RequestHeaderLength(RawVarint32)

RequestHeader

ParamSize(RawVarint32)

Param

CellScanner

 

RPC.proto定义:
message RequestHeader {
// Monotonically increasing callId to keep track of RPC requests and their response
optional uint32 callId = 1;
optional RPCTInfo traceInfo = 2;
optional string methodName = 3;
// If true, then a pb Message param follows.
optional bool requestParam = 4;
// If present, then an encoded data block follows.
optional CellBlockMeta cellBlockMeta = 5;
// TODO: Have client specify priority
}
序列化之后的数据
并从Header中确认是否存在Param和CellScanner,如果确认存在的情况下,会继续访问。

 

Protobuf的基本类型Message,
Request的Param继承了Message,
这个需要获取的Method类型决定。

 

从功能上讲,RpcServer上包含了三个模块,

1)Listener。包含了多个Reader线程,通过Selector获取ServerSocketChannel接收来自RpcClient发送来的Connection,并从中重构Call实例,添加到CallQueue队列中。

 ”IPC Server listener on 60021″ daemon prio=10 tid=0x00007f7210a97800 nid=0x14c6 runnable [0x00007f720e8d0000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked <0x00000000c43cae68> (a sun.nio.ch.Util$2)
- locked <0x00000000c43cae50> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c4322ca8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:84)
at org.apache.hadoop.hbase.ipc.RpcServer$Listener.run(RpcServer.java:646)

2)Handler。负责执行Call,调用Service的方法,然后返回Pair<Message,CellScanner>

“IPC Server handler 0 on 60021″ daemon prio=10 tid=0x00007f7210eab000 nid=0x14c7 waiting on condition [0x00007f720e7cf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000c43cad90> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at org.apache.hadoop.hbase.ipc.RpcServer$Handler.run(RpcServer.java:1804)

3) Responder。负责把Call的结果返回给RpcClient。

 ”IPC Server Responder” daemon prio=10 tid=0x00007f7210a97000 nid=0x14c5 runnable [0x00007f720e9d1000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked <0x00000000c4407078> (a sun.nio.ch.Util$2)
- locked <0x00000000c4407060> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c4345b68> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at org.apache.hadoop.hbase.ipc.RpcServer$Responder.doRunLoop(RpcServer.java:833)
at org.apache.hadoop.hbase.ipc.RpcServer$Responder.run(RpcServer.java:816)

RpcClient为Rpc请求建立Connection,通过Connection将Call发送RpcServer,然后RpcClient等待结果的返回。

 思考

1)为什么HBase新版本使用了Protobuf,并实现RPC接口?

HBase是Hadoop生态系统内重要的分布式数据库,Hadoop2.0广泛采用Protobuf作为中间数据组织方式,整个系统内Wire-Compatible的统一需求。

2)HBase内部实现的Rpc框架对于服务性能的影响?

目前使用Protobuf作为用户请求和内部数据交换的数据格式,采用更为紧缩编码格式,能够提高传输数据的效率。但是,有些优化仍然可以在该框架内探索:

实现多个Request复用Connection(把多个短连接合并成一个长连接);

在RpcServer内创建多个CallQueue,分别处理不同的Service,分离管理逻辑与应用逻辑的队列,保证互不干扰;

Responder单线程的模式,是否高并发应用的瓶颈所在?

是否可以分离Read/Write请求占用的队列,以及处理的handler,从而使得读写性能能够更加平衡?

针对读写应用的特点,在RpcServer层次内对应用进行分级,建立不同优先级的CallQueue,按照Hadoop-FairScheduler的模式,然后配置中心调度(类似OMega或者Sparrow轻量化调度方案),保证实时应用的低延迟和非实时应用的高吞吐。优先级更好的Call会优先被调度给Handler,而非实时应用可以实现多个Call的合并操作,从而提高吞吐。

3)Protobuf内置编码与传统压缩技术是否可以配合使用?

使用tcpdump获取了一段HMaster得到的RegionServer上报来的信息:

tcpdump_rpc_0

 

以上的信息几乎是明文出现在tcp-ip连接中,因此,是否在Protobuf-RPC数据格式采取一定的压缩策略,会给scan、multiGet等数据交互较为密集的应用提供一种优化的思路。

参考文献:

[1] HBase Rpc Protocols:  http://blog.zahoor.in/2012/08/protocol-buffers-in-hbase/

[2] HBase project 0.95.1
本系列文章属于Binos_ICTBinospace个人技术博客原创,原文链接为http://www.binospace.com/index.php/in-depth-analysis-hbase-rpc-0-95-version-implementation-mechanism/,未经允许,不得转载。

文章的脚注信息由WordPress的wp-posturl插件自动生成

 

分享到:
评论

相关推荐

    hbase分页查询实现.pdf

    HBase分页查询实现 HBase作为一个NoSQL数据库,具有高性能、高可扩展性和高可靠性等特点,但是在查询方面却存在一些限制,例如不支持分页查询。这就使得开发者需要自己实现分页查询功能。本文将讲解如何使用Java...

    HBase源码分析

    HBase是Apache Software Foundation下的一个开源的非关系型分布式数据库(NoSQL)...通过深入分析HBase的源码,我们可以更好地理解其背后的设计思想和实现细节,这对于在实际工作中进行性能调优和问题诊断非常有帮助。

    hbase分页查询实现[归类].pdf

    HBase分页查询实现 HBase是一种基于分布式的NoSQL数据库,它提供了高效的数据存储和检索能力。然而,HBase本身不支持分页查询,...本文提供了一个简单的示例代码,旨在帮助开发者更好地理解HBase分页查询的实现机制。

    藏经阁-HBase Coprocessor-22.pdf

    HBase Coprocessor 的实现机制: 1.使用 RegionObserver 接口可以实现二级索引的创建和维护。 2.使用 Endpoint 接口可以实现数据的聚合计算和排序。 3.使用 Protobuf 协议可以实现数据交换和 RPC 通信。 HBase ...

    HBaseCoprocessor的实现与应用.pdf

    - **Endpoint**:动态 RPC 插件接口,实现代码部署在服务器端,通过 HBase RPC 调用触发。 #### 二、Endpoint服务端实现 Endpoint 作为一种特殊的 Coprocessor,允许在服务器端直接处理请求,无需将所有数据返回给...

    protobuf-3.1.0.dist-info

    离线安装TensorFlow0.12cpu版本时,其它要求包都能容易找到,就这个找不到,想办法在以前的电脑上找到的,用的时候直接解压到python包的安装路径Lib\site-packages下,就可以了,其它作用没试。

    基于Java实现的Apache HBase分布式数据库设计源码分析

    该优化项目为Apache HBase分布式数据库设计,采用Java作为主要开发语言,辅以Ruby、Shell、Python等工具。项目源码包含5301个文件,其中...项目旨在深入分析HBase的内部实现机制,为分布式数据库的研究和应用提供参考。

    深入学习hbase原理资料整理

    《深入学习HBase原理》 HBase,全称为Hadoop Database,是一款高度可扩展的、高性能的、面向列的分布式数据库。它源自Google的Bigtable论文,并在其基础上为Hadoop生态系统提供了一种强大的非结构化数据存储解决...

    Canal支持protobuf2.5

    【标题】:“Canal支持protobuf2.5”指的是MySQL数据变更日志订阅与推送工具Canal在某次更新中增加了对Google Protocol Buffers(protobuf)2.5版本的支持。在此之前,Canal仅兼容protobuf的2.4版本。此次升级可能...

    2-6+HBase+Coprocessor.pdf

    - Endpoint:作为动态RPC插件的接口,其服务端实现可以被HBase RPC调用触发,提供自定义功能。 【Endpoint服务端实现】 Endpoint在服务端的实现涉及到RPC通信,因此客户端和服务端需定义一致的接口。HBase使用...

    HBase_SI_--_实现HBase_ACID的理论

    根据给定文件的信息,本文将深入探讨"HBase_SI"这一理论框架,该理论旨在实现HBase中的ACID特性。文章将从多个角度分析HBase_SI的设计理念、技术细节及其应用场景。 ### HBase_SI:实现HBase_ACID特性的理论 #### ...

    [HBase]源码级强力分析hadoop的RPC机制

    可以参考:http://baike.baidu.com/view/32726.htm)机制分析的博客一直耽搁了下来。昨天晚上胡老大和我抱怨说:最近乱的很。呵呵,老是往武汉跑,能不乱嘛。不过差不多腾讯面试的事就该告一段落了。五一期间,...

    HBase实现批量存取

    HBase是一种分布式、高性能、基于列族的NoSQL数据库,由Apache基金会开发并...通过Eclipse中的实践项目,我们可以深入学习HBase的API用法,了解其在大数据场景下的优势,为实际业务提供强有力的数据存储和处理支持。

    Elasticsearch-HBase-sep机制

    总结,`SEP`机制通过`hbase-indexer`组件实现了HBase与Elasticsearch之间的数据实时同步,让用户能够在保持HBase强大存储能力的同时,享受到Elasticsearch的搜索与分析优势。在实际部署和使用过程中,需根据业务需求...

    protobuf-wireshark plugins

    协议分析,可以分析protobuf消息。动态配置文件。 把文件放在(wireshark安装目录)\plugins目录下面。 把配置文件和proto文件放在(wireshark安装目录)\protobuf目录下面 插件基于wireshark 1.8.6 和protobuf ...

    10.hbase的整体工作机制--集群角色功能介绍--存储机制.mp4

    10.hbase的整体工作机制--集群角色功能介绍--存储机制.mp4

    HBase性能深度分析

    本文将深入探讨HBase的性能测试细节,重点剖析数据插入性能,并通过实证分析揭示其背后的机制与优化策略。 #### 数据插入性能测试设计 在评估HBase的实时数据插入性能时,测试场景设计至关重要。以随机值的Rowkey...

    HBase分布式事务与SQL实现

    在深入了解HBase分布式事务与SQL实现前,我们首先需要明确几个核心概念和组件,这些包括HBase基础架构、Google Percolator模型、TiDB分布式数据库,以及它们之间的关系。 HBase是一款分布式的、开源的NoSQL数据库,...

Global site tag (gtag.js) - Google Analytics