`
zqhxuyuan
  • 浏览: 32422 次
  • 性别: Icon_minigender_1
  • 来自: 福建
社区版块
存档分类
最新评论

Hadoop源码分析-RPC

阅读更多

方法

说明

waitForProxy

保证namenode启动正常且连接正常,主要由SecondayNode、Datanode、JobTracker使用

stopProxy

停止代理

getProxy

创建代理实例,获得代理实例的versioncode,再与getProxy()传入的versioncode做对比, 相同返回代理,不同抛出VersionMismatch异常

getServer

创建并返回一个Server实例,由TaskTracker、JobTracker、NameNode、DataNode使用

call

向一系列服务器发送一系列请求,在源码中没见到那个类使用该方法。但注释提到了:Expert,应该是给系统管理员使用的接口

 

内部类

说明

ClientCache

缓存Client对象

Invocation

用于封装方法名和参数,作为数据传输层。每次RPC调用传的参数实体类,其中Invocation包括了调用方法和配置文件

Invoker

具体的调用类,采用动态代理机制,继承InvocationHandler,

有remoteId和client成员,id用以标识异步请求对象,client用以调用实现代码

Server

Server的具体类,实现了抽象类的call方法,获得传入参数的call实例,再获取method方法,使用反射机制调用具体的方法

VersionMismatch

版本不匹配异常,三个参数interfaceName, clientVersion, serverVersion

 

Invocation类仅作为VO,ClientCache类只是作为缓存,而Server类用于服务端的处理,他们都和客户端的数据流和业务逻辑没有关系。重点是Invoker类

Invoker使用了Java的动态代理:

Dynamic Proxy是由两个class实现的:java.lang.reflect.Proxy 和 java.lang.reflect.InvocationHandler, 后者是一个接口。

动态代理类是指在运行时生成的class, 在生成它时你必须提供一组interface给它, 然后该class就宣称它实现了这些interface,即接口的代理。

Dynamic Proxy是典型的Proxy模式, 它不会替你作实质性的工作, 在生成它的实例时你必须提供一个handler(InvocationHandler), 由它接管实际的工作。

这个handler, 在Hadoop的RPC中, 就是Invoker对象,Invoker实现了InvocationHandler接口。

可以简单地理解:通过一个接口来生成一个类, 这个类上的所有方法调用, 都会传递到你生成类时传递的InvocationHandler实现中。

 

在Hadoop的RPC中, Invoker实现了InvocationHandler的invoke方法(invoke方法也是InvocationHandler的唯一方法)。

Invoker会把所有跟这次调用相关的调用方法名, 参数类型列表, 参数列表打包, 然后利用Client, 通过socket传递到服务器端。

就是说, 在proxy类上的任何调用, 都通过Client发送到远方的服务器上。

 

Invoker使用Invocation。Invocation封装了一个远程调用的所有相关信息, 它的主要属性有: 

methodName, 调用方法名, parameterClasses, 调用方法参数的类型列表和parameters, 调用方法参数。注意, 它实现了Writable接口, 可以串行化。

 

RPC.Server实现了org.apache.hadoop.ipc.Server, 你可以把一个对象, 通过RPC, 升级成为一个服务器。

服务器接收到请求,接收到的是Invocation对象, 反序列化后, 得到方法名, 方法参数列表和参数列表。

利用Java反射, 我们就可以调用对应的对象的方法。

调用的结果再通过socket, 返回给客户端, 客户端把结果解包后, 就可以返回给Dynamic Proxy的使用者了。

 

接口协议

把某些接口和接口中的方法称为协议,客户端和服务端只要实现这些接口中的方法就可以进行通信了 

Hadoop的RPC机制正是采用了这种“架构层次的协议”,有一整套作为协议的接口

/**
 * Superclass of all protocols that use Hadoop RPC. 所有RPC协议接口的父接口
 */
public interface VersionedProtocol {
  /**
   * Return protocol version corresponding to protocol interface. 返回对应的协议接口的协议版本
   * @param protocol The classname of the protocol interface 协议接口的类名
   * @param clientVersion The version of the protocol that the client speaks 客户端版本
   * @return the version that the server will speak 服务器版本
   */
  public long getProtocolVersion(String protocol, long clientVersion) throws IOException;
}

实现VersionedProtocol接口的接口

 

HDFS相关

协议接口

 

ClientDatanodeProtocol

client与datanode交互的接口,操作不多,只有一个block恢复的方法。

那么,其它数据请求的方法呢?client与datanode主要交互是通过流式的socket实现,源码在DataXceiver

ClientProtocol

client与Namenode交互的接口,所有控制流的请求均在这里,如:创建文件、删除文件等

DatanodeProtocol

Datanode与Namenode交互的接口,如心跳、blockreport等

NamenodeProtocol

SecondaryNode与Namenode交互的接口

Mapreduce相关

协议接口

 

InterDatanodeProtocol

Datanode内部交互的接口,用来更新block的元数据

InnerTrackerProtocol

TaskTracker与JobTracker交互的接口,功能与DatanodeProtocol相似

JobSubmissionProtocol

JobClient与JobTracker交互的接口,用来提交Job、获得Job等与Job相关的操作

TaskUmbilicalProtocol

Task中子进程与母进程交互的接口,子进程即map reduce等操作,母进程即TaskTracker,该接口会汇报子进程的运行状态

其它

协议接口

 

AdminOperationProtocol

不用用户操作的接口,提供一些管理操作,如刷新JobTracker的node列表

RefreshAuthorizationPolicyProtocol

 

RefreshUserMappingsProtocol 

 

 

Invocation

  /** A method invocation, including the method name and its parameters.*/
  private static class Invocation implements Writable, Configurable { //实现hadoop的序列化接口Writable,因为要在Client和Server之间传输该对象
    private String methodName;  // The name of the method invoked. 
    private Class[] parameterClasses;  // The parameter classes. 
    private Object[] parameters;  // The parameter instances. 
    private Configuration conf;

    public Invocation() {}
    public Invocation(Method method, Object[] parameters) {
      this.methodName = method.getName();
      this.parameterClasses = method.getParameterTypes();
      this.parameters = parameters;
  }
  // 序列化
    public void readFields(DataInput in) throws IOException {
      methodName = UTF8.readString(in);
      parameters = new Object[in.readInt()];
      parameterClasses = new Class[parameters.length];
      ObjectWritable objectWritable = new ObjectWritable();
      for (int i = 0; i < parameters.length; i++) { //数组类型,每个数组元素也都需要序列化
        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
        parameterClasses[i] = objectWritable.getDeclaredClass();
      }
    }
  // 反序列化
    public void write(DataOutput out) throws IOException {
      UTF8.writeString(out, methodName);
      out.writeInt(parameterClasses.length);
      for (int i = 0; i < parameterClasses.length; i++) {
        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], conf);
      }
  } 
  }

ClientCache

  /* Cache a client using its socket factory as the hash key */
  static private class ClientCache {
    private Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>();

    /**
     * Construct & cache an IPC client with the user-provided SocketFactory if no cached client exists.
     * @param conf Configuration
     * @return an IPC client
     */
    private synchronized Client getClient(Configuration conf, SocketFactory factory) {
      // Construct & cache client.  The configuration is only used for timeout, and Clients have connection pools.
      // So we can either (a) lose some connection pooling and leak sockets,
      // or (b) use the same timeout for all configurations.
      // Since the IPC is usually intended globally, not per-job, we choose (a).
      Client client = clients.get(factory);
      if (client == null) {
        client = new Client(ObjectWritable.class, conf, factory);
        clients.put(factory, client);
      } else {
        client.incCount();
      }
      return client;
    }
    /**
     * Construct & cache an IPC client with the default SocketFactory if no cached client exists.
     */
    private synchronized Client getClient(Configuration conf) {
      return getClient(conf, SocketFactory.getDefault());
    }

    /**
     * Stop a RPC client connection 
     * A RPC client is closed only when its reference count becomes zero.
     */
    private void stopClient(Client client) {
      synchronized (this) {
        client.decCount();
        if (client.isZeroReference()) {
          clients.remove(client.getSocketFactory());
        }
      }
      if (client.isZeroReference()) {
        client.stop();
      }
    }
  }
  private static ClientCache CLIENTS=new ClientCache();

  static Client getClient(Configuration conf) { //for unit testing only
    return CLIENTS.getClient(conf);
  }

Invoker

  private static class Invoker implements InvocationHandler {
    private Client.ConnectionId remoteId;
    private Client client;
    private boolean isClosed = false;

    private Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket,
        Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
      this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
      this.client = CLIENTS.getClient(conf, factory);
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);
      return value.get();
    }
    
    /* close the IPC client that's responsible for this invoker's RPCs */ 
    synchronized private void close() {
      if (!isClosed) {
        isClosed = true;
        CLIENTS.stopClient(client);
      }
    }
  }

 一般我们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg); 而上面invoke() 中却没有,这是为什么? 其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,所以这里的invoke()方法必然需要进行网络通信.要让服务端能知道客户端想要调用的是哪个接口,接口和其他参数比如要调用的地址等封装为remoteId, 这是Client的内部类ConnectionId,唯一确定一个连接

 

waitForProxy()

  static VersionedProtocol waitForProxy(Class<? extends VersionedProtocol> protocol,
	  long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout, long connTimeout) throws IOException { 
    long startTime = System.currentTimeMillis();
    IOException ioe;
    while (true) {
      try {
        return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
      } catch(ConnectException se) {  // namenode has not been started
        LOG.info("Server at " + addr + " not available yet, Zzzzz...");
        ioe = se;
      } catch(SocketTimeoutException te) {  // namenode is busy
        LOG.info("Problem connecting to server: " + addr);
        ioe = te;
      }
      // check if timed out
      if (System.currentTimeMillis()-connTimeout >= startTime) {
        throw ioe;
      }
      // wait for retry
      try {
        Thread.sleep(1000);
      } catch (InterruptedException ie) {} // IGNORE
    }
  }

getProxy()

  /** Construct a client-side proxy object that implements the named protocol,talking to a server at the named address. */
  public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol, long clientVersion, InetSocketAddress addr, 
 	  UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
    if (UserGroupInformation.isSecurityEnabled()) {
      SaslRpcServer.init(conf);
    }
    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
    VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol}, invoker);
    long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
    if (serverVersion == clientVersion) {
      return proxy;
    } else {
      throw new VersionMismatch(protocol.getName(), clientVersion, serverVersion);
    }
  }

和05中的RPC类的getProxy()方法相似。

getProxy()会生成协议接口VersionedProtocol的代理对象,当客户端调用接口的方法,会回调Invoker对象的invoke方法

Invoker实现了Java的InvocationHandler接口,和例子一样,客户端会发送封装好的Invocation对象给服务端。

Invocation对象封装了客户端想要调用的服务端的接口,方法,参数。

服务端会调用具体的接口的方法,并返回方法的执行结果,类型为ObjectWritable

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);
      return value.get();
    }

 

下一节分析client.call()是怎么将Invocation对象从客户端想服务端发送。

在分析Client之前,要明确以下几点目标:

客户端和服务端的连接是怎样建立的?
2. 客户端是怎样给服务端发送数据的?
3. 客户端是怎样获取服务端的返回数据的

 

现在来总结下Hadoop的RPC和我们自己实现的RPC的映射关系

角色

作用

05例子的对应类

Client

RPC服务的客户端

Client

RPC

实现了一个简单的RPC模型

RPC

Server

服务端的抽象类

Server接口

RPC.Server

服务端的具体类

RPC.RPCServer

VersionedProtocol

所有使用RPC服务的类都要实现该接口,在创建代理时用来判断代理对象是否创建正确

Echo接口

Invoker

动态代理

InvocationHandler

Invocation (RPC)  

Call (Client/Server)

封装客户端要调用的接口,方法,参数;以及服务端返回的方法执行结果

Invocation

Connection(Client)

Listener(Server)

处理远程连接对象: 监听客户端写入; 转发给服务端调用具体方法; 向客户端写回数据

Listener

 

RPC中关于服务端的操作: RPC.Server内部类, call(), getServer() 在后面分析RPC.Server时一起分析

  • 大小: 42.4 KB
  • 大小: 25.7 KB
分享到:
评论

相关推荐

    Hadoop RPC机制分析

    本文将深入探讨Hadoop的RPC机制,解析其工作原理,并结合源码分析其内部实现。 一、RPC简介 RPC是一种让程序能够调用运行在其他地址空间(通常在另一台机器上)的程序的方法。在Hadoop中,RPC被广泛用于NameNode、...

    Hadoop源码分析(client部分)

    ### Hadoop源码分析(client部分) #### Hadoop概述 Hadoop是一个开源的分布式存储与计算框架,由Apache基金会维护。它为用户提供了处理和存储海量数据的能力,并且能够运行在低成本的商品硬件集群上。Hadoop的...

    hadoop 源码分析 文档

    Hadoop是开源的分布式计算框架,它主要由两个核心组件构成:HDFS(Hadoop Distributed File System)和MapReduce。...Hadoop的源码分析文档提供了宝贵的参考资料,有助于开发者更好地理解和利用这个强大的框架。

    Hadoop源码分析HDFS数据流

    Hadoop 源码分析 HDFS 数据流 Hadoop 的 HDFS(Hadoop Distributed File System)是 Hadoop 项目中最核心的组件之一,它提供了高可靠、高-performance 的分布式文件系统。HDFS 的核心组件包括 Namenode、Datanode、...

    hadoop 源码分析全

    ### Hadoop源码分析知识点详解 #### 一、Hadoop及其核心技术背景 Hadoop作为一款开源的分布式计算框架,其核心思想来源于Google发布的几篇重要论文。这些论文详细阐述了Google构建其分布式计算平台的关键技术和...

    hadoop源码阅读总结

    ### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...

    hadoop源码分析

    Hadoop,作为大数据处理领域的重要框架,其源码分析对于开发者来说具有极高的价值。本文将深入探讨Hadoop的核心组件,包括Configuration、JobClient、JobConf以及JobTracker、TaskTracker等,并详细解析Hadoop作业...

    rpc架构与hadoop分享

    ### Hadoop架构设计与源码分析 #### Hadoop简介 Hadoop是一个能够对大量数据进行分布式处理的软件框架,由Apache基金会开发。它能够提供高可靠性、高扩展性以及高效的数据处理能力,被广泛应用于大数据处理领域。 ...

    Hadoop 培训课程(2)HDFS

    Hadoop 培训课程(2)HDFS 分布式文件系统与HDFS HDFS体系结构与基本概念*** HDFS的shell操作*** java接口及常用api*** ---------------------------加深拓展---------------------- ...HDFS的分布式存储架构的源码分析**

    hadoop段海涛老师八天实战视频

    10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 mapreduce的原理和编程 01-hdfs源码跟踪之打开输入流.avi 02-hdfs源码跟踪之打开输入流总结.avi 03-mapreduce介绍及wordcount.avi 04-wordcount的编写和...

    hadoop-2.6.0-src:原始解析

    源码分析YARN的工作原理,可以了解到它如何进行容器分配、任务调度以及资源监控,这对于我们理解分布式系统的资源管理和优化至关重要。 此外,Hadoop的源码还包含了大量用于容错、数据复制、网络通信的算法和策略。...

    hbase-0.98.1源码包

    源码分析可以从以下几个方面入手: 1. 源码结构:了解项目目录结构,如src/main/java下的org.apache.hadoop.hbase目录,包含了所有主要模块的源代码。 2. 主要组件:深入研究RegionServer、MasterServer、Client等...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    10-hdfs下载数据源码分析-getFileSystem2.avi 第三天 mapreduce的原理和编程 01-hdfs源码跟踪之打开输入流.avi 02-hdfs源码跟踪之打开输入流总结.avi 03-mapreduce介绍及wordcount.avi 04-wordcount的编写和...

    gene-rpc-framework

    - Java提供了多种RPC框架,如Hadoop的Hadoop RPC、Apache Thrift、Google的gRPC以及本案例中的Gene-RPC-Framework。它们都利用Java的网络编程接口来实现跨进程通信。 3. **服务提供者和服务消费者**: - 服务提供...

    hadoop2.9.x源码编译工具包

    4. **Hadoop源码结构**:Hadoop源码结构分为多个模块,如hadoop-common、hadoop-hdfs、hadoop-mapreduce等,每个模块对应不同的功能。编译时,Maven会按照模块顺序进行,确保依赖关系得到正确处理。理解这些模块的...

    hadoop源码.zip

    本篇文章将围绕"Hadoop源码"这一主题,深度探讨HDFS的底层实现机制,帮助读者从源码层面理解其工作原理。 一、HDFS概述 HDFS是基于Google发表的GFS论文设计的分布式文件系统,旨在处理和存储大量数据。它采用了主从...

Global site tag (gtag.js) - Google Analytics