1. Avro RPC简介
1.1. RPC
- RPC逻辑上分为二层,一是传输层,负责网络通信;二是协议层,将数据按照一定协议格式打包和解包
- 从序列化方式来看,Apache Thrift 和Google的Protocol Buffers和Avro应该是属于同一个级别的框架,都能跨语言,性能优秀,数据精简,但是Avro的动态模式(不用生成代码,而且性能很好)这个特点让人非常喜欢,比较适合RPC的数据交换。
1.2. Avro RPC的主要特点
Avro RPC 是一个支持跨语言实现的RPC服务框架。非常轻量级,实现简洁,使用方便,同时支持使用者进行二次开发,逻辑上该框架分为两层:
- 网络传输层使用Netty的Nio实现。
- 协议层可扩展,目前支持的数据序列化方式有Avro,Protocol Buffers ,Json, Hessian,Java序列化。 使用者可以注册自己的协议格式及序列化方式。
Avro RPC主要特点:
- 客户端传输层与应用层逻辑分离,传输层主要职责包括创建连接,连接查找与复用,传输数据,接收服务端回复后回调应用层;
- 客户端支持同步调用和异步调用。服务异步化能很好的提高系统吞吐量,建议使用异步调用。为防止异步发送请求过快,客户端增加了“请求流量限制”功能;
- 服务端有一个协议注册工厂和序列化注册工厂。这样方便针对不同的应用场景来定制服务方式。RPC应该只是服务方式的一种。在分布式的系统架构中,分布式节点之间的通信会存在多种方式,比如MQ的TOP消息,一个消息可以有多个订阅者。因此avro-rpc不仅仅是一个RPC服务框架,还是一个分布式通信的一个基础骨架,提供了很好的扩展性;
- Avro序列化框架是Hadoop下的一个子项目,其特点是数据序列化不带标签,因此序列化后的数据非常小。支持动态解析, 不像Thrift 与 Protocol Buffers必须根据IDL来生成代码,这样侵入性有点强。性能很好,基本上和 Protocol Buffers差不多;
2. Avro RPC开发
2.1 Maven依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>learn</groupId> <artifactId>learn.avro</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!--avro core--> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.7</version> </dependency> <!--avro rpc support--> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-ipc</artifactId> <version>1.7.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.7.7</version> <executions> <execution> <phase>generate-sources</phase> <goals> <!--Maven goal that helps for code generation--> <goal>schema</goal> <!--For RPC used--> <goal>protocol</goal> <goal>idl-protocol</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
2.2 定义协议schema文件(在src/main/avro/mail.avpr)
{"namespace": "examples.avro.rpc", "protocol": "Mail", "types": [ {"name": "Message", "type": "record", "fields": [ {"name": "to", "type": "string"}, {"name": "from", "type": "string"}, {"name": "body", "type": "string"} ] } ], "messages": { "send": { "request": [{"name": "message", "type": "Message"}], "response": "string" } } }
2.3 生成代码:
在Intellij Idea的Maven视图中,learn avro->Plugins->avro->avro:protocol,右击avro:protocol,执行Run Maven Build,生成protocol schema对应的Java实体类
2.3.1 Mail接口
/** * Autogenerated by Avro * * DO NOT EDIT DIRECTLY */ package examples.avro.rpc; @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public interface Mail { public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Mail\",\"namespace\":\"example.proto\",\"types\":[{\"type\":\"record\",\"name\":\"Message\",\"fields\":[{\"name\":\"to\",\"type\":\"string\"},{\"name\":\"from\",\"type\":\"string\"},{\"name\":\"body\",\"type\":\"string\"}]}],\"messages\":{\"send\":{\"request\":[{\"name\":\"message\",\"type\":\"Message\"}],\"response\":\"string\"}}}"); ///Mail接口有1个方法send,参数是Message,Message是一个Avro类,可以序列化和反序列化 java.lang.CharSequence send(Message message) throws org.apache.avro.AvroRemoteException; @SuppressWarnings("all") public interface Callback extends Mail { public static final org.apache.avro.Protocol PROTOCOL = Mail.PROTOCOL; void send(Message message, org.apache.avro.ipc.Callback<CharSequence> callback) throws java.io.IOException; } }
2.3.2 Message类(根据schema文件生成)
/** * Autogenerated by Avro * * DO NOT EDIT DIRECTLY */ package examples.avro.rpc; @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class Message extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Message\",\"namespace\":\"example.proto\",\"fields\":[{\"name\":\"to\",\"type\":\"string\"},{\"name\":\"from\",\"type\":\"string\"},{\"name\":\"body\",\"type\":\"string\"}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } @Deprecated public java.lang.CharSequence to; @Deprecated public java.lang.CharSequence from; @Deprecated public java.lang.CharSequence body; /** * Default constructor. Note that this does not initialize fields * to their default values from the schema. If that is desired then * one should use <code>newBuilder()</code>. */ public Message() {} /** * All-args constructor. */ public Message(java.lang.CharSequence to, java.lang.CharSequence from, java.lang.CharSequence body) { this.to = to; this.from = from; this.body = body; } public org.apache.avro.Schema getSchema() { return SCHEMA$; } // Used by DatumWriter. Applications should not call. public java.lang.Object get(int field$) { switch (field$) { case 0: return to; case 1: return from; case 2: return body; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } // Used by DatumReader. Applications should not call. @SuppressWarnings(value="unchecked") public void put(int field$, java.lang.Object value$) { switch (field$) { case 0: to = (java.lang.CharSequence)value$; break; case 1: from = (java.lang.CharSequence)value$; break; case 2: body = (java.lang.CharSequence)value$; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } /** * Gets the value of the 'to' field. */ public java.lang.CharSequence getTo() { return to; } /** * Sets the value of the 'to' field. * @param value the value to set. */ public void setTo(java.lang.CharSequence value) { this.to = value; } /** * Gets the value of the 'from' field. */ public java.lang.CharSequence getFrom() { return from; } /** * Sets the value of the 'from' field. * @param value the value to set. */ public void setFrom(java.lang.CharSequence value) { this.from = value; } /** * Gets the value of the 'body' field. */ public java.lang.CharSequence getBody() { return body; } /** * Sets the value of the 'body' field. * @param value the value to set. */ public void setBody(java.lang.CharSequence value) { this.body = value; } /** Creates a new Message RecordBuilder */ public static Message.Builder newBuilder() { return new Message.Builder(); } /** Creates a new Message RecordBuilder by copying an existing Builder */ public static Message.Builder newBuilder(Message.Builder other) { return new Message.Builder(other); } /** Creates a new Message RecordBuilder by copying an existing Message instance */ public static Message.Builder newBuilder(Message other) { return new Message.Builder(other); } /** * RecordBuilder for Message instances. */ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Message> implements org.apache.avro.data.RecordBuilder<Message> { private java.lang.CharSequence to; private java.lang.CharSequence from; private java.lang.CharSequence body; /** Creates a new Builder */ private Builder() { super(Message.SCHEMA$); } /** Creates a Builder by copying an existing Builder */ private Builder(Message.Builder other) { super(other); if (isValidValue(fields()[0], other.to)) { this.to = data().deepCopy(fields()[0].schema(), other.to); fieldSetFlags()[0] = true; } if (isValidValue(fields()[1], other.from)) { this.from = data().deepCopy(fields()[1].schema(), other.from); fieldSetFlags()[1] = true; } if (isValidValue(fields()[2], other.body)) { this.body = data().deepCopy(fields()[2].schema(), other.body); fieldSetFlags()[2] = true; } } /** Creates a Builder by copying an existing Message instance */ private Builder(Message other) { super(Message.SCHEMA$); if (isValidValue(fields()[0], other.to)) { this.to = data().deepCopy(fields()[0].schema(), other.to); fieldSetFlags()[0] = true; } if (isValidValue(fields()[1], other.from)) { this.from = data().deepCopy(fields()[1].schema(), other.from); fieldSetFlags()[1] = true; } if (isValidValue(fields()[2], other.body)) { this.body = data().deepCopy(fields()[2].schema(), other.body); fieldSetFlags()[2] = true; } } /** Gets the value of the 'to' field */ public java.lang.CharSequence getTo() { return to; } /** Sets the value of the 'to' field */ public Message.Builder setTo(java.lang.CharSequence value) { validate(fields()[0], value); this.to = value; fieldSetFlags()[0] = true; return this; } /** Checks whether the 'to' field has been set */ public boolean hasTo() { return fieldSetFlags()[0]; } /** Clears the value of the 'to' field */ public Message.Builder clearTo() { to = null; fieldSetFlags()[0] = false; return this; } /** Gets the value of the 'from' field */ public java.lang.CharSequence getFrom() { return from; } /** Sets the value of the 'from' field */ public Message.Builder setFrom(java.lang.CharSequence value) { validate(fields()[1], value); this.from = value; fieldSetFlags()[1] = true; return this; } /** Checks whether the 'from' field has been set */ public boolean hasFrom() { return fieldSetFlags()[1]; } /** Clears the value of the 'from' field */ public Message.Builder clearFrom() { from = null; fieldSetFlags()[1] = false; return this; } /** Gets the value of the 'body' field */ public java.lang.CharSequence getBody() { return body; } /** Sets the value of the 'body' field */ public Message.Builder setBody(java.lang.CharSequence value) { validate(fields()[2], value); this.body = value; fieldSetFlags()[2] = true; return this; } /** Checks whether the 'body' field has been set */ public boolean hasBody() { return fieldSetFlags()[2]; } /** Clears the value of the 'body' field */ public Message.Builder clearBody() { body = null; fieldSetFlags()[2] = false; return this; } @Override public Message build() { try { Message record = new Message(); record.to = fieldSetFlags()[0] ? this.to : (java.lang.CharSequence) defaultValue(fields()[0]); record.from = fieldSetFlags()[1] ? this.from : (java.lang.CharSequence) defaultValue(fields()[1]); record.body = fieldSetFlags()[2] ? this.body : (java.lang.CharSequence) defaultValue(fields()[2]); return record; } catch (Exception e) { throw new org.apache.avro.AvroRuntimeException(e); } } } }
2.3.3 AvroServer类
package examples.avro.rpc; import org.apache.avro.ipc.NettyServer; import org.apache.avro.ipc.Server; import org.apache.avro.ipc.specific.SpecificResponder; import org.apache.avro.util.Utf8; import java.io.IOException; import java.net.InetSocketAddress; //Server端的实现Mai服务 class MailImpl implements Mail { public Utf8 send(Message message) { System.out.println("Message Received:" + message); return new Utf8("Received your message: " + message.getFrom().toString() + " with body " + message.getBody().toString()); } } public class AvroServer { private static Server server; public static void main(String[] args) throws Exception { System.out.println("Starting server"); startServer(); Thread.sleep(1000); System.out.println("Server started"); Thread.sleep(60 * 1000); server.close(); } private static void startServer() throws IOException { server = new NettyServer(new SpecificResponder(Mail.class, new MailImpl()), new InetSocketAddress(65111)); } }
2.3.3 AvroClient类
package examples.avro.rpc; import org.apache.avro.ipc.NettyTransceiver; import org.apache.avro.ipc.specific.SpecificRequestor; import org.apache.avro.util.Utf8; import java.net.InetSocketAddress; public class AvroClient { public static void main(String[] args) throws Exception { NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(65111)); ///获取Mail接口的proxy实现 Mail proxy = SpecificRequestor.getClient(Mail.class, client); System.out.println("Client of Mail Proxy is built"); // fill in the Message record and send it args = new String[]{"to:Tom", "from:Jack", "body:How are you"}; Message message = new Message(); message.setTo(new Utf8(args[0])); message.setFrom(new Utf8(args[1])); message.setBody(new Utf8(args[2])); System.out.println("RPC call with message: " + message.toString()); ///底层给服务器发送send方法调用 System.out.println("Result: " + proxy.send(message)); // cleanup client.close(); } }
本文支持对Avro RPC的粗浅尝试,Avro Client端用的同步通信方式
相关推荐
Apache Avro是一个面向数据序列化的开源框架,由Apache软件基金会开发,主要应用于大数据处理和分布式系统。它提供了高效的、跨语言的数据交换格式和API,特别适合远程过程调用(RPC)以及存储和处理大规模数据集。...
Avro RPC则是Avro的一个重要特性,它提供了一种标准的远程过程调用(RPC)框架,使得基于Avro的数据可以在客户端和服务端之间安全、高效地传输。 **Avro RPC的核心概念** 1. **Schema**: Avro的核心是其Schema定义...
AVRO是一种跨语言的序列化框架,用于实现数据序列化以及远程过程调用(RPC)。它是Apache软件基金会旗下的一个项目。AVRO提供了丰富的数据结构类型、快速序列化以及支持动态类型语言和静态类型语言。 数据定义语言...
通过Avro的RPC框架,服务端和客户端可以使用相同的schema进行通信,确保数据的正确交换。 ### Avro与Hadoop的关系 在Hadoop生态系统中,Avro通常与HDFS、MapReduce、Pig、Hive等组件结合使用。它提供了对Hadoop友好...
8. **RPC Support**: Avro还提供了远程过程调用(RPC)框架,允许跨网络的服务间通信,且保持了数据的一致性和效率。 在实际应用中,开发者通常会使用Avro工具生成数据访问类,这些类可以帮助我们将Java对象直接...
4. **测试用例**:可能有JUnit或其他测试框架的测试类,用于验证Avro数据序列化和RPC功能的正确性。 5. **文档**:可能包括README文件,解释如何运行示例,以及每个示例的目的。 学习这个项目,你将了解如何: - *...
Avro 1.8.2版本为Hadoop新型序列化框架规范定义了相关细节,提供了标准化的序列化和反序列化机制,可以用于远程过程调用(RPC)和持久化数据的存储。 ### 标题和描述知识点 标题“Avro 1.8.2 序列化规范”直接指出了...
本文将探讨手写RPC框架的一些核心概念和组件。 首先,RPC架构的核心是网络传输。在实现RPC时,我们需要设计一个能够发送网络请求的机制,将目标类、方法信息以及参数从客户端传输到服务端。常见的网络传输库有BIO...
本篇文章将对几种常见的RPC框架进行比较分析,包括Protobuf RPC、Avro和Thrift,探讨它们的特点、优缺点以及适用场景。 1. Protobuf RPC(Protocol Buffers Remote Procedure Call) - **Protobuf** 是Google开发...
#### 二、Avro简介及其在RPC中的应用 **Avro**是一个强大的数据序列化框架,最初由Apache Hadoop项目开发。它不仅支持动态语言,还提供了丰富的数据结构和高效的二进制数据格式。Avro在RPC中主要负责数据的序列化与...
avro提供了一种紧凑、高效的二进制数据格式,同时支持动态类型,使得数据能够在不预先知道数据结构的情况下进行序列化和反序列化。avro的schema同样保存在资源目录下,其文件格式是JSON。 这个性能测试工程旨在比较...
HARPC(High Availability RPC)是基于Thrift的跨语言、高可用的RPC框架。具备高性能、高可用、轻量级等特点 * 跨语言通信 * 方便的使Java、Python、C++三种程序可以相互通信 * 负载均衡和容灾处理 * 方便的实现...
在高并发场景下,设计良好的RPC框架能够有效地处理请求,保证服务的稳定性和性能。 标题“RPC轻量级框架高并发处理”暗示我们将讨论一种适用于高负载环境的RPC实现,其特点是轻量、高效,能够应对大量并发请求。轻...
Thrift和Avro是两种广泛使用的数据序列化和远程过程调用(RPC)框架,它们在分布式系统中扮演着重要角色。本文将详细介绍这两种技术及其关键特性。 首先,让我们了解Thrift。Thrift是由Facebook开发的一种跨语言的...
Avro还提供了一种远程过程调用(RPC)框架,允许跨网络高效地调用服务。它基于Avro模式,确保客户端和服务器之间的兼容性。 **Avro工具** Avro提供了一系列命令行工具,如`avro-tools`,用于操作Avro文件,如转换...
Thrift和Avro是两种广泛使用的数据序列化和远程过程调用(RPC)框架,它们在分布式系统中扮演着重要角色。在这个实例中,我们将深入理解这两种技术,并探讨它们各自的特性和应用场景。 Thrift是由Facebook开发的一...
以阿里巴巴的Dubbo为例,Dubbo是一款广泛使用的Java服务化框架,其二进制RPC方式就体现了效率优势。Dubbo在客户端本地创建一个Proxy(Stub),作为远程调用的代理。客户端的调用请求通过Stub进行封装,包括Dubbo协议...
在分布式系统中,RPC框架扮演着至关重要的角色,它简化了服务间的通信,使得开发者可以像操作本地方法一样进行跨网络的函数调用。Apache提供了多种RPC实现,如Apache Thrift、Apache Avro和Hadoop的RPC等。 **...
4. **Protocol Buffers**:Avro支持远程过程调用(RPC),通过定义协议,不同服务可以相互通信。这使得Avro成为构建分布式系统中的有力工具。 5. **Language Agnostic**:Avro提供了多种编程语言的实现,包括Java、...
这个Java RPC框架的核心设计目标就是利用RDMA的特性,提供轻量级的接口,使得开发者可以轻松地在分布式系统中实现高性能的通信。它可能包括以下关键组件: 1. **RDMA客户端和服务器**:客户端发起RPC请求,服务器...