`
zqhxuyuan
  • 浏览: 32409 次
  • 性别: Icon_minigender_1
  • 来自: 福建
社区版块
存档分类
最新评论

Hadoop源码分析-RPC.Client

阅读更多


内部类

作用

Call 

用于封装Invocation对象,作为VO写到服务端,同时也用于存储从服务端返回的数据

Connection 

用以处理远程连接对象。继承了Thread

ConnectionId 

唯一确定一个连接

由于Client可能和多个Server通信, 典型的一次HDFS读, 需要和NameNode打交道, 也需要和某个/某些DataNode通信。这意味着某一个Client需要维护多个连接。同时为了减少不必要的连接, Client的做法是拿ConnectionId来做为Connection的ID。ConnectionId包括一个InetSocketAddress(IP地址+端口号或主机名+端口号)对象和一个用户信息对象。即同一个用户到同一个InetSocketAddress的通信将共享同一个连接。

 

连接被封装在类Client.Connection中, 所有的RPC调用, 都是通过Connection进行通信。一个RPC调用, 自然有输入参数, 输出参数和可能的异常, 同时为了区分在同一个Connection上的不同调用, 每个调用都有唯一的id。调用是否结束也需要一个标记, 所有的这些都体现在对象Client.Call中。Connection对象通过一个Hash表, 维护在这个连接上的所有Call。

一个RPC调用通过addCall, 把请求加到Connection里。为了能够在这个框架上传输Java的基本类型, String和Writable接口的实现类, 以及元素为以上类型的数组, 我们一般把Call需要的参数打包成为ObjectWritable对象。

Client.Connection会通过socket连接服务器, 连接成功后回校验客户端/服务器的版本号(Client.ConnectionwriteHeader()方法), 校验成功后就可以通过Writable对象来进行请求的发送/应答了。注意, 每个Client.Connection会起一个线程, 不断去读取socket, 并将收到的结果解包, 找出对应的Call, 设置Call并通知结果已经获取。

Call使用Obejct的wait和notify, 把RPC上的异步消息交互转成同步调用。

还有一点需要注意, 一个Client会有多个Client.Connection, 这是一个很自然的结果。


 

 

ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);

第一个参数Invocation实现了Writable,作为Call的一部分

 

Client.call(param, remoteId)

  /** Make a call, passing param, to the IPC server defined by remoteId, returning the value. */
  public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException {
    Call call = new Call(param);  							//将传入的数据封装成call对象 
    Connection connection = getConnection(remoteId, call); 	//获得一个连接 
    connection.sendParam(call);  							// send the parameter 向服务端发送call对象 
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();           // wait for the result 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程 
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }
      if (interrupted) { // set the interrupt flag now that we are done waiting 因中断异常而终止,设置标志interrupted为true 
        Thread.currentThread().interrupt();
      }
      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception: use the connection because it will reflect an ip change, unlike the remoteId 本地异常
          throw wrapException(connection.getRemoteAddress(), call.error);
        }
      } else {
        return call.value; //返回结果数据 
      }
    }
  }

 

还有一个并行RPC调用的重载方法

  /** Makes a set of calls in parallel.  Each parameter is sent to the corresponding address.
   * When all values are available, or have timed out or errored, the collected results are returned in an array.
   * The array contains nulls for calls that timed out or errored.  */
  public Writable[] call(Writable[] params, InetSocketAddress[] addresses, Class<?> protocol, UserGroupInformation ticket, Configuration conf){
    if (addresses.length == 0) return new Writable[0];
    ParallelResults results = new ParallelResults(params.length);
    synchronized (results) {
      for (int i = 0; i < params.length; i++) {
        ParallelCall call = new ParallelCall(params[i], results, i);
        try {
          ConnectionId remoteId = ConnectionId.getConnectionId(addresses[i], protocol, ticket, 0, conf);
          Connection connection = getConnection(remoteId, call);
          connection.sendParam(call);  	// send each parameter
        } catch (IOException e) {
          results.size--;             		//  wait for one fewer result
        }
      }
      while (results.count != results.size) {
        try {
          results.wait();               	// wait for all results
        } catch (InterruptedException e) {}
      }
      return results.values;
    }
  }

Call

  /** A call waiting for a value. */
  private class Call {
    int id;            	// call id 调用次数
    Writable param;   	// parameter 参数(Invocation包含调用方法的方法名和方法的参数)
    Writable value;    	// value, null if error 返回值(调用方法的方法值)
    IOException error;  	// exception, null if value 调用出现异常?
    boolean done;     	// true when call is done 调用完成?

    protected Call(Writable param) {
      this.param = param;
      synchronized (Client.this) {
        this.id = counter++;
      }
    }

    /** Indicate when the call is complete and the value or error are available.  Notifies by default.  */
    protected synchronized void callComplete() {
      this.done = true;
      notify();      		// notify caller 当调用方法完成,通知调用者,即Invoker-->InvocationHandler--->接口代理-->客户端-->Client
    }
    /** Set the return value when there is no error.  Notify the caller the call is done. 返回RPC调用的方法执行结果 */
    public synchronized void setValue(Writable value) {
      this.value = value;
      callComplete();
    }
  }

并行RPC调用对应的ParallelCall

  /** Call implementation used for parallel calls. */
  private class ParallelCall extends Call {
    private ParallelResults results;
    private int index;
    
    public ParallelCall(Writable param, ParallelResults results, int index) {
      super(param);
      this.results = results;
      this.index = index;
    }

    /** Deliver result to result collector. */
    protected void callComplete() {
      results.callComplete(this);
    }
  }

  /** Result collector for parallel calls. */
  private static class ParallelResults {
    private Writable[] values;
    private int size;
    private int count;
    public ParallelResults(int size) {
      this.values = new Writable[size];
      this.size = size;
    }

    /** Collect a result. */
    public synchronized void callComplete(ParallelCall call) {
      values[call.index] = call.value;   	// store the value
      count++;                     	// count it
      if (count == size)        		// if all values are in
        notify();                    	// then notify waiting caller
    }
  }

Client.getConnection(remoteId, call)

  /** Get a connection from the pool, or create a new one and add it to the pool. Connections to a given ConnectionId are reused. */
  private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException {
    if (!running.get()) { // the client is stopped 连接关闭
      throw new IOException("The client is stopped");
    }
    Connection connection;
    //we could avoid this allocation for each RPC by having a connectionsId object and with set() method.通过ConnectionId,不用每次RPC调用都new一个连接  
  //We need to manage the refs for keys in HashMap properly. For now its ok. 将ConnectionId作为连接池Map的key来管理连接 
  //如果connections连接池中有对应的连接对象,就不需重新创建了;如果没有就需重新创建一个连接对象。  
  //但请注意,该连接对象只是存储了remoteId的信息,其实还并没有和服务端建立连接
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call)); //将call对象放入对应连接中的calls池 
    //we don't invoke the method below inside "synchronized (connections)" block above. 不在上面的同步块中完成下面的方法操作
    // The reason for that is if the server happens to be slow, 因为服务器建立一个连接会耗费比较长的时间,在同步块中执行会影响系统运行
    //it will take longer to establish a connection and that will slow the entire system down.
    connection.setupIOstreams(); //和服务端建立连接
    return connection;
  }

connection.setupIOstreams()

  /** Thread that reads responses and notifies callers.  Each connection owns a
   * socket connected to a remote address.  Calls are multiplexed through this
   * socket: responses may be delivered out of order. */
  private class Connection extends Thread {
    private InetSocketAddress server;  	// server ip:port  (客户端要连接到的)服务端的IP地址:端口号
    private String serverPrincipal;  		// server's krb5 principal name
    private ConnectionHeader header; 	// connection header 连接头
    private final ConnectionId remoteId; 	// connection id 每一次RPC调用的ConnectionId,唯一
    private AuthMethod authMethod; 	// authentication method 授权方法
    private boolean useSasl;
    private Token<? extends TokenIdentifier> token;
    private SaslRpcClient saslRpcClient;
    
    private Socket socket = null;        	// connected socket 客户端连接的Socket
    private DataInputStream in;			// 输入流	
    private DataOutputStream out;		// 输出流
    private int rpcTimeout;
    private int maxIdleTime; 			//connections will be culled if it was idle for maxIdleTime msecs
    private final RetryPolicy connectionRetryPolicy;
    private boolean tcpNoDelay; 		// if T then disable Nagle's Algorithm
    private int pingInterval; 			// how often sends ping to the server in msecs
    
    private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>(); 	// currently active calls
    private AtomicLong lastActivity = new AtomicLong();					// last I/O activity time
    private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  	// indicate if the connection is closed
  private IOException closeException; 									// close reason
  
  /** Update lastActivity with the current time. */
    private void touch() {
      lastActivity.set(System.currentTimeMillis());
  }
  
  /**
     * Add a call to this connection's call queue and notify a listener; synchronized. Returns false if called during shutdown.
     * @param call to add
     * @return true if the call was added.
     */
    private synchronized boolean addCall(Call call) {
      if (shouldCloseConnection.get())
        return false;
      calls.put(call.id, call);
      notify();
      return true;
    }
  
    private synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      while (true) {
        try {
          this.socket = socketFactory.createSocket(); //创建连接用的Socket(NIO方式创建)
          this.socket.setTcpNoDelay(tcpNoDelay); 
          NetUtils.connect(this.socket, server, 20000); // connection time out is 20s 开始连接
          if (rpcTimeout > 0) {
            pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval
          }
          this.socket.setSoTimeout(pingInterval);
          return;
        } catch (SocketTimeoutException toe) {
          /* Check for an address change and update the local reference.
           * Reset the failure counter if the address was changed*/
          if (updateAddress()) {
            timeoutFailures = ioFailures = 0;
          }
          /* The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries.*/
          handleConnectionFailure(timeoutFailures++, 45, toe);
        } catch (IOException ie) {
          if (updateAddress()) {
            timeoutFailures = ioFailures = 0;
          }
          handleConnectionFailure(ioFailures++, ie);
        }
      }
    }
  
    /** Connect to the server and set up the I/O streams. It then sends a header to the server and starts the connection thread that waits for responses.
     * 客户端和服务器建立连接, 然后客户端会一直监听服务端传回的数据. 和05例子的监听器类似 */
    private synchronized void setupIOstreams() throws InterruptedException {
      if (socket != null || shouldCloseConnection.get()) {
        return;
      }
      try {
        short numRetries = 0;
        final short maxRetries = 15;
        Random rand = null;
        while (true) {
          setupConnection(); 										//建立连接 
          InputStream inStream = NetUtils.getInputStream(socket); 		//获得输入流(接收数据) 
          OutputStream outStream = NetUtils.getOutputStream(socket); 	//获得输出流(发送数据) 
          writeRpcHeader(outStream);
          // ... 使用Sasl安全机制的连接设置
          this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream))); //将输入流装饰成DataInputStream 
          this.out = new DataOutputStream(new BufferedOutputStream(outStream)); //将输出流装饰成DataOutputStream 
          writeHeader();
          touch(); // update last activity time 更新最近活动的时间 
          start(); // start the receiver thread after the socket connection has been set up 连接建立启动接收线程等待服务端传回数据.Thread调用run()
          return;
        }
      } catch (Throwable t) {
        close();
      }
    }
    
    private void closeConnection() {
      try {
        socket.close();  // close the current connection 关闭当前连接
      } catch (IOException e) {
        LOG.warn("Not able to close a socket", e);
      }
      // set socket to null so that the next call to setupIOstreams can start the process of connect all over again.
      socket = null;
    }
    /* Write the RPC header 向输出流写入RPC调用的header*/
    private void writeRpcHeader(OutputStream outStream) throws IOException {
      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
      out.write(Server.HEADER.array()); // Write out the header, version and authentication method
      out.write(Server.CURRENT_VERSION);
      authMethod.write(out);
      out.flush();
    }
    /* Write the protocol header for each connection.Out is not synchronized because only the first thread does this.*/
    private void writeHeader() throws IOException {
      DataOutputBuffer buf = new DataOutputBuffer();
      header.write(buf); // Write out the ConnectionHeader
      int bufLen = buf.getLength();
      out.writeInt(bufLen); // Write out the payload length
      out.write(buf.getData(), 0, bufLen);
    }
    
    /* wait till someone signals us to start reading RPC response or it is idle too long, it is marked as to be closed, or the client is marked as not running.
     * 等待开始读取RPC调用的返回值, 空闲时间太长, 被标记为连接关闭, 客户端没有运行. 当准备读取数据时返回true
     * Return true if it is time to read a response; false otherwise. */
    private synchronized boolean waitForWork() {
      if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
        long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity.get());
        if (timeout>0) {
          try {
            wait(timeout);
          } catch (InterruptedException e) {}
        }
      }
      if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
        return true;
      } else if (shouldCloseConnection.get()) {
        return false;
      } else if (calls.isEmpty()) { // idle connection closed or stopped
        markClosed(null);
        return false;
      } else { // get stopped but there are still pending requests 
        markClosed((IOException)new IOException().initCause(new InterruptedException()));
        return false;
      }
    }
  
  // 建立连接后, start()会调用run(), 相当于05中的监听器. 监听服务端返回的数据并读取数据.
    public void run() {
      while (waitForWork()) {//wait here for work - read or close connection
        receiveResponse();
      }
      close();
    }

    /** Initiates a call by sending the parameter to the remote server. 发送Call对象给远程服务器,开始RPC调用
     * Note: this is not called from the Connection thread, but by other threads. 不是由当前客户端连接的线程调用 */
    public void sendParam(Call call) {
      if (shouldCloseConnection.get()) {
        return;
      }
      DataOutputBuffer d=null;
      try {
        synchronized (this.out) {          
          //for serializing the data to be written 序列化Call对象,因为Call由id和传入的Invocation param组成,需要对所有属性进行序列化
          d = new DataOutputBuffer();  	//构造输出流缓冲区,用于客户端向服务端输出(写)数据
          d.writeInt(call.id);				//序列化Call的id
          call.param.write(d);				//序列化Call的param即Invocation对象
          byte[] data = d.getData();			//输出流的数据
          int dataLength = d.getLength();	//输出流的长度
          out.writeInt(dataLength);      	//first put the data length 首先写出数据的长度
          out.write(data, 0, dataLength);	//write the data 输出数据,向服务端写入数据
          out.flush();
        }
      } catch(IOException e) {
        markClosed(e);
      } finally { //the buffer is just an in-memory buffer, but it is still polite to close early
        IOUtils.closeStream(d);
      }
    }  

    /* Receive a response. Because only one receiver, so no synchronization on in.接收响应 */
    private void receiveResponse() {
      if (shouldCloseConnection.get()) { return; }
      touch();
      int id = in.readInt();     		// try to read an id 阻塞读取输入流的id
      Call call = calls.get(id);		//在calls池中找到发送时的那个调用对象 
      int state = in.readInt();    	// read call status 阻塞读取RPC调用结果的状态
      if (state == Status.SUCCESS.state) {
        Writable value = ReflectionUtils.newInstance(valueClass, conf);
        value.readFields(in);   	// read value 读取调用结果.由此我们知道客户端接收服务端的输入流包含了3个数据
        call.setValue(value);		//将读取到的值赋给call对象,同时唤醒Client等待线程. value为方法的执行结果,设置value时会通知调用者 
        calls.remove(id);			//删除已处理的Call: 本次调用结束,从活动的调用Map中删除该调用
      }
    }
    
    /** Close the connection. */
    private synchronized void close() {
      if (!shouldCloseConnection.get()) {
        return;
      }
      // release the resources  first thing to do;take the connection out of the connection list
      synchronized (connections) {
        if (connections.get(remoteId) == this) {
          connections.remove(remoteId);
        }
      }
      // close the streams and therefore the socket
      IOUtils.closeStream(out);
      IOUtils.closeStream(in);
      disposeSasl();
      // clean up all calls
      cleanupCalls();
    }
    /* Cleanup all calls and mark them as done */
    private void cleanupCalls() {
      Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
      while (itor.hasNext()) {
        Call c = itor.next().getValue(); 
        c.setException(closeException); // local exception
        itor.remove();         
      }
    }
  }

 

  • 大小: 28.2 KB
  • 大小: 36.3 KB
分享到:
评论

相关推荐

    Hadoop源码分析(client部分)

    ### Hadoop源码分析(client部分) #### Hadoop概述 Hadoop是一个开源的分布式存储与计算框架,由Apache基金会维护。它为用户提供了处理和存储海量数据的能力,并且能够运行在低成本的商品硬件集群上。Hadoop的...

    hadoop源码阅读总结

    ### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...

    Hadoop client server通讯分析

    在Hadoop中,客户端(Client)负责提交任务、读写数据,而服务器端则包括NameNode、DataNode和TaskTracker等组件,它们处理客户端请求,管理数据存储和任务调度。 二、HDFS通信 1. 客户端与NameNode交互: 当...

    hbase-0.98.1源码包

    源码分析可以从以下几个方面入手: 1. 源码结构:了解项目目录结构,如src/main/java下的org.apache.hadoop.hbase目录,包含了所有主要模块的源代码。 2. 主要组件:深入研究RegionServer、MasterServer、Client等...

    hadoop源码.zip

    本篇文章将围绕"Hadoop源码"这一主题,深度探讨HDFS的底层实现机制,帮助读者从源码层面理解其工作原理。 一、HDFS概述 HDFS是基于Google发表的GFS论文设计的分布式文件系统,旨在处理和存储大量数据。它采用了主从...

    学习hadoop_源代码,RPC_部分

    ### Hadoop RPC 深入理解 #### 一、引言 ...通过以上分析可以看出,Hadoop RPC 不仅提供了一个高效的通信框架,还通过对细节的精确控制确保了高性能和服务的可靠性。这对于构建大型分布式系统来说至关重要。

    细细品味Hadoop_Hadoop集群(第11期)_HBase简介及安装.pdf

    4. **集成Hadoop生态系统**:HBase 无缝集成 Hadoop 生态系统中的其他工具,如 Hive、Pig、Hadoop MapReduce 等,可以方便地进行复杂的数据处理和分析。 5. **容错机制**:通过 Zookeeper 协同服务和 HDFS 的数据...

    HBase源码分析

    这个框架由多个组件构成,包括RPCServer和RPCClient,它们是HBase内部类,负责处理客户端请求并转发给相应的服务器。RPC.Server是Hadoop中的基础RPC服务实现,而HBase在其基础上进行了定制,以满足其特定需求,如...

    Hadoop分布式文件系统架构

    ### Hadoop分布式文件系统(HDFS)架构及源码分析 #### 一、HDFS的架构和设计分析 ##### 1.1 引言 Hadoop分布式文件系统(HDFS)是一种专门为运行在通用硬件上的分布式文件系统而设计的解决方案。与其他分布式文件...

    Apache RPC调用实例

    **Apache RPC调用实例** Apache远程过程调用(Remote Procedure Call, RPC)是一种通信协议,允许网络上的一个程序调用另一个远程系统上...在实际项目中,结合源码分析和相关工具,我们可以更好地理解和利用这些技术。

    Hadoop之HDFS源代码分析 pdf

    在Hadoop的RPC实现方法中,Client类、Server类、RPC类以及HDFS通信协议组是核心。这些组件共同协作实现远程过程调用,使得HDFS中的各个组件能够相互交流和协作。通过这些组件,HDFS能够处理客户端请求,并在集群内部...

    hbase 源码包

    HBase 0.94.4的源码分析有助于我们深入了解其内部机制,从而更好地进行系统设计和优化。无论是对于开发者还是管理员,掌握HBase的核心原理都将极大地提升在大数据领域的实践能力。通过不断学习和实践,我们可以更好...

    hdfs源码分析整理

    在分布式文件系统中,HDFS(Hadoop Distributed File System)扮演着核心角色,而HDFS的源码分析则是深入了解HDFS架构和实现机理的关键。本文将对HDFS源码进行详细的分析和整理,涵盖了HDFS的目录结构、对象序列化、...

    hbase源码带中文注释

    源码分析有助于提升对空间和时间复杂度的理解。 8. **Test-HBase**: 测试模块,包含了HBase的单元测试和集成测试,这对于理解HBase的正确性验证和测试策略非常重要。 9. **HBase Hadoop Compat**: 类似于HBase ...

    《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理 》的源代码

    《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》是一本深入探讨Hadoop核心组件的书籍,其源代码提供了对Hadoop内部工作原理的直观理解。这本书主要关注两个关键部分:Hadoop Common和HDFS...

    YARN应用开发与核心源码剖析.pdf

    通过对YARN应用开发流程和核心源码的剖析,我们可以更深刻地理解Hadoop YARN的工作原理及其在分布式计算环境中的重要作用。开发者不仅需要熟悉YARN的基本架构和组件,还需要掌握相关的RPC协议以及具体的应用开发步骤...

    MapReduce Job本地提交过程源码跟踪及分析

    MapReduce是Hadoop生态系统中的核心组件,主要用于处理和存储大规模数据。...通过阅读《Job本地提交过程源码分析及图解》这样的文档,我们可以深入学习MapReduce的工作原理,提升我们的Hadoop编程技能。

    CAT实时应用监控平台-其他

    CAT作为服务端项目基础组件,提供了Java,C/C++,Node.js,Python,Go等多语言客户端,已经在美团点评的基础架构中间件框架(MVC框架,RPC框架,数据库框架,缓存框架等,消息队列,配置系统等)深度集成,为美团...

Global site tag (gtag.js) - Google Analytics