`

Hadoop-远程过程调用

 
阅读更多

 

 

 

Hadoop IPC类图如下


 
 

 

连接

//为了提高通讯效率,连接是可以复用的,通过ConnectionId来区分不同的连接
class ConnectionId {
     InetSocketAddress address;         //远端服务器的地址
     UserGroupInformation ticket;       //用户和用户组的信息
     Class<?> protocol;                     //IPC接口对应的类对象
}

//ConnectionHeader类是客户端和服务端TCP连接建立之后交换的第一条消息,包括ConnectionId中的
//用户信息和IPC接口信息,用于确认用户是否有权利连接
ConnectionHeader


//服务端连接对象
public class Connection {
    private boolean rpcHeaderRead = false; //是否已读如入了RPC版本号
    private boolean headerRead = false;  //是否读入了连接消息头

    private SocketChannel channel;
    private ByteBuffer data;
    private ByteBuffer dataLengthBuffer;
    private LinkedList<Call> responseQueue;
    private volatile int rpcCount = 0; //当前正在处理的RPC数量
    private long lastContact;
    private int dataLength;
    private Socket socket;
    // Cache the remote host & port info so that even if the socket is 
    // disconnected, we can say where it used to connect to.
    private String hostAddress;
    private int remotePort;
    private InetAddress addr;
    
    ConnectionHeader header = new ConnectionHeader();
    Class<?> protocol;
    boolean useSasl;
    SaslServer saslServer;
    private AuthMethod authMethod;
    private boolean saslContextEstablished;
    private boolean skipInitialSaslHandshake;
    private ByteBuffer rpcHeaderBuffer;
    private ByteBuffer unwrappedData;
    private ByteBuffer unwrappedDataLengthBuffer;
    UserGroupInformation user = null;
}

//客户端连接
  private class Connection extends Thread {
    private InetSocketAddress server;             //IPC服务端地址
    private String serverPrincipal;  // server's krb5 principal name
    private ConnectionHeader header;              //连接消息头
    private final ConnectionId remoteId;                //IPC连接标识
    private AuthMethod authMethod; // authentication method
    private boolean useSasl;
    private Token<? extends TokenIdentifier> token;
    private SaslRpcClient saslRpcClient;
    
    private Socket socket = null;                 // connected socket
    private DataInputStream in;
    private DataOutputStream out;
    private int rpcTimeout;
    private int maxIdleTime; //connections will be culled if it was idle for
         //maxIdleTime msecs
    private int maxRetries; //the max. no. of retries for socket connections
    private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
    private int pingInterval; // how often sends ping to the server in msecs
    
    // currently active calls
    private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
    private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
    private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
    private IOException closeException; // close reason
}

 

Call

//客户端
  private class Call {
    int id;                                       // call id
    Writable param;                               // parameter
    Writable value;                               // value, null if error
    IOException error;                            // exception, null if value
    boolean done;    
}

//服务端
  private static class Call {
    private int id;                               // the client's call id
    private Writable param;                       // the parameter passed
    private Connection connection;                // connection to client
    private long timestamp; 
}

//客户端和服务端通过各自的Call对象发送调用
客户端还有ParallelCall 用于同时发送多个远程IPC调用

 

 

服务端处理

//处理监听事件的线程
class Listener extends Thread {
	//创建SeverSocketChannel,并注册ACCEPT事件
	public Listener() {
		acceptChannel = ServerSocketChannel.open();	
		acceptChannel.configureBlocking(false);

      	// Bind the server socket to the local host and port
      	bind(acceptChannel.socket(), address, backlogLength);
      	port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      	// create a selector;
      	selector= Selector.open();
      	readers = new Reader[readThreads];
      	readPool = Executors.newFixedThreadPool(readThreads);
      	for (int i = 0; i < readThreads; i++) {
        	Selector readSelector = Selector.open();
        	Reader reader = new Reader(readSelector);
        	readers[i] = reader;
        	readPool.execute(reader);
      	}
      	acceptChannel.register(selector, SelectionKey.OP_ACCEPT);	
	}

	//处理ACCEPT事件
	public void run() {
	selector.select();
	Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
	while (iter.hasNext()) {
		key = iter.next();
		iter.remove();
		if (key.isValid()) {
	    	if (key.isAcceptable())
	      		doAccept(key);
	  		}
		}            	
	}

}

//Reader线程,用于处理读事件并交由Handler线程处理
class Reader implements Runnable {
	public void run() {
		readSelector.select();
		while (adding) {
			this.wait(1000);
		}
		Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
		while (iter.hasNext()) {
			key = iter.next();
			iter.remove();
			if (key.isValid()) {
	  			if (key.isReadable()) {
	    			doRead(key);
	  			}
			}
		}		              	
	}
}

//异步的处理写事件
class Responder extends Thread {
	public void run() {
		waitPending();     // If a channel is being registered, wait.
		writeSelector.select(PURGE_INTERVAL);
		Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
		while (iter.hasNext()) {
			SelectionKey key = iter.next();
            iter.remove();
			if (key.isValid() && key.isWritable()) {
				doAsyncWrite(key);
			}            
		}
		
		synchronized (writeSelector.keys()) {
			calls = new ArrayList<Call>(writeSelector.keys().size());
			iter = writeSelector.keys().iterator();
			while (iter.hasNext()) {
				SelectionKey key = iter.next();
				Call call = (Call)key.attachment();
				if (call != null && key.channel() == call.connection.channel) { 
					calls.add(call);
				}
			}
		}		
	}

}



void doAsyncWrite(SelectionKey key) {
	synchronized(call.connection.responseQueue) {
		processResponse(call.connection.responseQueue, false);
	}
}


//inHandler用于表示是Handler中直接调用写操作
//还是Responer线程的异步写操作
void processResponse(LinkedList<Call> responseQueue,boolean inHandler) {
	call = responseQueue.removeFirst();
          SocketChannel channel = call.connection.channel;
	int count =  (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
                 channel.write(buffer) : channelIO(null, channel, buffer);          
}


void doRead(SelectionKey key) {
	Connection c = (Connection)key.attachment();
	count = c.readAndProcess();
}

 

 

 

 

 

 

  • 大小: 90.3 KB
分享到:
评论

相关推荐

    apache-hadoop-3.1.3-winutils-master.zip

    将`hadoop.dll`放在正确的位置,可以确保Hadoop客户端能够正确地调用这些功能。 3. **配置HDFS客户端**: 要在Windows上配置HDFS客户端,首先需要设置HADOOP_HOME环境变量,指向解压后的Hadoop目录。接着,需要在...

    hadoop-2.6.0-bin-master-PC端远程调用HDFS

    这个压缩包包含了Hadoop 2.6.0的二进制版本,特别为在Windows PC上远程调用HDFS进行了优化。首先,你需要下载并解压这个文件到本地目录。解压后,你会看到一个名为"hadoop-2.6.0"的文件夹,里面包含了一系列Hadoop的...

    hadoop-common-2.2.0-bin_32bit_&_64bit

    总结来说,"hadoop-common-2.2.0-bin_32bit_&_64bit"包含了在Windows上运行Hadoop所需的二进制文件,特别是对于解决远程调试过程中的空指针异常问题,确保hadoop.dll和winutils.exe的正确配置至关重要。同时,了解并...

    windos_x64-hadoop-2.6.0-hadoop.dll-winutils.exe

    之后,开发者就可以在Eclipse中创建Hadoop项目,编写MapReduce程序,并直接在本地或远程Hadoop集群上进行调试和运行。 4. **环境变量配置**:在使用这两个文件之前,需要正确配置HADOOP_HOME和PATH环境变量,指向...

    hadoop-2.7.2-hbase-jar.zip

    这些JAR文件的使用方式通常是在Hadoop集群的每个节点上部署,或者在客户端应用中作为类路径的一部分引用,以便进行远程调用。 在实际使用中,开发人员会通过HBase的API(如Admin API和Table API)来创建表、管理...

    hadoop-3.0.1winutils.rar

    3. **SSH配置**:尽管Windows不常用SSH,但Hadoop的一些功能如YARN和HDFS的远程管理需要SSH支持。可以使用OpenSSH for Windows或者第三方工具如PuTTY。 4. **安全认证**:如果涉及到安全性,可能需要配置Kerberos以...

    hadoop-fs指令学习.pdf

    此外,Hadoop使用了**远程过程调用(Remote Procedure Call, RPC)**机制来实现NameNode与DataNode之间的通信。RPC机制允许一个程序在不同的地址空间调用另一个程序,而无需了解底层网络协议的细节。 #### 五、Hadoop...

    hadoop-winutils大合集

    当Hadoop在windows下运行或调用远程Hadoop集群的时候,需要该辅助程序才能运行。winutils是Windows中的二进制文件,适用于不同版本的Hadoop系统并构建在Windows VM上,该VM用以在Windows系统中测试Hadoop相关的应用...

    hadoop-common-2.2.0-bin-master

    - **RPC 框架**:Hadoop 使用 RPC 来通信,支持服务发现和远程方法调用。 - **工具**:如 `fsck` (文件系统检查工具) 和 `balancer` (数据均衡工具)。 对于 Spark,它可以使用 Hadoop 的 YARN(Yet Another ...

    hadoop-eclipse-plugin含WINDOWS下调试文档

    1. 确保Hadoop服务在本地或者远程集群上正常运行。 2. 确认Hadoop的版本与插件版本相匹配,不兼容的版本可能导致错误。 3. Windows环境下可能会遇到路径问题,确保所有路径都正确无误,尤其是Hadoop的本地路径设置。...

    hadoop-common-2.7.3-bin-master + win10-64

    例如,使用Socket编程实现数据传输,以及通过HTTP的RESTful API进行远程过程调用(RPC)。 3. **安全性**:Hadoop支持认证、授权和审计功能,如Kerberos,以确保集群的安全性。在2.7.3版本中,安全机制得到了进一步...

    hadoop-shell.pdf

    Hadoop还有一个重要的机制是RPC(远程过程调用)。Hadoop使用自定义的RPC来在集群中的各个节点之间进行通信。RPC机制允许Hadoop的不同组件像本地方法调用一样进行通信,而实际上它们分布在不同的机器上。 Hadoop...

    winutils-master最高包含兼容Hadoop-3.4.0版本

    总之,`winutils-master`是Hadoop在Windows环境下的关键组成部分,它使得开发者可以在本地进行Hadoop相关项目的开发和测试,而不必依赖于远程集群或复杂的环境配置。这个压缩包的使用大大简化了在Windows上运行...

    远程调用执行Hadoop Map/Reduce

    本篇文章将深入探讨“远程调用执行Hadoop Map/Reduce”的概念、原理及其实现过程,同时结合标签“源码”和“工具”,我们将涉及到如何通过编程接口与Hadoop集群进行交互。 Hadoop MapReduce是一种编程模型,用于大...

    Hadoop-1.1.2上路 v1.0

    1. **Hadoop Common**: 这是 Hadoop 的基础模块,包含了一系列通用工具和服务,如文件系统抽象、网络通信库、序列化机制以及远程过程调用(RPC)框架。 2. **HDFS (Hadoop Distributed File System)**: HDFS 是一个...

    修改hadoop中的io写的,远程调用对象的东西。

    在标题中提到的“修改Hadoop中的io写的,远程调用对象的东西”可能指的是对Hadoop的分布式文件系统(HDFS)进行定制化开发,以便更高效地处理数据或实现特定功能。下面我们将深入探讨这个主题。 1. **Hadoop I/O...

    hadoop jar合集

    4. **hadoop-client**:这是一个聚合模块,包含了访问Hadoop集群所需的所有依赖项,使得开发者可以在本地或远程机器上编写和运行Hadoop应用程序。它不仅包含了上述三个JAR文件中的类,还包含了其他必要的库,如...

    hadoop2 MR运行修改jar

    在这个过程中,涉及到很多核心的JAR文件,如`hadoop-mapreduce-client-common-2.2.0.jar`和`hadoop-mapreduce-client-jobclient-2.2.0.jar`。这两个JAR文件在Hadoop生态系统中扮演着重要角色。 1. `hadoop-...

Global site tag (gtag.js) - Google Analytics