`
jackyhongvip
  • 浏览: 160786 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

RPC_AVRO范例

    博客分类:
  • RPC
 
阅读更多

1.下载与安装

  官方网站:http://avro.apache.org/

  下载地址:http://labs.renren.com/apache-mirror//avro/avro-1.5.1/avro-src-1.5.1.tar.gz

  安装之前确保已经装了maven

  1. cd /usr/local/src
  2. wget http://labs.renren.com/apache-mirror//avro/avro-1.5.1/avro-src-1.5.1.tar.gz
  3. tar zxvf avro-src-1.5.1.tar.gz
  4. cd avro-src-1.5.1/lang/java
  5. mvn clean install -DskipTests

  安装后,avro-1.5.1.jar位于avro-src-1.5.1/lang/java/avro/target

 

2.消息结构与服务接口

  Avro的模式主要由JSON对象来表示,Avro支持8种基本类型(Primitive Type)和6种复杂类型(Complex Type:records、enums、arrays、maps、unions 和fixed),基本类型可以由JSON字符串来表示。

  Avro支持两种序列化编码方式:二进制编码和JSON编码,使用二进制编码会高效序列化,并且序列化后得到的结果会比较小。

  基本类型:

  1. null: no value
  2. boolean: a binary value
  3. int: 32-bit signed integer
  4. long: 64-bit signed integer
  5. float: single precision (32-bit) IEEE 754 floating-point number
  6. double: double precision (64-bit) IEEE 754 floating-point number
  7. bytes: sequence of 8-bit unsigned bytes
  8. string: unicode character sequence

  首先编写一个message.avpr文件,定义一个消息结构。

  1. {
  2.     "namespace": "avro",
  3.     "protocol": "messageProtocol",
  4.     "doc": "This is a message.",
  5.     "name": "Message",
  6.  
  7.     "types": [
  8.         {"name":"message", "type":"record",
  9.             "fields":[
  10.                 {"name":"name", "type":"string"},
  11.                 {"name":"type", "type":"int"},
  12.                 {"name":"price", "type":"double"},
  13.                 {"name":"valid", "type":"boolean"},
  14.                 {"name":"content", "type":"bytes"}
  15.         ]}
  16.     ],
  17.  
  18.     "messages":    {
  19.         "sendMessage":{
  20.             "doc" : "test",
  21.             "request" :[{"name":"message","type":"message" }],
  22.             "response" :"message"
  23.         }         
  24.     }   
  25. }

  其中定义了1种类型叫做message,有5个成员name、type、price、valid、content。还定义了1个消息服务叫做sendMessage,输入有一个参数,类型是message,返回message。

3.序列化

  Avro有两种序列化编码:binary和JSON。

3.1.Binary Encoding

  基本类型:

    null:0字节

    boolean:1个字节——0(false)或1(true)

    int和long使用变长的zig-zag编码

    float:4个字节

    double:8个字节

    bytes:1个long,后边跟着字节序列

    string:1个long,后边跟着UTF-8编码的字符

3.2.records

  按字段声明的顺序编码值,如下面一个record schema:

  1. {
  2.     "type": "record",
  3.     "name": "test",
  4.     "fields" : [
  5.         {"name": "a", "type": "long"},
  6.         {"name": "b", "type": "string"}]
  7. }

  实例化这个record,a字段的值是27(编码为0×36),b字段的值是“foo”(编码为06 66 6f 6f),那么这个record编码结果是:

  1. 36 06 66 6f 6f

3.3.enums

  一个enum被编码为一个int,比如,考虑这个enum。

  1. {"type": "enum", "name": "Foo", "symbols": ["A", "B", "C", "D"] }

  这将被编码为一个取值范围为[0,3]的int,0表示“A”,3表示“D”。

3.4.arrays

  arrays编码为block序列,每个block包含一个long的count值,紧跟着的是array items,一个block的count为0表示该block是array的结尾。

3.5.maps

  mapss编码为block序列,每个block包含一个long的count值,紧跟着的是key/value对,一个block的count为0表示该block是map的结尾。

3.6.union

  union编码以一个long值开始,表示后边的数据是union中的哪种数据类型。

3.7.fixed

  编码为指定数目的字节。

4.rpc通信实现

  Avro的RPC实现不需要定义服务接口,但需要从.avpr文件中解析协议,协议中定义了消息结构和消息服务。message.avpr中定义了一个类型叫message,定义了一个服务叫sendMessage。

  工具类Utils.java:

  1. package avro;
  2.  
  3. import java.io.File;
  4. import java.io.IOException;
  5. import java.net.URL;
  6.  
  7. import org.apache.avro.Protocol;
  8.  
  9. public class Utils {
  10.     public static Protocol getProtocol() {
  11.         Protocol protocol = null;
  12.         try {
  13.             URL url = Utils.class.getClassLoader().getResource("message.avpr");
  14.             protocol = Protocol.parse(new File(url.getPath()));
  15.         } catch (IOException e) {
  16.             e.printStackTrace();
  17.         }
  18.         return protocol;
  19.     }
  20. }

  服务端实现Server.java:

  1. package avro;
  2.  
  3. import org.apache.avro.Protocol;
  4. import org.apache.avro.Protocol.Message;
  5. import org.apache.avro.generic.GenericRecord;
  6. import org.apache.avro.ipc.HttpServer;
  7. import org.apache.avro.ipc.generic.GenericResponder;
  8.  
  9. public class Server extends GenericResponder {
  10.     private Protocol protocol = null;
  11.     private int port;
  12.  
  13.     public Server(Protocol protocol, int port) {
  14.         super(protocol);
  15.         this.protocol = protocol;
  16.         this.port = port;
  17.     }
  18.  
  19.     public Object respond(Message message, Object request) throws Exception {
  20.         GenericRecord req = (GenericRecord) request;
  21.         GenericRecord msg = (GenericRecord)(req.get("message"));
  22.         // process the request
  23.         …
  24.         return msg;
  25.     }
  26.  
  27.     public void run() {
  28.         try {
  29.             HttpServer server = new HttpServer(this, port);
  30.  
  31.             server.start();
  32.         } catch (Exception e) {
  33.             e.printStackTrace();
  34.         }
  35.     }
  36.  
  37.     public static void main(String[] args) {
  38.         if (args.length != 1) {
  39.             System.out.println("Usage: Server port");
  40.             System.exit(0);
  41.         }
  42.         int port = Integer.parseInt(args[0]);
  43.         new Server(Utils.getProtocol(), port).run();
  44.     }
  45. }

  客户端实现Client.java:

  1. package avro;
  2.  
  3. import java.net.URL;
  4. import java.nio.ByteBuffer;
  5. import java.util.Arrays;
  6.  
  7. import org.apache.avro.util.Utf8;
  8. import org.apache.avro.Protocol;
  9. import org.apache.avro.generic.GenericData;
  10. import org.apache.avro.generic.GenericRecord;
  11. import org.apache.avro.ipc.HttpTransceiver;
  12. import org.apache.avro.ipc.Transceiver;
  13. import org.apache.avro.ipc.generic.GenericRequestor;
  14.  
  15. public class Client {
  16.     private Protocol protocol = null;
  17.     private String host = null;
  18.     private int port = 0;
  19.     private int size = 0;
  20.     private int count = 0;
  21.  
  22.     public Client(Protocol protocol, String host, int port, int size, int count) {
  23.         this.protocol = protocol;
  24.         this.host = host;
  25.         this.port = port;
  26.         this.size = size;
  27.         this.count = count;
  28.     }
  29.  
  30.     public long sendMessage() throws Exception {
  31.         GenericRecord requestData = new GenericData.Record(
  32.                 protocol.getType("message"));
  33.         // initiate the request data
  34.         …
  35.  
  36.         GenericRecord request = new GenericData.Record(protocol.getMessages()
  37.                 .get("sendMessage").getRequest());
  38.         request.put("message", requestData);
  39.  
  40.         Transceiver t = new HttpTransceiver(new URL("http://" + host + ":"
  41.                 + port));
  42.         GenericRequestor requestor = new GenericRequestor(protocol, t);
  43.  
  44.         long start = System.currentTimeMillis();
  45.         for (int i = 0; i < count; i++) {
  46.             requestor.request("sendMessage", request);
  47.         }
  48.         long end = System.currentTimeMillis();
  49.         System.out.println(end - start);
  50.         return end - start;
  51.     }
  52.  
  53.     public long run() {
  54.         long res = 0;
  55.         try {
  56.             res = sendMessage();
  57.         } catch (Exception e) {
  58.             e.printStackTrace();
  59.         }
  60.         return res;
  61.     }
  62.  
  63.     public static void main(String[] args) throws Exception {
  64.         if (args.length != 4) {
  65.             System.out.println("Usage: Client host port dataSize count");
  66.             System.exit(0);
  67.         }
  68.  
  69.         String host = args[0];
  70.         int port = Integer.parseInt(args[1]);
  71.         int size = Integer.parseInt(args[2]);
  72.         int count = Integer.parseInt(args[3]);
  73.         new Client(Utils.getProtocol(), host, port, size, count).run();
  74.     }
  75. }

5.参考资料

  (1) Avro Documentation: http://avro.apache.org/docs/current/index.html

分享到:
评论

相关推荐

    kafka.rar_DEMO_avro_consumer_kafka_producer

    kafka的生产消费示例程序,其中数据生产时需要avro序列化,消费时需要反序列化

    Hadoop大数据零基础实战培训教程_Avro数据序列化系统.rar

    Hadoop是一个分布式系统基础架构,由Apache基金会开发。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力高速运算和存储。Hadoop实现了一个分布式文件系统(Hadoop Distributed File ...

    test_avro:在 spring 中使用 apache avro

    test_avro 使用 jetty 和 apache avro 进行 Spring Boot 概念验证 使用 jetty 运行 spring boot 项目: mvn spring-boot: run Avro 的概念证明在测试中,所以使用:mvn test 结尾。-

    Avro C API接口库接口调用示例

    - 读取的数据可以使用`avro_value_get*()`函数获取,根据数据类型不同,如`avro_value_get_int()`、`avro_value_get_string()`等。 - 读取完毕后,别忘了调用`avro_file_reader_close()`关闭读取器。 4. **编译与...

    rec-avro:Avro模式和数据转换器支持存储任意嵌套的python数据结构

    from rec_avro import to_rec_avro_destructive , from_rec_avro_destructive , rec_avro_schema def json_objects (): return [{ 'a' : 'a' }, { 'b' : 'b' }] # For efficiency, to_rec_avro_destructive()

    Apache Avro RPC简单示例

    在这个“Apache Avro RPC简单示例”中,我们将深入探讨Avro如何实现RPC,并通过提供的压缩包文件`avro-rpc-quickstart-master`进行实战演练。 首先,理解Avro的基本概念是至关重要的。Avro的数据模型基于JSON,包括...

    avro-rpc程序示例

    **Avro RPC简介** Avro是Hadoop生态系统中的一个关键组件,由Apache软件基金会开发,主要用作数据序列化系统。它提供了一种高效的、语言无关的、版本化的数据序列化机制,使得不同编程语言之间可以方便地交换数据。...

    kafka_avro_poseidon_example:集成kafkaavroposeidon的示例项目

    Kafka/Avro/Poseidon 示例此 repo 包含用于创建使用 Avro 编码的生产和消费 Kafka 消息的基本代码。 消费者使用poseidon_group gem 来使用高级组管理API。 这允许您在同一组中拥有多个读者。 Zookeeper 还将跟踪您的...

    avro-rpc-quickstart:Apache Avro RPC快速入门

    作者: (在上关注我)概括,如果您愿意的话...介绍该项目中包含的示例应用程序模拟了一个远程服务Mail,其中Avro RPC用于使用该服务发送消息。 本文档详细介绍了如何使用Maven构建和运行示例。 Avro jar文件(以及它们

    dask-avro:适用于Dask的Avro阅读器

    达斯克·阿夫罗(Dask-Avro) Avro Dask阅读器。 免费软件:MIT许可证 文档: : 。 Python版本:2.7、3.5+ 特征 该项目为提供了格式的阅读器。 提供了一种方便的功能,可以读取一个或多个Avro文件并对其进行任意...

    avro_sample.rar

    标题中的"avro_sample.rar"表明这是一个关于Avro的示例项目,Avro是一种数据序列化系统,由Apache Hadoop项目开发。它被设计用来高效地处理和交换各种语言之间的数据。Avro提供了丰富的数据模式定义,允许在不同的...

    avro-rpc-demo:avro rpc的演示代码

    “ avro-rpc-demo”是Java实现的avro rpc的演示代码。 作者:zhexin Pan日期:20151103 简介此项目包括Java中的三个实现演示。 第一个是官方网站的快速入门演示,它是数据序列化和反序列化的两种实现,分别称为...

    Python库 | avro_to_python-0.2.4.tar.gz

    《Python库:avro_to_python-0.2.4》 在Python开发中,库扮演着至关重要的角色,它们提供了一系列预定义的功能,使开发者能够更高效地编写代码。本篇将详细介绍Python库"avro_to_python-0.2.4"的相关知识点。 1. *...

    avro-in-action:RPC与Apache Avro示例

    Apache Avro是Hadoop生态系统中的一个关键组件,它主要用于数据序列化和远程过程调用(RPC)。这个"avro-in-action: RPC与Apache Avro示例"项目显然旨在帮助开发者理解如何在实际应用中利用Avro进行高效的数据交互。...

    apache avro 简介

    通过Avro的RPC框架,服务端和客户端可以使用相同的schema进行通信,确保数据的正确交换。 ### Avro与Hadoop的关系 在Hadoop生态系统中,Avro通常与HDFS、MapReduce、Pig、Hive等组件结合使用。它提供了对Hadoop友好...

    avro_tutorial

    **Avro教程** 在IT行业中,数据序列化是至关重要的,它允许我们将对象的状态转换为可以在网络上传输或存储在磁盘上的字节流。Avro是Apache Hadoop项目的一部分,是一个高效的数据序列化系统,特别适用于分布式计算...

    avro-tool工具jar包

    Avro工具jar包是Avro的一部分,主要用于处理Avro格式的数据,包括编译Avro模式,转换数据,以及合并或拆分Avro文件等。在这个版本1.8.2中,它已经被验证为功能正常且可直接从Maven官方库免费获取。 首先,让我们...

    avro的avro-1.8.1的jar

    这是关于avro的avro-1.8.1版本的avro-tools的一个jar包

    Java读写avro所需jar

    - **Compile Protocol工具**:将Avro的RPC协议文件编译为特定语言(如Java)的客户端和服务端代码。 - **Avro命令行界面**:提供了一些基本的Avro文件操作命令,如`cat`、`mkdatafile`等。 在实际开发中,这两个...

    word_count.avro

    MapReduce读取Avro序列化文件测试数据,文件中存储了几个单词作为测试数据供读取

Global site tag (gtag.js) - Google Analytics