1. 代理对象的生成
DFS中构造方法开始:
// 创建代理对象
// NameNodeProxies :所有远程访问NameNode都必须通过它创建代理对象
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
NameNodeProxies中createNonHAProxy(..)方法
// xface就是你传入进来要代理的接口,也就是协议,不同协议对应着不同的代理对象创建方法
if (xface == ClientProtocol.class) {
// 用户与NameNode同行基于ClientProtocol,进入这个方法生成proxy
proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
withRetries, fallbackToSimpleAuth);
} else if (xface == JournalProtocol.class) {
proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
} else if (xface == NamenodeProtocol.class) {
proxy = (T) createNNProxyWithNamenodeProtocol(nnAddr, conf, ugi,
withRetries);
} else if (xface == GetUserMappingsProtocol.class) {
proxy = (T) createNNProxyWithGetUserMappingsProtocol(nnAddr, conf, ugi);
} else if (xface == RefreshUserMappingsProtocol.class) {
proxy = (T) createNNProxyWithRefreshUserMappingsProtocol(nnAddr, conf, ugi);
} else if (xface == RefreshAuthorizationPolicyProtocol.class) {
proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
conf, ugi);
} else if (xface == RefreshCallQueueProtocol.class) {
proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
} else {
String message = "Unsupported protocol found when creating the proxy " +
"connection to NameNode: " +
((xface != null) ? xface.getClass().getName() : "null");
LOG.error(message);
throw new IllegalStateException(message);
}
return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
createNNProxyWithClientProtocol(...) :// 我只列出了我觉得重要的步骤
/*
RPC的注释:
A simple RPC mechanism. A protocol is a Java interface(协议是一个java j接口). All parameters and return types must be one of(所有的参数和返回值必须是以下类型):
a primitive type, boolean, byte, char, short, int, long, float, double, or void; or
a String; or
a Writable; or
an array of the above types
All methods in the protocol should throw only IOException.(所有的方法只能抛出IOException) No field data of the protocol instance is transmitted.
*/
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
........
/*
主要这一步连接到了远程主机 , 得到一个代理对象 $Proxy16 注意代理类的名字下面会用到
Get a protocol proxy that contains a proxy connection to a remote server and a
set of methods that are supported by the server
这个方法可以深入去看下, 加下面代码分析
*/
// 注意下这个地方为什么返回的这种类型的对象,因为这个方法开头第一句就向RPC注册了这个对象
//PB是Protocol Buffer 的缩写 而Protocol Buffer 是一种类似json ,xml的数据传输格式
//$Proxy16
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
fallbackToSimpleAuth).getProxy();
.......
/*
protocolTranslator : 协议翻译器
ClientNameNodeProtocolTranslatorPB:
转发ClientProtocol调用作为RPC调用传给NNserver
This class forwards(转发) NN's ClientProtocol calls as RPC calls to the NN server
while translating from the parameter types used in ClientProtocol to the
new PB types.
*/
ClientProtocol translatorProxy =
new ClientNamenodeProtocolTranslatorPB(proxy);
/*
返回ClientProtocol的代理对象 $Proxy17
*/
return (ClientProtocol) RetryProxy.create(
ClientProtocol.class,
new DefaultFailoverProxyProvider<ClientProtocol>(
ClientProtocol.class, translatorProxy),
methodNameToPolicyMap,
defaultPolicy);
RPC.getProtocolProxy(....)
// 通过RPC引擎获得代理 这里是ProtobufEngine 它是RpcEngine的一种实现
return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
fallbackToSimpleAuth);
再接着进入getProxy(..)
// 这个类很重要 实现了InvocationHandler 在他的构造方法中建立了与服务端的连接
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
// 这里JDK的动态代理机制,InvocationHandler是Invoker ProtocolProxy只是一个包装类
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
进入RetryProxy.create(..) :
到这里代理对象就生成了 $proxy17 被包装在ProxyAndInfo中
public static <T> Object create(Class<T> iface,
FailoverProxyProvider<T> proxyProvider,
Map<String,RetryPolicy> methodNameToPolicyMap,
RetryPolicy defaultPolicy) {
// 这里不就是JDK的动态代理生成机制
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<?>[] { iface },
// InvocationHandler实现类,代理调用方法的时候,就会调用他的invoke()方法
new RetryInvocationHandler<T>(proxyProvider, defaultPolicy,
methodNameToPolicyMap)
);
}
2. 客户端与服务单连接的建立及客户单Rpc 请求的传递
接下来就以客户端调用getBlockLocation()为例:
RetryInvocationHandler invoke():
if (isRpc) {
Client.setCallIdAndRetryCount(callId, retries);
}
try { // 核心的一步,调用方法
Object ret = invokeMethod(method, args);
hasMadeASuccessfulCall = true;
return ret;
}
invokeMethod(...)
protected Object invokeMethod(Method method, Object[] args) throws Throwable {
try {
if (!method.isAccessible()) {
method.setAccessible(true);
}
// 注意这里他调用的是代理对象的方法 我们一般调用的都是具体实现类的方法
// $Proxy17.getBlockLocation()?
return method.invoke(currentProxy.proxy, args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
}
接下来因为无法进入$Proxy17的源码,无法确定实际做了啥,通过设断点可知,他接下来进入了
$Proxy16中,调用它的getBlockLocations()方法
.....
// 获得方法信息 方法名字,所属协议,协议版本
/*
methodName: "getBlockLocations"
declaringClassProtocolName: "org.apache.hadoop.hdfs.protocol.ClientProtocol"
clientProtocolVersion: 1
*/
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
....
/*
args[1] 方法的一些参数信息 , 我这里是获取文件的块位置,所以就得指明是哪个文件
src: "/d12/hello"
offset: 0
length: 1342177280
*/
//注意这里Message就是protocol Buffer定义的一种数据传输格式,类似json , xml
//在hdfs ,rpc 是通过这种形式在客户端和服务端之间传递消息的。
Message theRequest = (Message) args[1];
// 这个应当是方法调用后的返回结果
final RpcResponseWrapper val;
try {
// 这里就是重头戏了,真正的调用了 RequestWrapper继承了Writable,所以满足序列化的要求
//Make a call, passing rpcRequest
, to the IPC server defined by remoteId
, returning the rpc respond
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
}
.....
Message prototype = null;
try {
prototype = getReturnProtoType(method);
} catch (Exception e) {
throw new ServiceException(e);
}
Message returnMessage;
try {
returnMessage = prototype.newBuilderForType()
.mergeFrom(val.theResponseRead).build();
.....
return returnMessage;
深入client的call方法:
// 返回的结果是writable类型 , 可以把writable理解成hadoop提供的便于在网络上传输的对象
// request , response都是该类型的子类
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass) throws IOException {
return call(rpcKind, rpcRequest, remoteId, serviceClass, null);
}
在进入call :
//将请求封装成Rpc call
final Call call = createCall(rpcKind, rpcRequest);
// 基于连接池的概念,真正建立了与服务端的联系 见下面方法分析
Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
....
// 将RpcRequest传递给服务端
connection.sendRpcRequest(call);
.....
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
...
return call.getRpcResponse();
getConnection(..) :
do {
synchronized (connections) {
//connections就是一个连接池 hashtable
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId, serviceClass);
connections.put(remoteId, connection);
}
}
//注意这个add方法,将call加入这个连接的call队列中!并且会notify()也就是会叫醒一个listener
} while (!connection.addCall(call));
/*
这个方法真正建立了与server的连接,并且将requestheader传递给了服务端
Connect to the server and set up the I/O streams. It then sends
a header to the server and starts the connection thread that waits for responses.
建立了一个连接线程 等待响应!如果不单独建立一个线程的话,那么就会一直阻塞在这里..
*/
connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
addCall() :
private synchronized boolean addCall(Call call) {
if (shouldCloseConnection.get())
return false;
// Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
//它是位于Connection中!
calls.put(call.id, call);
// notify一个listener
notify();
return true;
}
setIOStreams(..) :
setupConnection();
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
writeConnectionHeader(outStream);
......
// start the receiver thread after the socket connection has been set up
//这里启动了一个新的线程
start();
return;
再看一下客户单把RPC call写给服务端senRpcRequest()的过程:
final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
header.writeDelimitedTo(d);
call.rpcRequest.write(d);
synchronized (sendRpcRequestLock) {
// 传参线程池来传递,也就是一个连接中又分出一个线程出来了.
//Future的作用,就是计算耗时的操作的时候,主线程可以继续去做其他的事情,然后
//计算完成,一个通过返回值获得结果。
//这里不就是一种异步操作的体现吗?
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
synchronized (Connection.this.out) {
if (shouldCloseConnection.get()) {
return;
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
byte[] data = d.getData();
int totalLength = d.getLength();
out.writeInt(totalLength); // Total Length
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
out.flush();
}
}
senderFuture.get();
到这里,与服务端建立了连接,把call写给了
服务端,接下来客户端就是等待响应。
客户端接收响应,前面我们看setIOStream的时候,启动一个Connection线程,那么它接下来就会传递call给服务端
同时,等待着响应,看它的run方法就知道了:
//waitForWork()必然内部有一个wait()等待其他线程来notify()
//这里就不展开他的代码了
/*
它的官方注释:
wait till someone signals us to start reading RPC response or
it is idle(发呆,没反应) too long, it is marked as to be closed,
or the client is marked as not running.
Return true if it is time to read a response; false otherwise.
*/
while (waitForWork()) {//wait here for work - read or close connection
receiveRpcResponse();
}
receiveRpcResponse() :
int totalLen = in.readInt();
// 获得响应的头信息
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
checkResponse(header);
int headerLen = header.getSerializedSize();
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
//每一个Call对象都被唯一标示
int callId = header.getCallId();
....
//得到原来的Call对象
Call call = calls.get(callId);
//得到请求调用的状态,是否成功
RpcStatusProto status = header.getStatus();
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
// read value 放入value
value.readFields(in);
calls.remove(callId);
// 将value注入到call中,这地方有一点注意了,在前面的call()方法中,我们看到
//call.wait()方法,它在一直等待,直到有结果返回,那么这里注入的同时会调用
//completeCall()方法,唤醒等待的线程。
call.setRpcResponse(value);
/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
protected synchronized void callComplete() {
this.done = true;
notify(); // notify caller
}
到这里服务端的响应成功返回,被Connection线程成功接收到,并注入到Call中,同时唤醒
主线程。那么接下来,就看看服务端是怎么接收客户端连接,接收信息,调用方法,返回结果的。
3. 服务端接收请求及请求的处理
思考这样一个问题,服务端你觉得应该是什么时候启动的?
应当是文件系统启动的时候,serve就应当会启动的!
通过查看nameNode的initialize()方法,就会看到:
...
// NameNodeRpcServer rpcServer;
//这个创建方法实际上就是一个构造方法
//就是server启动前的准备工作,为rpcServer的所有属性都赋值。
rpcServer = createRpcServer(conf);
...
//这一步才是真正的启动线程。
//开启通用服务
startCommonServices(conf);
这个
NameNodeRpcServer类实现了NamenodeProtocols,这个NameNodeProtocols也就是NameNode所支持的所有协议
look:
public interface NamenodeProtocols
extends ClientProtocol,
DatanodeProtocol,
NamenodeProtocol,
RefreshAuthorizationPolicyProtocol,
RefreshUserMappingsProtocol,
RefreshCallQueueProtocol,
GenericRefreshProtocol,
GetUserMappingsProtocol,
HAServiceProtocol,
TraceAdminProtocol {
}
NameNodeRpcServer中的几个重要字段 :
/** The RPC server that listens to requests from DataNodes来自DataNode的请求 */
private final RPC.Server serviceRpcServer;
private final InetSocketAddress serviceRPCAddress;
/** The RPC server that listens to requests from clients来自Client */
//注意这里是不是与RPC.server挂钩了。可是它是抽象类啊。
protected final RPC.Server clientRpcServer;
protected final InetSocketAddress clientRpcAddress;
接下来只看来自client的。
clientRpcServer对象的创建
来自NameNodeRpcServer的构造方法:
this.clientRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService).setBindAddress(bindHost)
.setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
server build():
//调用RPC引擎,因为在它里面有RPC.server的具体实现 比如ProtobufEngine中就有Server具体实现类
//不同的引擎有不同的server启动方法 是不是为了这个才把server的具体类方法在ProtocolEngine的实现类中了
getProtocolEngine(this.protocol, this.conf).getServer(
this.protocol, this.instance, this.bindAddress, this.port,
this.numHandlers, this.numReaders, this.queueSizePerHandler,
this.verbose, this.conf, this.secretManager, this.portRangeConfig);
clientRpcServer的启动:
上面NameNode的初始化方法中的startCommonServices():
//我只关注这句话
rpcServer.start();
void start() {
clientRpcServer.start();
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
}
那么接下来,就进入Server中了:
理一下Server类间的关系:Server(抽象基类) ,RPC.Server继承自Server的抽象类,ProtobufEngine中的Server则是继承自RPC.Server的具体类
public synchronized void start() {
//响应,监听,处理线程都启动
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
由上面我们可以想上面分析Client代码一样总结出Server类中的几个总要属性:
//这四个类都是Server的内部类
//前三个类都继承自Thread
//监听线程,接收Socket
private Listener listener = null;
//响应线程,将结果返回给客户端
private Responder responder = null;
//处理线程,处理队里中的call,真正调用方法的地方
private Handler[] handlers = null;
// maintains the set of client connections and handles idle timeouts
private ConnectionManager connectionManager;
// 调用队列 , 阻塞队列
private CallQueueManager<Call> callQueue;
所以,现在来解决第一个问题:
要看服务端是怎么接收客户端的连接的就要看Listener:
先看Listener的构造方法:
//基于NIO,打开选择器
selector= Selector.open();
//多个读线程
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
//生成Listener的同时就会启动多个读线程
reader.start();
再看Listener的listener方法:
//这一步就是连接管理器它新开线程在那不停的扫描,看哪个连接是否
//一直在发呆..我觉得这个地方对这中情形的实现还是非常有参考价值的...
connectionManager.startIdleScan();
//runnig则是表示server是否还在运行.
while (running) {
//接下来就是NIO的接收连接的操作...
SelectionKey key = null;
try {
getSelector().select();
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
//这里接收连接
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
doAccept():
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
//通道处理
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);
//得到一个读线程
Reader reader = getReader();
//将该通道包装成一个Connection,并添加到连接管理器的set中
Connection c = connectionManager.register(channel);
// 连接与key关联起来,下面会用到
key.attach(c); // so closeCurrentConnection can get the object
//将改连接对象与相应的读线程关联起来
//但是这里有一点要注意,一个Reader可能同时管理多个连接
//所以在Reader内部会维持一个阻塞队列。
reader.addConnection(c);
}
}
public void addConnection(Connection conn) throws InterruptedException {
//加入阻塞队列
pendingConnections.put(conn);
//唤醒阻塞在该选择器上的线程
readSelector.wakeup();
}
到这里,服务端接收到了客户端的连接,并把它包装成Connection对象,并且关联到了相应的读线程,那么接下来就是
怎么读的问题了;
读的话,则自然是上面的Reader :
看它的run():// 这真是一个学习NIO的好机会啊!
// 它的run实际调用的就是下面这个方法
private synchronized void doRunLoop() {
while (running) {
SelectionKey key = null;
try {
// consume as many connections as currently queued to avoid
// unbridled acceptance of connections that starves the select
int size = pendingConnections.size();
for (int i=size; i>0; i--) {
Connection conn = pendingConnections.take();
//取的当前读线程的队列中的所有连接的通道,并注册读事件
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
}
readSelector.select();
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
//读处理 here
doRead(key);
}
}
key = null;
}
}
}
doRead():
//取得与该key关联的Connection对象
Connection c = (Connection)key.attachment();
.....
//读与处理了
count = c.readAndProcess();
readAndProcess()://将request装换成Call
public int readAndProcess()
{
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
*/
int count = -1;
if (dataLengthBuffer.remaining() > 0) {
count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
if (!connectionHeaderRead) {
//Every connection is expected to send the header.
if (connectionHeaderBuf == null) {
connectionHeaderBuf = ByteBuffer.allocate(3);
}
//请求头
count = channelRead(channel, connectionHeaderBuf);
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
return count;
}
//版本号
int version = connectionHeaderBuf.get(0);
.....
}
.....
//读取请求
count = channelRead(channel, data);
if (data.remaining() == 0) {
...
//Process an RPC Request - handle connection setup and decoding of request into
//a Call 处理RPC请求,请求编码成Call对象
processOneRpc(data.array());
...
}
return count;
}
}
processOneRpc(...) : // 处理一个RPC请求
final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
final RpcRequestHeaderProto header =
decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
......
//这里处理
processRpcRequest(header, dis);
processRpcReqeust(...) :
...
Writable rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
//请求读入rpcRequest中
rpcRequest.readFields(dis);
...
//这里就将request包装成了Server中的call
Call call = new Call(header.getCallId(), header.getRetryCount(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceSpan);
//加入队列,可能阻塞在这里
callQueue.put(call);
incRpcCount(); // Increment the rpc count
到这里,服务端就接收到客户端的请求并将它转换成了Call对象,加入到call队列,那么接下来的就是处理的,handler的事情:
Handler这个内部类最简单了,将两个方法,一个构造方法,把自己设成daemon,还有一个就是run()方法
还记得前面启动clientRpcServer的时候,会同时启动多个handler线程,这样的,这多个handler线程就会一直在等待,只要阻塞队列callQueue
中有对象,就会被一个线程获得,然后处理....
run():
while (running) {
.....
//就是在这个地方可能会发生阻塞!
final Call call = callQueue.take();
......
//调用方法,这里Server中并没有实现
//而是调用的ProtobufEngine中的call方法
Writable value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
........
// 接下来就是响应的过程....
synchronized (call.connection.responseQueue) {
setupResponse(buf, call, returnStatus, detailedErr,
value, errorClass, error);
........
responder.doRespond(call);
}
}
看一下call方法,是采用的com.google中的方法:
//解析获得方法信息
RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
RequestHeaderProto rpcRequest = request.requestHeader;
String methodName = rpcRequest.getMethodName();
String protoName = rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
......
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName);
......
Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType()
.mergeFrom(request.theRequestRead).build();
Message result;
........
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
//到这里就是方法调用了
result = service.callBlockingMethod(methodDescriptor, null, param);
.....
return new RpcResponseWrapper(result);
到这里,handler就将请求处理并返回了,那么接下来来看看responder的处理:
接着handler的run()方法来:
doRespond():
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
//加入响应队列
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
//这个方法的最后一个decPending(),就会notify responder线程,因为它一直都是在wait()
processResponse(call.connection.responseQueue, true);
}
}
}
最后看一下Responder的run():
//会调用duRunLoop() , 直接看它
while (running) {
....
waitPending(); // If a channel is being registered, wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
//这个写的过程中会调用processResponse()方法
doAsyncWrite(key);
}
}
}
.......
//对于某些存在于响应队列中时间过程的response会调用doPurg();
doPurg();
}
到此,RPC客户端,服务端的源码分析完毕。
分享到:
相关推荐
hdfs源码分析整理 在分布式文件系统中,HDFS(Hadoop Distributed File System)扮演着核心角色,而HDFS的源码分析则是深入了解HDFS架构和实现机理的关键。本文将对HDFS源码进行详细的分析和整理,涵盖了HDFS的目录...
通过阅读和理解Hadoop源码分析-HDFS部分.pdf文档,我们可以更深入地理解这些组件的工作原理,掌握HDFS在处理大数据时的内部机制。这对于我们优化HDFS的性能,解决实际问题,以及开发相关的分布式应用都具有重要的...
Hadoop 源码分析 HDFS 数据流 Hadoop 的 HDFS(Hadoop Distributed File System)是 Hadoop 项目中最核心的组件之一,它提供了高可靠、高-performance 的分布式文件系统。HDFS 的核心组件包括 Namenode、Datanode、...
本文将重点探讨HDFS的源码分析,基于《Hadoop2.x HDFS源码剖析》这本书中的参考注释。首先,我们来看HDFS的核心组件——NameNode和DataNode。 1. NameNode:作为HDFS的元数据管理节点,NameNode负责维护文件系统的...
在Hadoop的RPC实现方法中,Client类、Server类、RPC类以及HDFS通信协议组是核心。这些组件共同协作实现远程过程调用,使得HDFS中的各个组件能够相互交流和协作。通过这些组件,HDFS能够处理客户端请求,并在集群内部...
Hadoop 培训课程(2)HDFS 分布式文件系统与HDFS HDFS体系结构与基本概念*** HDFS的shell操作*** java接口及常用api*** ---------------------------加深拓展---------------------- ...HDFS的分布式存储架构的源码分析**
《Hadoop深入浅出之HDFS介绍》 ...通过Shell命令,用户可以方便地进行文件操作,而RPC机制和源码分析则为深入理解HDFS的运行机制提供了可能。对于需要处理大量数据的企业和研究机构,HDFS提供了一种可靠的解决方案。
Hadoop是开源的分布式计算框架,它主要由两个核心组件构成:HDFS(Hadoop Distributed File System)和MapReduce。...Hadoop的源码分析文档提供了宝贵的参考资料,有助于开发者更好地理解和利用这个强大的框架。
### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...
源码分析可以帮助理解数据流动的细节。 4. `DFSClient`:客户端与HDFS交互的主要接口,负责文件的读写操作。通过源码,我们可以看到如何构建RPC请求,以及如何处理来自NameNode和DataNode的响应。 五、源码学习的...
6. **HDFS源码分析**:研究HDFS如何实现高效的数据存储和访问机制,这对于理解大数据处理中的数据管理至关重要。 7. **Google ProtoBuf源代码分析**:了解Google的ProtoBuf是如何实现高效的序列化和反序列化,有助于...
10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 mapreduce的原理和编程 01-hdfs源码跟踪之打开输入流.avi 02-hdfs源码跟踪之打开输入流总结.avi 03-mapreduce介绍及wordcount.avi 04-wordcount的编写和...
总的来说,HBase的源码分析涉及到客户端与服务器的交互、RPC通信机制、数据存储流程以及系统架构等多个层面。理解这些核心机制对于优化HBase性能、排查问题以及进行二次开发都至关重要。通过对HBase源码的深入学习,...
### Hadoop源码分析知识点详解 #### 一、Hadoop及其核心技术背景 Hadoop作为一款开源的分布式计算框架,其核心思想来源于Google发布的几篇重要论文。这些论文详细阐述了Google构建其分布式计算平台的关键技术和...
NameNode源码分析(RPC是基础) DataNode源码分析 FileSystem源码分析(如何与NameNode通信ClientProtocol) JobTracker源码分析 TaskTracker源码分析 网站日志分析项目(这个项目分析可以让你更加掌握好所学的知识...
总的来说,Hadoop源码分析涵盖了分布式文件系统的设计原理、分布式计算模型的实现以及相关的通信、安全和监控机制。深入理解Hadoop的源码,有助于开发者更好地利用这个框架来处理大数据问题,同时也能为优化分布式...
### Dubbo实战与源码分析 #### Dubbo简介 Dubbo是阿里巴巴开源的一款高性能、轻量级的微服务框架,它提供了一整套微服务解决方案,包括服务注册与发现、负载均衡、服务代理、断路器等功能。 #### Dubbo的核心组件...