`
huangshihang
  • 浏览: 12503 次
社区版块
存档分类
最新评论

Hadoop RPC Server端的简单实现

阅读更多

Server端的主要负责接收client端发送的请求并处理,最后返回处理结果给客户端。

Hadoop RPC的Server端采用了NIO技术,涉及到channel,selector等概念。Server类中主要有Listener,Connect,Call,Handler,Responder等类。

1、Listener类和Reader类

private class Listener extends Thread {
        private ServerSocketChannel acceptChannel = null;                         
        private Selector selector = null;
        private Reader[] readers = null;
        private int currentReader = 0;
        private InetSocketAddress address;

        public Listener() throws IOException {
            address = new InetSocketAddress(bindAddress, port);
            acceptChannel = ServerSocketChannel.open();
            acceptChannel.configureBlocking(false);
            bind(acceptChannel.socket(), address);
            selector = Selector.open();
            readers = new Reader[readThreads];
            for (int i = 0; i < readThreads; i++) {
                System.out.println(">>>start reader" + i + "......");
                Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port" + port);
                readers[i] = reader;
                reader.start();
            }
            System.out.println(">>>register listener selector on port" + port + "......");
            acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
            this.setName("IPC Server listener on:" + acceptChannel.socket().getLocalPort());
            this.setDaemon(true);
        }

        private class Reader extends Thread {
            private volatile boolean adding = false;
            private final Selector readSelector;

            Reader(String name) throws IOException {
                super(name);
                this.readSelector = Selector.open();
            }

            @Override
            public void run() {
                doRunLoop();
            }

            public synchronized void doRunLoop(){
                while (running){
                    SelectionKey key = null;
                    try {
                        readSelector.select();
                        while(adding){
                            this.wait(1000);
                        }

                        Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
                        while(iter.hasNext()){
                            key = iter.next();
                            iter.remove();
                            if(key.isValid() && key.isReadable()){
                                doRead(key);
                            }
                            key = null;
                        }
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

            public void doRead(SelectionKey key){
                Connection c = (Connection)key.attachment();
                if(c == null){
                    return;
                }
                int count = 0;
                try {
                    System.out.println(">>>reader read and process " + this.toString() + "......");
                    count = c.readAndProcess();
                } catch (IOException e) {
                    e.printStackTrace();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                if(count < 0) {
                    closeConnection(c);
                    c = null;
                }
            }

            public void startAdd() {
                adding = true;
                readSelector.wakeup();
            }

            public synchronized void finishAdd() {
                adding = false;
                this.notify();
            }

            public synchronized SelectionKey registerChannel(SocketChannel channel) throws ClosedChannelException {
                System.out.println(">>>register reader on channel:"+ this.toString() + "......");
                return channel.register(readSelector, SelectionKey.OP_READ);
            }
        }

        @Override
        public void run() {
            while (running) {
                SelectionKey key = null;
                try {
                    getSelector().select();
                    Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
                    while (iter.hasNext()) {
                        key = iter.next();
                        iter.remove();
                        if (key.isValid() && key.isAcceptable()) {
                            doAccept(key);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            synchronized (this) {
                try {
                    acceptChannel.close();
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                selector = null;
                acceptChannel = null;

                while(!connectionList.isEmpty()){
                    closeConnection(connectionList.remove(0));
                }

            }
        }

        void doAccept(SelectionKey key) throws IOException {
            Connection c = null;
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel channel;
            while ((channel = server.accept()) != null) {
                channel.configureBlocking(false);
                channel.socket().setTcpNoDelay(tcpNoDelay);
                Reader reader = getReader();
                reader.startAdd();
                System.out.println(">>>start add reader" + reader.toString() + "...");
                SelectionKey readKey = reader.registerChannel(channel);
                System.out.println(">>>create connection...");
                c = new Connection(readKey, channel);
                readKey.attach(c);
                synchronized (connectionList) {
                    connectionList.add(numConnections, c);
                    numConnections++;
                }
                reader.finishAdd();
            }
        }

        Reader getReader() {
            currentReader = (currentReader + 1) % readers.length;
            return readers[currentReader];
        }

        synchronized Selector getSelector() {
            return selector;
        }
    }

 

2、Connection类

public class Connection {
        private SocketChannel channel;
        private ByteBuffer dataLengthBuffer;
        private ByteBuffer data;
        private int dataLength;
        private LinkedList<Call> responseQueue;

        public Connection(SelectionKey key, SocketChannel channel) {
            this.channel = channel;
            this.dataLengthBuffer = ByteBuffer.allocate(4);
            this.data = null;
            this.responseQueue = new LinkedList<Call>();
        }

        public int readAndProcess() throws IOException, InterruptedException {
            int count = -1;

            if(dataLengthBuffer.remaining() > 0){
                System.out.println(">>>read the data length from the channel:" + channel.toString() + ".......");
                count = channelRead(channel, dataLengthBuffer);
                if(count < 0 || dataLengthBuffer.remaining() > 0){
                    return count;
                }
            }
            System.out.println(">>>read the data from the channel:" + channel.toString() + ".......");
            if(data == null){
                dataLengthBuffer.flip();
                dataLength = dataLengthBuffer.getInt();
                data = ByteBuffer.allocate(dataLength);
            }

            count = channelRead(channel, data);
            System.out.println(">>>finished reading the data from the channel and prepare to process the rpc.......");
            if(data.remaining() == 0){
                dataLengthBuffer.clear();
                data.flip();
                processOneRpc(data.array());
                data = null;
            }

            return count;
        }

        private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
            final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
            int id = 0;
            Writable invocation = null;
            try {
                invocation = new Invocation(Client.class.getMethod("call", Writable.class, Client.ConnectionId.class), new Object[]{});
                id = dis.readInt();
                invocation.readFields(dis);
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            }
            System.out.println(">>> create the call according to the data: id#" + id + ":" + invocation.toString());
            Call call = new Call(id, invocation, this);
            callQueue.put(call);
        }

        public void close(){

        }

    }

 

3、Call类

public static class Call {
        private final int callId;             //标识调用的id,在客户端处理返回结果时用到
        private final Writable rpcRequest;    //封装请求
        private final Connection connection;   //连接中包含channel信息
        private ByteBuffer rpcResponse;         //返回结果

        public Call(int id, Writable param, Connection connection) {
            this.callId = id;
            this.rpcRequest = param;
            this.connection = connection;
        }

        public void setResponse(ByteBuffer response){
            this.rpcResponse = response;
        }
    }

 4、Handler类

private class Handler extends Thread{
        public Handler(int instanceNumber){
            this.setDaemon(true);
            this.setName("IPC Server handler " + instanceNumber + "on port" + port);
        }

        @Override
        public void run(){
            ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
            while(running){
                Writable value = null;
                try {
                    final Call call = callQueue.take();
                    System.out.println(">>>call the service on the server...");
                    value = call(call);
                    synchronized (call.connection.responseQueue){
                        System.out.println(">>>prepare to respond the call...");
                        setupResponse(buf, call, value);
                        responder.doRespond(call);

                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }

 5、Responder类

private class Responder extends Thread{
        private final Selector writeSelector;
        private int pending;

        final static int PURGE_INTERVAL = 900000; // 15mins

        Responder() throws IOException {
            this.setName("IPC Server Responder");
            this.setDaemon(true);
            writeSelector = Selector.open();
            pending = 0;
        }

        @Override
        public void run(){
            while(running){
                try {
                    waitPending();
                    writeSelector.select(PURGE_INTERVAL);
                    Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
                    while(iter.hasNext()){
                        SelectionKey key = iter.next();
                        iter.remove();
                        if(key.isValid() && key.isWritable()){
                            doAsyncWrite(key);
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            try {
                writeSelector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        void doRespond(Call call){
            synchronized (call.connection.responseQueue){
                call.connection.responseQueue.addLast(call);
                System.out.println(">>>only one response then directly respond the call......");
                if(call.connection.responseQueue.size() == 1){
                    processResponse(call.connection.responseQueue, true);
                }
            }
        }

        private void doAsyncWrite(SelectionKey key) throws IOException {
            Call call = (Call) key.attachment();
            if(call == null){
                return;
            }
            if(key.channel() != call.connection.channel){
                throw new IOException("doAsyncWrite: bad channel");
            }

            synchronized (call.connection.responseQueue){
                System.out.println(">>>doAsyncwrite...........");
                processResponse(call.connection.responseQueue, false);
            }
        }

        private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler){
            boolean done  = false;
            Call call = null;
            int numElements = 0;
            synchronized (responseQueue){
                if((numElements = responseQueue.size()) == 0){
                    return true;
                }

                call = responseQueue.removeFirst();
                SocketChannel channel = call.connection.channel;
                try {
                    int numBytes = channelWrite(channel, call.rpcResponse);
                    if(numBytes < 0){
                        return true;
                    }

                    if(!call.rpcResponse.hasRemaining()){
                        System.out.println(">>>data writing is finished.....");
                        call.rpcResponse = null;
                        if(numElements == 1){
                            done = true;
                        }else{
                            done = false;
                        }
                    }else{
                        System.out.println(">>>data writing is not finished and register writeselector on the channel.....");
                        call.connection.responseQueue.addFirst(call);
                        if(inHandler){
                            incPending();
                            try {
                                writeSelector.wakeup();
                                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
                            }catch (ClosedChannelException e){
                                done = true;
                            }finally {
                                decPending();
                            }
                        }
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            return done;
        }

        private synchronized void incPending(){
            pending++;
        }

        private synchronized void decPending(){
            pending--;
            notify();
        }

        private synchronized void waitPending() throws InterruptedException {
            while(pending > 0){
                wait();
            }
        }
    }

 6、Server类的成员

volatile private boolean running = true;
    private String bindAddress;
    private int port;
    private BlockingDeque<Call> callQueue;
    private int handlerCount;
    private Handler[] handlers = null;
    private Responder responder = null;

    private List<Connection> connectionList =
            Collections.synchronizedList(new LinkedList<Connection>());

    private Listener listener = null;
    private int numConnections = 0;
    private int readThreads;
    private final boolean tcpNoDelay;
    private static int NIO_BUFFER_LIMIT = 8 * 1024;

 Server类的方法

protected Server(String bindAddress, int port, int numReader) throws IOException {
        this.tcpNoDelay = false;
        this.bindAddress = bindAddress;
        this.port = port;
        this.readThreads = numReader;
        this.callQueue = new LinkedBlockingDeque<Call>();
        listener = new Listener();
        responder = new Responder();
        handlerCount = 1;
    }

    public synchronized void start(){
        responder.start();
        listener.start();
        handlers = new Handler[handlerCount];
        for(int i = 0; i < handlerCount; i++){
            handlers[i] = new Handler(i);
            handlers[i].start();
        }

    }

    public synchronized void stop(){
        running = false;
        running = false;
        if (handlers != null) {
            for (int i = 0; i < handlerCount; i++) {
                if (handlers[i] != null) {
                    handlers[i].interrupt();
                }
            }
        }
        listener.interrupt();
        responder.interrupt();
        notifyAll();
    }

    public static void bind(ServerSocket socket, InetSocketAddress address) throws IOException {
        socket.bind(address);
        if (!socket.isBound()) {
            throw new BindException("could not find a free port...");
        }
    }

    private void closeConnection(Connection connection){
        synchronized (connectionList){
            if(connectionList.remove(connection))
                numConnections--;
        }
        connection.close();
    }

    private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
       int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
               channel.read(buffer) : channelIO(channel, null, buffer);
        return count;
    }

    private static int channelIO(ReadableByteChannel readCh,
                                WritableByteChannel writeCh,
                                ByteBuffer buf) throws IOException {
        int originalLimit = buf.limit();
        int initialRemaining = buf.remaining();
        int ret = 0;

        while(buf.remaining() > 0){
            try {
                int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
                buf.limit(buf.position() + ioSize);
                ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
            } finally {
                buf.limit(originalLimit);
            }
        }

        int nBytes = initialRemaining - buf.remaining();
        return (nBytes > 0) ? nBytes : ret;
    }

    private int channelWrite(SocketChannel channel, ByteBuffer rpcResponse) throws IOException {
        int count = (rpcResponse.remaining() <= NIO_BUFFER_LIMIT)?
                channel.write(rpcResponse):channelIO(null, channel, rpcResponse);
        return count;
    }

    private void setupResponse(ByteArrayOutputStream responseBuf, Call call, Writable rv){
        responseBuf.reset();
        DataOutputStream out = new DataOutputStream(responseBuf);
        try {
            final DataOutputBuffer buf = new DataOutputBuffer();
            rv.write(buf);
            byte[] data = buf.getData();
            int fullLength = buf.getLength();
//            out.writeInt(fullLength);
            out.writeInt(call.callId);
            out.write(data, 0, buf.getLength());
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println(">>>set response of the call#" + call.callId + "........");
        call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
    }

    public Writable call(Call call){
        return call.rpcRequest;
    }

 

分享到:
评论

相关推荐

    Hadoop_RPC详细分析.doc

    Hadoop RPC Server 是 RPC 服务端的实现,它是 RPC 服务的核心组件。RPC Server 可以分为三个部分:Server 抽象、Server 实现和 Server.Listener。 * Server 抽象:提供 Call 队列,用于存储客户端的调用请求。 * ...

    hadoop rpc实例

    在这个实例中,我们将深入探讨Hadoop RPC的工作原理、客户端(`hadoop_rcp_client`)与服务器端(`hadoop_rpc_server`)的角色以及它们之间的交互过程。 ### Hadoop RPC概述 Hadoop RPC是Hadoop框架中用于进程间...

    Hadoop RPC机制分析

    本文将深入探讨Hadoop的RPC机制,解析其工作原理,并结合源码分析其内部实现。 一、RPC简介 RPC是一种让程序能够调用运行在其他地址空间(通常在另一台机器上)的程序的方法。在Hadoop中,RPC被广泛用于NameNode、...

    java操作hadoop的RPC,源码

    - `RPC.Server`:服务器端的实现,负责接收和处理客户端的请求。 - `RPC.Client`:客户端的实现,用于建立连接并发送请求。 - `VersionedProtocol`:所有RPC服务需要实现的接口,包含了版本信息,用于兼容性检查...

    hadooprpc机制&&将avro引入hadooprpc机制初探

    RPCServer实现了一种抽象的RPC服务,同时提供Call队列。RPCServer作为服务提供者由两个部分组成:接收Call调用和处理Call调用。接收Call调用负责接收来自RPCClient的调用请求,编码成Call对象后放入到Call队列中。这...

    Hadoop自己的Rpc框架使用Demo

    public class RpcServer { public static void main(String[] args) throws IOException { MyServiceImpl service = new MyServiceImpl(); Server server = RPC.getServer(service, "localhost", 9999, 1, true, ...

    Hadoop的RPC通信程序

    * RPC server端实现类 */ public class SayRpcServer implements Isay { @Override public String say(String userName) { System.out.println("server received data -&gt; [" + userName + "]"); return ...

    Hadoop client server通讯分析

    Hadoop中的通信主要通过RPC实现,这是一种远程调用协议,使得客户端可以像调用本地方法一样调用远程服务器上的方法。Hadoop的RPC基于Java的Protocol Buffers,提供高效、灵活的序列化和反序列化能力,确保跨网络的...

    学习hadoop_源代码,RPC_部分

    Hadoop 的 RPC 机制类似于 Java 的 RMI(远程方法调用),都需要用户定义接口并在服务器端实现该接口。通过 `java.lang.reflect.Proxy` 类,客户端可以像调用本地方法一样调用远程服务。 ##### 3.2 设计决策 ...

    hadoop中RPC协议的小测试例子(吴超老师)

    在这个“weekend110--rpcserver”和“weekend110”的小测试例子中,我们可以预期看到吴超老师如何创建一个简单的RPC服务器和客户端。可能包含以下内容: - 服务器端实现一个接口,该接口定义了可供客户端调用的方法...

    学习hadoop源代码,RPC部分.pdf

    Hadoop中的RPC机制是基于Java的IPC(Inter-Process Communication)实现的,它在设计时考虑了性能、效率和可控制性,因此与RMI(Remote Method Invocation)等其他RPC方案有所不同。 1. **RPC原理**: Hadoop的RPC...

    RPC应用的java实现

    RPC的基本架构基于Client/Server模型,其中客户端发起请求,服务器端执行请求并返回结果。这个过程涉及以下步骤: 1. 建立RPC服务,定义传输协议,通常是TCP或UDP。 2. 客户端将调用参数封装并发送到服务器的指定...

    hdfs源码.zip

    2.3 Hadoop RPC实现 63 2.3.1 RPC类实现 63 2.3.2 Client类实现 64 2.3.3 Server类实现 76 第3章 Namenode(名字节点) 88 3.1 文件系统树 88 3.1.1 INode相关类 89 3.1.2 Feature相关类 102 3.1.3 ...

    hadoop源码阅读总结

    Hadoop IPC/RPCServer主要基于NIO(Non-blocking I/O)模型实现,其核心处理流程如下: 1. **监听与连接建立**: - **Listener**:监听特定端口,关注`OP_ACCEPT`事件,等待客户端连接。 - 当客户端尝试连接时,`...

    Hadoop中HDFS源代码分析

    Hadoop使用远程过程调用(RPC)来实现NameNode和DataNode之间的通信,以及客户端与NameNode的交互。 - **3.2.1 Client类** 客户端API包含在`org.apache.hadoop.ipc`包下,主要负责发起RPC请求,如打开文件、关闭...

    Hadoop默认端口清单-防火墙申请使用

    ### Hadoop默认端口清单详解 #### Hadoop概述 Hadoop是一个开源软件框架,用于分布式存储和处理大型数据集。其核心组件包括HDFS(Hadoop Distributed File System)、MapReduce和YARN(Yet Another Resource ...

    Hadoop2.2.0集群安装

    ### Hadoop2.2.0集群安装:QJM实现HA及Hdfs-site配置详解 #### 一、Hadoop2.2.0完全分布式集群平台安装设置概述 在深入探讨Hadoop2.2.0的安装过程之前,我们需要了解Hadoop的基本架构以及其核心组件——HDFS...

    Apress - Pro Hadoop

     HDFS通信部分使用org.apache.hadoop.ipc,可以很快使用RPC.Server.start()构造一个节点,具体业务功能还需自己实现。针对HDFS的业务则为数据流的读写,NameNode/DataNode的通信等。  MapReduce主要在org.apache....

    基于hadoop的云计算研究报告

    2. **Avro**:负责Hadoop中的RPC通信。 3. **Chukwa**:主要用于收集和分析分布式系统的数据。 4. **HBase**:提供了一个类似于Google Bigtable的分布式数据库服务,适用于存储结构化数据。 5. **HDFS (Hadoop ...

Global site tag (gtag.js) - Google Analytics