1. HBaseServer创建后有几个重要的角色.
1.1 Listener deamon线程,负责接收HMaster,HRegionServer,HBase Client的http请求.
1.2 Responder demon线程,负责将处理完的请求,发送回调用者.
1.3 Connection listener接收到的每个Socket请求都会创建一个Connection 实例.
1.4 Call 每一个客户端的发送的请求由Connection读取到有效数据后都会生成一个Call实例
1.5 LinkedBlockingQueue<Call> callQueue 每个由新生成的call都会放入到callQueue这个队列里.
1.6 Handler 从callQueue中取出call,并对call进行反射的调用,生成的结果value,交由responder处理.
1.7 LinkedList<Call> Connection.responseQueue ,用来存放已经由handler处理过的属于同一个Connection的call.
2. Listener
2.1 Listener首先创建一个ServerSocketChannel,将这个socket绑定到配置(HMaster和HRegionServer)的端口上,当然还有一个backlogLength conf.getInt("ipc.server.listen.queue.size", 128),设定成no-blocking,在selector上注册OP_ACCEPT事件.
2.2 每个新连接socket都会设置tcpNoDelay,tcpKeepAlive属性(这些属性对不同的环境应该有不同的设置,有待测试),并且在selector中注册OP_READ事件,,然后创建Connection对象.connection中持有已经连接的socket对象.
2.3 对于可读的socket,调用connection(selector的SelectionKey 的attachment存放着connection对象)的readAndProcess()方法.
2.4 问题
a. 每个客户端的请求,Listener都会创建一个Connection对象,如果这个Connection对象在创建后,请求端突然不再发送数据了,那么这个connection就会一直占用资源,且释放不了.所以Listener的run中每次轮询最后都会判断并处理一下所有的Connection List表.
这里有3个条件:
1. thresholdIdleConnections(conf.getInt("ipc.client.idlethreshold", 4000)),总的连接数大于这个值.
2. cleanupInterval(10000 10秒)如果不是强制清除所有的connection那么两次清除操作的时间间隔要大于这个值.
3. 调用Connection.timedOut(long currentTime)来看这个连接的是不是可以关闭.
b. 每个Connection可能是长连接(HRegionServer保存的到HMaster的连接),而且每个connection对应的数据可能不会通过一次channel.read(..)能全部处理完,也可以是客户端定时发送请求(HRegionServer发送心跳给HMaster),那么剩余的数据到达时,Listener怎么知道,这就是所有的accept的socket都注册到selector的原因.
3. Connection
3.1 Listener对有数据可读的connection调用readAndProcess()方法.readAndProcess会先读取version,然后与本机的version值进行比较,如果读不到version,那么这个connection将会被关闭(在Listener中),然后读取头UserGroupInformation ticket(这个信息只读取一次),最后读取Data数据,data数据分为两部分(1.id HBaseClient发送一个请求都会先把客户端生成的id传过来(这个id还要HBaseServer传回给HBaseClient),2.ByteBuffer类型的数据,HBaseClient在发送请求时,每个完整的请求的byte大小放在数据的前部,所以HBaseServer得到整个数据的大小值后,是可以判断出从socket中取多少数据是一个请求的分界.这个数据包括方法的名称,参数类型,参数的值),由id与数据生成一个Call对象,并把call放入到callQueue 这个阻塞队列中.
这里每次channel.read()最少要可以读到4字节的数据,不然connetion会被关闭掉.
3.2 timedOut(long currentTime)方法用来测试connetion是否可以被close掉.
(rpcCount == 0)&&(currentTime - lastContact > maxIdleTime)为判断的条件.也就是说如果maxIdleTime值设置的足够大,那么connetion不会被关闭,除非被强制关闭.
rpcCount值表示接收的数据与要发送的处于平衡状态,rpcCount值是一个volatile值,如果有数据读入那么rpcCount+1,如果Responder把一个call的数据都写出,那么rpcCount-1,只是代码中incRpcCount();的位置不能保证每次call对象的生成,如果客户端的请求数据量大,那么使得rpcCount被增加了超过1次,那么这个Connection永远不能被Listner关闭.
4. Handler
4.1 Handler是真正的请求的处理者,它阻塞于callQueue.take(),callQueue是一个阻塞形的queue,它的大小由handler的个数handlerCount(HBaseServer创建时指定)*100来确定,如果callQueue队列满了,Listener将会阻塞于Connetion的处理.再也不能accept新的请求.
4.2 多个handler来并发的处理callQueue中的Call对象,每个Handler处理call时都会对call进行捆绑.
CurCall.set(call);
UserGroupInformation.setCurrentUser(call.connection.ticket);
处理的结果以ByteBuffer的格式由call对象保存在response中,
4.3 最后交由Responder来处理call.
5. Responder
5.1 responder的run方法是用来处理因为call的回复数据量太大,而不能在一次channelWrite(channel, call.response)操作完成的call.
5.2 handler处理后的call调用Responder.doRespond(Call call)方法,将call加入到call.connection.responseQueue中,如果发现call.connection.responseQueue中的大小只有1时,会直接将call中的内容回复给客户端(HBaseClient),如果call中的数据量比较大,那么channelWrite(channel, call.response)一次写不完response中的内容,就应该在writeSelector上注册这个channel的OP_WRITE事件.
call.connection.responseQueue.addFirst(call);
...
incPending();//这相当于是一把锁,改变这个值后thread下次轮循会wait()
try {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();//这里要把writeSelector唤醒,因为writeSelector可能正在blocking select(PURGE_INTERVAL)方法上.
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
//注册OP_WRITE事件.并把call当作SelectionKey的attachment.
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();// 这里要解锁.
}
这里只有call为responseQueue唯一个值时,才会在没写完时注册OP_WRITE事件.
5.3 对于run中运行的writeSelector.select(PURGE_INTERVAL);会将可以写的channel对应的call来处理.因为没有发送完的call始终在connection.responseQueue的first位置,所以结果的回复对于HBaseClient来说是完整的.对每个call的处理流程为doAsyncWrite(SelectionKey key)
a. 锁定call.connection.responseQueue
b. processResponse(final LinkedList<Call> responseQueue,boolean inHandler),取responseQueue中的第一个值.然后调用channelWrite(channel, call.response),
b.1 如果response中没有数据了,且responseQueue中也没有数据了,那么算是做完了,call对应的key重新设置key.interestOps(0);
b.2 如果response中没有数据了,但是responseQueue中还有call,那么call对应的SeletorKey还是要监听OP_WRITE,但是这时当前call只是一个路径,通过SeletorKey找到call.connection.responseQueue,而且当前的call也已经不在队列中了,但是call还活着.直到responseQueue中的处理全部处理完,然后又有一个call被接收处理,并且没有一次处理完时,register注册并关联新的call后,之前的call才会被丢弃.
5.4 run 的一次循环中只处理每一个SocketChannel的第一个Call对象.
5.5 run中now > lastPurgeTime + PURGE_INTERVAL(900000 15分钟) 时,responder会检查所有的注册的call,并把从接收时间算起doPurge(Call call, long now) call.connection.responseQueue队列中有now > nextCall.timestamp + PURGE_INTERVAL的call请求,就把那个connetion关闭掉.
分享到:
相关推荐
赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
hbase-hbck2-1.1.0-SNAPSHOT.jar
HBCK是HBase1.x中的命令,到了HBase2.x中,HBCK命令不适用,且它的写功能(-fix)已删除;...其GitHub地址为:https://github.com/apache/hbase-operator-tools.git 附件资源是已经编译好的hbase2.4.4版本的hbck
赠送jar包:hbase-server-1.4.3.jar; 赠送原API文档:hbase-server-1.4.3-javadoc.jar; 赠送源代码:hbase-server-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-server-1.4.3.pom; 包含翻译后的API文档:...
赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...
HBCK2 jar包是这个工具的可执行文件,通常在HBase的lib目录下可以找到,名为`hbase-hbck2-x.x.x.jar`,其中`x.x.x`表示具体的HBase版本号。这个jar包包含了所有执行HBCK2命令所需的功能和类。你可以通过Hadoop的`...
《深入解析YCSB-HBase14-Binding 0.17.0》 YCSB(Yahoo! Cloud Serving Benchmark)是一种广泛使用的云数据库基准测试工具,它为各种分布式存储系统提供了标准化的性能评估框架。YCSB-HBase14-Binding 0.17.0是针对...
赠送jar包:flink-hbase_2.11-1.10.0.jar; 赠送原API文档:flink-hbase_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-hbase_2.11-1.10.0-sources.jar; 赠送Maven依赖信息文件:flink-hbase_2.11-1.10.0.pom; ...
赠送jar包:hbase-server-1.1.3.jar; 赠送原API文档:hbase-server-1.1.3-javadoc.jar; 赠送源代码:hbase-server-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-server-1.1.3.pom; 包含翻译后的API文档:...
本文将深入探讨这两个技术及其结合体`phoenix-hbase-2.2-5.1.2-bin.tar.gz`的详细内容。 首先,HBase(Hadoop Database)是Apache软件基金会的一个开源项目,它构建于Hadoop之上,是一款面向列的分布式数据库。...
Hbase修复工具 示例情景: Q:缺失hbase.version文件 A:加上选项 -fixVersionFile 解决 Q:如果一个region即不在META表中,又不在hdfs上面,但是在regionserver的online region集合中 A:加上选项 -...
hbase-client-2.1.0-cdh6.3.0.jar
赠送jar包:hbase-hadoop2-compat-1.1.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.1.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
赠送jar包:hbase-server-1.2.12.jar; 赠送原API文档:hbase-server-1.2.12-javadoc.jar; 赠送源代码:hbase-server-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-server-1.2.12.pom; 包含翻译后的API文档...
赠送jar包:hbase-server-1.2.12.jar; 赠送原API文档:hbase-server-1.2.12-javadoc.jar; 赠送源代码:hbase-server-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-server-1.2.12.pom; 包含翻译后的API文档...
赠送jar包:hbase-hadoop2-compat-1.4.3.jar; 赠送原API文档:hbase-hadoop2-compat-1.4.3-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.4.3-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
赠送jar包:hbase-hadoop2-compat-1.2.12.jar; 赠送原API文档:hbase-hadoop2-compat-1.2.12-javadoc.jar; 赠送源代码:hbase-hadoop2-compat-1.2.12-sources.jar; 赠送Maven依赖信息文件:hbase-hadoop2-compat-...
赠送jar包:hbase-server-1.1.3.jar; 赠送原API文档:hbase-server-1.1.3-javadoc.jar; 赠送源代码:hbase-server-1.1.3-sources.jar; 赠送Maven依赖信息文件:hbase-server-1.1.3.pom; 包含翻译后的API文档:...
`hbase-1.2.0-cdh5.14.2.tar.gz` 是针对Cloudera Distribution Including Apache Hadoop (CDH) 5.14.2的一个特定版本的HBase打包文件。CDH是一个流行的Hadoop发行版,包含了多个大数据组件,如HDFS、MapReduce、YARN...