Hadoop IPC源码分析(一)
类RPC
RPC包含了一系列的静态类和静态方法。
使用PROTOCOL_ENGINES = new HashMap<Class,RpcEngine>();
作为实现的接口—>RPCEngine的一个缓存
使用PROXY_ENGINES = new HashMap<Class,RpcEngine>();
作为实现的接口代理->PPCEngine的一个缓存
接口代理使用JDK的Proxy.getProxyClass获取对应接口的代理类。
默认的RpcEngine的实现为 WritableRpcEngine.class ;
waitForProxy方法是在时间限制内周期的调用getProxy方法,而getProxy是从缓存中获取接口所对应的RpcEngine实例,具体的Proxy获取由RpcEgine接口实现类实现。
RPC类管理一系列的PpcEngine,getProxy getServer call stopProxy方法的实际工作是由RpcEgine实现的。
接口RpcEngine
接口定义了getProxy getServer call stopProxy几个方法。
getProxy用于获取客户端代理,getProxy方法包含了一个InetSocketAddress addr参数,用于指定PPC服务器的地址和一个SocketFactory factory用于创建客户端到服务器之间的连接。
getSever用于构建PRC服务器端,getServer有一个Object instance参数,此参数代表接口实现的类。PPC服务器将请求转发给instance实例处理。
call方法参数包含一个请求参数列表数组Object[][] params和一组PPC服务器地址InetSocketAddress[] addrs和需要执行的方法Method method。一次call请求可以同时请求多个服务器。
RpcEgine有两种实现AvroRpcEngine和WritableRpcEngine。
类WritableRpcEngine
WritableRpcEngine使用Invocation类表示一次方法调用,一次调用包含调用的函数名、参数类型和参数值信息。
Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>();
每一个client和一个SocktFactory是对应的,一个节点上建立了几个SocketFactory对象就有几个client。这里说明client不是每次方法调用的时候都新建一个客户端,而是一个进程类所有的调用都用同一个客户端。
使用JDK RPC框架实现Proxy,Proxy将接口任务转交给Invocker处理,Invocker类实现了InvocationHandler接口。在Invocker类的实现中,由将方法调用转发给client类。
每次调用接口方法是,都会先获取一个client,增加client引用计数,调用完毕后,减少client引用计数,当client引用计数为0的时候,将client从缓存中移除。
Call方法的实现中,没有用到JDK的动态代理,直接将需要调用的方法给client实例处理了。
Server复写了Rpc.Server的call方法,Rpc的call方法实现的是一次方法的调用,与RpcEngine中的call方法不同的,在此类中的call方法中添加了一些统计功能。
这个类命名为WritablePpcEngine主要体现在两点:
1. 代理中的invoke方法中调用了clinet的call方法,call方法的参数Invocation是一个Writable实现类;
2. server的实现中,server的call方法接收的是一个Writable的类。
总之,调用过程中传递参数是通过Writable的实现类Invocation包装的。
下面分析两个真正做事的类Client类和Server类。
类Client
首先分析Client的call方法。Client的call方法接收Invocation参数,将参数打包为一个Call对象,每一个Call对象都有一个唯一的id,Call包含了此次方法调用是否成功,返回值的信息。
Call对象通过一个Connection对象发送出去,然后调用Object.wait()方法循环等待,如果中间线程被中断,忽略中断继续等待,当call对象被notify的时候返回,而后取出Call对象的value的值。
那么Call对象的value值是什么时候被设置的呢?Connection有一个receiveResponse方法,当收到相应Call的id是,将此id对应的call的value值设置,然后notify等待中的call。
Connection用一个Hashtable<Integer, Call> calls维护在此连接上的方法调用。
下面看一次方法调用时,怎样获得连接对象的。一个client使用Hashtable<ConnectionId, Connection> connections来维护在此client上的连接池。
一个Connection是有ConnectionId标识的,相同的远端地址,接口和用户有相同的ConnectionId。一个Call获取Connection的时候,获取到的Connection也将此Call加入到其维护的calls中。此后,connection会进行一些IO流的设置操作。
下面看connection在设置IO流的过程中做了些什么事情。
如果是第一次使用connection,有两件事情要做:
1. 向服务器写RPC请求头;
2. 启动此连接上等待RPC回应的线程。
如果不是第一次使用此connection,直接返回。
PRC请求头中开始几个字母为hrpc,向服务器表明这是一个RPC连接,然后是版本号和验证方式。
然后写入与请求的接口服务和请求用户身份信息。
此连接上的线程循环判断是否有工作要做,如果calls不为空,就调用receiveResponse方法接收服务器的响应。
响应由CallId、方法调用状态和调用返回值组成,如果方法调用状态成功,则将对应的Call从calls中移除,通知等待结果的调用方法。
ParallelCall和ParallelResult不分析。
类Server
Server的实现比Client要复杂。
首先分析类Server的构造函数和启动过程。在Server的构造函数中设置了监听端口,处理队列大小,元素为Call对象的阻塞队列,初始化Listener和Responder。
在server的start方法中,做了以下一些工作。
1. 启动Responder线程
2. 启动Listener线程
3. 启动handlerCount个Handler线程
然后分别看看这几个线程是做什么用,怎么工作的。
Listener
这个类的实现用到了concurrent并发包和nio的一些东西。
在Listener的构造函数中,打开了一个ServerSocketChannel,并为之绑定监听地址。每个Listner有一个selector,将服务器通道的OP_ACCEPT时间在selector上注册为感兴趣事件。建立一个固定大小的线程池,用于执行多个Reader线程,多个Reader线程以同一个readSelector为构造参数。
下面看看Reader到底是干什么的。
Reader线程阻塞在readSelector的select方法上,等待有感兴趣的事件发生或者有线程调用wakeup方法。当有感兴趣的事情发生时,遍历对应的SelectionKey进行读取操作。读取操作在doRead方法中完成。
每个SelectionKey关联了一个Server.Connection对象。得到了Connection对象后,就可以进行读取和处理Connection对象数据上的事情了。
现在来看看Connection对象怎么注册到readSelector上的。原来是在Listner线程中发生了selector感兴趣的OP_ACCEPT事件后。当有OP_ACCEPT事件发生时,说明客户端数据已经准备好了,调用accept后就得到服务器与客户端的套接字通道。
得到套接字通道后,以round-robin的方式选择一个Reader,将channel的OP_READ事件注册在readSelector上。并以当前的客户端服务器channel新建一个Connection,附在注册感兴趣事件返回的SelectionKey上面。最后将得到的connection加入到connectionList中进行维护。
现在回过头来看看Listner主线程在干什么,Listner循环阻塞在selector.select方法上面,获取感兴趣的OP_ACCEPT事件。
了解完了Listner类是干什么的,上面有提到处理数据是在count = c.readAndProcess();中完成的,下面分析Connection类。
类Connection
主要分析Connection的readAndProcess方法。
(待续)
分享到:
相关推荐
Hadoop的源码分析涉及对整个系统的拆解和理解,是一个复杂且细致的工作。通过对Hadoop源码的分析,开发者可以深入理解分布式系统的内部结构,掌握数据处理和存储的高级技巧,以及学习如何构建一个可靠、可扩展的...
* ipc:提供一个简单的 IPC 的实现,依赖于 io 提供的编解码功能。 * io:提供数据的编解码功能。 * net:封装部分网络功能,如 DNS 和 socket。 * security:提供用户和用户组信息。 * metrics:提供系统统计数据的...
### Hadoop源码分析知识点概览 #### 一、Hadoop概述与背景 - **Google核心技术**:Hadoop的设计理念很大程度上受到了Google一系列核心技术的影响,包括Google File System (GFS)、BigTable以及MapReduce等。这些...
本项目深入分析了基于Java和Shell语言的Hadoop IPC模块,包含38个文件,其中21个Java源文件,6个JAR包文件,以及Shell脚本、属性文件、XML配置等辅助文件,旨在提取并设计该模块的核心源码,以增强对Hadoop分布式...
Hadoop是开源的分布式计算框架,它主要由两个核心组件构成:HDFS(Hadoop Distributed File System)和MapReduce。...Hadoop的源码分析文档提供了宝贵的参考资料,有助于开发者更好地理解和利用这个强大的框架。
8. **Hadoop源码分析**:通过对这些源码的学习,我们可以理解Hadoop内部的工作流程,如数据分块、副本策略、任务调度、错误恢复等。这对于开发和优化Hadoop应用程序、调试集群问题以及理解分布式系统原理非常有价值...
《Hadoop源码分析——HDFS部分》 Hadoop,作为开源大数据处理的基石,其核心组件之一就是HDFS(Hadoop Distributed File System),这是一个高度容错性的分布式文件系统,设计用于运行在廉价硬件上,能够处理大规模...
该文件内容指出,对Hadoop源代码的深入理解和分析能够帮助人们更好地掌握Hadoop开发技术,并且提供了一系列的学习资源链接,包括视频教程、安装包集合、推荐书籍以及面试题目等,这些都是学习和深入研究Hadoop的重要...
### Hadoop源码分析知识点详解 #### 一、Hadoop及其核心技术背景 Hadoop作为一款开源的分布式计算框架,其核心思想来源于Google发布的几篇重要论文。这些论文详细阐述了Google构建其分布式计算平台的关键技术和...
Hadoop的源码分析对于理解其工作原理至关重要。源码中,`fs`包提供了对多种文件系统的抽象接口,使得Hadoop能够透明地操作本地文件系统或分布式文件系统,如HDFS和Amazon S3。`ipc`包则包含了简单IPC实现,用于不同...
总的来说,Hadoop源码分析涵盖了分布式文件系统的设计原理、分布式计算模型的实现以及相关的通信、安全和监控机制。深入理解Hadoop的源码,有助于开发者更好地利用这个框架来处理大数据问题,同时也能为优化分布式...
### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...
### Hadoop源代码分析知识点详解 #### 一、Hadoop背景与关键技术介绍 Hadoop作为一款开源的大数据处理框架,其设计灵感源自Google的一系列核心论文。这些论文详细阐述了Google构建其基础设施的方法论和技术原理,...
本文将深入探讨Hadoop的RPC机制,解析其工作原理,并结合源码分析其内部实现。 一、RPC简介 RPC是一种让程序能够调用运行在其他地址空间(通常在另一台机器上)的程序的方法。在Hadoop中,RPC被广泛用于NameNode、...