- 浏览: 614039 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
月光杯:
问题解决了吗?
Exceptions in HDFS -
iostreamin:
神,好厉害,这是我找到的唯一可以ac的Java代码,厉害。
[leetcode] word ladder II -
standalone:
One answer I agree with:引用Whene ...
How many string objects are created? -
DiaoCow:
不错!,一开始对这些确实容易犯迷糊
erlang中的冒号 分号 和 句号 -
standalone:
Exception in thread "main& ...
one java interview question
现在版本的hadoop各种server、client RPC端通信协议的实现是基于google的protocol buffers的,如果对这个不熟悉,读code的时候会比较痛苦一些,所以花了些时间学习了一下,然后仿照写了个比较简单的例子,麻雀虽小,五脏俱全,看懂了我这个或许对你读hadoop的code有帮助! :)
我现在实现一个简单的server-client方式的calculator,client将计算请求序列化成protocol buffers形式然后发给server端,server端反序列化后将完成计算然后将结果序列化后返回给client端。
先看一下最后整体的package结构(模仿hadoop的包命名,便于比较)
package org.tao.pbtest.api:
org.tao.pbtest.api.Calculator
org.tao.pbtest.api.CalculatorPB
org.tao.pbtest.api.CalculatorPBServiceImpl
package org.tao.pbtest.server.business
org.tao.pbtest.server.business.CalculatorService
package org.tao.pbtest.ipc
org.tao.pbtest.ipc.Server
package org.tao.pbtest.proto
org.tao.pbtest.proto.Calculator
org.tao.pbtest.proto.CalculatorMsg
package org.tao.pbtest.proto.test
org.tao.pbtest.proto.test.TestCalculator
step 1:
首先看一下Calculator这个接口:
这个计算器就进行简单的两种运算,两个整数的加减。
step 2:
然后定义两个proto文件:CalculatorMsg.proto和Calculator.proto。
第一个是运算的参数消息、返回结果消息,输入时两个整数,返回结果是一个整数。具体protocol buffers的语法此处不做解释了,可以参看google的文档。
第二个proto文件定义service:
然后用protoc将此两个文件编译,生成两个java文件:
org.tao.pbtest.proto.Calculator
org.tao.pbtest.proto.CalculatorMsg
step 3:
然后定义一个CalculatorPB接口extends刚才生成的org.tao.pbtest.proto.Calculator.CalculatorService.BlockingInterface, 这是一个过渡作用的接口。
step 4:
还需要一个发送、接受信息的ipc server/client端。这里偷懒只实现一个最最简单的server端,什么并发啊,异常处理啊,nio啊统统不考虑,因为这不是重点。
step 5:
CalculatorServer.java,实现计算器服务的类,此类依赖ipc Server接受请求并处理计算请求,注意到其自身实现了Calculator接口,本质上的计算是由其来完成的。也就是,Server接受客户端请求要执行方法M,Server对象里有实现了CalculatorPB接口的对象A,那么请求就交给A处理(A其实是CalculatorPBServiceImpl类的对象,此类后面介绍),此时A对应的M方法的参数是pb的形式,另外A对象里其实包含对CalculatorService的一个引用,所以在A的M方法里,先对参数反序列化,然后将参数交给CalculatorService处理。
step 6:
刚才提到的PB格式跟最终实现的桥梁类:CalculatorPBServiceImpl
step 7:
最后,偷懒没写客户端的东西,只是写了一个简单的测试例子:
输出:
76 add 14=90
76 minus 14=62
20 add 84=104
20 minus 84=-64
4 add 16=20
4 minus 16=-12
56 add 4=60
56 minus 4=52
46 add 50=96
46 minus 50=-4
我现在实现一个简单的server-client方式的calculator,client将计算请求序列化成protocol buffers形式然后发给server端,server端反序列化后将完成计算然后将结果序列化后返回给client端。
先看一下最后整体的package结构(模仿hadoop的包命名,便于比较)
package org.tao.pbtest.api:
org.tao.pbtest.api.Calculator
org.tao.pbtest.api.CalculatorPB
org.tao.pbtest.api.CalculatorPBServiceImpl
package org.tao.pbtest.server.business
org.tao.pbtest.server.business.CalculatorService
package org.tao.pbtest.ipc
org.tao.pbtest.ipc.Server
package org.tao.pbtest.proto
org.tao.pbtest.proto.Calculator
org.tao.pbtest.proto.CalculatorMsg
package org.tao.pbtest.proto.test
org.tao.pbtest.proto.test.TestCalculator
首先看一下Calculator这个接口:
package org.tao.pbtest.api; public interface Calculator { public int add(int a, int b); public int minus(int a, int b); }
这个计算器就进行简单的两种运算,两个整数的加减。
然后定义两个proto文件:CalculatorMsg.proto和Calculator.proto。
第一个是运算的参数消息、返回结果消息,输入时两个整数,返回结果是一个整数。具体protocol buffers的语法此处不做解释了,可以参看google的文档。
option java_package = "org.tao.pbtest.proto"; option java_outer_classname = "CalculatorMsg"; option java_generic_services = true; option java_generate_equals_and_hash = true; message RequestProto { required string methodName = 1; required int32 num1 = 2; required int32 num2 = 3; } message ResponseProto { required int32 result = 1; }
第二个proto文件定义service:
option java_package = "org.tao.pbtest.proto"; option java_outer_classname = "Calculator"; option java_generic_service = true; option java_generate_equals_and_hash = true; import "CalculatorMsg.proto" service CalculatorService { rpc add(RequestProto) returns (ResponseProto); rpc minus(RequestProto) returns (ResponseProto); }
然后用protoc将此两个文件编译,生成两个java文件:
org.tao.pbtest.proto.Calculator
org.tao.pbtest.proto.CalculatorMsg
然后定义一个CalculatorPB接口extends刚才生成的org.tao.pbtest.proto.Calculator.CalculatorService.BlockingInterface, 这是一个过渡作用的接口。
package org.tao.pbtest.server.api; import org.tao.pbtest.proto.Calculator.CalculatorService.BlockingService; public interface CalculatorPB extends BlockingInterface { }
还需要一个发送、接受信息的ipc server/client端。这里偷懒只实现一个最最简单的server端,什么并发啊,异常处理啊,nio啊统统不考虑,因为这不是重点。
package org.tao.pbtest.ipc; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.*; import com.google.protobuf.*; import com.google.protobuf.Descriptors.MethodDescriptor; import org.tao.pbtest.proto.CalculatorMsg.RequestProto; import org.tao.pbtest.proto.CalculatorMsg.ResponseProto; public class Server extends Thread { private Class<?> protocol; private BlockingService impl; private int port; private ServerSocket ss; public Server(Class<?> protocol, BlockingService protocolImpl, int port){ this.protocol = protocol; this.impl = protocolImpl; this.port = port; } public void run(){ Socket clientSocket = null; DataOutputStream dos = null; DataInputStream dis = null; try { ss = new ServerSocket(port); }catch(IOException e){ } int testCount = 10; //进行10次计算后就退出 while(testCount-- > 0){ try { clientSocket = ss.accept(); dos = new DataOutputStream(clientSocket.getOutputStream()); dis = new DataInputStream(clientSocket.getInputStream()); int dataLen = dis.readInt(); byte[] dataBuffer = new byte[dataLen]; int readCount = dis.read(dataBuffer); byte[] result = processOneRpc(dataBuffer); dos.writeInt(result.length); dos.write(result); dos.flush(); }catch(Exception e){ } } try { dos.close(); dis.close(); ss.close(); }catch(Exception e){ }; } public byte[] processOneRpc (byte[] data) throws Exception { RequestProto request = RequestProto.parseFrom(data); String methodName = request.getMethodName(); MethodDescriptor methodDescriptor = impl.getDescriptorForType().findMethodByName(methodName); Message response = impl.callBlockingMethod(methodDescriptor, null, request); return response.toByteArray(); } }
package org.tao.pbtest.server.business; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import org.tao.pbtest.ipc.Server; import org.tao.pbtest.server.api.Calculator; import com.google.protobuf.BlockingService; public class CalculatorService implements Calculator { private Server server = null; private final Class protocol = Calculator.class; private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); private final String protoPackage = "org.tao.pbtest.proto"; private final String host = "localhost"; private final int port = 8038; public CalculatorService (){ } @Override public int add(int a, int b) { // TODO Auto-generated method stub return a+b; } public int minus(int a, int b){ return a-b; } public void init(){ createServer(); } /* * return org.tao.pbtest.server.api.CalculatorPBServiceImpl */ public Class<?> getPbServiceImplClass(){ String packageName = protocol.getPackage().getName(); String className = protocol.getSimpleName(); String pbServiceImplName = packageName + "." + className + "PBServiceImpl"; Class<?> clazz = null; try{ clazz = Class.forName(pbServiceImplName, true, classLoader); }catch(ClassNotFoundException e){ System.err.println(e.toString()); } return clazz; } /* * return org.tao.pbtest.proto.Calculator$CalculatorService */ public Class<?> getProtoClass(){ String className = protocol.getSimpleName(); String protoClazzName = protoPackage + "." + className + "$" + className + "Service"; Class<?> clazz = null; try{ clazz = Class.forName(protoClazzName, true, classLoader); }catch(ClassNotFoundException e){ System.err.println(e.toString()); } return clazz; } public void createServer(){ Class<?> pbServiceImpl = getPbServiceImplClass(); Constructor<?> constructor = null; try{ constructor = pbServiceImpl.getConstructor(protocol); constructor.setAccessible(true); }catch(NoSuchMethodException e){ System.err.print(e.toString()); } Object service = null; // instance of CalculatorPBServiceImpl try { service = constructor.newInstance(this); }catch(InstantiationException e){ } catch (IllegalArgumentException e) { } catch (IllegalAccessException e) { } catch (InvocationTargetException e) { } /* * interface: org.tao.pbtest.server.CalculatorPB */ Class<?> pbProtocol = service.getClass().getInterfaces()[0]; /* * class: org.tao.pbtest.proto.Calculator$CalculatorService */ Class<?> protoClazz = getProtoClass(); Method method = null; try { // pbProtocol.getInterfaces()[] 即是接口 org.tao.pbtest.proto.Calculator$CalculatorService$BlockingInterface method = protoClazz.getMethod("newReflectiveBlockingService", pbProtocol.getInterfaces()[0]); method.setAccessible(true); }catch(NoSuchMethodException e){ System.err.print(e.toString()); } try{ createServer(pbProtocol, (BlockingService)method.invoke(null, service)); }catch(InvocationTargetException e){ } catch (IllegalArgumentException e) { } catch (IllegalAccessException e) { } } public void createServer(Class pbProtocol, BlockingService service){ server = new Server(pbProtocol, service, port); server.start(); } public static void main(String[] args){ CalculatorService cs = new CalculatorService(); cs.init(); } }
package org.tao.pbtest.server.api; import org.tao.pbtest.proto.CalculatorMsg.RequestProto; import org.tao.pbtest.proto.CalculatorMsg.ResponseProto; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; public class CalculatorPBServiceImpl implements CalculatorPB { public Calculator real; public CalculatorPBServiceImpl(Calculator impl){ this.real = impl; } @Override public ResponseProto add(RpcController controller, RequestProto request) throws ServiceException { // TODO Auto-generated method stub ResponseProto proto = ResponseProto.getDefaultInstance(); ResponseProto.Builder build = ResponseProto.newBuilder(); int add1 = request.getNum1(); int add2 = request.getNum2(); int sum = real.add(add1, add2); ResponseProto result = null; build.setResult(sum); result = build.build(); return result; } @Override public ResponseProto minus(RpcController controller, RequestProto request) throws ServiceException { // TODO Auto-generated method stub ResponseProto proto = ResponseProto.getDefaultInstance(); ResponseProto.Builder build = ResponseProto.newBuilder(); int add1 = request.getNum1(); int add2 = request.getNum2(); int sum = real.minus(add1, add2); ResponseProto result = null; build.setResult(sum); result = build.build(); return result; } }
最后,偷懒没写客户端的东西,只是写了一个简单的测试例子:
package org.tao.pbtest.proto.test; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; import java.util.Random; import org.tao.pbtest.proto.CalculatorMsg.RequestProto; import org.tao.pbtest.proto.CalculatorMsg.ResponseProto; import org.tao.pbtest.server.api.Calculator; public class TestCalculator implements Calculator { public int doTest(String op, int a, int b){ // TODO Auto-generated method stub Socket s = null; DataOutputStream out = null; DataInputStream in = null; int ret = 0; try { s= new Socket("localhost", 8038); out = new DataOutputStream(s.getOutputStream()); in = new DataInputStream(s.getInputStream()); RequestProto.Builder builder = RequestProto.newBuilder(); builder.setMethodName(op); builder.setNum1(a); builder.setNum2(b); RequestProto request = builder.build(); byte [] bytes = request.toByteArray(); out.writeInt(bytes.length); out.write(bytes); out.flush(); int dataLen = in.readInt(); byte[] data = new byte[dataLen]; int count = in.read(data); if(count != dataLen){ System.err.println("something bad happened!"); } ResponseProto result = ResponseProto.parseFrom(data); System.out.println(a + " " + op + " " + b + "=" + result.getResult()); ret = result.getResult(); }catch(Exception e){ e.printStackTrace(); System.err.println(e.toString()); }finally { try{ in.close(); out.close(); s.close(); }catch(IOException e){ e.printStackTrace(); } } return ret; } @Override public int add(int a, int b) { // TODO Auto-generated method stub return doTest("add", a, b); } @Override public int minus(int a, int b) { // TODO Auto-generated method stub return doTest("minus", a, b); } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub TestCalculator tc = new TestCalculator(); int testCount = 5; Random rand = new Random(); while(testCount-- > 0){ int a = rand.nextInt(100); int b = rand.nextInt(100); tc.add(a,b); tc.minus(a, b); } } }
输出:
76 add 14=90
76 minus 14=62
20 add 84=104
20 minus 84=-64
4 add 16=20
4 minus 16=-12
56 add 4=60
56 minus 4=52
46 add 50=96
46 minus 50=-4
发表评论
文章已被作者锁定,不允许评论。
-
hadoop-2.2.0 build failure due to missing dependancy
2014-01-06 13:18 754The bug and fix is at https://i ... -
HDFS中租约管理源代码分析
2013-07-05 18:05 0HDFS中Client写文件的时候要获得一个租约,用来保证Cl ... -
Question on HBase source code
2013-05-22 15:05 1117I'm reading source code of hbas ... -
Using the libjars option with Hadoop
2013-05-20 15:03 969As I have said in my last post, ... -
学习hadoop之基于protocol buffers的 RPC
2012-11-15 22:59 2现在版本的hadoop各种server、client RPC端 ... -
Hadoop RPC 一问
2012-11-14 14:43 121看代码时候发现好像有个地方做得多余,不知道改一下会不会有好处, ... -
Hadoop Version Graph
2012-11-14 11:47 930可以到这里看全文: http://cloudblog.8km ... -
Hadoop 2.0 代码分析---MapReduce
2012-10-25 18:27 7094本文参考hadoop的版本: hadoop-2.0.1-alp ... -
how to study hadoop?
2012-04-27 15:34 1533From StackOverflow http://stack ... -
首相发怒记之hadoop篇
2012-03-23 12:14 794我在youtube上看到的,某位能翻*墙的看一下吧,挺好笑的。 ... -
一个HDFS Error
2011-06-11 21:53 1534ERROR: hdfs.DFSClient: Excep ... -
hadoop cluster at ebay
2011-06-11 21:39 1161Friday, December 17, 2010Hadoop ... -
[转]hadoop at ebay
2011-06-11 21:09 1198http://www.ebaytechblog.com/201 ... -
【读书笔记】Data warehousing and analytics infrastructure at facebook
2011-03-18 22:03 1953这好像是sigmod2010上的paper。 读了之后做了以 ... -
impact of total region numbers?
2011-01-19 16:31 933这几天tune了hbase的几个参数,有些有意思的结果。具体看 ... -
Will all HFiles managed by a regionserver kept open
2011-01-19 10:29 1486code 没看仔细,所以在hbase 的mail list上面 ... -
problems in building hadoop
2010-12-22 10:28 1010When I try to modify some code ... -
HDFS scalability: the limits to growth
2010-11-30 12:52 2046Abstract: The Hadoop Distr ... -
hadoop-0.20.2+737 and hbase-0.20.6 not compatible?
2010-11-11 13:28 1319master log里面发现 0 region server ... -
Implementing WebGIS on Hadoop: A Case Study of Improving Small File IO Performan
2010-07-26 22:46 1678Implementing WebGIS on Hadoo ...
相关推荐
在Hadoop中,远程过程调用(RPC)是核心组件之一,它使得节点间的通信变得高效且可靠。本文将深入探讨Hadoop的RPC机制,解析其工作原理,并结合源码分析其内部实现。 一、RPC简介 RPC是一种让程序能够调用运行在...
2. **Hadoop RPC**:Hadoop的RPC系统基于Google的Protocol Buffers,它提供了一种高效的二进制序列化方式。在Hadoop中,每个RPC服务都有一个特定的协议定义,包含了服务接口、方法签名和数据类型。服务端注册服务,...
- `ProtobufRpcEngine`:Hadoop使用Google的Protocol Buffers进行序列化和反序列化,此引擎负责处理这些操作。 - `RPC.Server`:服务器端的实现,负责接收和处理客户端的请求。 - `RPC.Client`:客户端的实现,...
在IT行业中,分布式计算系统的重要性日益凸显,而Hadoop作为其中的佼佼者,其核心组件之一就是远程过程调用(RPC,Remote Procedure Call)。RPC允许一个程序在某个网络中的计算机上执行另一个计算机上的程序,而...
Hadoop的RPC基于Java的Protocol Buffers,提供高效、灵活的序列化和反序列化能力,确保跨网络的数据交换效率。 五、安全通信与身份验证 Hadoop支持Kerberos等安全协议,用于确保客户端和服务端的身份验证和授权。...
Hadoop提供了两种主要的序列化框架:Writable和Protocol Buffers,以及更现代的Apache Avro、Thrift和Kryo。 1. Writable接口:这是Hadoop最初提供的序列化机制,适用于Java对象。所有可序列化的类都需要实现...
总之,Apache Ant、FindBugs和Protocol Buffers是Hadoop开发过程中不可或缺的工具,它们分别负责构建管理、静态代码分析和数据序列化,共同保证了Hadoop的稳定性和可靠性。了解并熟练使用这些工具,对于理解和开发...
2. **ProtocolBuffer**:Google的Protocol Buffers(简称protobuf)是一种语言无关、平台无关的数据序列化协议。在Hadoop中,protobuf用于定义数据交换格式,特别是HDFS和MapReduce中的RPC通信。要编译Hadoop源码,...
Hadoop是一个分布式存储和计算框架,而HBase是一个构建在Hadoop之上的非关系型数据库(NoSQL),特别适合处理大规模数据。这里我们将详细探讨Java如何与这两个组件进行交互,并重点关注所需的jar包。 首先,Java...
此外,Hadoop还引入了更高效的Protocol Buffers和Avro等序列化框架,以提高性能和兼容性。 MapReduce和HDFS之间的通信涉及到多个组件,例如JobTracker(在Hadoop 1.x中)或Resource Manager(在Hadoop 2.x的YARN中...
1. **网络通信库**:如Apache Avro和Protocol Buffers,它们提供了高效的序列化和远程过程调用(RPC)机制,使得Hadoop中的节点间能高效地交换数据。 2. **配置管理**:通过Hadoop配置文件,用户可以定制Hadoop集群...
1. **网络通信**:如Socket通信、Netty库的使用,以及RPC(远程过程调用)协议的实现,如Avro或Protocol Buffers用于服务间的通信。 2. **I/O处理**:Hadoop的BlockInputStream和BlockOutputStream类,用于高效读写...
在Hadoop源码中,protobuf主要用于定义各种RPC(远程过程调用)接口和数据交换格式,例如HDFS的BlockProtos、NameNode的Protocol Buffers等。通过protobuf,开发者可以清晰地定义服务接口和数据结构,而无需关心底层...
Protocol Buffers(简称protobuf)是由谷歌公司开发的一种高效、跨平台的数据序列化协议。它允许开发者定义数据结构,然后将这些数据结构转换成二进制格式进行存储或在网络上传输。这种格式不仅紧凑,而且解析速度快...
Hadoop生态系统还包括其他工具和系统,如Accumulo(安全的、分布式的键值存储系统),Dremel(用于交互式分析的系统),Drill(用于SQL查询的系统),Tez(更高效的MapReduce替代),Impala(实时查询系统),以及...
1. Protobuf RPC(Protocol Buffers Remote Procedure Call) - **Protobuf** 是Google开发的一种数据序列化协议,可以将结构化数据序列化,可用于数据存储、通信协议等方面。 - **protobuf-2.5.0.tar.bz2** 是一...
标题 "protobuf2.5.0源码及win32文件" 涉及的主要知识点是Protocol Buffers(protobuf),这是Google开发的一种数据序列化协议,用于结构化数据的存储和交换。protobuf提供了一种高效、灵活且易于使用的机制,允许...
标题中的"protobuf-2.5.0.tar.gz"是一个压缩包文件,其中包含了Protocol Buffers(简称protobuf)的源代码,版本为2.5.0。protobuf是Google开发的一种数据序列化协议,常用于结构化数据的存储和通信,它可以将复杂的...
6. **Google Protobuf**:Hadoop底层数据交换可能涉及Protocol Buffers,一种高效的序列化协议,用于跨平台的数据传输。 7. **数据库与数据处理**:HBase是一个分布式、面向列的NoSQL数据库,适合处理大数据。Hive...