`
zy19982004
  • 浏览: 662071 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
F6f66edc-1c1a-3859-b76b-a22e740b7aa7
Hadoop学习
浏览量:252008
社区版块
存档分类
最新评论

Hadoop学习十:Hadoop-Hdfs RPC源码 RPC

 
阅读更多

一.RPC类图

二.详细描述

  1. Server:继承org.apache.hadoop.ipc.Server(Hadoop学习九:Hadoop-hdfs RPC源码 Server)。我们称之为RPC Server。
      /** An RPC Server. */
      public static class Server extends org.apache.hadoop.ipc.Server {
        //创建一个RPC server
        //注意调用父类IP.Server的构造函数时传入的就是Invocation.class
        public Server(Object instance, Configuration conf, String bindAddress,  int port,
                      int numHandlers, boolean verbose, 
                      SecretManager<? extends TokenIdentifier> secretManager) 
            throws IOException {
          super(bindAddress, port, Invocation.class, numHandlers, conf,
              classNameBase(instance.getClass().getName()), secretManager);
        }
    
        //重写ipc.Server的call方法
        public Writable call(Class<?> protocol, Writable param, long receivedTime){
            Invocation call = (Invocation)param;
            if (verbose) log("Call: " + call);
    
            Method method = protocol.getMethod(call.getMethodName(),
            method.setAccessible(true);
            long startTime = System.currentTimeMillis();
            //真正执行远程命令就体现在这里:client的方法最终在server上被执行,就是所谓的rpc
            Object value = method.invoke(instance, call.getParameters());
    
            return new ObjectWritable(method.getReturnType(), value);
        }
      }
  2. Invocation: 我要在远程server上运行一条命令,也就是一个方法,我们把这个方法的方法名,参数,类型封装成一个Invocation对象。
       //每次方法调用都会实例化一个Invocation对象
       //将方法的方法名,参数类型,值封装成一个Invocation对象
      private static class Invocation implements Writable, Configurable {
        private String methodName;
        private Class[] parameterClasses;
        private Object[] parameters;
        private Configuration conf;
    
        public Invocation(Method method, Object[] parameters) {
          this.methodName = method.getName();
          this.parameterClasses = method.getParameterTypes();
          this.parameters = parameters;
        }
      }
  3. Invoker: 继承InvocationHandler,重写了invoke方法,invoke方法里面就是IPC Client向IPC Server发送Invocation。
      //java反射
      private static class Invoker implements InvocationHandler {
        private Client.ConnectionId remoteId;
        private Client client;
        private boolean isClosed = false;
        //只有内部RPC.getProx时调用
        private Invoker(Class<? extends VersionedProtocol> protocol,
            InetSocketAddress address, UserGroupInformation ticket,
            Configuration conf, SocketFactory factory,
            int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
          this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
              ticket, rpcTimeout, connectionRetryPolicy, conf);
          this.client = CLIENTS.getClient(conf, factory);
        }
    
        //重写invoke方法
        //每次方法调用都将封装成Invocation对象,被发往server端
        //返回server端运行此方法的结果
        public Object invoke(Object proxy, Method method, Object[] args)
          throws Throwable {
          ObjectWritable value = (ObjectWritable)
            client.call(new Invocation(method, args), remoteId);//client向server发生消息
          if (logDebug) {
            long callTime = System.currentTimeMillis() - startTime;
            LOG.debug("Call: " + method.getName() + " " + callTime);
          }
          return value.get();
        }
      }
  4. ClientCache:缓存IPC Client对象。
  5. RPC:前两章学习了IPC Client和IPC Server,RPC就是综合了这两者以及java反射机制,为我们提供了许多静态方法,我们只需要调用RPC.*,即可获得代理类,而不需要关心它们是怎么完成的。
    public class RPC {
      private RPC() {}                                  // no public ctor
      
      //相比getProxy,waitForProxy肯定能获得一个代理类VersionedProtocol
      static VersionedProtocol waitForProxy(Class<? extends VersionedProtocol> protocol,
                                                   long clientVersion,
                                                   InetSocketAddress addr,
                                                   Configuration conf,
                                                   int rpcTimeout,
                                                   long connTimeout)
                                                   throws IOException { 
            return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
            catch(ConnectException se) {  // namenode has not been started
            LOG.info("Server at " + addr + " not available yet, Zzzzz..."); //你是在卖萌
        }
      }
    
      //获得代理类,剩下的只需用代理类执行命令就行了
      public static VersionedProtocol getProxy(
          Class<? extends VersionedProtocol> protocol,
          long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
          Configuration conf, SocketFactory factory, int rpcTimeout,
          RetryPolicy connectionRetryPolicy) throws IOException {
    
        final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
            rpcTimeout, connectionRetryPolicy);
        //反射获得代理类
        VersionedProtocol proxy = (VersionedProtocol)Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[]{protocol}, invoker);
        long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion);
        //协议检查
        if (serverVersion == clientVersion) {
          return proxy;
        }
      }
    
      //每次发送一组方法调用到指定server
      //并没有经过反射机制
      //而是直接发送方法名,参数类型,值到指定server;并获取server执行这些方法的结果集
      //如果你确实有一组远程命令要执行,并且知道这些命令的的方法名,参数类型,值,并为每个命令指定执行server,那你可以调用此方法
      public static Object[] call(Method method, Object[][] params,
                                  InetSocketAddress[] addrs, 
                                  UserGroupInformation ticket, Configuration conf)
        throws IOException, InterruptedException {
        //实例化Invocation
        Invocation[] invocations = new Invocation[params.length];
        for (int i = 0; i < params.length; i++)
          invocations[i] = new Invocation(method, params[i]);
        Client client = CLIENTS.getClient(conf);
        try {//发生Invocation
        Writable[] wrappedValues = 
          client.call(invocations, addrs, method.getDeclaringClass(), ticket, conf);
        
        if (method.getReturnType() == Void.TYPE) {
          return null;
        }
    
        Object[] values =
          (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
        for (int i = 0; i < values.length; i++)
          if (wrappedValues[i] != null)
            values[i] = ((ObjectWritable)wrappedValues[i]).get();
        
        return values;
        } finally {
          CLIENTS.stopClient(client);
        }
      }
     
      //获得RPC Server
      public static Server getServer(final Object instance, final String bindAddress, final int port,
                                     final int numHandlers,
                                     final boolean verbose, Configuration conf,
                                     SecretManager<? extends TokenIdentifier> secretManager) 
        throws IOException {
        return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
      }
      
    }
     

 

0
0
分享到:
评论

相关推荐

    hadoop段海涛老师八天实战视频

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...

    Hadoop 培训课程(2)HDFS

    Hadoop 培训课程(2)HDFS 分布式文件系统与HDFS HDFS体系结构与基本概念*** HDFS的shell操作*** java接口及常用api*** ---------------------------加深拓展---------------------- RPC调用** HDFS的分布式存储架构的...

    《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理 》的源代码

    《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》是一本深入探讨Hadoop核心组件的书籍,其源代码提供了对Hadoop内部工作原理的直观理解。这本书主要关注两个关键部分:Hadoop Common和HDFS...

    hadoop-2.5.2:1.HDFS源码分析,代码注释参考自《 Hadoop2.x HDFS源码剖析》

    本文将重点探讨HDFS的源码分析,基于《Hadoop2.x HDFS源码剖析》这本书中的参考注释。首先,我们来看HDFS的核心组件——NameNode和DataNode。 1. NameNode:作为HDFS的元数据管理节点,NameNode负责维护文件系统的...

    HDFS源码剖析带书签目录高清.zip

    《Hadoop 2.X HDFS源码剖析》以Hadoop 2.6.0源码为基础,深入剖析了HDFS 2.X中各个模块的实现细节,包括RPC框架实现、Namenode实现、Datanode实现以及HDFS客户端实现等。《Hadoop 2.X HDFS源码剖析》一共有5章,其中...

    hadoop源码阅读总结

    ### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...

    hdfs源码分析整理

    在分布式文件系统中,HDFS(Hadoop Distributed File System)扮演着核心角色,而HDFS的源码分析则是深入了解HDFS架构和实现机理的关键。本文将对HDFS源码进行详细的分析和整理,涵盖了HDFS的目录结构、对象序列化、...

    Hadoop源码分析HDFS数据流

    Hadoop 源码分析 HDFS 数据流 Hadoop 的 HDFS(Hadoop Distributed File System)是 Hadoop 项目中最核心的组件之一,它提供了高可靠、高-performance 的分布式文件系统。HDFS 的核心组件包括 Namenode、Datanode、...

    hadoop-2.8.1-src.tar.gz

    2. **hadoop-hdfs**: Hadoop分布式文件系统(HDFS)的源码,负责数据的分布式存储。HDFS的设计目标是高度容错性和高吞吐量的数据访问。它将大文件分割成块,并在多台机器上复制,以确保数据的可靠性。 3. **hadoop-...

    hdfs源码.zip

    1.2.1 Hadoop RPC接口 4 1.2.2 流式接口 20 1.3 HDFS主要流程 22 1.3.1 HDFS客户端读流程 22 1.3.2 HDFS客户端写流程 24 1.3.3 HDFS客户端追加写流程 25 1.3.4 Datanode启动、心跳以及执行名字节点指令...

    hadoop源码.zip

    总结,Hadoop HDFS源码的学习是一项深入理解大数据存储技术的重要任务。通过源码,我们可以更清晰地看到HDFS是如何在分布式环境下实现高可用性和容错性的,这对于提升开发和运维技能,以及解决实际问题具有重大意义...

    HDFS源码解析

    《HDFS源码解析——揭示分布式文件系统的内在...总之,HDFS源码解析是一次探索分布式存储奥秘的旅程,通过对源码的深入学习,我们可以了解到如何构建一个健壮、高效的分布式文件系统,从而更好地适应大数据时代的需求。

    rpc架构与hadoop分享

    ### RPC架构概述 RPC(Remote Procedure Call Protocol,...通过学习Dubbo和Hadoop的相关知识,不仅可以深入了解分布式系统的架构原理和技术细节,还能够在实际工作中更好地应对大规模数据处理和微服务架构的挑战。

    hadoop2.9.x源码编译工具包

    在Hadoop中,protobuf用于定义数据交换格式,特别是HDFS和MapReduce中的RPC通信。要编译Hadoop源码,你需要安装protobuf的编译器,并确保其在PATH环境变量中。Hadoop源码中包含protobuf的配置,Maven会自动调用...

    Hadoop2.7.3源码Eclipse工程

    【标题】"Hadoop2.7.3源码Eclipse工程"揭示了这个压缩包包含的是Hadoop 2.7.3版本的源代码,并且是为Eclipse IDE准备的项目工程,便于开发者在Eclipse环境中进行源码级别的学习、调试和开发。 【描述】中的信息说明...

    hadoop-2.6.5:编译好的hadoop2.6.5,主要用于研究HDFS2

    总之,Hadoop 2.6.5版本对于HDFS2的研究提供了丰富的资源和工具,无论是对于初学者还是资深开发者,都是一个极佳的学习和探索平台。通过这个版本,我们可以深入了解分布式存储系统的设计理念,提升处理大数据问题的...

Global site tag (gtag.js) - Google Analytics