`
zy19982004
  • 浏览: 662075 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
F6f66edc-1c1a-3859-b76b-a22e740b7aa7
Hadoop学习
浏览量:252013
社区版块
存档分类
最新评论

Hadoop学习八:Hadoop-Hdfs RPC源码 Client

 
阅读更多

一.Client类图

 

二.详细描述

  1.  ConnectionId:This class holds the address and the user ticket. The client connections to servers are uniquely identified by <remoteAddress, protocol, ticket>。一个connection由一个ConnectionId唯一标识;所以要重写ConnectionId的equals和hashcode方法。
  2. ConnectionHeader:The IPC connection header sent by the client to the server on connection establishment.
  3. Connection:继承Thread。代表client到server的一个连接。我在文中将Connection对象称为“连接”。
      /** Thread that reads responses and notifies callers.  Each connection owns a
       * socket connected to a remote address.  Calls are multiplexed through this
       * socket: responses may be delivered out of order. */
      private class Connection extends Thread {
    	 //一个连接的基本信息
        private InetSocketAddress server;             // server ip:port,注意是服务端
        private ConnectionHeader header;              // connection header
        private final ConnectionId remoteId;                // connection id
        private Socket socket = null;                 // connected socket
        private DataInputStream in;
        private DataOutputStream out;
        
        //所有的调用
        private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
        
        //1.构造函数只初始化一些基本信息
        public Connection(ConnectionId remoteId) throws IOException {
          this.remoteId = remoteId;
          this.server = remoteId.getAddress();
          header = new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket, authMethod);
          this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
              remoteId.getAddress().toString() +" from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
          this.setDaemon(true);
        }
    
        //2.与IPC server建立socket连接
        private synchronized void setupConnection() throws IOException {
          while (true) {
            try {
              this.socket = socketFactory.createSocket();
          }
        }
        /** Connect to the server and set up the I/O streams. It then sends
         * a header to the server and starts
         * the connection thread that waits for responses.
         */
        private synchronized void setupIOstreams() throws InterruptedException {
          try {
            while (true) {
              //2.与IPC server建立socket连接
              setupConnection();
              //3.创建流
              InputStream inStream = NetUtils.getInputStream(socket);
              OutputStream outStream = NetUtils.getOutputStream(socket);
              //4.发送RPC报头
              writeRpcHeader(outStream);
              this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(inStream)));
              this.out = new DataOutputStream(new BufferedOutputStream(outStream));
              //5.发送connection header到server
              writeHeader();
              //6.启动自己(线程),接受response
              start();
              return;
            }
          } catch (Throwable t) {
            if (t instanceof IOException) {
              markClosed((IOException)t);
            } else {
              markClosed(new IOException("Couldn't set up IO streams", t));
            }
            close();
          }
        }
    
        //4.发送RPC报头
        private void writeRpcHeader(OutputStream outStream) throws IOException {
          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
          //public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
          //public static final byte CURRENT_VERSION = 4;
          out.write(Server.HEADER.array());
          out.write(Server.CURRENT_VERSION);
          authMethod.write(out);
          out.flush();
        }
        
        //5.发送connection header到server
        private void writeHeader() throws IOException {
          DataOutputBuffer buf = new DataOutputBuffer();
          header.write(buf);
          
          int bufLen = buf.getLength();
          out.writeInt(bufLen);
          out.write(buf.getData(), 0, bufLen);
        }
        
        //6.启动自己(线程),等待接受response,接受完后关闭此连接
        public void run() {
          while (waitForWork()) {//Return true if it is time to read a response; false otherwise.
            receiveResponse();
          }
          //关闭此连接
          close();
        }
    
        //7.发送请求:长度+内容
        public void sendParam(Call call) {
          DataOutputBuffer d=null;
            synchronized (this.out) {
              d = new DataOutputBuffer();
              d.writeInt(call.id);
              call.param.write(d);
              byte[] data = d.getData();
              int dataLength = d.getLength();
              out.writeInt(dataLength);      //first put the data length
              out.write(data, 0, dataLength);//write the data
              out.flush();
            }
        }  
    
        //6.接受response,把结果赋值给Call对象
        private void receiveResponse() {
            int id = in.readInt();                    // try to read an id
            Call call = calls.get(id);
            int state = in.readInt();     // read call status
            if (state == Status.SUCCESS.state) {
              Writable value = ReflectionUtils.newInstance(valueClass, conf);
              value.readFields(in);                 // read value
              call.setValue(value);
              calls.remove(id);
            } 
        }
        
    }
  4. Call对象:RPC是基于反射的,每次方法调用都对应一个Call对象,我在文中将Call对象称为“调用”。
        private class Call {
            int id;                                       // call id,唯一标示一个Call
            Writable param;                        // parameter,创建Call对象时赋值
            Writable value;                          // value, Connecion线程接受到server的response后赋值给value
            boolean done;                           // true when call is done
    
            protected Call(Writable param) {
              this.param = param;
              synchronized (Client.this) {
                this.id = counter++;
              }
            }
        }
  5. ParallelCall:继承Call,还是一个Call对象,只是这些Call对象共享一个ParallelResults。
            private class ParallelCall extends Call {
                private ParallelResults results; //多个Call共享一个ParallelResults
                private int index;
                
                public ParallelCall(Writable param, ParallelResults results, int index) {
                  super(param);
                  this.results = results;
                  this.index = index;
                }
    
                /** Deliver result to result collector. */
                protected void callComplete() {
                  results.callComplete(this);
                }
              }
     
  6. ParallelResults:一组Call对象的返回结果。
    private static class ParallelResults {
                private Writable[] values;
                private int size;   //一共有多少个Call要返回
                private int count;  //实际已经返回几个
    
                public ParallelResults(int size) {
                  this.values = new Writable[size];
                  this.size = size;
                }
    
                /** Collect a result. */
                public synchronized void callComplete(ParallelCall call) {
                  values[call.index] = call.value;            // store the value
                  count++;                                    // count it
                  if (count == size)                          // if all values are in
                    notify();                                 // then notify waiting caller
                }
              } 
  7. Client:IPC client端。调用client的call方法,传入Writable作为参数,返回一个Writable作为结果。
    /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
     * parameter, and return a {@link Writable} as their value.  A service runs on
     * a port and is defined by a parameter class and a value class.
     * 
     * @see Server
     */
    public class Client {
      //缓存client到server的所有连接
      private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
    
      private Class<? extends Writable> valueClass;   //Call对象value的类型
      private int counter;                            //创建一个Call的时候,用counter++作为Call的id
      final private Configuration conf;
    
      private SocketFactory socketFactory;           //服务器端的ip+port创建的socketFactory
      
      //1.初始化Client
      public Client(Class<? extends Writable> valueClass, Configuration conf, 
          SocketFactory factory) {
        this.valueClass = valueClass;
        this.conf = conf;
        this.socketFactory = factory; //初始化client时传入
      }
    
      //2.用client发送一个请求
      public Writable call(Writable param, ConnectionId remoteId)  
                           throws InterruptedException, IOException {
    	  //创建Call对象
        Call call = new Call(param);
        //创建Connection对象
        Connection connection = getConnection(remoteId, call);
        //发送请求,参考Connection类代码7
        connection.sendParam(call);                 
        ...
        return call.value;
      }
    
      //2.用client一次发送多个请求
      public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
          Class<?> protocol, UserGroupInformation ticket, Configuration conf)
          throws IOException, InterruptedException {
    	 //创建一个结果集
        ParallelResults results = new ParallelResults(params.length);
        synchronized (results) {
          for (int i = 0; i < params.length; i++) {
        	 //创建每个Call
            ParallelCall call = new ParallelCall(params[i], results, i);
            try {
             //创建每个Connection对象,不同的Call存放到不同的连接上(Each parameter is sent to the corresponding address.)。
              ConnectionId remoteId = ConnectionId.getConnectionId(addresses[i],
                  protocol, ticket, 0, conf);
              Connection connection = getConnection(remoteId, call);
              //发送请求,参考Connection类代码7
              connection.sendParam(call);            
            } catch (IOException e) {
            }
          }
          while (results.count != results.size) {
            try {
              results.wait();                    // wait for all results
            } catch (InterruptedException e) {}
          }
          //放回所有结果
          return results.values;
        }
      }
    
      /** Get a connection from the pool, or create a new one and add it to the
       * pool.  Connections to a given ConnectionId are reused. */
      //获得一个连接,首先从缓存中去;取不到,创建一个,并放到缓存中。
       private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException {
        Connection connection;
        do {
          synchronized (connections) {
            connection = connections.get(remoteId);
            if (connection == null) {
             //参考Connection类代码1
              connection = new Connection(remoteId);
              connections.put(remoteId, connection);
            }
          }
        } while (!connection.addCall(call));//往创建的连接里加入call
        //参考Connection类代码23456
        connection.setupIOstreams();
        return connection;
      }
    
    }
     
分享到:
评论

相关推荐

    《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理 》的源代码

    《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》是一本深入探讨Hadoop核心组件的书籍,其源代码提供了对Hadoop内部工作原理的直观理解。这本书主要关注两个关键部分:Hadoop Common和HDFS...

    hdfs源码分析整理

    在分布式文件系统中,HDFS(Hadoop Distributed File System)扮演着核心角色,而HDFS的源码分析则是深入了解HDFS架构和实现机理的关键。本文将对HDFS源码进行详细的分析和整理,涵盖了HDFS的目录结构、对象序列化、...

    Hadoop client server通讯分析

    在Hadoop中,客户端(Client)负责提交任务、读写数据,而服务器端则包括NameNode、DataNode和TaskTracker等组件,它们处理客户端请求,管理数据存储和任务调度。 二、HDFS通信 1. 客户端与NameNode交互: 当...

    hadoop源码阅读总结

    ### Hadoop源码阅读总结:IPC/RPC 通信机制详解 #### 一、概述 Hadoop作为分布式计算框架,其内部各个组件之间的通信主要通过RPC(Remote Procedure Call)实现。本文将详细介绍Hadoop中RPC机制的工作原理,特别是...

    Hadoop源码分析(client部分)

    ### Hadoop源码分析(client部分) #### Hadoop概述 Hadoop是一个开源的分布式存储与计算框架,由Apache基金会维护。它为用户提供了处理和存储海量数据的能力,并且能够运行在低成本的商品硬件集群上。Hadoop的...

    hdfs源码.zip

    1.2.1 Hadoop RPC接口 4 1.2.2 流式接口 20 1.3 HDFS主要流程 22 1.3.1 HDFS客户端读流程 22 1.3.2 HDFS客户端写流程 24 1.3.3 HDFS客户端追加写流程 25 1.3.4 Datanode启动、心跳以及执行名字节点指令...

    hadoop源码.zip

    总结,Hadoop HDFS源码的学习是一项深入理解大数据存储技术的重要任务。通过源码,我们可以更清晰地看到HDFS是如何在分布式环境下实现高可用性和容错性的,这对于提升开发和运维技能,以及解决实际问题具有重大意义...

    Hadoop之HDFS源代码分析 pdf

    在Hadoop的RPC实现方法中,Client类、Server类、RPC类以及HDFS通信协议组是核心。这些组件共同协作实现远程过程调用,使得HDFS中的各个组件能够相互交流和协作。通过这些组件,HDFS能够处理客户端请求,并在集群内部...

    学习hadoop_源代码,RPC_部分

    ### Hadoop RPC 深入理解 #### 一、引言 随着大数据处理需求的日益增长,Apache Hadoop 作为一款流行的开源分布式计算框架,在处理海量数据方面展现出了极高的性能和灵活性。其中,Hadoop 的远程过程调用(RPC)...

    hbase-0.98.1源码包

    HBase是Apache软件基金会开发的一个开源、分布式、版本化、基于列族的NoSQL数据库,它构建在Hadoop文件系统(HDFS)之上,专为处理海量数据而设计。源码包“hbase-0.98.1-src.tar.gz”提供了HBase 0.98.1版本的完整...

    细细品味Hadoop_Hadoop集群(第11期)_HBase简介及安装.pdf

    **HBase** 是一个构建在 **Hadoop** 分布式文件系统 (HDFS) 之上的分布式、可扩展的大规模数据存储系统。它是针对大数据量场景设计的,特别适用于需要实时读写访问大量稀疏数据的应用场景。HBase 的设计灵感来源于 ...

    Hadoop分布式文件系统架构

    ### Hadoop分布式文件系统(HDFS)架构及源码分析 #### 一、HDFS的架构和设计分析 ##### 1.1 引言 Hadoop分布式文件系统(HDFS)是一种专门为运行在通用硬件上的分布式文件系统而设计的解决方案。与其他分布式文件...

    hbase源码带中文注释

    通过阅读源码,可以理解HBase如何与YARN、HDFS和其他Hadoop组件交互。 4. **HBase Thrift**: 提供了一个Thrift接口,使得非Java语言也能访问HBase。通过Thrift,你可以用Python、C++、Ruby等语言与HBase通信。研究...

    MapReduce Job本地提交过程源码跟踪及分析

    MapReduce是Hadoop生态系统中的核心组件,主要用于处理和存储大规模数据。...通过阅读《Job本地提交过程源码分析及图解》这样的文档,我们可以深入学习MapReduce的工作原理,提升我们的Hadoop编程技能。

    hbase 源码包

    4. **RPC通信**:`org.apache.hadoop.hbase.ipc`包下的RpcServer实现了客户端与服务器之间的远程过程调用,处理客户端请求。 5. **版本控制与并发控制**:每个Cell都有时间戳,用于版本控制;`org.apache.hadoop....

    HBase源码分析

    《深入剖析HBase源码:理解其核心机制》 HBase,作为一款基于分布式存储的NoSQL数据库,广泛应用于...通过对HBase源码的深入学习,我们可以更好地掌握其工作原理,从而更好地利用HBase处理大规模的非结构化数据。

Global site tag (gtag.js) - Google Analytics