Hadoop IPC类图如下
连接
//为了提高通讯效率,连接是可以复用的,通过ConnectionId来区分不同的连接 class ConnectionId { InetSocketAddress address; //远端服务器的地址 UserGroupInformation ticket; //用户和用户组的信息 Class<?> protocol; //IPC接口对应的类对象 } //ConnectionHeader类是客户端和服务端TCP连接建立之后交换的第一条消息,包括ConnectionId中的 //用户信息和IPC接口信息,用于确认用户是否有权利连接 ConnectionHeader //服务端连接对象 public class Connection { private boolean rpcHeaderRead = false; //是否已读如入了RPC版本号 private boolean headerRead = false; //是否读入了连接消息头 private SocketChannel channel; private ByteBuffer data; private ByteBuffer dataLengthBuffer; private LinkedList<Call> responseQueue; private volatile int rpcCount = 0; //当前正在处理的RPC数量 private long lastContact; private int dataLength; private Socket socket; // Cache the remote host & port info so that even if the socket is // disconnected, we can say where it used to connect to. private String hostAddress; private int remotePort; private InetAddress addr; ConnectionHeader header = new ConnectionHeader(); Class<?> protocol; boolean useSasl; SaslServer saslServer; private AuthMethod authMethod; private boolean saslContextEstablished; private boolean skipInitialSaslHandshake; private ByteBuffer rpcHeaderBuffer; private ByteBuffer unwrappedData; private ByteBuffer unwrappedDataLengthBuffer; UserGroupInformation user = null; } //客户端连接 private class Connection extends Thread { private InetSocketAddress server; //IPC服务端地址 private String serverPrincipal; // server's krb5 principal name private ConnectionHeader header; //连接消息头 private final ConnectionId remoteId; //IPC连接标识 private AuthMethod authMethod; // authentication method private boolean useSasl; private Token<? extends TokenIdentifier> token; private SaslRpcClient saslRpcClient; private Socket socket = null; // connected socket private DataInputStream in; private DataOutputStream out; private int rpcTimeout; private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs private int maxRetries; //the max. no. of retries for socket connections private boolean tcpNoDelay; // if T then disable Nagle's Algorithm private int pingInterval; // how often sends ping to the server in msecs // currently active calls private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>(); private AtomicLong lastActivity = new AtomicLong();// last I/O activity time private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed private IOException closeException; // close reason }
Call
//客户端 private class Call { int id; // call id Writable param; // parameter Writable value; // value, null if error IOException error; // exception, null if value boolean done; } //服务端 private static class Call { private int id; // the client's call id private Writable param; // the parameter passed private Connection connection; // connection to client private long timestamp; } //客户端和服务端通过各自的Call对象发送调用 客户端还有ParallelCall 用于同时发送多个远程IPC调用
服务端处理
//处理监听事件的线程 class Listener extends Thread { //创建SeverSocketChannel,并注册ACCEPT事件 public Listener() { acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); } acceptChannel.register(selector, SelectionKey.OP_ACCEPT); } //处理ACCEPT事件 public void run() { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } } } //Reader线程,用于处理读事件并交由Handler线程处理 class Reader implements Runnable { public void run() { readSelector.select(); while (adding) { this.wait(1000); } Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { doRead(key); } } } } } //异步的处理写事件 class Responder extends Thread { public void run() { waitPending(); // If a channel is being registered, wait. writeSelector.select(PURGE_INTERVAL); Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isValid() && key.isWritable()) { doAsyncWrite(key); } } synchronized (writeSelector.keys()) { calls = new ArrayList<Call>(writeSelector.keys().size()); iter = writeSelector.keys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); Call call = (Call)key.attachment(); if (call != null && key.channel() == call.connection.channel) { calls.add(call); } } } } } void doAsyncWrite(SelectionKey key) { synchronized(call.connection.responseQueue) { processResponse(call.connection.responseQueue, false); } } //inHandler用于表示是Handler中直接调用写操作 //还是Responer线程的异步写操作 void processResponse(LinkedList<Call> responseQueue,boolean inHandler) { call = responseQueue.removeFirst(); SocketChannel channel = call.connection.channel; int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? channel.write(buffer) : channelIO(null, channel, buffer); } void doRead(SelectionKey key) { Connection c = (Connection)key.attachment(); count = c.readAndProcess(); }
相关推荐
将`hadoop.dll`放在正确的位置,可以确保Hadoop客户端能够正确地调用这些功能。 3. **配置HDFS客户端**: 要在Windows上配置HDFS客户端,首先需要设置HADOOP_HOME环境变量,指向解压后的Hadoop目录。接着,需要在...
这个压缩包包含了Hadoop 2.6.0的二进制版本,特别为在Windows PC上远程调用HDFS进行了优化。首先,你需要下载并解压这个文件到本地目录。解压后,你会看到一个名为"hadoop-2.6.0"的文件夹,里面包含了一系列Hadoop的...
总结来说,"hadoop-common-2.2.0-bin_32bit_&_64bit"包含了在Windows上运行Hadoop所需的二进制文件,特别是对于解决远程调试过程中的空指针异常问题,确保hadoop.dll和winutils.exe的正确配置至关重要。同时,了解并...
之后,开发者就可以在Eclipse中创建Hadoop项目,编写MapReduce程序,并直接在本地或远程Hadoop集群上进行调试和运行。 4. **环境变量配置**:在使用这两个文件之前,需要正确配置HADOOP_HOME和PATH环境变量,指向...
这些JAR文件的使用方式通常是在Hadoop集群的每个节点上部署,或者在客户端应用中作为类路径的一部分引用,以便进行远程调用。 在实际使用中,开发人员会通过HBase的API(如Admin API和Table API)来创建表、管理...
3. **SSH配置**:尽管Windows不常用SSH,但Hadoop的一些功能如YARN和HDFS的远程管理需要SSH支持。可以使用OpenSSH for Windows或者第三方工具如PuTTY。 4. **安全认证**:如果涉及到安全性,可能需要配置Kerberos以...
此外,Hadoop使用了**远程过程调用(Remote Procedure Call, RPC)**机制来实现NameNode与DataNode之间的通信。RPC机制允许一个程序在不同的地址空间调用另一个程序,而无需了解底层网络协议的细节。 #### 五、Hadoop...
当Hadoop在windows下运行或调用远程Hadoop集群的时候,需要该辅助程序才能运行。winutils是Windows中的二进制文件,适用于不同版本的Hadoop系统并构建在Windows VM上,该VM用以在Windows系统中测试Hadoop相关的应用...
- **RPC 框架**:Hadoop 使用 RPC 来通信,支持服务发现和远程方法调用。 - **工具**:如 `fsck` (文件系统检查工具) 和 `balancer` (数据均衡工具)。 对于 Spark,它可以使用 Hadoop 的 YARN(Yet Another ...
1. 确保Hadoop服务在本地或者远程集群上正常运行。 2. 确认Hadoop的版本与插件版本相匹配,不兼容的版本可能导致错误。 3. Windows环境下可能会遇到路径问题,确保所有路径都正确无误,尤其是Hadoop的本地路径设置。...
例如,使用Socket编程实现数据传输,以及通过HTTP的RESTful API进行远程过程调用(RPC)。 3. **安全性**:Hadoop支持认证、授权和审计功能,如Kerberos,以确保集群的安全性。在2.7.3版本中,安全机制得到了进一步...
Hadoop还有一个重要的机制是RPC(远程过程调用)。Hadoop使用自定义的RPC来在集群中的各个节点之间进行通信。RPC机制允许Hadoop的不同组件像本地方法调用一样进行通信,而实际上它们分布在不同的机器上。 Hadoop...
总之,`winutils-master`是Hadoop在Windows环境下的关键组成部分,它使得开发者可以在本地进行Hadoop相关项目的开发和测试,而不必依赖于远程集群或复杂的环境配置。这个压缩包的使用大大简化了在Windows上运行...
本篇文章将深入探讨“远程调用执行Hadoop Map/Reduce”的概念、原理及其实现过程,同时结合标签“源码”和“工具”,我们将涉及到如何通过编程接口与Hadoop集群进行交互。 Hadoop MapReduce是一种编程模型,用于大...
1. **Hadoop Common**: 这是 Hadoop 的基础模块,包含了一系列通用工具和服务,如文件系统抽象、网络通信库、序列化机制以及远程过程调用(RPC)框架。 2. **HDFS (Hadoop Distributed File System)**: HDFS 是一个...
在标题中提到的“修改Hadoop中的io写的,远程调用对象的东西”可能指的是对Hadoop的分布式文件系统(HDFS)进行定制化开发,以便更高效地处理数据或实现特定功能。下面我们将深入探讨这个主题。 1. **Hadoop I/O...
4. **hadoop-client**:这是一个聚合模块,包含了访问Hadoop集群所需的所有依赖项,使得开发者可以在本地或远程机器上编写和运行Hadoop应用程序。它不仅包含了上述三个JAR文件中的类,还包含了其他必要的库,如...
在这个过程中,涉及到很多核心的JAR文件,如`hadoop-mapreduce-client-common-2.2.0.jar`和`hadoop-mapreduce-client-jobclient-2.2.0.jar`。这两个JAR文件在Hadoop生态系统中扮演着重要角色。 1. `hadoop-...