Server端的主要负责接收client端发送的请求并处理,最后返回处理结果给客户端。
Hadoop RPC的Server端采用了NIO技术,涉及到channel,selector等概念。Server类中主要有Listener,Connect,Call,Handler,Responder等类。
1、Listener类和Reader类
private class Listener extends Thread { private ServerSocketChannel acceptChannel = null; private Selector selector = null; private Reader[] readers = null; private int currentReader = 0; private InetSocketAddress address; public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); bind(acceptChannel.socket(), address); selector = Selector.open(); readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) { System.out.println(">>>start reader" + i + "......"); Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port" + port); readers[i] = reader; reader.start(); } System.out.println(">>>register listener selector on port" + port + "......"); acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on:" + acceptChannel.socket().getLocalPort()); this.setDaemon(true); } private class Reader extends Thread { private volatile boolean adding = false; private final Selector readSelector; Reader(String name) throws IOException { super(name); this.readSelector = Selector.open(); } @Override public void run() { doRunLoop(); } public synchronized void doRunLoop(){ while (running){ SelectionKey key = null; try { readSelector.select(); while(adding){ this.wait(1000); } Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while(iter.hasNext()){ key = iter.next(); iter.remove(); if(key.isValid() && key.isReadable()){ doRead(key); } key = null; } }catch (InterruptedException e){ e.printStackTrace(); }catch (IOException e) { e.printStackTrace(); } } } public void doRead(SelectionKey key){ Connection c = (Connection)key.attachment(); if(c == null){ return; } int count = 0; try { System.out.println(">>>reader read and process " + this.toString() + "......"); count = c.readAndProcess(); } catch (IOException e) { e.printStackTrace(); }catch (InterruptedException e){ e.printStackTrace(); } if(count < 0) { closeConnection(c); c = null; } } public void startAdd() { adding = true; readSelector.wakeup(); } public synchronized void finishAdd() { adding = false; this.notify(); } public synchronized SelectionKey registerChannel(SocketChannel channel) throws ClosedChannelException { System.out.println(">>>register reader on channel:"+ this.toString() + "......"); return channel.register(readSelector, SelectionKey.OP_READ); } } @Override public void run() { while (running) { SelectionKey key = null; try { getSelector().select(); Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid() && key.isAcceptable()) { doAccept(key); } } } catch (IOException e) { e.printStackTrace(); } } synchronized (this) { try { acceptChannel.close(); selector.close(); } catch (IOException e) { e.printStackTrace(); } selector = null; acceptChannel = null; while(!connectionList.isEmpty()){ closeConnection(connectionList.remove(0)); } } } void doAccept(SelectionKey key) throws IOException { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); reader.startAdd(); System.out.println(">>>start add reader" + reader.toString() + "..."); SelectionKey readKey = reader.registerChannel(channel); System.out.println(">>>create connection..."); c = new Connection(readKey, channel); readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } reader.finishAdd(); } } Reader getReader() { currentReader = (currentReader + 1) % readers.length; return readers[currentReader]; } synchronized Selector getSelector() { return selector; } }
2、Connection类
public class Connection { private SocketChannel channel; private ByteBuffer dataLengthBuffer; private ByteBuffer data; private int dataLength; private LinkedList<Call> responseQueue; public Connection(SelectionKey key, SocketChannel channel) { this.channel = channel; this.dataLengthBuffer = ByteBuffer.allocate(4); this.data = null; this.responseQueue = new LinkedList<Call>(); } public int readAndProcess() throws IOException, InterruptedException { int count = -1; if(dataLengthBuffer.remaining() > 0){ System.out.println(">>>read the data length from the channel:" + channel.toString() + "......."); count = channelRead(channel, dataLengthBuffer); if(count < 0 || dataLengthBuffer.remaining() > 0){ return count; } } System.out.println(">>>read the data from the channel:" + channel.toString() + "......."); if(data == null){ dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); data = ByteBuffer.allocate(dataLength); } count = channelRead(channel, data); System.out.println(">>>finished reading the data from the channel and prepare to process the rpc......."); if(data.remaining() == 0){ dataLengthBuffer.clear(); data.flip(); processOneRpc(data.array()); data = null; } return count; } private void processOneRpc(byte[] buf) throws IOException, InterruptedException { final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = 0; Writable invocation = null; try { invocation = new Invocation(Client.class.getMethod("call", Writable.class, Client.ConnectionId.class), new Object[]{}); id = dis.readInt(); invocation.readFields(dis); } catch (NoSuchMethodException e) { e.printStackTrace(); } System.out.println(">>> create the call according to the data: id#" + id + ":" + invocation.toString()); Call call = new Call(id, invocation, this); callQueue.put(call); } public void close(){ } }
3、Call类
public static class Call { private final int callId; //标识调用的id,在客户端处理返回结果时用到 private final Writable rpcRequest; //封装请求 private final Connection connection; //连接中包含channel信息 private ByteBuffer rpcResponse; //返回结果 public Call(int id, Writable param, Connection connection) { this.callId = id; this.rpcRequest = param; this.connection = connection; } public void setResponse(ByteBuffer response){ this.rpcResponse = response; } }
4、Handler类
private class Handler extends Thread{ public Handler(int instanceNumber){ this.setDaemon(true); this.setName("IPC Server handler " + instanceNumber + "on port" + port); } @Override public void run(){ ByteArrayOutputStream buf = new ByteArrayOutputStream(10240); while(running){ Writable value = null; try { final Call call = callQueue.take(); System.out.println(">>>call the service on the server..."); value = call(call); synchronized (call.connection.responseQueue){ System.out.println(">>>prepare to respond the call..."); setupResponse(buf, call, value); responder.doRespond(call); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
5、Responder类
private class Responder extends Thread{ private final Selector writeSelector; private int pending; final static int PURGE_INTERVAL = 900000; // 15mins Responder() throws IOException { this.setName("IPC Server Responder"); this.setDaemon(true); writeSelector = Selector.open(); pending = 0; } @Override public void run(){ while(running){ try { waitPending(); writeSelector.select(PURGE_INTERVAL); Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator(); while(iter.hasNext()){ SelectionKey key = iter.next(); iter.remove(); if(key.isValid() && key.isWritable()){ doAsyncWrite(key); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } try { writeSelector.close(); } catch (IOException e) { e.printStackTrace(); } } void doRespond(Call call){ synchronized (call.connection.responseQueue){ call.connection.responseQueue.addLast(call); System.out.println(">>>only one response then directly respond the call......"); if(call.connection.responseQueue.size() == 1){ processResponse(call.connection.responseQueue, true); } } } private void doAsyncWrite(SelectionKey key) throws IOException { Call call = (Call) key.attachment(); if(call == null){ return; } if(key.channel() != call.connection.channel){ throw new IOException("doAsyncWrite: bad channel"); } synchronized (call.connection.responseQueue){ System.out.println(">>>doAsyncwrite..........."); processResponse(call.connection.responseQueue, false); } } private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler){ boolean done = false; Call call = null; int numElements = 0; synchronized (responseQueue){ if((numElements = responseQueue.size()) == 0){ return true; } call = responseQueue.removeFirst(); SocketChannel channel = call.connection.channel; try { int numBytes = channelWrite(channel, call.rpcResponse); if(numBytes < 0){ return true; } if(!call.rpcResponse.hasRemaining()){ System.out.println(">>>data writing is finished....."); call.rpcResponse = null; if(numElements == 1){ done = true; }else{ done = false; } }else{ System.out.println(">>>data writing is not finished and register writeselector on the channel....."); call.connection.responseQueue.addFirst(call); if(inHandler){ incPending(); try { writeSelector.wakeup(); channel.register(writeSelector, SelectionKey.OP_WRITE, call); }catch (ClosedChannelException e){ done = true; }finally { decPending(); } } } } catch (IOException e) { e.printStackTrace(); } } return done; } private synchronized void incPending(){ pending++; } private synchronized void decPending(){ pending--; notify(); } private synchronized void waitPending() throws InterruptedException { while(pending > 0){ wait(); } } }
6、Server类的成员
volatile private boolean running = true; private String bindAddress; private int port; private BlockingDeque<Call> callQueue; private int handlerCount; private Handler[] handlers = null; private Responder responder = null; private List<Connection> connectionList = Collections.synchronizedList(new LinkedList<Connection>()); private Listener listener = null; private int numConnections = 0; private int readThreads; private final boolean tcpNoDelay; private static int NIO_BUFFER_LIMIT = 8 * 1024;
Server类的方法
protected Server(String bindAddress, int port, int numReader) throws IOException { this.tcpNoDelay = false; this.bindAddress = bindAddress; this.port = port; this.readThreads = numReader; this.callQueue = new LinkedBlockingDeque<Call>(); listener = new Listener(); responder = new Responder(); handlerCount = 1; } public synchronized void start(){ responder.start(); listener.start(); handlers = new Handler[handlerCount]; for(int i = 0; i < handlerCount; i++){ handlers[i] = new Handler(i); handlers[i].start(); } } public synchronized void stop(){ running = false; running = false; if (handlers != null) { for (int i = 0; i < handlerCount; i++) { if (handlers[i] != null) { handlers[i].interrupt(); } } } listener.interrupt(); responder.interrupt(); notifyAll(); } public static void bind(ServerSocket socket, InetSocketAddress address) throws IOException { socket.bind(address); if (!socket.isBound()) { throw new BindException("could not find a free port..."); } } private void closeConnection(Connection connection){ synchronized (connectionList){ if(connectionList.remove(connection)) numConnections--; } connection.close(); } private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? channel.read(buffer) : channelIO(channel, null, buffer); return count; } private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, ByteBuffer buf) throws IOException { int originalLimit = buf.limit(); int initialRemaining = buf.remaining(); int ret = 0; while(buf.remaining() > 0){ try { int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); buf.limit(buf.position() + ioSize); ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); } finally { buf.limit(originalLimit); } } int nBytes = initialRemaining - buf.remaining(); return (nBytes > 0) ? nBytes : ret; } private int channelWrite(SocketChannel channel, ByteBuffer rpcResponse) throws IOException { int count = (rpcResponse.remaining() <= NIO_BUFFER_LIMIT)? channel.write(rpcResponse):channelIO(null, channel, rpcResponse); return count; } private void setupResponse(ByteArrayOutputStream responseBuf, Call call, Writable rv){ responseBuf.reset(); DataOutputStream out = new DataOutputStream(responseBuf); try { final DataOutputBuffer buf = new DataOutputBuffer(); rv.write(buf); byte[] data = buf.getData(); int fullLength = buf.getLength(); // out.writeInt(fullLength); out.writeInt(call.callId); out.write(data, 0, buf.getLength()); } catch (IOException e) { e.printStackTrace(); } System.out.println(">>>set response of the call#" + call.callId + "........"); call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray())); } public Writable call(Call call){ return call.rpcRequest; }
相关推荐
Hadoop RPC Server 是 RPC 服务端的实现,它是 RPC 服务的核心组件。RPC Server 可以分为三个部分:Server 抽象、Server 实现和 Server.Listener。 * Server 抽象:提供 Call 队列,用于存储客户端的调用请求。 * ...
在这个实例中,我们将深入探讨Hadoop RPC的工作原理、客户端(`hadoop_rcp_client`)与服务器端(`hadoop_rpc_server`)的角色以及它们之间的交互过程。 ### Hadoop RPC概述 Hadoop RPC是Hadoop框架中用于进程间...
本文将深入探讨Hadoop的RPC机制,解析其工作原理,并结合源码分析其内部实现。 一、RPC简介 RPC是一种让程序能够调用运行在其他地址空间(通常在另一台机器上)的程序的方法。在Hadoop中,RPC被广泛用于NameNode、...
- `RPC.Server`:服务器端的实现,负责接收和处理客户端的请求。 - `RPC.Client`:客户端的实现,用于建立连接并发送请求。 - `VersionedProtocol`:所有RPC服务需要实现的接口,包含了版本信息,用于兼容性检查...
RPCServer实现了一种抽象的RPC服务,同时提供Call队列。RPCServer作为服务提供者由两个部分组成:接收Call调用和处理Call调用。接收Call调用负责接收来自RPCClient的调用请求,编码成Call对象后放入到Call队列中。这...
public class RpcServer { public static void main(String[] args) throws IOException { MyServiceImpl service = new MyServiceImpl(); Server server = RPC.getServer(service, "localhost", 9999, 1, true, ...
* RPC server端实现类 */ public class SayRpcServer implements Isay { @Override public String say(String userName) { System.out.println("server received data -> [" + userName + "]"); return ...
Hadoop中的通信主要通过RPC实现,这是一种远程调用协议,使得客户端可以像调用本地方法一样调用远程服务器上的方法。Hadoop的RPC基于Java的Protocol Buffers,提供高效、灵活的序列化和反序列化能力,确保跨网络的...
Hadoop 的 RPC 机制类似于 Java 的 RMI(远程方法调用),都需要用户定义接口并在服务器端实现该接口。通过 `java.lang.reflect.Proxy` 类,客户端可以像调用本地方法一样调用远程服务。 ##### 3.2 设计决策 ...
在这个“weekend110--rpcserver”和“weekend110”的小测试例子中,我们可以预期看到吴超老师如何创建一个简单的RPC服务器和客户端。可能包含以下内容: - 服务器端实现一个接口,该接口定义了可供客户端调用的方法...
Hadoop中的RPC机制是基于Java的IPC(Inter-Process Communication)实现的,它在设计时考虑了性能、效率和可控制性,因此与RMI(Remote Method Invocation)等其他RPC方案有所不同。 1. **RPC原理**: Hadoop的RPC...
RPC的基本架构基于Client/Server模型,其中客户端发起请求,服务器端执行请求并返回结果。这个过程涉及以下步骤: 1. 建立RPC服务,定义传输协议,通常是TCP或UDP。 2. 客户端将调用参数封装并发送到服务器的指定...
2.3 Hadoop RPC实现 63 2.3.1 RPC类实现 63 2.3.2 Client类实现 64 2.3.3 Server类实现 76 第3章 Namenode(名字节点) 88 3.1 文件系统树 88 3.1.1 INode相关类 89 3.1.2 Feature相关类 102 3.1.3 ...
Hadoop IPC/RPCServer主要基于NIO(Non-blocking I/O)模型实现,其核心处理流程如下: 1. **监听与连接建立**: - **Listener**:监听特定端口,关注`OP_ACCEPT`事件,等待客户端连接。 - 当客户端尝试连接时,`...
### Hadoop2.2.0集群安装:QJM实现HA及Hdfs-site配置详解 #### 一、Hadoop2.2.0完全分布式集群平台安装设置概述 在深入探讨Hadoop2.2.0的安装过程之前,我们需要了解Hadoop的基本架构以及其核心组件——HDFS...
HDFS通信部分使用org.apache.hadoop.ipc,可以很快使用RPC.Server.start()构造一个节点,具体业务功能还需自己实现。针对HDFS的业务则为数据流的读写,NameNode/DataNode的通信等。 MapReduce主要在org.apache....
2. **Avro**:负责Hadoop中的RPC通信。 3. **Chukwa**:主要用于收集和分析分布式系统的数据。 4. **HBase**:提供了一个类似于Google Bigtable的分布式数据库服务,适用于存储结构化数据。 5. **HDFS (Hadoop ...
3. RPC(Remote Procedure Call)模块:Hadoop使用RPC进行节点间的通信,比如NameNode与DataNode之间的通信。源代码中的`org.apache.hadoop.ipc`包包含了相关实现。 4. YARN(Yet Another Resource Negotiator):...