Client的主要包含下面几个内部类:
主要的几个类说明:
- 1. Call,表示一次rpc的调用请求
- 2. Connection,表示一个client与server之间的连接,一个连接一个线程启动
- 3. ConnectionId:连接的标记(包括server地址,协议,其他一些连接的配置项信息)
Connection类:
主要属性说明:
- private InetSocketAddress server; //IPC服务器地址
- private final ConnectionId remoteId;//连接标识
- private Socket socket = null; //TCP连接的Socket对象
- private DataInputStream in;
- private DataOutputStream out;
- private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();//当前正在处理的远程调用
- private AtomicLong lastActivity = new AtomicLong();//IPC连接的最后一次通信时间
- private AtomicBoolean shouldCloseConnection = new AtomicBoolean();//连接关闭标识
- private IOException closeException;//导致IPC连接关闭的异常
主要方法介绍:
touch方法,更新最后一次通信时间
addCall方法:将远程调用放入calls集合中,并且唤醒waitForWork方法
setupConnection方法:创建Socket连接,创建过程中失败会进行重连直到达到次数限制
private synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; //死循环,除非创建socke连接成功,或者重连次数达到限制抛出异常 while (true) { try { //创建一个TCP Socket对象 this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(true); /* * Bind the socket to the host specified in the principal name of the * client, to ensure Server matching address of the client connection * to host name in principal passed. */ //通过HOST名称连接 UserGroupInformation ticket = remoteId.getTicket(); if (ticket != null && ticket.hasKerberosCredentials()) { KerberosInfo krbInfo = remoteId.getProtocol().getAnnotation(KerberosInfo.class); if (krbInfo != null && krbInfo.clientPrincipal() != null) { String host = SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName()); // If host name is a valid local address then bind socket to it InetAddress localAddr = NetUtils.getLocalInetAddress(host); if (localAddr != null) { this.socket.bind(new InetSocketAddress(localAddr, 0)); } } } //连接到服务器 NetUtils.connect(this.socket, server, connectionTimeout); if (rpcTimeout > 0) { pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval } //设置超时时间 this.socket.setSoTimeout(pingInterval); return; } catch (ConnectTimeoutException toe) { /* Check for an address change and update the local reference. * Reset the failure counter if the address was changed */ //检查地址是否变了,如果变了则更新,且重置失败计数器 if (updateAddress()) { timeoutFailures = ioFailures = 0; } //关闭当前连接,并计算失败次数是否超过超时重试次数,如果是抛出异常 handleConnectionTimeout(timeoutFailures++, maxRetriesOnSocketTimeouts, toe); } catch (IOException ie) { //检查地址是否变了,如果变了则更新,且重置失败计数器 if (updateAddress()) { timeoutFailures = ioFailures = 0; } //关闭当前连接,并计算失败次数是否超过失败重试次数,如果是抛出异常 handleConnectionFailure(ioFailures++, ie); } } }
writeConnectionHeader方法:往服务端发送头信息,服务端根据头信息协议版本检查,接口检查,权限检查
/** * Write the connection header - this is sent when connection is established * +----------------------------------+ * | "hrpc" 4 bytes | * +----------------------------------+ * | Version (1 byte) | * +----------------------------------+ * | Service Class (1 byte) | * +----------------------------------+ * | AuthProtocol (1 byte) | * +----------------------------------+ */ private void writeConnectionHeader(OutputStream outStream) throws IOException { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); // Write out the header, version and authentication method out.write(RpcConstants.HEADER.array());//IPCd的魔数hrpc out.write(RpcConstants.CURRENT_VERSION);//版本 out.write(serviceClass);//请求的类 out.write(authProtocol.callId);//权限协议 out.flush(); }
waitForWork方法:这是一个阻塞方法
private synchronized boolean waitForWork() { //calls为空&&连接没标识关闭&&client运行中 if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { long timeout = maxIdleTime- (Time.now()-lastActivity.get()); if (timeout>0) { try { wait(timeout);//阻塞timeout时间,或者被唤醒(addCall方法中会调用) } catch (InterruptedException e) {} } } //calls不为空&&连接没标识关闭&&client运行中 if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) { return true; } else if (shouldCloseConnection.get()) { return false; } else if (calls.isEmpty()) { //连接空闲,就关闭 markClosed(null); return false; } else { // get stopped but there are still pending requests markClosed((IOException)new IOException().initCause( new InterruptedException())); return false; } }
run方法:接受服务端返回,从此方法可以看出IPC连接可以复用,run方法会一直接收返回直到关闭
public void run() { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size()); try { //阻塞,直到连接 while (waitForWork()) {//wait here for work - read or close connection receiveRpcResponse();//接受服务端返回数据 } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don't leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
receiveRpcResponse方法:接收服务器返回数据
private void receiveRpcResponse() { if (shouldCloseConnection.get()) { return; } touch();//更新最后通信时间 try { //数据长度 int totalLen = in.readInt(); //返回头信息 RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); checkResponse(header);//校验头信息 int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); Call call = calls.get(callId); RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value calls.remove(callId);//从calls中删除本次Call call.setRpcResponse(value); // verify that length was correct // only for ProtobufEngine where len can be verified easily //校验长度正确性,当返回数据是ProtobufRpcEngine.RpcWrapper类型 if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) { ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse(); //总长度!=头报文长度+数据报文长度就是有问题 if (totalLen != headerLen + resWrapper.getLength()) { throw new RpcClientException( "RPC response length mismatch on rpc success"); } } } else { // Rpc Request failed // Verify that length was correct //请求失败,总长度应该等于报文头长度 if (totalLen != headerLen) { throw new RpcClientException( "RPC response length mismatch on rpc error"); } //异常类名 final String exceptionClassName = header.hasExceptionClassName() ? header.getExceptionClassName() : "ServerDidNotSetExceptionClassName"; //错误信息 final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; //错误码 final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null); if (erCode == null) { LOG.warn("Detailed error code not set by server on rpc error"); } RemoteException re = ( (erCode == null) ? new RemoteException(exceptionClassName, errorMsg) : new RemoteException(exceptionClassName, errorMsg, erCode)); if (status == RpcStatusProto.ERROR) { calls.remove(callId);//从calls中删除本次Call call.setException(re); } else if (status == RpcStatusProto.FATAL) { // Close the connection //请求失败关闭连接connection markClosed(re); } } } catch (IOException e) { markClosed(e); } }
相关推荐
赠送jar包:hadoop-mapreduce-client-common-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.6.5-sources.jar; 赠送Maven依赖信息...
hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...
赠送jar包:hadoop-mapreduce-client-common-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.5.1-sources.jar; 赠送Maven依赖信息...
赠送jar包:hadoop-mapreduce-client-common-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.7.3-sources.jar; 赠送Maven依赖信息...
赠送jar包:hadoop-mapreduce-client-common-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.5.1-sources.jar; 赠送Maven依赖信息...
赠送jar包:hadoop-mapreduce-client-common-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.6.5-sources.jar; 赠送Maven依赖信息...
java运行依赖jar包
curator-client-2.7.1.jar curator-framework-2.7.1.jar curator-recipes-2.7.1.jar gson-2.2.4.jar guava-11.0.2.jar hadoop-annotations-2.10.0.jar hadoop-auth-2.10.0.jar hadoop-common-2.10.0-tests.jar ...
Apache Hadoop (hadoop-3.3.4.tar.gz)项目为可靠、可扩展的分布式计算开发开源软件。官网下载速度非常缓慢,因此将hadoop-3.3.4 版本放在这里,欢迎大家来下载使用! Hadoop 架构是一个开源的、基于 Java 的编程...
hadoop-mapreduce-client-common-2.7.1,java开发的jar包需要的直接下载
2. **Java库**:Hadoop是用Java编写的,因此压缩包中会包含各种jar文件,如`hadoop-core.jar`, `hadoop-hdfs.jar`, `hadoop-mapreduce-client-core.jar`等,这些库文件提供了Hadoop的API,使得开发者可以编写分布式...
CentOS7是最常见的Linux发行版之一,它稳定且适合作为Hadoop的运行平台。首先,确保系统已经更新到最新版本,并安装了必要的依赖库,如Java运行环境(JRE)和开发工具集(Development Tools)。Hadoop需要Java 8或更...
[INFO] hadoop-mapreduce-client-common .................... SUCCESS [19.076s] [INFO] hadoop-mapreduce-client-shuffle ................... SUCCESS [2.650s] [INFO] hadoop-mapreduce-client-app ...............
1. **hadoop-common**: 这是Hadoop的基础模块,包含网络通信、安全、I/O和配置等通用功能。 2. **hadoop-hdfs-project**: 包含了HDFS的所有组件,如NameNode、DataNode和Secondary NameNode,以及客户端库和HDFS...
[INFO] hadoop-mapreduce-client-common .................... SUCCESS [19.076s] [INFO] hadoop-mapreduce-client-shuffle ................... SUCCESS [2.650s] [INFO] hadoop-mapreduce-client-app ...............
1. **HDFS相关**:HDFS是Hadoop的核心组件之一,它提供了一个高容错、高吞吐量的数据存储系统。依赖的jar包包括hadoop-hdfs,它包含了HDFS的客户端API和服务器端实现,使得应用程序能够读写HDFS上的数据。 2. **...
1. **hadoop-client-2.7.7.jar**:这个jar包是Hadoop客户端的核心库,包含了与Hadoop交互的各种接口和工具,比如提交作业、监控任务状态等。它提供了对HDFS(Hadoop Distributed File System)、MapReduce和YARN...
[INFO] hadoop-mapreduce-client-common .................... SUCCESS [19.076s] [INFO] hadoop-mapreduce-client-shuffle ................... SUCCESS [2.650s] [INFO] hadoop-mapreduce-client-app ...............
1. `hadoop-hdfs`:此模块是HDFS的实现,包括NameNode、DataNode和Client等组件。NameNode负责元数据管理,DataNode存储实际数据,Client则为应用程序提供接口。源码中可以深入了解HDFS的Block、 Lease、Heartbeat...
1. **hadoop-common**:这是Hadoop的核心组件,包含网络、安全、I/O和系统工具等通用功能。它提供了文件系统接口,支持多种底层存储系统,如本地文件系统、HDFS和其他分布式文件系统。 2. **hadoop-hdfs**:HDFS...