`
h140465
  • 浏览: 21887 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Hadoop-common之Client(1)

 
阅读更多

Client的主要包含下面几个内部类:

       主要的几个类说明: 

  • 1. Call,表示一次rpc的调用请求
  • 2. Connection,表示一个client与server之间的连接,一个连接一个线程启动
  • 3. ConnectionId:连接的标记(包括server地址,协议,其他一些连接的配置项信息)

Connection类:

         主要属性说明:

  • private InetSocketAddress server; //IPC服务器地址
  • private final ConnectionId remoteId;//连接标识
  • private Socket socket = null; //TCP连接的Socket对象
  • private DataInputStream in;
  • private DataOutputStream out;
  • private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();//当前正在处理的远程调用
  • private AtomicLong lastActivity = new AtomicLong();//IPC连接的最后一次通信时间
  • private AtomicBoolean shouldCloseConnection = new AtomicBoolean();//连接关闭标识
  • private IOException closeException;//导致IPC连接关闭的异常

        主要方法介绍:

            touch方法,更新最后一次通信时间

            addCall方法:将远程调用放入calls集合中,并且唤醒waitForWork方法

               setupConnection方法:创建Socket连接,创建过程中失败会进行重连直到达到次数限制     

private synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      //死循环,除非创建socke连接成功,或者重连次数达到限制抛出异常
      while (true) {
        try {
          //创建一个TCP Socket对象
          this.socket = socketFactory.createSocket();
          this.socket.setTcpNoDelay(tcpNoDelay);
          this.socket.setKeepAlive(true);
          
          /*
           * Bind the socket to the host specified in the principal name of the
           * client, to ensure Server matching address of the client connection
           * to host name in principal passed.
           */
          //通过HOST名称连接
          UserGroupInformation ticket = remoteId.getTicket();
          if (ticket != null && ticket.hasKerberosCredentials()) {
            KerberosInfo krbInfo = 
              remoteId.getProtocol().getAnnotation(KerberosInfo.class);
            if (krbInfo != null && krbInfo.clientPrincipal() != null) {
              String host = 
                SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName());
              
              // If host name is a valid local address then bind socket to it
              InetAddress localAddr = NetUtils.getLocalInetAddress(host);
              if (localAddr != null) {
                this.socket.bind(new InetSocketAddress(localAddr, 0));
              }
            }
          }
          //连接到服务器
          NetUtils.connect(this.socket, server, connectionTimeout);
          if (rpcTimeout > 0) {
            pingInterval = rpcTimeout;  // rpcTimeout overwrites pingInterval
          }
          //设置超时时间
          this.socket.setSoTimeout(pingInterval);
          return;
        } catch (ConnectTimeoutException toe) {
          /* Check for an address change and update the local reference.
           * Reset the failure counter if the address was changed
           */
          //检查地址是否变了,如果变了则更新,且重置失败计数器
          if (updateAddress()) {
            timeoutFailures = ioFailures = 0;
          }
          //关闭当前连接,并计算失败次数是否超过超时重试次数,如果是抛出异常
          handleConnectionTimeout(timeoutFailures++,
              maxRetriesOnSocketTimeouts, toe);
        } catch (IOException ie) {
          //检查地址是否变了,如果变了则更新,且重置失败计数器
          if (updateAddress()) {
            timeoutFailures = ioFailures = 0;
          }
        //关闭当前连接,并计算失败次数是否超过失败重试次数,如果是抛出异常
          handleConnectionFailure(ioFailures++, ie);
        }
      }
    }

            writeConnectionHeader方法:往服务端发送头信息,服务端根据头信息协议版本检查,接口检查,权限检查           

/**
     * Write the connection header - this is sent when connection is established
     * +----------------------------------+
     * |  "hrpc" 4 bytes                  |      
     * +----------------------------------+
     * |  Version (1 byte)                |
     * +----------------------------------+
     * |  Service Class (1 byte)          |
     * +----------------------------------+
     * |  AuthProtocol (1 byte)           |      
     * +----------------------------------+
     */
    private void writeConnectionHeader(OutputStream outStream)
        throws IOException {
      DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
      // Write out the header, version and authentication method
      out.write(RpcConstants.HEADER.array());//IPCd的魔数hrpc
      out.write(RpcConstants.CURRENT_VERSION);//版本
      out.write(serviceClass);//请求的类
      out.write(authProtocol.callId);//权限协议
      out.flush();
    }

             waitForWork方法:这是一个阻塞方法   

private synchronized boolean waitForWork() {
      //calls为空&&连接没标识关闭&&client运行中
      if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
        long timeout = maxIdleTime-
              (Time.now()-lastActivity.get());
        if (timeout>0) {
          try {
            wait(timeout);//阻塞timeout时间,或者被唤醒(addCall方法中会调用)
          } catch (InterruptedException e) {}
        }
      }
    //calls不为空&&连接没标识关闭&&client运行中
      if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
        return true;
      } else if (shouldCloseConnection.get()) {
        return false;
      } else if (calls.isEmpty()) { //连接空闲,就关闭
        markClosed(null);
        return false;
      } else { // get stopped but there are still pending requests 
        markClosed((IOException)new IOException().initCause(
            new InterruptedException()));
        return false;
      }
    }

            run方法:接受服务端返回,从此方法可以看出IPC连接可以复用,run方法会一直接收返回直到关闭  

public void run() {
      if (LOG.isDebugEnabled())
        LOG.debug(getName() + ": starting, having connections " 
            + connections.size());

      try {
    	//阻塞,直到连接
        while (waitForWork()) {//wait here for work - read or close connection
          receiveRpcResponse();//接受服务端返回数据
        }
      } catch (Throwable t) {
        // This truly is unexpected, since we catch IOException in receiveResponse
        // -- this is only to be really sure that we don't leave a client hanging
        // forever.
        LOG.warn("Unexpected error reading responses on connection " + this, t);
        markClosed(new IOException("Error reading responses", t));
      }
      
      close();
      
      if (LOG.isDebugEnabled())
        LOG.debug(getName() + ": stopped, remaining connections "
            + connections.size());
    }

           receiveRpcResponse方法:接收服务器返回数据

  

 private void receiveRpcResponse() {
      if (shouldCloseConnection.get()) {
        return;
      }
      touch();//更新最后通信时间
      
      try {
    	//数据长度
        int totalLen = in.readInt();
        //返回头信息
        RpcResponseHeaderProto header = 
            RpcResponseHeaderProto.parseDelimitedFrom(in);
        checkResponse(header);//校验头信息

        int headerLen = header.getSerializedSize();
        headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);

        int callId = header.getCallId();
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + callId);

        Call call = calls.get(callId);
        RpcStatusProto status = header.getStatus();
        if (status == RpcStatusProto.SUCCESS) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);                 // read value
          calls.remove(callId);//从calls中删除本次Call
          call.setRpcResponse(value);
          
          // verify that length was correct
          // only for ProtobufEngine where len can be verified easily
          //校验长度正确性,当返回数据是ProtobufRpcEngine.RpcWrapper类型
          if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
            ProtobufRpcEngine.RpcWrapper resWrapper = 
                (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
            //总长度!=头报文长度+数据报文长度就是有问题
            if (totalLen != headerLen + resWrapper.getLength()) { 
              throw new RpcClientException(
                  "RPC response length mismatch on rpc success");
            }
          }
        } else { // Rpc Request failed
          // Verify that length was correct
         //请求失败,总长度应该等于报文头长度
          if (totalLen != headerLen) {
            throw new RpcClientException(
                "RPC response length mismatch on rpc error");
          }
          //异常类名
          final String exceptionClassName = header.hasExceptionClassName() ?
                header.getExceptionClassName() : 
                  "ServerDidNotSetExceptionClassName";
          //错误信息
          final String errorMsg = header.hasErrorMsg() ? 
                header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
          //错误码
          final RpcErrorCodeProto erCode = 
                    (header.hasErrorDetail() ? header.getErrorDetail() : null);
          if (erCode == null) {
             LOG.warn("Detailed error code not set by server on rpc error");
          }
          RemoteException re = 
              ( (erCode == null) ? 
                  new RemoteException(exceptionClassName, errorMsg) :
              new RemoteException(exceptionClassName, errorMsg, erCode));
          if (status == RpcStatusProto.ERROR) {
            calls.remove(callId);//从calls中删除本次Call
            call.setException(re);
          } else if (status == RpcStatusProto.FATAL) {
            // Close the connection
        	//请求失败关闭连接connection
            markClosed(re);
          }
        }
      } catch (IOException e) {
        markClosed(e);
      }
    }

 

 

 

分享到:
评论

相关推荐

    hadoop-mapreduce-client-common-2.6.5-API文档-中英对照版.zip

    赠送jar包:hadoop-mapreduce-client-common-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.6.5-sources.jar; 赠送Maven依赖信息...

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop-mapreduce-client-common-2.5.1-API文档-中文版.zip

    赠送jar包:hadoop-mapreduce-client-common-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.5.1-sources.jar; 赠送Maven依赖信息...

    hadoop-mapreduce-client-common-2.7.3-API文档-中英对照版.zip

    赠送jar包:hadoop-mapreduce-client-common-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.7.3-sources.jar; 赠送Maven依赖信息...

    hadoop-mapreduce-client-common-2.5.1-API文档-中英对照版.zip

    赠送jar包:hadoop-mapreduce-client-common-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.5.1-sources.jar; 赠送Maven依赖信息...

    hadoop-mapreduce-client-common-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-mapreduce-client-common-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.6.5-sources.jar; 赠送Maven依赖信息...

    hadoop-mapreduce-client-common-2.6.0.jar

    java运行依赖jar包

    hadoop-2.10.0jar.zip

    curator-client-2.7.1.jar curator-framework-2.7.1.jar curator-recipes-2.7.1.jar gson-2.2.4.jar guava-11.0.2.jar hadoop-annotations-2.10.0.jar hadoop-auth-2.10.0.jar hadoop-common-2.10.0-tests.jar ...

    hadoop-3.3.4 版本(最新版)

    Apache Hadoop (hadoop-3.3.4.tar.gz)项目为可靠、可扩展的分布式计算开发开源软件。官网下载速度非常缓慢,因此将hadoop-3.3.4 版本放在这里,欢迎大家来下载使用! Hadoop 架构是一个开源的、基于 Java 的编程...

    hadoop-mapreduce-client-common-2.7.1.jar

    hadoop-mapreduce-client-common-2.7.1,java开发的jar包需要的直接下载

    hadoop-common-release-1.1.2.zip

    2. **Java库**:Hadoop是用Java编写的,因此压缩包中会包含各种jar文件,如`hadoop-core.jar`, `hadoop-hdfs.jar`, `hadoop-mapreduce-client-core.jar`等,这些库文件提供了Hadoop的API,使得开发者可以编写分布式...

    hadoop-3.1.3.tar.gz

    CentOS7是最常见的Linux发行版之一,它稳定且适合作为Hadoop的运行平台。首先,确保系统已经更新到最新版本,并安装了必要的依赖库,如Java运行环境(JRE)和开发工具集(Development Tools)。Hadoop需要Java 8或更...

    hadoop-2.2.0-x64.tar.gz part3

    [INFO] hadoop-mapreduce-client-common .................... SUCCESS [19.076s] [INFO] hadoop-mapreduce-client-shuffle ................... SUCCESS [2.650s] [INFO] hadoop-mapreduce-client-app ...............

    hadoop-3.1.3.tar.gz编译后的源码包

    1. **hadoop-common**: 这是Hadoop的基础模块,包含网络通信、安全、I/O和配置等通用功能。 2. **hadoop-hdfs-project**: 包含了HDFS的所有组件,如NameNode、DataNode和Secondary NameNode,以及客户端库和HDFS...

    hadoop-2.2.0-x64.tar.gz part2

    [INFO] hadoop-mapreduce-client-common .................... SUCCESS [19.076s] [INFO] hadoop-mapreduce-client-shuffle ................... SUCCESS [2.650s] [INFO] hadoop-mapreduce-client-app ...............

    hadoop-2.2.0依赖的jar包

    1. **HDFS相关**:HDFS是Hadoop的核心组件之一,它提供了一个高容错、高吞吐量的数据存储系统。依赖的jar包包括hadoop-hdfs,它包含了HDFS的客户端API和服务器端实现,使得应用程序能够读写HDFS上的数据。 2. **...

    hadoop-2.7.7-dependence.zip

    1. **hadoop-client-2.7.7.jar**:这个jar包是Hadoop客户端的核心库,包含了与Hadoop交互的各种接口和工具,比如提交作业、监控任务状态等。它提供了对HDFS(Hadoop Distributed File System)、MapReduce和YARN...

    hadoop-2.2.0-x64.tar.gz part1

    [INFO] hadoop-mapreduce-client-common .................... SUCCESS [19.076s] [INFO] hadoop-mapreduce-client-shuffle ................... SUCCESS [2.650s] [INFO] hadoop-mapreduce-client-app ...............

    hadoop-src源代码

    1. `hadoop-hdfs`:此模块是HDFS的实现,包括NameNode、DataNode和Client等组件。NameNode负责元数据管理,DataNode存储实际数据,Client则为应用程序提供接口。源码中可以深入了解HDFS的Block、 Lease、Heartbeat...

    Hadoop-2.2.0源码包

    1. **hadoop-common**:这是Hadoop的核心组件,包含网络、安全、I/O和系统工具等通用功能。它提供了文件系统接口,支持多种底层存储系统,如本地文件系统、HDFS和其他分布式文件系统。 2. **hadoop-hdfs**:HDFS...

Global site tag (gtag.js) - Google Analytics