`
standalone
  • 浏览: 611366 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

学习hadoop之基于protocol buffers的 RPC

阅读更多
现在版本的hadoop各种server、client RPC端通信协议的实现是基于google的protocol buffers的,如果对这个不熟悉,读code的时候会比较痛苦一些,所以花了些时间学习了一下,然后仿照写了个比较简单的例子,麻雀虽小,五脏俱全,看懂了我这个或许对你读hadoop的code有帮助! :)

我现在实现一个简单的server-client方式的calculator,client将计算请求序列化成protocol buffers形式然后发给server端,server端反序列化后将完成计算然后将结果序列化后返回给client端。

先看一下最后整体的package结构(模仿hadoop的包命名,便于比较)

package org.tao.pbtest.api:
org.tao.pbtest.api.Calculator
org.tao.pbtest.api.CalculatorPB
org.tao.pbtest.api.CalculatorPBServiceImpl

package org.tao.pbtest.server.business
org.tao.pbtest.server.business.CalculatorService

package org.tao.pbtest.ipc
org.tao.pbtest.ipc.Server

package org.tao.pbtest.proto
org.tao.pbtest.proto.Calculator
org.tao.pbtest.proto.CalculatorMsg

package org.tao.pbtest.proto.test
org.tao.pbtest.proto.test.TestCalculator


  • step 1:


  • 首先看一下Calculator这个接口:
    package org.tao.pbtest.api;
    
    public interface Calculator {
       public int add(int a, int b);
       public int minus(int a, int b);
    }
    


    这个计算器就进行简单的两种运算,两个整数的加减。

  • step 2:

  • 然后定义两个proto文件:CalculatorMsg.proto和Calculator.proto。

    第一个是运算的参数消息、返回结果消息,输入时两个整数,返回结果是一个整数。具体protocol buffers的语法此处不做解释了,可以参看google的文档。

    option java_package = "org.tao.pbtest.proto";
    option java_outer_classname = "CalculatorMsg";
    option  java_generic_services = true;
    option java_generate_equals_and_hash = true;
    
    message RequestProto {
       required string methodName = 1;
       required int32 num1 = 2;
       required int32 num2 = 3;
    }
    
    message ResponseProto {
       required int32 result = 1;
    }
    


    第二个proto文件定义service:
    option java_package = "org.tao.pbtest.proto";
    option java_outer_classname = "Calculator";
    option java_generic_service = true;
    option java_generate_equals_and_hash = true;
    
    import "CalculatorMsg.proto"
    
    service CalculatorService {
       rpc add(RequestProto) returns (ResponseProto);
       rpc minus(RequestProto) returns (ResponseProto);
    }
    


    然后用protoc将此两个文件编译,生成两个java文件:

    org.tao.pbtest.proto.Calculator
    org.tao.pbtest.proto.CalculatorMsg

  • step 3:

  • 然后定义一个CalculatorPB接口extends刚才生成的org.tao.pbtest.proto.Calculator.CalculatorService.BlockingInterface, 这是一个过渡作用的接口。

    package org.tao.pbtest.server.api;
    
    import org.tao.pbtest.proto.Calculator.CalculatorService.BlockingService;
    
    public interface CalculatorPB extends BlockingInterface {
    }
    
    


  • step 4:

  • 还需要一个发送、接受信息的ipc server/client端。这里偷懒只实现一个最最简单的server端,什么并发啊,异常处理啊,nio啊统统不考虑,因为这不是重点。

    package org.tao.pbtest.ipc;
    
    import java.io.DataInputStream;
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.net.*;
    import com.google.protobuf.*;
    import com.google.protobuf.Descriptors.MethodDescriptor;
    import org.tao.pbtest.proto.CalculatorMsg.RequestProto;
    import org.tao.pbtest.proto.CalculatorMsg.ResponseProto;
    
    public class Server extends Thread {
       private Class<?> protocol;
       private BlockingService impl;
       private int port;
       private ServerSocket ss;
    
       public Server(Class<?> protocol, BlockingService protocolImpl, int port){
          this.protocol = protocol;
          this.impl = protocolImpl; 
          this.port = port;
       }
    
       public void run(){
          Socket clientSocket = null;
          DataOutputStream dos = null;
          DataInputStream dis = null;
          try {
               ss = new ServerSocket(port);
           }catch(IOException e){
           }    
           int testCount = 10; //进行10次计算后就退出
    
           while(testCount-- > 0){
              try {
                   clientSocket = ss.accept();
                   dos = new DataOutputStream(clientSocket.getOutputStream());
                   dis = new DataInputStream(clientSocket.getInputStream());
                   int dataLen = dis.readInt();
                   byte[] dataBuffer = new byte[dataLen];
                   int readCount = dis.read(dataBuffer);
                   byte[] result = processOneRpc(dataBuffer);
    
                   dos.writeInt(result.length);
                   dos.write(result);
                   dos.flush();
               }catch(Exception e){
               }
           }
           try { 
               dos.close();
               dis.close();
               ss.close();
           }catch(Exception e){
           };
    
       }
    
       public byte[] processOneRpc (byte[] data) throws Exception {
          RequestProto request = RequestProto.parseFrom(data);
          String methodName = request.getMethodName();
          MethodDescriptor methodDescriptor = impl.getDescriptorForType().findMethodByName(methodName);
          Message response = impl.callBlockingMethod(methodDescriptor, null, request);
          return response.toByteArray();
       }
    }
    
      

  • step 5:
  • CalculatorServer.java,实现计算器服务的类,此类依赖ipc Server接受请求并处理计算请求,注意到其自身实现了Calculator接口,本质上的计算是由其来完成的。也就是,Server接受客户端请求要执行方法M,Server对象里有实现了CalculatorPB接口的对象A,那么请求就交给A处理(A其实是CalculatorPBServiceImpl类的对象,此类后面介绍),此时A对应的M方法的参数是pb的形式,另外A对象里其实包含对CalculatorService的一个引用,所以在A的M方法里,先对参数反序列化,然后将参数交给CalculatorService处理。

    package org.tao.pbtest.server.business;
    
    import java.lang.reflect.Constructor;
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    
    import org.tao.pbtest.ipc.Server;
    import org.tao.pbtest.server.api.Calculator;
    
    import com.google.protobuf.BlockingService;
    
    public class CalculatorService implements Calculator {    
        
        private Server server = null;
        private final Class protocol = Calculator.class;
        private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        private final String protoPackage = "org.tao.pbtest.proto";
        private final String host = "localhost";
        private final int port = 8038;
        
        public CalculatorService (){
            
        }
        
        @Override
        public int add(int a, int b) {
            // TODO Auto-generated method stub
            return a+b;
        }
    
        public int minus(int a, int b){
            return a-b;
        }
        
        
        public void init(){
            createServer();        
        }
        
        
        /*
         * return org.tao.pbtest.server.api.CalculatorPBServiceImpl
         */
        public Class<?> getPbServiceImplClass(){
            String packageName = protocol.getPackage().getName();
            String className = protocol.getSimpleName();
            String pbServiceImplName =  packageName + "." + className +  "PBServiceImpl";        
            Class<?> clazz = null;
            try{
                clazz = Class.forName(pbServiceImplName, true, classLoader);
            }catch(ClassNotFoundException e){
                System.err.println(e.toString());
            }
            return clazz;
        }
        
        /*
         * return org.tao.pbtest.proto.Calculator$CalculatorService
         */
        public Class<?> getProtoClass(){
            String className = protocol.getSimpleName();
            String protoClazzName =  protoPackage + "." + className + "$" + className + "Service";
            Class<?> clazz = null;
            try{
                clazz = Class.forName(protoClazzName, true, classLoader);
            }catch(ClassNotFoundException e){
                System.err.println(e.toString());
            }
            return clazz;
        }
        
        public void createServer(){
            Class<?> pbServiceImpl = getPbServiceImplClass();
            Constructor<?> constructor = null;
            try{
                constructor = pbServiceImpl.getConstructor(protocol);
                constructor.setAccessible(true);
            }catch(NoSuchMethodException e){
                System.err.print(e.toString());
            }
            
            Object service = null;  // instance of CalculatorPBServiceImpl
            try {
                service = constructor.newInstance(this);
            }catch(InstantiationException e){
            } catch (IllegalArgumentException e) {
            } catch (IllegalAccessException e) {
            } catch (InvocationTargetException e) {
            }
            
            /*
             * interface: org.tao.pbtest.server.CalculatorPB
             */
            Class<?> pbProtocol = service.getClass().getInterfaces()[0];
                    
            /*
             * class: org.tao.pbtest.proto.Calculator$CalculatorService
             */
            Class<?> protoClazz = getProtoClass();
            
            Method method = null;
            try {
    
                // pbProtocol.getInterfaces()[] 即是接口 org.tao.pbtest.proto.Calculator$CalculatorService$BlockingInterface
    
                method = protoClazz.getMethod("newReflectiveBlockingService", pbProtocol.getInterfaces()[0]);
                method.setAccessible(true);
            }catch(NoSuchMethodException e){
                System.err.print(e.toString());
            }
            
            try{
                createServer(pbProtocol, (BlockingService)method.invoke(null, service));
            }catch(InvocationTargetException e){
            } catch (IllegalArgumentException e) {
            } catch (IllegalAccessException e) {
            }
            
        }
        
        public void createServer(Class pbProtocol, BlockingService service){
            server = new Server(pbProtocol, service, port);
            server.start();
        }
        
        public static void main(String[] args){
            CalculatorService cs = new CalculatorService();
            cs.init();
        }
    }
    
    


  • step 6:
  • 刚才提到的PB格式跟最终实现的桥梁类:CalculatorPBServiceImpl

    package org.tao.pbtest.server.api;
    
    import org.tao.pbtest.proto.CalculatorMsg.RequestProto;
    import org.tao.pbtest.proto.CalculatorMsg.ResponseProto;
    
    import com.google.protobuf.RpcController;
    import com.google.protobuf.ServiceException;
    
    public class CalculatorPBServiceImpl implements CalculatorPB {
    
        public Calculator real;
        
        public CalculatorPBServiceImpl(Calculator impl){
            this.real = impl;
        }
        
        @Override
        public ResponseProto add(RpcController controller, RequestProto request) throws ServiceException {
            // TODO Auto-generated method stub
            ResponseProto proto = ResponseProto.getDefaultInstance();
            ResponseProto.Builder build = ResponseProto.newBuilder();
            int add1 = request.getNum1();
            int add2 = request.getNum2();
            int sum = real.add(add1, add2);
            ResponseProto result = null;
            build.setResult(sum);
            result = build.build();
            return result;
        }
    
        @Override
        public ResponseProto minus(RpcController controller, RequestProto request) throws ServiceException {
            // TODO Auto-generated method stub
            ResponseProto proto = ResponseProto.getDefaultInstance();
            ResponseProto.Builder build = ResponseProto.newBuilder();
            int add1 = request.getNum1();
            int add2 = request.getNum2();
            int sum = real.minus(add1, add2);
            ResponseProto result = null;
            build.setResult(sum);
            result = build.build();
            return result;
        }
    
    }
    
    



  • step 7:

  • 最后,偷懒没写客户端的东西,只是写了一个简单的测试例子:
    package org.tao.pbtest.proto.test;
    
    import java.io.DataInputStream;
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.net.Socket;
    import java.util.Random;
    
    import org.tao.pbtest.proto.CalculatorMsg.RequestProto;
    import org.tao.pbtest.proto.CalculatorMsg.ResponseProto;
    import org.tao.pbtest.server.api.Calculator;
    
    public class TestCalculator implements Calculator {
    
        public int doTest(String op, int a, int b){
            // TODO Auto-generated method stub
            Socket s = null;
            DataOutputStream out = null;
            DataInputStream in = null;
            int ret = 0;
            try {
                s= new Socket("localhost", 8038);
                out = new DataOutputStream(s.getOutputStream());
                in = new DataInputStream(s.getInputStream());
                
                RequestProto.Builder builder = RequestProto.newBuilder();
                builder.setMethodName(op);
                builder.setNum1(a);
                builder.setNum2(b);
                RequestProto request = builder.build();
                
                byte [] bytes = request.toByteArray();
                out.writeInt(bytes.length);
                out.write(bytes);
                out.flush();
                
                int dataLen = in.readInt();
                byte[] data = new byte[dataLen];
                int count = in.read(data);
                if(count != dataLen){
                    System.err.println("something bad happened!");
                }
                
                ResponseProto result = ResponseProto.parseFrom(data);
                System.out.println(a + " " + op + " " +  b + "=" + result.getResult());            
                ret =  result.getResult();
                
            }catch(Exception e){
                e.printStackTrace();
                System.err.println(e.toString());
            }finally {
                try{
                in.close();
                out.close();
                s.close();
                }catch(IOException e){
                    e.printStackTrace();
                }
            }
            return ret;
        }
        @Override
        public int add(int a, int b) {
            // TODO Auto-generated method stub
            return doTest("add", a, b);
        }
    
        @Override
        public int minus(int a, int b) {
            // TODO Auto-generated method stub
            return doTest("minus", a, b);
        }
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            // TODO Auto-generated method stub
            TestCalculator tc = new TestCalculator();
            int testCount = 5;
            Random rand = new Random();
            while(testCount-- > 0){
                int a = rand.nextInt(100);
                int b = rand.nextInt(100);
                tc.add(a,b);
                tc.minus(a, b);
            }        
            
        }
    
    }
    
    


    输出:

    76 add 14=90
    76 minus 14=62
    20 add 84=104
    20 minus 84=-64
    4 add 16=20
    4 minus 16=-12
    56 add 4=60
    56 minus 4=52
    46 add 50=96
    46 minus 50=-4
    4
    0
    分享到:
    评论
    发表评论

    文章已被作者锁定,不允许评论。

    相关推荐

      Hadoop RPC机制分析

      在Hadoop中,远程过程调用(RPC)是核心组件之一,它使得节点间的通信变得高效且可靠。本文将深入探讨Hadoop的RPC机制,解析其工作原理,并结合源码分析其内部实现。 一、RPC简介 RPC是一种让程序能够调用运行在...

      java_RPC_hadoop.zip

      2. **Hadoop RPC**:Hadoop的RPC系统基于Google的Protocol Buffers,它提供了一种高效的二进制序列化方式。在Hadoop中,每个RPC服务都有一个特定的协议定义,包含了服务接口、方法签名和数据类型。服务端注册服务,...

      java操作hadoop的RPC,源码

      - `ProtobufRpcEngine`:Hadoop使用Google的Protocol Buffers进行序列化和反序列化,此引擎负责处理这些操作。 - `RPC.Server`:服务器端的实现,负责接收和处理客户端的请求。 - `RPC.Client`:客户端的实现,...

      hadoop中RPC协议的小测试例子(吴超老师)

      在IT行业中,分布式计算系统的重要性日益凸显,而Hadoop作为其中的佼佼者,其核心组件之一就是远程过程调用(RPC,Remote Procedure Call)。RPC允许一个程序在某个网络中的计算机上执行另一个计算机上的程序,而...

      Hadoop client server通讯分析

      Hadoop的RPC基于Java的Protocol Buffers,提供高效、灵活的序列化和反序列化能力,确保跨网络的数据交换效率。 五、安全通信与身份验证 Hadoop支持Kerberos等安全协议,用于确保客户端和服务端的身份验证和授权。...

      Hadoop序列化机制

      Hadoop提供了两种主要的序列化框架:Writable和Protocol Buffers,以及更现代的Apache Avro、Thrift和Kryo。 1. Writable接口:这是Hadoop最初提供的序列化机制,适用于Java对象。所有可序列化的类都需要实现...

      编译hadoop所需要的额外工具

      总之,Apache Ant、FindBugs和Protocol Buffers是Hadoop开发过程中不可或缺的工具,它们分别负责构建管理、静态代码分析和数据序列化,共同保证了Hadoop的稳定性和可靠性。了解并熟练使用这些工具,对于理解和开发...

      hadoop2.9.x源码编译工具包

      2. **ProtocolBuffer**:Google的Protocol Buffers(简称protobuf)是一种语言无关、平台无关的数据序列化协议。在Hadoop中,protobuf用于定义数据交换格式,特别是HDFS和MapReduce中的RPC通信。要编译Hadoop源码,...

      java集成hadoop-hbase用到的jar包

      Hadoop是一个分布式存储和计算框架,而HBase是一个构建在Hadoop之上的非关系型数据库(NoSQL),特别适合处理大规模数据。这里我们将详细探讨Java如何与这两个组件进行交互,并重点关注所需的jar包。 首先,Java...

      Hadoop源代码分析.zip

      此外,Hadoop还引入了更高效的Protocol Buffers和Avro等序列化框架,以提高性能和兼容性。 MapReduce和HDFS之间的通信涉及到多个组件,例如JobTracker(在Hadoop 1.x中)或Resource Manager(在Hadoop 2.x的YARN中...

      Hadoop技术内幕 深入解析HADOOP COMMON和HDFS架构设计与实现原理

      1. **网络通信库**:如Apache Avro和Protocol Buffers,它们提供了高效的序列化和远程过程调用(RPC)机制,使得Hadoop中的节点间能高效地交换数据。 2. **配置管理**:通过Hadoop配置文件,用户可以定制Hadoop集群...

      《Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理 》的源代码

      1. **网络通信**:如Socket通信、Netty库的使用,以及RPC(远程过程调用)协议的实现,如Avro或Protocol Buffers用于服务间的通信。 2. **I/O处理**:Hadoop的BlockInputStream和BlockOutputStream类,用于高效读写...

      protobuf-2.5.0,Linux版。

      在Hadoop源码中,protobuf主要用于定义各种RPC(远程过程调用)接口和数据交换格式,例如HDFS的BlockProtos、NameNode的Protocol Buffers等。通过protobuf,开发者可以清晰地定义服务接口和数据结构,而无需关心底层...

      protocol Buffer

      Protocol Buffers(简称protobuf)是由谷歌公司开发的一种高效、跨平台的数据序列化协议。它允许开发者定义数据结构,然后将这些数据结构转换成二进制格式进行存储或在网络上传输。这种格式不仅紧凑,而且解析速度快...

      通用大数据存储与分析处理平台_Hadoop.docx

      Hadoop生态系统还包括其他工具和系统,如Accumulo(安全的、分布式的键值存储系统),Dremel(用于交互式分析的系统),Drill(用于SQL查询的系统),Tez(更高效的MapReduce替代),Impala(实时查询系统),以及...

      RPC调用框架比较分析

      1. Protobuf RPC(Protocol Buffers Remote Procedure Call) - **Protobuf** 是Google开发的一种数据序列化协议,可以将结构化数据序列化,可用于数据存储、通信协议等方面。 - **protobuf-2.5.0.tar.bz2** 是一...

      protobuf2.5.0源码及win32文件

      标题 "protobuf2.5.0源码及win32文件" 涉及的主要知识点是Protocol Buffers(protobuf),这是Google开发的一种数据序列化协议,用于结构化数据的存储和交换。protobuf提供了一种高效、灵活且易于使用的机制,允许...

      protobuf-2.5.0.tar.gz

      标题中的"protobuf-2.5.0.tar.gz"是一个压缩包文件,其中包含了Protocol Buffers(简称protobuf)的源代码,版本为2.5.0。protobuf是Google开发的一种数据序列化协议,常用于结构化数据的存储和通信,它可以将复杂的...

      大数据课程体系.docx

      6. **Google Protobuf**:Hadoop底层数据交换可能涉及Protocol Buffers,一种高效的序列化协议,用于跨平台的数据传输。 7. **数据库与数据处理**:HBase是一个分布式、面向列的NoSQL数据库,适合处理大数据。Hive...

    Global site tag (gtag.js) - Google Analytics