HBase 以客户端角色来调用服务器端的RPC实现
HBase 以客户端角色来调用服务器端的RPC实现
1. HBase的客户端RPC实际上是在Hadoop客户端RPC做的修改,具体的类为:
HBase org.apache.hadoop.hbase.ipc.HBaseRPC
Hadoop org.apache.hadoop.ipc.RPC
2. 大致流程
2.1 客户端(这里可以是HBase的HTable调用HMaster或者HRegionServer,HRegionServer调用HMaster,DFSClient 调用NameNode)在创建或者初始化时,通过java的Proxy.newProxyInstance(..)方法创建一些接口的代理.例如HTable 通过HConnectionManager创建HMasterInterface的代理.
2.2 代理的实现是通过创建Socket连接到服务器,将调用的方法标识及调用的方法的参数传递给服务器,服务器处理后返回结果,发送请求时参数要写入到DataOutput中,对于服务器的响应创建实例时,要用DataInput中读取数据,这就是为什么接口的参数大部分都实现了org.apache.hadoop.io.Writable接口,对于List,byte[]这些参数类型,是通过org.apache.hadoop.hbase.io.HbaseObjectWritable进行了封装.
3. 以HTable 通过HMasterInterface与HMaster通信为例.
3.1 HTable-->HConnectionManager.getConnection(conf)取得了org.apache.hadoop.hbase.client.HConnection接口的实例,实际上是由org.apache.hadoop.hbase.client.HConnectionManager.TableServers来实现的.HConnection接口定义了public HMasterInterface getMaster()的方法,来看下TableServers是怎么实现的.
3.2
masterLocation = zk.readMasterAddressOrThrow();//这里是通过ZooKeeper来取得HMaster注册的ip地址和端口
HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
masterLocation.getInetSocketAddress(), this.conf);//这里由HBaseRPC来生成代理
3.3 HBaseRPC 最后调用的方法是
public static VersionedProtocol getProxy(Class<?> protocol,
long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
Configuration conf, SocketFactory factory)
...
VersionedProtocol proxy =
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(addr, ticket, conf, factory));
//这里的factory是由conf中的参数"hadoop.rpc.socket.factory.class.default"指定的,默认(core-default.xml)是org.apache.hadoop.net.StandardSocketFactory,创建socket时是SocketChannel.open().socket()创建的NIO的socket.
//由org.apache.hadoop.hbase.ipc.HBaseRPC.Invoker实现的InvocationHandler接口
3.4 Invoker.invoke(Object proxy, Method method, Object[] args)方法中的实现为
HbaseObjectWritable value = (HbaseObjectWritable)
client.call(new Invocation(method, args), address, ticket);
//HBaseRPC.Invocation封装了Method的调用的参数,可以看readFields(DataInput in)和write(DataOutput out)的实现看是怎么样把参数写入读出的.
//client为Invoker创建时由HBaseRPC.ClientCache.getClient(conf, factory)创建.实现类为org.apache.hadoop.hbase.ipc.HBaseClient,每个SocketFactory对应一个HBaseClient对象.
3.5 真正的调用在HBaseClient中的call方法(有些异常的处理忽略了)
public Writable call(Writable param, InetSocketAddress addr,
UserGroupInformation ticket)
throws IOException {
Call call = new Call(param);//创建调用的对象,call中的方法都是synchronized,因为call会在不同的线程中调用
Connection connection = getConnection(addr, ticket, call);//取与addr的连接,这个连接可能是已经创建好的,也可能是新创建的.Connection是一个extends Thread线程
connection.sendParam(call); // send the parameter
synchronized (call) {//要先获得call的锁
while (!call.done) {//如果call调用没有完成,就一直循环下去
try {
call.wait(); // wait for the result,当前的调用线程挂起
} catch (InterruptedException ignored) {..}
}
...
return call.value;//服务器成功返回信息.
}
}
3.6 getConnection操作
private Connection getConnection(InetSocketAddress addr,
UserGroupInformation ticket,
Call call)
throws IOException {
Connection connection;
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
ConnectionId remoteId = new ConnectionId(addr, ticket);//通过addr及ticket生成一个key.
do {
synchronized (connections) {//连接池
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId);//创建新的连接
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call));//如果call调用不能加入到connetion中,就一直循环,这里的加入操作只是加入到connection的calls(Hashtable<Integer, Call>)中,然后调用notify()方法唤醒因为没有
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
connection.setupIOstreams();//这里是初始化的操作
return connection;
}
3.7 org.apache.hadoop.hbase.ipc.HBaseClient.Connection.setupIOstreams()方法
protected synchronized void setupIOstreams() throws IOException {
if (socket != null || shouldCloseConnection.get()) {//如果socket不为空说明连接已经建立过了
return;
}
short ioFailures = 0;
short timeoutFailures = 0;
try {
while (true) {//这里一真循环真到连接建立起,如果网络不好,应该会等较长时间
try {
this.socket = socketFactory.createSocket();//这里从创建新的socket然后设置socket的连接属性
this.socket.setTcpNoDelay(tcpNoDelay);//这些属性都是从conf文件中初始化的,具体的作用不清楚
this.socket.setKeepAlive(tcpKeepAlive);
// connection time out is 20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);//这里设置每个socket的connect返回时间间隔最大不超过20秒,
//如果超出20秒,方法会抛出SocketTimeoutException,
this.socket.setSoTimeout(pingInterval);
break;
} catch (SocketTimeoutException toe) {
handleConnectionFailure(timeoutFailures++, maxRetries, toe);
//这里每次超时都会累加超时次数,直到达到maxRetries=conf.getInt("hbase.ipc.client.connect.max.retries", 0),重新抛出异常
} catch (IOException ie) {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(NetUtils.getInputStream(socket))));//读入流
this.out = new DataOutputStream
(new BufferedOutputStream(NetUtils.getOutputStream(socket)));//写入流
writeHeader();//头信息
// start the receiver thread after the socket connection has been set up
start();//当前线程开启
} catch (IOException e) {
..
}
}
3.8 Connetion的run方法
public void run() {
try {
while (waitForWork()) {//wait here for work - read or close connection,如果还有提交的call没有回复,或者闲置的时间没有超出限制,那么就一直循环处理.
receiveResponse();
}
} catch (Throwable t) {
..
}
close();//从connection连接池中删除自己,关闭自己的in,out流.
}
3.9 判断是否退出的条件
private synchronized boolean waitForWork() {
if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {//如果calls池中没有还没有返回的提交信息.
long timeout = maxIdleTime-
(System.currentTimeMillis()-lastActivity.get());
if (timeout>0) {//如果闲置时间还没到,那么挂起
try {
wait(timeout);
} catch (InterruptedException ignored) {}
}
}
if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {//如果提交信息还有没有返回的.
return true;
} else if (shouldCloseConnection.get()) {
return false;
} else if (calls.isEmpty()) { // idle connection closed or stopped 所有的提交信息都返回了
markClosed(null);
return false;
} else { // get stopped but there are still pending requests
markClosed((IOException)new IOException().initCause(
new InterruptedException()));
return false;
}
}
3.10 处理服务器的回复信息
private void receiveResponse() { //因为这个方法是在run中顺序执行的,所以不用对in加锁.
...
try {
int id = in.readInt(); // try to read an id 每个call的标识
Call call = calls.get(id);
boolean isError = in.readBoolean(); // read if error 调用成功或者失败的标识,这里是客户端和服务器端约定好的
if (isError) {
//noinspection ThrowableInstanceNeverThrown
call.setException(new RemoteException( WritableUtils.readString(in),
WritableUtils.readString(in)));
calls.remove(id);//都要从calls队列中去掉call.
} else {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value,这里从流中创建返回的对象
call.setValue(value);//call的setValue会调用notify()方法,这样就唤醒了客户端调用call(..)方法挂起的线程.
calls.remove(id);
}
} catch (IOException e) {
markClosed(e);
}
}
3.11 call方法中与入call对象调用的Connetion sendParam(call)方法
protected void sendParam(Call call) {
..
DataOutputBuffer d=null;
try {
//noinspection SynchronizeOnNonFinalField
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC 对于out的操作是排它的如果写入的数据量非常大那么会不会导致长时间的等待,阻塞其它的线程.
//for serializing the
//data to be written
d = new DataOutputBuffer();
d.writeInt(call.id);//先写入call的id标识,以便从服务器的返回信息中读回id
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //first put the data length
out.write(data, 0, dataLength);//write the data
out.flush();//这里调用了flush操作.确保了out的顺序写入
}
} catch(IOException e) {
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
IOUtils.closeStream(d);
}
}
4. 总结
4.1 HBaseClient 与 SocketFactory实例是一一对应的.默认实现StandardSocketFactory的equals只要类型一致,那么就是相同的,也就是说一个jvm中只会有一个HBaseClient实例.
4.2 HBaseClient中创建的Connection 与address和用户的ticket(这里不清楚)一一对应(org.apache.hadoop.hbase.ipc.HBaseClient.ConnectionId),每个Connection只会对应一个 Socket,每个Connection在最大闲置时间内可以接受多个Call对象.
4.3 Connection中的in与out是由NetUtils.getInputStream(socket)和NetUtils.getOutputStream(socket)来封装的,使用了Selector.select()方法,这里不怎么清楚.
4.4 每个调用RPC的线程都会等待call的结果,输出到out是线程安全的,
2010.10.15
4.5 对HBaseClient的调用会是一个阻塞的过程.
4.6 Connection 创建时会初始化socket,socket的创建由SocketFactory创建,默认的socket它是blocking状态的,在NetUtils.connect(this.socket, remoteId.getAddress(), 20000);连接到服务器端时, 底层先设置SocketChannel为no-blocking,然后由Seletor来设置超时时间,如果超时时间到,connect没有返回,则抛出SocketTimeoutException异常.
4.7 socket connect返回后,会设置this.socket.setSoTimeout(pingInterval);值,这样read操作会最多阻塞pingInterval时间,如果超时,会抛出SocketTimeoutException 异常.
4.8 socket的read和write操作都是由NetUtils来提供的,底层都是由Seletor来支持针对时间的等待.
4.9 对于read的超时,通过PingInputStream的包装,每次都会发送一个Ping操作,sendPing();
4.10 每个Connetion创建之初都会writeHeader().对应的HBaseServer创建的Connection都会读header.
4.11 每个Connetion 只要calls(Hashtable<Integer, Call>)中有发送的call,除了发生异常,是不会被关闭的,除非HMasterServer关闭请求的连接.
5. 参考链接
5.1 Hadoop源代码分析(七) http://caibinbupt.iteye.com/blog/281281
5.2 Improve the Scalability and Robustness of IPC https://issues.apache.org/jira/browse/HADOOP-2864
5.3 HBase源码阅读-4-HMaster与HRegionServer的RPC http://run-xiao.iteye.com/blog/756455
分享到:
相关推荐
6. RPC框架:用于处理客户端请求,实现分布式通信。 通过深入学习HBase 0.94的源代码,开发者可以了解到如何处理分布式一致性、数据分区、数据复制等复杂问题,这对于构建大规模分布式系统非常有价值。同时,了解源...
4. RPC机制:理解HBase如何通过HBaseRpcController和RpcServer实现客户端与服务器之间的通信。 5. 并发控制:学习RegionSplitPolicy、RegionSplitter等类,理解HBase如何处理并发请求和Region分裂。 6. 客户端API:...
5. **RPC机制**:HBase使用Hadoop的RPC框架进行客户端与服务器之间的通信,源代码中的`HBaseRpcController`和`RpcExecutor`是实现这一功能的核心组件。 **测试用例与代码分析:** 在"**HBase-Research**"中提供的...
在Hadoop的RPC实现方法中,Client类、Server类、RPC类以及HDFS通信协议组是核心。这些组件共同协作实现远程过程调用,使得HDFS中的各个组件能够相互交流和协作。通过这些组件,HDFS能够处理客户端请求,并在集群内部...
Thrift提供了编译器,可以将这些.thrift文件转换为多种编程语言的源代码,如C++、Java、Python、PHP、Go等。生成的代码包括了客户端和服务器端的实现,使得开发者只需关注业务逻辑,而无需处理底层的网络通信细节。 ...
源代码即可应用。 我们用几个月的时间验证了 Themis 的正确性,并优化算法以获得更好的性能。 执行 Themis 包含三个组件:时间戳服务器、客户端库、themis 协处理器。 时间戳服务器 Themis内部使用HBase的KeyValue的...
项目的源代码展示了如何创建协议接口、实现协议类、构建服务器端、启动服务以及编写客户端代码来调用远程方法。 在实际开发中,了解并掌握Hadoop的RPC协议有助于优化分布式应用的性能,解决如网络延迟、安全认证等...
以C++为例,首先使用Thrift编译器生成C++代码,然后通过`make`命令编译生成的源代码,最后运行`./CppServer`和`./CppClient`启动服务端和客户端。同样地,对于Java示例,通过`ant`工具编译生成的Java代码,之后运行`...
解压缩下载的`thrift-0.8.0`文件,通常这是一个tar.gz或zip文件,解压后会得到一个包含源代码的目录。 3. **编译环境准备** 在编译Thrift之前,确保你的系统已经安装了必要的依赖库,这通常包括C++编译器(如GCC...
这个“thrift包及其源码”包含的是thrift 0.9.1版本的二进制库(libthrift-0.9.1.jar)和源代码(thrift-src.rar)。通过这个压缩包,我们可以深入了解Thrift的工作原理,并且可以根据需求进行定制化开发。 1. **...
- **Hadoop底层IPC原理和RPC**:探讨Hadoop内部通信机制,包括RPC(远程过程调用)的实现细节。 - **Hadoop底层googleProtoBuf的协议分析**:分析Google Protobuf在Hadoop中的应用情况。 #### 四、分布式数据库...
【描述】提到的"zk的的前台页面源码"可能指的是ZooKeeper提供的监控界面的源代码。这个界面用于查看ZooKeeper集群的状态,包括节点状态、会话信息、数据树等。"页面美化"意味着该源码可能包含了用户界面的改进,使得...
7. **Google ProtoBuf源代码分析**:了解Google的ProtoBuf是如何实现高效的序列化和反序列化,有助于提高Hadoop应用程序的性能。 综上所述,Hadoop不仅是一个强大的数据处理框架,而且其内部机制和技术也值得深入...
在“NehaDawale-Sem-I-Distributed-System-main”这个文件夹中,可能包含了实现分布式系统项目的源代码、文档、测试案例等内容。通过深入研究这些文件,可以进一步了解分布式系统的设计和实现细节,包括具体编程语言...