上篇说了半天,却回避了一个重要的问题:为什么要用异步呢,它有什么样的好处?坦率的说,我对这点的认识不是太深刻(套句俗语,只可意会,不可言传)。还是举个例子吧:
比如Client向Server发送一个request,Server收到后需要100ms的处理时间,为了方便起见,我们忽略掉网络的延迟,并且,我们认为Server端的处理能力是无穷大的。在这个use case下,如果采用同步机制,即Client发送request -> 等待结果 -> 继续发送,那么,一个线程一秒钟之内只能够发送10个request,如果希望达到10000 request/s的发送压力,那么Client端就需要创建1000个线程,而这么多线程的context switch就成为client的负担了。而采用异步机制,就不存在这个问题了。Client将request发送出去后,立即发送下一个request,理论上,它能够达到网卡发送数据的极限。当然,同时需要有机制不断的接收来自Server端的response。
以上的例子其实就是这篇的主题,异步的消息机制,基本的流程是这样的:
如果仔细琢磨的话,会发现这个流程中有两个很重要的问题需要解决:
1. 当client接收到response后,怎样确认它到底是之前哪个request的response呢?
2. 如果发送一个request后,这个request对应的response由于种种原因(比如server端出问题了)一直没有返回。client怎么能够发现类似这样长时间没有收到response的request呢?
对于第一个问题,一般会尝试给每个request分配一个独一无二的ID,返回的Response会同时携带这个ID,这样就能够将request和response对应上了。
对于第二个问题,需要有一个timeout机制,对于每一个request都有一个定时器,如果到指定时间仍然没有返回结果,那么会触发timeout操作。多说一句,timeout机制其实对于涉及网络的同步机制也是非常有必要的,因为有可能client与server之间的链接坏了,在极端情况下,client会被一直阻塞住。
纸上谈兵了这么久,还是看一个实际的例子。我在这里用Hadoop的RPC代码举例。这里需要事先说明的是,Hadoop的RPC对外的接口其实是同步的,但是,RPC的内部实现其实是异步消息机制。多说无益,直接看代码吧(讨论的所有代码都在org.apache.hadoop.ipc.Client.java 里):
- public Writable call(Writable param, ConnectionId remoteId)
- throws InterruptedException, IOException {
- //具体的代码一会再看...
- }
这就是Client.java对外提供的接口。一共有两个参数,param是希望发送的request,remoteId是指远程server对应的Id。函数的返回就是response(也是继承自writable)。所以说,这是一个同步调用,一旦call函数返回,那么response也就拿到了。
call函数的具体实现一会再看,先介绍Client中两个重要的内部类:
- private class Call {
- int id; // call id
- Writable param; // parameter
- Writable value; // value, null if error
- IOException error; // exception, null if value
- boolean done; // true when call is done
- protected Call(Writable param) {
- this.param = param;
- synchronized (Client.this) {
- this.id = counter++;
- }
- }
- protected synchronized void callComplete() {
- this.done = true;
- notify(); // notify caller
- }
- //.........
- public synchronized void setValue(Writable value) {
- this.value = value;
- callComplete();
- }
- public synchronized Writable getValue() {
- return value;
- }
- }
call这个类对应的就是一次异步请求。它的几个成员变量:
id: 这个就是之前提过的,对于每一个request都需要分配一个唯一标示符,这样接收到response后才能知道到底对应哪个request;
param: 需要发送到server的request;
value: 从server发送过来的response;
error: 可能发生的异常(比如网络读写错误,server挂了,等等);
done: 表示这个call是否成功完成了,即是否接收到了response;
- private class Connection extends Thread {
- private InetSocketAddress server; // server ip:port
- // .........
- private Socket socket = null; // connected socket
- private DataInputStream in;
- private DataOutputStream out;
- //............
- // currently active calls
- private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
- // .......
- private synchronized boolean addCall(Call call) {
- if (shouldCloseConnection.get())
- return false;
- calls.put(call.id, call);
- notify();
- return true;
- }
- private void receiveResponse() {
- if (shouldCloseConnection.get()) {
- return;
- }
- touch();
- try {
- int id = in.readInt(); // try to read an id
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " got value #" + id);
- Call call = calls.get(id);
- int state = in.readInt(); // read call status
- if (state == Status.SUCCESS.state) {
- Writable value = ReflectionUtils.newInstance(valueClass, conf);
- value.readFields(in); // read value
- call.setValue(value);
- calls.remove(id);
- } else if (state == Status.ERROR.state) {
- call.setException(new RemoteException(WritableUtils.readString(in),
- WritableUtils.readString(in)));
- calls.remove(id);
- } else if (state == Status.FATAL.state) {
- // Close the connection
- markClosed(new RemoteException(WritableUtils.readString(in),
- WritableUtils.readString(in)));
- }
- } catch (IOException e) {
- markClosed(e);
- }
- }
- public void run() {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": starting, having connections "
- + connections.size());
- try {
- while (waitForWork()) {//wait here for work - read or close connection
- receiveResponse();
- }
- } catch (Throwable t) {
- LOG.warn("Unexpected error reading responses on connection " + this, t);
- markClosed(new IOException("Error reading responses", t));
- }
- close();
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": stopped, remaining connections "
- + connections.size());
- }
- public void sendParam(Call call) {
- if (shouldCloseConnection.get()) {
- return;
- }
- DataOutputBuffer d=null;
- try {
- synchronized (this.out) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " sending #" + call.id);
- //for serializing the
- //data to be written
- d = new DataOutputBuffer();
- d.writeInt(call.id);
- call.param.write(d);
- byte[] data = d.getData();
- int dataLength = d.getLength();
- out.writeInt(dataLength); //first put the data length
- out.write(data, 0, dataLength);//write the data
- out.flush();
- }
- } catch(IOException e) {
- markClosed(e);
- } finally {
- //the buffer is just an in-memory buffer, but it is still polite to
- // close early
- IOUtils.closeStream(d);
- }
- }
- }
Connection这个类要比之前的Call复杂得多,所以我省略了很多这里不会被讨论的代码。
Connection对应于一个连接,即一个socket。但同时,它又继承自Thread,所有它本身又对应于一个线程。可以看出,在Hadoop的RPC中,一个连接对应于一个线程。先看他的成员变量:
server: 这是远程server的地址;
socket: 对应的socket;
in / out: socket的输入流和输出流;
calls: 重要的成员变量。它是一个hash表, 维护了这个connection正在进行的所有call和它们对应的id之间的关系。当读取到一个response后,就通过id在这张表中找到对应的call;
再看看它的run()函数。这是Connection这个线程的启动函数,我贴的代码中这个函数没做任何的删减,你可以发现,刨除一些冗余代码,这个函数其实就只做了一件事:receiveResponse,即等待接收response。
OK。回到call()这个函数,看看它到底做了什么:
- public Writable call(Writable param, ConnectionId remoteId)
- throws InterruptedException, IOException {
- Call call = new Call(param);
- Connection connection = getConnection(remoteId, call);
- connection.sendParam(call); // send the parameter
- boolean interrupted = false;
- synchronized (call) {
- while (!call.done) {
- try {
- call.wait(); // wait for the result
- } catch (InterruptedException ie) {
- // save the fact that we were interrupted
- interrupted = true;
- }
- }
- if (interrupted) {
- // set the interrupt flag now that we are done waiting
- Thread.currentThread().interrupt();
- }
- if (call.error != null) {
- if (call.error instanceof RemoteException) {
- call.error.fillInStackTrace();
- throw call.error;
- } else { // local exception
- throw wrapException(remoteId.getAddress(), call.error);
- }
- } else {
- return call.value;
- }
- }
- }
首先,它创建了一个新的call(这个call是Call类的实体,注意和call()函数的区分),然后根据remoteId找到对应的connection(Client类中维护了一个connection pool),然后调用connection.sendParam()。从前面找到这个函数,你会发现它就是将request写入到socket,发送出去。
但值得一提的是,它使用的write是最普通的blocking IO,也是同步IO(后面会看到,它读取response也是用的blcoking IO,所以,hadoop RPC虽然是异步机制,但是采用的是同步blocking IO,所以,异步消息机制还采用什么样的IO机制是没有关系的)。
接下来,调用了call.wait(),将线程阻塞在这里。直到在某个地方调用了call.notify(),它才重新运行起来,然后一通判断后返回call.value,即接收到的response。
所以,剩下的问题是,到底是哪调用了call.notify()?
回到connection的receiveResponse函数:
首先,它从socket的输入流中读到一个id,然后根据这个id找到对应的call,调用call.setValue将从socket中读取的response放入到call的value中,然后调用calls.remove(id)将这个call从队列中移除。这里要注意的是call.setValue,这个函数将value设置好之后,调用了call.notify()!
好了,让我们再重头将流程捋一遍:
这里其实有两个线程,一个线程是调用Client.call(),希望向远程server发送请求的线程,另外一个线程就是connection对应的那个线程。当然,虽然有两个线程,但server对应的只有一个socket。第一个线程创建call,然后调用call.sendParam将request通过这个socket发送出去;而第二个线程不断的从socket中读取response。因此,request的发送和response的接收被分隔到不同的线程中执行,而且这两个线程之间关于socket的读写并没有任何的同步机制,因此我认为这个RPC是异步消息机制实现的,只不过通过call.wait()/call.notify()使得对外的接口看上去像是同步。
好了,Hadoop的RPC介绍完了(虽然我略掉了很多内容,比如timeout机制我这里就没写),说说我个人的评价吧。我认为,Hadoop的这个设计还是挺巧妙的,底层采用的是异步机制,但对外的接口提供的又是一般人比较习惯的同步方式。但是,我觉着缺点不是没有,一个问题是一个链接就要产生一个线程,这个如果是在几千台的cluster中,仍然会带来巨大的线程context switch的开销;另一个问题是对于同一个remote server只有一个socket来进行数据的发送和接收,这样的设计网络的吞吐量很有可能上不去。(一家之言,欢迎指正)
未完待续~
相关推荐
在`RPC.Server`中,可以看到对请求的接收、处理和响应的逻辑。 六、总结 Hadoop的RPC机制是其分布式系统中的重要组成部分,它简化了分布式环境下的通信,提高了效率。理解并掌握RPC的工作原理对于优化Hadoop集群...
* 高性能:Hadoop RPC 使用异步非阻塞的方式来处理客户端的调用请求,从而提高了系统的性能。 * 可扩展性:Hadoop RPC 的架构设计使得它可以很容易地扩展到大规模的分布式系统中。 * 可靠性:Hadoop RPC 使用了多种...
Hadoop RPC是Hadoop框架中用于进程间通信的一种机制,它允许一个进程调用另一个远程进程中定义的方法,仿佛这个方法是在本地执行一样。这种透明性使得开发者可以专注于业务逻辑,而无需关心底层通信细节。 ### 工作...
5. **org.apache.hadoop.ipc**: 这个包提供了进程间通信(IPC)的基础工具,使得客户端和服务端能通过网络进行异步通信。`Protocol`接口定义了服务端提供的服务,而`RPC`类实现了RPC调用的逻辑。 6. **org.apache....
Hadoop的RPC框架就是基于这个理念构建的,它实现了客户端与服务器端之间的高效、安全的通信机制。 要使用Hadoop的RPC框架,你需要完成以下步骤: 1. **定义协议**:创建一个接口,声明客户端和服务器端需要交互的...
Java平台上的RPC框架有很多,如Hadoop的Hadoop RPC、Apache Thrift、Google的gRPC等,而“nfs-rpc”则是一个专门基于Java开发的高性能RPC框架。这个框架的设计目标是提供高效、稳定、易用的跨网络服务调用能力。 ...
Hadoop使用远程过程调用(RPC)机制来实现节点间的通信,比如NameNode与DataNode之间的通信。RPC允许一个程序调用另一个在网络另一端的程序,这在分布式环境中尤其重要。Hadoop的RPC是基于protobuf协议的,可以通过...
最后,netty-3.6.2.Final.jar是一个高性能的异步事件驱动的网络应用程序框架,它在Hadoop中用于网络通信,特别是在处理RPC(Remote Procedure Call)请求时,提供高效的网络I/O能力。 总的来说,Hadoop Eclipse ...
- **基于消息的RPC**:如AMQP(Advanced Message Queuing Protocol)上的RPC,通过消息队列实现异步RPC。 - **基于协议缓冲区的RPC**:如gRPC,使用Google的protobuf定义服务接口和数据结构,提供高性能、低延迟的...
与Hadoop的批处理不同,Storm专注于实时处理,可以实时处理消息并更新数据库,也可以进行连续计算和分布式远程过程调用(RPC)。Storm保证每个消息都会被处理,而且具有极低的延迟,小集群中每秒可处理百万级的消息...
Apache提供了多种RPC实现,如Apache Thrift、Apache Avro和Hadoop的RPC等。 **Apache Thrift** Apache Thrift是一种软件框架,用于构建可伸缩的、跨语言的服务。它将服务定义为接口定义语言(IDL),允许开发者...
综上所述,实现一个基于Netty的自定义RPC框架,需要理解Netty的异步I/O模型,设计合理的RPC通信协议,利用Zookeeper进行服务注册与发现,同时考虑服务的高可用性和性能优化。通过分析提供的压缩包文件,我们可以深入...
在SOA中,服务可以通过Web Services(如SOAP或RESTful API)进行交互,或者通过消息队列进行异步通信。 RPC和SOA之间的关系在于,RPC可以作为实现SOA的一种技术手段。在SOA架构中,服务间的通信可能通过RPC来完成,...
“Lecture.04远程调用.pdf”和“Lecture.05间接调用.pdf”可能会讨论在分布式环境中的通信机制,比如RPC(远程过程调用)和消息队列,它们是不同节点间协作的关键。远程调用允许程序跨越网络边界调用其他服务,而...
Avro是Hadoop生态系统中的一个关键组件,由Apache软件基金会开发,主要用作数据序列化系统。它提供了一种高效的、语言无关的、版本化的数据序列化机制,使得不同编程语言之间可以方便地交换数据。Avro RPC则是Avro的...
RPC(Remote Procedure Call)框架是软件开发中的一个重要概念,它允许一个程序在不理解底层网络细节的情况下,调用运行在另一个计算机上的程序。这个过程就像是本地调用一样,极大地简化了分布式系统之间的通信。本...
Dubbo是阿里巴巴开源的分布式服务框架,而Zookeeper是Apache Hadoop的一个子项目,主要用来实现分布式服务的配置管理、命名服务和分布式同步。这个服务端项目包含的源码经过了严格的测试,确保可以直接运行,为...
Dubbo-RPC分布式服务框架 Dubbo 是阿里巴巴开发的一个分布式服务框架,每天为2千多个服务提供大于30亿次的访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。 Dubbo 最新的版本是 2.5.3。 Dubbox 是当当网基于...
0.99.2版本的`HBaseRpcController`和`RpcServer`实现了异步调用和请求调度,极大地提升了系统吞吐量。同时,HBase还支持多种数据压缩算法,如Snappy和LZO,通过`Compression`模块的源码,可以了解其压缩和解压缩的...
2. 消息队列:允许进程异步通信,数据以消息的形式存储在队列中,等待接收方处理。 3. 共享内存:多个进程可以直接访问同一块内存区域,高效但需谨慎管理同步。 4. 套接字(Sockets):通用的网络通信接口,支持TCP/...