1.下载与安装
官方网站:http://avro.apache.org/
下载地址:http://labs.renren.com/apache-mirror//avro/avro-1.5.1/avro-src-1.5.1.tar.gz
安装之前确保已经装了maven
- cd /usr/local/src
- wget http://labs.renren.com/apache-mirror//avro/avro-1.5.1/avro-src-1.5.1.tar.gz
- tar zxvf avro-src-1.5.1.tar.gz
- cd avro-src-1.5.1/lang/java
- mvn clean install -DskipTests
安装后,avro-1.5.1.jar位于avro-src-1.5.1/lang/java/avro/target
2.消息结构与服务接口
Avro的模式主要由JSON对象来表示,Avro支持8种基本类型(Primitive Type)和6种复杂类型(Complex Type:records、enums、arrays、maps、unions 和fixed),基本类型可以由JSON字符串来表示。
Avro支持两种序列化编码方式:二进制编码和JSON编码,使用二进制编码会高效序列化,并且序列化后得到的结果会比较小。
基本类型:
- null: no value
- boolean: a binary value
- int: 32-bit signed integer
- long: 64-bit signed integer
- float: single precision (32-bit) IEEE 754 floating-point number
- double: double precision (64-bit) IEEE 754 floating-point number
- bytes: sequence of 8-bit unsigned bytes
- string: unicode character sequence
首先编写一个message.avpr文件,定义一个消息结构。
- {
- "namespace": "avro",
- "protocol": "messageProtocol",
- "doc": "This is a message.",
- "name": "Message",
-
- "types": [
- {"name":"message", "type":"record",
- "fields":[
- {"name":"name", "type":"string"},
- {"name":"type", "type":"int"},
- {"name":"price", "type":"double"},
- {"name":"valid", "type":"boolean"},
- {"name":"content", "type":"bytes"}
- ]}
- ],
-
- "messages": {
- "sendMessage":{
- "doc" : "test",
- "request" :[{"name":"message","type":"message" }],
- "response" :"message"
- }
- }
- }
其中定义了1种类型叫做message,有5个成员name、type、price、valid、content。还定义了1个消息服务叫做sendMessage,输入有一个参数,类型是message,返回message。
3.序列化
Avro有两种序列化编码:binary和JSON。
3.1.Binary Encoding
基本类型:
null:0字节
boolean:1个字节——0(false)或1(true)
int和long使用变长的zig-zag编码
float:4个字节
double:8个字节
bytes:1个long,后边跟着字节序列
string:1个long,后边跟着UTF-8编码的字符
3.2.records
按字段声明的顺序编码值,如下面一个record schema:
- {
- "type": "record",
- "name": "test",
- "fields" : [
- {"name": "a", "type": "long"},
- {"name": "b", "type": "string"}]
- }
实例化这个record,a字段的值是27(编码为0×36),b字段的值是“foo”(编码为06 66 6f 6f),那么这个record编码结果是:
3.3.enums
一个enum被编码为一个int,比如,考虑这个enum。
- {"type": "enum", "name": "Foo", "symbols": ["A", "B", "C", "D"] }
这将被编码为一个取值范围为[0,3]的int,0表示“A”,3表示“D”。
3.4.arrays
arrays编码为block序列,每个block包含一个long的count值,紧跟着的是array items,一个block的count为0表示该block是array的结尾。
3.5.maps
mapss编码为block序列,每个block包含一个long的count值,紧跟着的是key/value对,一个block的count为0表示该block是map的结尾。
3.6.union
union编码以一个long值开始,表示后边的数据是union中的哪种数据类型。
3.7.fixed
编码为指定数目的字节。
4.rpc通信实现
Avro的RPC实现不需要定义服务接口,但需要从.avpr文件中解析协议,协议中定义了消息结构和消息服务。message.avpr中定义了一个类型叫message,定义了一个服务叫sendMessage。
工具类Utils.java:
-
package avro;
-
-
import java.io.File;
-
import java.io.IOException;
-
import java.net.URL;
-
-
import org.apache.avro.Protocol;
-
-
public class Utils {
-
public static Protocol getProtocol() {
-
Protocol protocol = null;
-
try {
-
URL url = Utils.class.getClassLoader().getResource("message.avpr");
-
protocol = Protocol.parse(new File(url.getPath()));
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
return protocol;
-
}
- }
服务端实现Server.java:
-
package avro;
-
-
import org.apache.avro.Protocol;
-
import org.apache.avro.Protocol.Message;
-
import org.apache.avro.generic.GenericRecord;
-
import org.apache.avro.ipc.HttpServer;
-
import org.apache.avro.ipc.generic.GenericResponder;
-
-
public class Server extends GenericResponder {
-
private Protocol protocol = null;
-
private int port;
-
-
public Server(Protocol protocol, int port) {
-
super(protocol);
-
this.protocol = protocol;
-
this.port = port;
-
}
-
-
public Object respond(Message message, Object request) throws Exception {
-
GenericRecord req = (GenericRecord) request;
-
GenericRecord msg = (GenericRecord)(req.get("message"));
-
// process the request
- …
-
return msg;
-
}
-
-
public void run() {
-
try {
-
HttpServer server = new HttpServer(this, port);
-
-
server.start();
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
-
public static void main(String[] args) {
-
if (args.length != 1) {
-
System.out.println("Usage: Server port");
-
System.exit(0);
-
}
-
int port = Integer.parseInt(args[0]);
-
new Server(Utils.getProtocol(), port).run();
-
}
- }
客户端实现Client.java:
-
package avro;
-
-
import java.net.URL;
-
import java.nio.ByteBuffer;
-
import java.util.Arrays;
-
-
import org.apache.avro.util.Utf8;
-
import org.apache.avro.Protocol;
-
import org.apache.avro.generic.GenericData;
-
import org.apache.avro.generic.GenericRecord;
-
import org.apache.avro.ipc.HttpTransceiver;
-
import org.apache.avro.ipc.Transceiver;
-
import org.apache.avro.ipc.generic.GenericRequestor;
-
-
public class Client {
-
private Protocol protocol = null;
-
private String host = null;
-
private int port = 0;
-
private int size = 0;
-
private int count = 0;
-
-
public Client(Protocol protocol, String host, int port, int size, int count) {
-
this.protocol = protocol;
-
this.host = host;
-
this.port = port;
-
this.size = size;
-
this.count = count;
-
}
-
-
public long sendMessage() throws Exception {
-
GenericRecord requestData = new GenericData.Record(
-
protocol.getType("message"));
-
// initiate the request data
- …
-
-
GenericRecord request = new GenericData.Record(protocol.getMessages()
-
.get("sendMessage").getRequest());
-
request.put("message", requestData);
-
-
Transceiver t = new HttpTransceiver(new URL("http://" + host + ":"
-
+ port));
-
GenericRequestor requestor = new GenericRequestor(protocol, t);
-
-
long start = System.currentTimeMillis();
-
for (int i = 0; i < count; i++) {
-
requestor.request("sendMessage", request);
-
}
-
long end = System.currentTimeMillis();
-
System.out.println(end - start);
-
return end - start;
-
}
-
-
public long run() {
-
long res = 0;
-
try {
-
res = sendMessage();
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
return res;
-
}
-
-
public static void main(String[] args) throws Exception {
-
if (args.length != 4) {
-
System.out.println("Usage: Client host port dataSize count");
-
System.exit(0);
-
}
-
-
String host = args[0];
-
int port = Integer.parseInt(args[1]);
-
int size = Integer.parseInt(args[2]);
-
int count = Integer.parseInt(args[3]);
-
new Client(Utils.getProtocol(), host, port, size, count).run();
-
}
- }
5.参考资料
(1) Avro Documentation: http://avro.apache.org/docs/current/index.html
分享到:
相关推荐
kafka的生产消费示例程序,其中数据生产时需要avro序列化,消费时需要反序列化
Hadoop是一个分布式系统基础架构,由Apache基金会开发。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力高速运算和存储。Hadoop实现了一个分布式文件系统(Hadoop Distributed File ...
test_avro 使用 jetty 和 apache avro 进行 Spring Boot 概念验证 使用 jetty 运行 spring boot 项目: mvn spring-boot: run Avro 的概念证明在测试中,所以使用:mvn test 结尾。-
- 读取的数据可以使用`avro_value_get*()`函数获取,根据数据类型不同,如`avro_value_get_int()`、`avro_value_get_string()`等。 - 读取完毕后,别忘了调用`avro_file_reader_close()`关闭读取器。 4. **编译与...
from rec_avro import to_rec_avro_destructive , from_rec_avro_destructive , rec_avro_schema def json_objects (): return [{ 'a' : 'a' }, { 'b' : 'b' }] # For efficiency, to_rec_avro_destructive()
在这个“Apache Avro RPC简单示例”中,我们将深入探讨Avro如何实现RPC,并通过提供的压缩包文件`avro-rpc-quickstart-master`进行实战演练。 首先,理解Avro的基本概念是至关重要的。Avro的数据模型基于JSON,包括...
**Avro RPC简介** Avro是Hadoop生态系统中的一个关键组件,由Apache软件基金会开发,主要用作数据序列化系统。它提供了一种高效的、语言无关的、版本化的数据序列化机制,使得不同编程语言之间可以方便地交换数据。...
Kafka/Avro/Poseidon 示例此 repo 包含用于创建使用 Avro 编码的生产和消费 Kafka 消息的基本代码。 消费者使用poseidon_group gem 来使用高级组管理API。 这允许您在同一组中拥有多个读者。 Zookeeper 还将跟踪您的...
作者: (在上关注我)概括,如果您愿意的话...介绍该项目中包含的示例应用程序模拟了一个远程服务Mail,其中Avro RPC用于使用该服务发送消息。 本文档详细介绍了如何使用Maven构建和运行示例。 Avro jar文件(以及它们
达斯克·阿夫罗(Dask-Avro) Avro Dask阅读器。 免费软件:MIT许可证 文档: : 。 Python版本:2.7、3.5+ 特征 该项目为提供了格式的阅读器。 提供了一种方便的功能,可以读取一个或多个Avro文件并对其进行任意...
标题中的"avro_sample.rar"表明这是一个关于Avro的示例项目,Avro是一种数据序列化系统,由Apache Hadoop项目开发。它被设计用来高效地处理和交换各种语言之间的数据。Avro提供了丰富的数据模式定义,允许在不同的...
“ avro-rpc-demo”是Java实现的avro rpc的演示代码。 作者:zhexin Pan日期:20151103 简介此项目包括Java中的三个实现演示。 第一个是官方网站的快速入门演示,它是数据序列化和反序列化的两种实现,分别称为...
《Python库:avro_to_python-0.2.4》 在Python开发中,库扮演着至关重要的角色,它们提供了一系列预定义的功能,使开发者能够更高效地编写代码。本篇将详细介绍Python库"avro_to_python-0.2.4"的相关知识点。 1. *...
Apache Avro是Hadoop生态系统中的一个关键组件,它主要用于数据序列化和远程过程调用(RPC)。这个"avro-in-action: RPC与Apache Avro示例"项目显然旨在帮助开发者理解如何在实际应用中利用Avro进行高效的数据交互。...
通过Avro的RPC框架,服务端和客户端可以使用相同的schema进行通信,确保数据的正确交换。 ### Avro与Hadoop的关系 在Hadoop生态系统中,Avro通常与HDFS、MapReduce、Pig、Hive等组件结合使用。它提供了对Hadoop友好...
**Avro教程** 在IT行业中,数据序列化是至关重要的,它允许我们将对象的状态转换为可以在网络上传输或存储在磁盘上的字节流。Avro是Apache Hadoop项目的一部分,是一个高效的数据序列化系统,特别适用于分布式计算...
Avro工具jar包是Avro的一部分,主要用于处理Avro格式的数据,包括编译Avro模式,转换数据,以及合并或拆分Avro文件等。在这个版本1.8.2中,它已经被验证为功能正常且可直接从Maven官方库免费获取。 首先,让我们...
这是关于avro的avro-1.8.1版本的avro-tools的一个jar包
- **Compile Protocol工具**:将Avro的RPC协议文件编译为特定语言(如Java)的客户端和服务端代码。 - **Avro命令行界面**:提供了一些基本的Avro文件操作命令,如`cat`、`mkdatafile`等。 在实际开发中,这两个...
MapReduce读取Avro序列化文件测试数据,文件中存储了几个单词作为测试数据供读取