`
huangshihang
  • 浏览: 12197 次
社区版块
存档分类
最新评论

Hadoop RPC Client端的简单实现

阅读更多

 

HadoopRPC代码中,Client负责维护客户端与服务器的连接,连接负责将客户端的请求发送到服务器端并接受服务器端的返回结果。

 

Client的内部对象关系如下:

 

1)一个Client对象维护着多个与服务器的连接;

 

2ConnectionsConnection集合,每个ConnectionConnectionId标识,ConnectionId中包含了Socket连接的服务器端口地址;

 

3Connection维护与服务器的连接,发送和接受数据,在Connection中存放了对服务器的每次请求Call,请求发起时,将Call加入Connection中,返回后从Connection 中删除。

(Hadoop的RPC中Client代码考虑的很细致,文中代码为删减后细节的代码)

(1)Call的代码如下:

 

 

static class Call{
        final int id;                         //标识Call
        final Writable rpcRequest;      //请求
        Writable rpcResponse;          //返回结果
        boolean done;                     //接收返回结果标志

        public Call(Writable param){
            final Integer id = callId.get();
            if(id == null){
                this.id = nextCallId();
            }else{
                callId.set(null);
                this.id = id;
            }
            this.rpcRequest = param;
        }

        public synchronized void callCompleted(){        //接收返回结果后将标志置为true,唤醒挂起的线程
            done = true;
            notify();
        }

        public synchronized void setRpcResponse(Writable rpcResponse){
            this.rpcResponse = rpcResponse;
            callCompleted();
        }
    }

 (2)Connect继承线程Thread类,在初始化后启动,不断的查看是否有返回结果,又返回结果则找到相应的Call

private class Connection extends Thread{
        private InetSocketAddress server;            //socket地址
        private final ConnectionId remoteId;
        private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();    //Call collections
        private Socket socket = null;                    
        private DataInputStream in;
        private DataOutputStream out;
        private final Object sendRpcRequestLock = new Object();               //并发控制锁

        public Connection(ConnectionId remoteId){
            this.remoteId = remoteId;
            server = remoteId.getAddress();
        }

        private synchronized boolean addCall(Call call){                //将请求加入HashTable中
            calls.put(call.id, call);
            notify();
            return true;
        }

        private synchronized void setUpConnection(){
            try {
                this.socket = socketFactory.createSocket();
                socket.connect(server);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private synchronized void setUpIOStreams(){                          //建立socket连接并打开输入输出流
            if(socket != null) {
                return;
            }
            System.out.println("connect the socket and create input and output stream");
            setUpConnection();
            try {
                InputStream inputStream = socket.getInputStream();
                OutputStream outputStream = socket.getOutputStream();
                this.in = new DataInputStream(new BufferedInputStream(inputStream));
                this.out = new DataOutputStream(new BufferedOutputStream(outputStream));
            } catch (IOException e) {
                e.printStackTrace();
            }
            start();
        }

        private void closeConnection(){
            if(socket == null){
                return;
            }
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            socket = null;
        }

        public void sendRPCRequest(Call call) throws IOException {         //发送请求,请求中包括callid和request,callid在返回结果时用到
            final DataOutputBuffer d = new DataOutputBuffer();
            System.out.println("prepare to write the data of the call.........");
            d.writeInt(call.id);
            call.rpcRequest.write(d);
            synchronized(sendRpcRequestLock){
                Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            synchronized (Connection.this.out) {
                                byte[] data = d.getData();
                                int totalLength = d.getLength();
                                out.write(data, 0, totalLength);
                                out.flush();
                            }
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                        }finally {
                            try {
                                d.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });

                try {
                    senderFuture.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }

            }
        }

        private void receiveRpcResponse() throws NoSuchMethodException, InvocationTargetException {                          //接收返回结果
            try {
                try {                                                //sleep()是为了测试,可以删除
                    sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Integer callId = in.readInt();
                Call call = calls.get(callId);
                Writable value = valueClass.getConstructor(Method.class,         Object[].class).newInstance(Client.class.getMethod("call", Writable.class, Client.ConnectionId.class), new Object[]{});                      //这里的返回结果的类类型为自定义的类,实现Hadoop io的writable,RPC包括方法部分和参数部分,构造函数需要这两个参数
                value.readFields(in);
                calls.remove(callId);
                call.setRpcResponse(value);                         //返回结果时设置done参数并唤醒线程
                System.out.println("remove the call and the calls:" + calls.size() + ",receive the response:" + value);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InstantiationException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }

        private synchronized boolean waitForWork(){
            if(calls.isEmpty()){
                try {
                    wait(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if(!calls.isEmpty()){
                return true;
            }else {
                return false;
            }
        }

        private synchronized void close(){
            connections.remove(remoteId);
            try {
                in.close();
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            closeConnection();
            cleanUpCalls();
        }

        private void cleanUpCalls(){
            Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
            while (itor.hasNext()){
                itor.remove();
            }
        }

        public void run(){
            while(waitForWork()){
                System.out.println("prepare to accept the response...............");
                try {
                    receiveRpcResponse();
                } catch (NoSuchMethodException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
            close();
        }

    }

 (3)ConnectionId类:

public static class ConnectionId{
        InetSocketAddress address;

        ConnectionId(InetSocketAddress address){
            this.address = address;
        }

        InetSocketAddress getAddress(){ return this.address; }

    }

 (4)Client类的成员:

 

private static final AtomicInteger callIdCounter = new AtomicInteger();
    private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
    private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
    private SocketFactory socketFactory;
    private static final ExecutorService SEND_PARAMS_EXECUTOR = Executors.newCachedThreadPool(
            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("IPC Params sending Thread #%d").build()
    );

    private Class<? extends Writable> valueClass;

 (5)Client类方法:

 

public Writable call(Writable rpcRequest, ConnectionId remoteId){      //发送服务器请求时调用Client的call方法
        final Call call = new Call(rpcRequest);
        Connection connection = getConnection(remoteId, call);
        try {
            connection.sendRPCRequest(call);
        } catch (IOException e) {
            e.printStackTrace();
        }
        synchronized (call){
            while (!call.done){
                try {
                    System.out.println("waiting for the complete..........");
                    call.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        return call.rpcResponse;
    }

    private Connection getConnection(ConnectionId remoteId, Call call){
        Connection connection;
        do {
            synchronized (connections) {
                connection = connections.get(remoteId);
                if (connection == null) {
                    connection = new Connection(remoteId);
                    connections.put(remoteId, connection);
                }
            }
        }while(!connection.addCall(call));
        System.out.println("create or already have connection in connections:" + connection.getName() + "--" + call.id + ",calls size:" + connection.calls.size());
        connection.setUpIOStreams();
        return connection;
    }

    public static int nextCallId(){
        return callIdCounter.getAndIncrement() & 0x7FFFFFFF;
    }

 

(6)测试,线程模拟服务接收请求并返回结果,这里请求和返回都是Invocation对象

public static void main(String[] args){
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                ServerSocket serverSocket;
                boolean flag = true;
                try {
                    serverSocket = new ServerSocket(8088);
                    while (flag) {
                        try {
                            sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Socket socket = serverSocket.accept();
                        System.out.println("accept socket at port:8088.............");
                        DataOutputStream out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
                        DataInputStream in = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
                        try {
                            Invocation invocation = new Invocation(Client.class.getMethod("call", Writable.class, Client.ConnectionId.class), new Object[]{});
                            System.out.println("prepare to read information from in of socket:<<<<<<<<<<<");
                            int id = in.readInt();
                            System.out.println("read int :" + id);
                            invocation.readFields(in);
                            System.out.println("read invocation :" + invocation);
                            out.writeInt(id);
                            invocation.write(out);
                            out.flush();
                            System.out.println("write procession is over>>>>>>>>>>>>>" + invocation);
                        } catch (NoSuchMethodException e) {
                            e.printStackTrace();
                        }
                        out.close();
                        in.close();
                        flag = false;
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }


            }
        });

        thread.start();


        Client.ConnectionId id = new Client.ConnectionId(new InetSocketAddress("127.0.0.1", 8088));

        Client client = new Client(Invocation.class);
        try {
            try {
                sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Invocation invocation = (Invocation)client.call(new Invocation(Client.class.getMethod("call", Writable.class, Client.ConnectionId.class), new Object[]{}), id);
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        }
    }

 (7)自定义的Invocation类

public class Invocation implements Writable {
    private String methodName;
    private Class<?>[] parameterClasses;
    private Object[] parameters;
    private Configuration configure;

    public Object[] getParameters() {
        return parameters;
    }

    public Class<?>[] getParamterClasses() {
        return parameterClasses;
    }

    public String getMethodName() {
        return methodName;
    }
    public Configuration getConfigure() {
        return configure;
    }

    public void setConfigure(Configuration configure) {
        this.configure = configure;
    }

    public Invocation(){

    }

    public Invocation(Method method, Object[] parameters){
        this.methodName = method.getName();
        this.parameterClasses = method.getParameterTypes();
        this.parameters = parameters;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        UTF8.writeString(dataOutput, methodName);
        for(int i = 0; i < parameters.length; i++) {
            ObjectWritable.writeObject(dataOutput, parameters[i], parameterClasses[i], this.configure, true);
        }
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        methodName = UTF8.readString(dataInput);
        parameterClasses = new Class[parameters.length];
        ObjectWritable objectWritable = new ObjectWritable();
        for (int i = 0; i < parameters.length; i++) {
            parameters[i] =
                    ObjectWritable.readObject(dataInput, objectWritable, this.configure);
            parameterClasses[i] = objectWritable.getDeclaredClass();
        }
    }
}

 

 

分享到:
评论

相关推荐

    Hadoop_RPC详细分析.doc

    Hadoop RPC Client 是 RPC 客户端的实现,它提供了对 RPC 服务端的访问接口。RPC Client 可以分为三个部分:Client 抽象、Client 实现和 Client.Request。 * Client 抽象:提供了对 RPC 服务端的访问接口。 * ...

    hadoop rpc实例

    在这个实例中,我们将深入探讨Hadoop RPC的工作原理、客户端(`hadoop_rcp_client`)与服务器端(`hadoop_rpc_server`)的角色以及它们之间的交互过程。 ### Hadoop RPC概述 Hadoop RPC是Hadoop框架中用于进程间...

    Hadoop client server通讯分析

    在Hadoop中,客户端(Client)负责提交任务、读写数据,而服务器端则包括NameNode、DataNode和TaskTracker等组件,它们处理客户端请求,管理数据存储和任务调度。 二、HDFS通信 1. 客户端与NameNode交互: 当...

    java操作hadoop的RPC,源码

    - `RPC.Client`:客户端的实现,用于建立连接并发送请求。 - `VersionedProtocol`:所有RPC服务需要实现的接口,包含了版本信息,用于兼容性检查。 5. **安全性**: - Hadoop RPC支持安全模式,可以通过Kerberos...

    hadooprpc机制&&将avro引入hadooprpc机制初探

    接收Call调用负责接收来自RPCClient的调用请求,编码成Call对象后放入到Call队列中。这一过程由Listener线程完成。具体步骤:处理Call调用负责处理Call队列中的每个调用请求,由Handler线程完成:交互过程如下图所示...

    Hadoop自己的Rpc框架使用Demo

    public class RpcClient { public static void main(String[] args) throws IOException, InterruptedException { InetSocketAddress address = new InetSocketAddress("localhost", 9999); MyService proxy = ...

    Hadoop的RPC通信程序

    本文将详细介绍如何使用Hadoop的RPC机制创建一个简单的协议接口、通信服务端和通信客户端程序。这不仅有助于理解分布式系统中的RPC协议,还能深入了解客户机与服务器之间的通信机制。 #### 二、创建RPC通信程序 ##...

    学习hadoop_源代码,RPC_部分

    Hadoop 的 RPC 机制类似于 Java 的 RMI(远程方法调用),都需要用户定义接口并在服务器端实现该接口。通过 `java.lang.reflect.Proxy` 类,客户端可以像调用本地方法一样调用远程服务。 ##### 3.2 设计决策 ...

    Hadoop源码分析(client部分)

    3. **Hadoop RPC通信机制**:Hadoop内部大量使用了RPC(Remote Procedure Call)机制来进行不同进程间的通信。这种机制允许在一台机器上运行的程序调用另一台机器上程序的功能,就像调用本地函数一样简单。 当用户...

    RPC应用的java实现

    Java中还有其他RPC实现,比如Hadoop的Hadoop RPC,它主要用于Hadoop生态系统中的进程间通信,以及Google的gRPC,它基于HTTP/2协议,支持protobuf进行高效的数据序列化,提供了更好的性能和扩展性。 总的来说,RPC是...

    hdfs源码.zip

    2.3 Hadoop RPC实现 63 2.3.1 RPC类实现 63 2.3.2 Client类实现 64 2.3.3 Server类实现 76 第3章 Namenode(名字节点) 88 3.1 文件系统树 88 3.1.1 INode相关类 89 3.1.2 Feature相关类 102 3.1.3 ...

    hadoop-jar.zip

    `hadoop-mapreduce-client-core-2.7.7.jar`是Hadoop MapReduce客户端的核心库,它主要实现了MapReduce编程模型。MapReduce是一种分布式计算模型,将大型任务分解成可并行执行的小任务——"map"阶段和"reduce"阶段。...

    Hadoop2.2.0集群安装

    ### Hadoop2.2.0集群安装:QJM实现HA及Hdfs-site配置详解 #### 一、Hadoop2.2.0完全分布式集群平台安装设置概述 在深入探讨Hadoop2.2.0的安装过程之前,我们需要了解Hadoop的基本架构以及其核心组件——HDFS...

    hadoop源码阅读总结

    #### 四、IPC/RPCClient工作原理 客户端采用代理模式实现RPC调用,主要步骤如下: 1. **代理对象创建**: - 调用`RPC.getProxy`方法创建代理对象。 - 实际返回的对象实现了所需接口,并重写了接口中的方法。 2....

    大数据处理方法和技术实验一-RPC和反射机制应用.docx

    1. **实现本地Hadoop RPC调用** - 创建两个项目:一个是服务器端,提供一个可调用的方法;另一个是客户端,用于调用服务器端提供的方法。 - 服务器端: - 设计并实现一个服务接口及其实现类,提供一个特定的功能...

    java集成hadoop-hbase用到的jar包

    3. `hadoop-client-*.jar`:包含了Hadoop所有客户端模块,便于一次性导入所有依赖。 其次,对于HBase,你需要以下jar包: 1. `hbase-client-*.jar`:提供HBase客户端API,用于连接HBase服务,执行CRUD操作。 2. `...

    Java如何实现简单的RPC框架

    RpcClient rpcClient = new RpcClient(); HelloService helloService = rpcClient.getProxy(HelloService.class); String result = helloService.sayHi("John"); System.out.println(result); } } ``` 实现一...

    hadoop源代码code归档整理

    主要关注`org.apache.hadoop.yarn`包,如`org.apache.hadoop.yarn.server.resourcemanager`和`org.apache.hadoop.yarn.client.api`。 5. 容错机制:Hadoop的容错机制是其稳定运行的重要保障。通过查看`org.apache....

Global site tag (gtag.js) - Google Analytics