`

第六章:小朱笔记hadoop之源码分析-ipc分析 第四节:RPC类分析

 
阅读更多

第六章:小朱笔记hadoop之源码分析-ipc分析

第四节:RPC类分析

RPC类是对Server、Client的具体化。在RPC类中规定,客户程序发出请求调用时,参数类型必须是Invocation;从服务器返回的值类型必须是ObjectWritable。
RPC类是对Server、Client的包装,简化用户的使用。如果一个类需充当服务器,只需通过RPC类的静态方法getServer获得Server实例,然后start。同时此类提供协议接口的实现。如果一个类充当客户端,可以通过getProxy或者waitForProxy获得一个实现了协议接口的proxy object,与服务器端交互。

RPC类中有5个静态内部类,分别为:

写道
Invocation :用于封装方法名和参数,作为数据传输层,相当于VO吧。
ClientCache :用于存储client对象,用socket factory作为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker :是动态代理中的调用实现类,继承了InvocationHandler.
Server : org.apache.hadoop.ipc.Server的具体实现类,实现了抽象类的call方法,获得传入参数的call实例,再获取method方法,反射调用即可。
VersionMismatch:版本不匹配异常。

 

(1)Invocation

 

    /** A method invocation, including the method name and its parameters. */
    private static class Invocation implements Writable, Configurable {
        private String methodName; // 方法名
        private Class[] parameterClasses; // 参数类型集合
        private Object[] parameters; // 参数值
        private Configuration conf; // 配置类实例
        ......
    }

 

 

(2)ClientCache

 

    /* Cache a client using its socket factory as the hash key */
    static private class ClientCache {
        // 该内部类定义了一个缓存Map
        private Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>();

        /**
         * Construct & cache an IPC client with the user-provided SocketFactory
         * if no cached client exists.
         * 通过客户端org.apache.hadoop.ipc.Client的SocketFactory可以快速取出对应的Client实例
         * 
         * @param conf
         *            Configuration
         * @return an IPC client
         */
        /**
         * 从缓存Map中取出一个IPC Client实例,如果缓存够中不存在,就创建一个兵加入到缓存Map中
         */
        private synchronized Client getClient(Configuration conf, SocketFactory factory) {
            // Construct & cache client. The configuration is only used for
            // timeout,
            // and Clients have connection pools. So we can either (a) lose some
            // connection pooling and leak sockets, or (b) use the same timeout
            // for all
            // configurations. Since the IPC is usually intended globally, not
            // per-job, we choose (a).
            Client client = clients.get(factory);
            if (client == null) {
                client = new Client(ObjectWritable.class, conf, factory); // 通过反射实例化一个ObjectWritable对象,构造Client实例
                clients.put(factory, client);
            } else {
                client.incCount();
            }
            return client;
        }

        /**
         * Construct & cache an IPC client with the default SocketFactory if no
         * cached client exists.
         * 
         * @param conf
         *            Configuration
         * @return an IPC client
         */
        private synchronized Client getClient(Configuration conf) {
            return getClient(conf, SocketFactory.getDefault());
        }

        /**
         * Stop a RPC client connection A RPC client is closed only when its
         * reference count becomes zero.
         */
        private void stopClient(Client client) {
            synchronized (this) {
                client.decCount(); // 该client实例的引用计数减1
                if (client.isZeroReference()) { // 如果client实例的引用计数此时为0
                    clients.remove(client.getSocketFactory()); // 从缓存中删除
                }
            }
            if (client.isZeroReference()) { // 如果client实例引用计数为0,需要关闭
                client.stop(); // 停止所有与该client实例相关的线程
            }
        }
    }

 

 

(3)Invoker

 

 private static class Invoker implements InvocationHandler {
        private Client.ConnectionId remoteId;
        private Client client;
        private boolean isClosed = false;

        public Invoker(Class<? extends VersionedProtocol> protocol, InetSocketAddress address, UserGroupInformation ticket,
                Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
            this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf);
            this.client = CLIENTS.getClient(conf, factory);
        }

        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            final boolean logDebug = LOG.isDebugEnabled();
            long startTime = 0;
            if (logDebug) {
                startTime = System.currentTimeMillis();
            }
            // 构造一个RPC.Invocation实例作为参数传递给调用程序,执行调用,返回值为value
            ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);
            if (logDebug) {
                long callTime = System.currentTimeMillis() - startTime;
                LOG.debug("Call: " + method.getName() + " " + callTime);
            }
            return value.get();
        }

        /* close the IPC client that's responsible for this invoker's RPCs */
        synchronized private void close() {
            if (!isClosed) {
                isClosed = true;
                CLIENTS.stopClient(client);
            }
        }
    }

 

 

 

(4)Server

public Writable call(Class<?> protocol, Writable param, long receivedTime)throws IOException {
Invocation call = (Invocation)param;

// 通过反射,根据调用方法名和方法参数类型得到Method实例  
Method method =protocol.getMethod(call.getMethodName(),
call.getParameterClasses()); 

method.setAccessible(true);// 设置反射的对象在使用时取消Java语言访问检查,提高效率  
        
Object value = method.invoke(instance, call.getParameters());// 执行调用(instance是调用底层方法的对象,第二个参数是方法调用的参数)     
}

 

 

 

分享到:
评论

相关推荐

    hadoop-lzo-0.4.21-SNAPSHOT jars

    集成Hadoop-LZO到你的Hadoop环境,你需要将`hadoop-lzo-0.4.21-SNAPSHOT.jar`添加到Hadoop的类路径中,并配置Hadoop的相关参数,例如在`core-site.xml`中设置`io.compression.codecs`属性,指定支持LZO压缩。...

    hadoop-yarn-server-common-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-yarn-server-common-2.6.5.jar; 赠送原API文档:hadoop-yarn-server-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-server-common-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-...

    flink-shaded-hadoop-2-uber-2.7.2-10.0.jar

    Flink1.10.1编译hadoop2.7.2 编译flink-shaded-hadoop-2-uber

    hadoop-eclipse-plugin-2.7.0.jar

    《Hadoop Eclipse Plugin 2.7.0:高效开发与调试工具》 Hadoop Eclipse Plugin 2.7.0是一款专门为Hadoop生态系统设计的Eclipse集成插件,它极大地简化了开发者在Eclipse环境中对Hadoop应用程序的创建、调试和管理...

    hadoop-common-2.7.3-API文档-中文版.zip

    赠送jar包:hadoop-common-2.7.3.jar; 赠送原API文档:hadoop-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-common-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-common-2.7.3.pom; 包含翻译后的API文档...

    Hadoop权威指南----读书笔记.pdf

    Hadoop权威指南----读书笔记

    hadoop-mapreduce-examples-2.6.5.jar

    hadoop-mapreduce-examples-2.6.5.jar 官方案例源码

    hadoop-core-0.20.2 源码 hadoop-2.5.1-src.tar.gz 源码 hadoop 源码

    **Hadoop Core源码分析** Hadoop-core-0.20.2是Hadoop早期版本的核心组件,它包含了Hadoop的文件系统接口、分布式计算模型MapReduce以及其他的工具和库。在源码中,我们可以看到以下几个关键部分: 1. **HDFS接口*...

    hadoop-common-2.7.1-bin-maste

    Hadoop Common是Apache Hadoop项目的核心组件之一,它提供了Hadoop生态系统中所有其他模块所必需的基础设施和服务。在本文中,我们将深入探讨Hadoop Common 2.7.1版本,解析其重要特性、工作原理以及在实际应用中的...

    编译hadoophadoop-3.2.2-src源码

    编译hadoophadoop-3.2.2-src的源码

    Hadoop源码分析 完整版 共55章

    ### Hadoop源码分析知识点概览 #### 一、Hadoop概述与背景 - **Google核心技术**:Hadoop的设计理念很大程度上受到了Google一系列核心技术的影响,包括Google File System (GFS)、BigTable以及MapReduce等。这些...

    flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar

    Flink-1.11.2与Hadoop3集成JAR包,放到flink安装包的lib目录下,可以避免Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.这个报错,实现...

    Hadoop默认端口清单-防火墙申请使用

    ### Hadoop默认端口清单详解 #### Hadoop概述 Hadoop是一个开源软件框架,用于分布式存储和处理大型数据集。其核心组件包括HDFS(Hadoop Distributed File System)、MapReduce和YARN(Yet Another Resource ...

    hadoop-common-2.7.3-bin-master

    Hadoop Common是Apache Hadoop项目的核心组件之一,提供了Hadoop系统运行所需的通用工具和服务。在本篇中,我们将深入探讨Hadoop Common 2.7.3版本在Windows环境下的安装、配置以及如何进行HDFS(Hadoop Distributed...

    Hadoop源码分析(完整版)

    Hadoop源码分析是深入理解Hadoop分布式计算平台原理的起点,通过源码分析,可以更好地掌握Hadoop的工作机制、关键组件的实现方式和内部通信流程。Hadoop项目包括了多个子项目,其中最核心的是HDFS和MapReduce,这两...

    Hadoop rpc源码

    Hadoop rpc源码是从Hadoop分离出的ipc,去掉了认证部分,附录使用文档.使用前请add lib包commons-logging-*.*.*.jar(我用的是1.0.4)和log4j-*.*.*.jar(我的1.2.13) 相关blog post: ...

    hadoop2x-eclipse-plugin

    【Hadoop2x-Eclipse-Plugin插件详解】 在大数据处理领域,Apache Hadoop是一个不可或缺的开源框架,它主要用于分布式存储和计算。Eclipse作为Java开发的主流集成开发环境(IDE),提供了强大的代码编辑、调试和管理...

    hadoop1-2-1源码

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心由两个主要部分组成:HDFS(Hadoop Distributed File System)和MapReduce。本资源提供的“hadoop1-2-1源码”是Hadoop 1.2.1版本的源代码,这个版本...

Global site tag (gtag.js) - Google Analytics