本文的目的主要是总结下基于Avro Schema代码生成,然后进行序列化和反序列化开发的基本流程。需要指出的是,Avro并不要求一定得根据Schema文件生成代码,这对于动态类型语言很有用。
1. 添加Maven依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>learn</groupId> <artifactId>learn.avro</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!--avro core--> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.7.7</version> <executions> <execution> <phase>generate-sources</phase> <goals> <!--Maven goal that helps for code generation--> <goal>schema</goal> <goal>protocol</goal> <goal>idl-protocol</goal> </goals> <configuration> <!--sourceDirectory: where to find the schema file, schema: .avsc; protocol: .avpr--> <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
2.定义schema文件(users.avsc)
放置到src/main/avro目录下
{"namespace": "examples.avro.simple", "type": "record", "name": "User", "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]} ] }
namespace:包名
type: record表示记录,可以理解为几个field构成的一个record,类似于Java的JavaBean
name:类名
fiels:成员变量,favorite_number最后会解释成getFavoriteNumber和setFavoriteNumber
3. 生成User类
在Intellij Idea的Maven视图中,learn avro->Plugins->avro->avro:schema,右击avro:schema,执行Run Maven Build,生成schema对应的Java实体类
/** * Autogenerated by Avro * * DO NOT EDIT DIRECTLY */ package examples.avro.simple; @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } @Deprecated public java.lang.CharSequence name; @Deprecated public java.lang.Integer favorite_number; @Deprecated public java.lang.CharSequence favorite_color; /** * Default constructor. Note that this does not initialize fields * to their default values from the schema. If that is desired then * one should use <code>newBuilder()</code>. */ public User() {} /** * All-args constructor. */ public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) { this.name = name; this.favorite_number = favorite_number; this.favorite_color = favorite_color; } public org.apache.avro.Schema getSchema() { return SCHEMA$; } // Used by DatumWriter. Applications should not call. public java.lang.Object get(int field$) { switch (field$) { case 0: return name; case 1: return favorite_number; case 2: return favorite_color; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } // Used by DatumReader. Applications should not call. @SuppressWarnings(value="unchecked") public void put(int field$, java.lang.Object value$) { switch (field$) { case 0: name = (java.lang.CharSequence)value$; break; case 1: favorite_number = (java.lang.Integer)value$; break; case 2: favorite_color = (java.lang.CharSequence)value$; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } /** * Gets the value of the 'name' field. */ public java.lang.CharSequence getName() { return name; } /** * Sets the value of the 'name' field. * @param value the value to set. */ public void setName(java.lang.CharSequence value) { this.name = value; } /** * Gets the value of the 'favorite_number' field. */ public java.lang.Integer getFavoriteNumber() { return favorite_number; } /** * Sets the value of the 'favorite_number' field. * @param value the value to set. */ public void setFavoriteNumber(java.lang.Integer value) { this.favorite_number = value; } /** * Gets the value of the 'favorite_color' field. */ public java.lang.CharSequence getFavoriteColor() { return favorite_color; } /** * Sets the value of the 'favorite_color' field. * @param value the value to set. */ public void setFavoriteColor(java.lang.CharSequence value) { this.favorite_color = value; } /** Creates a new User RecordBuilder */ public static User.Builder newBuilder() { return new User.Builder(); } /** Creates a new User RecordBuilder by copying an existing Builder */ public static User.Builder newBuilder(User.Builder other) { return new User.Builder(other); } /** Creates a new User RecordBuilder by copying an existing User instance */ public static User.Builder newBuilder(User other) { return new User.Builder(other); } /** * RecordBuilder for User instances. */ public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User> implements org.apache.avro.data.RecordBuilder<User> { private java.lang.CharSequence name; private java.lang.Integer favorite_number; private java.lang.CharSequence favorite_color; /** Creates a new Builder */ private Builder() { super(User.SCHEMA$); } /** Creates a Builder by copying an existing Builder */ private Builder(User.Builder other) { super(other); if (isValidValue(fields()[0], other.name)) { this.name = data().deepCopy(fields()[0].schema(), other.name); fieldSetFlags()[0] = true; } if (isValidValue(fields()[1], other.favorite_number)) { this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); fieldSetFlags()[1] = true; } if (isValidValue(fields()[2], other.favorite_color)) { this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); fieldSetFlags()[2] = true; } } /** Creates a Builder by copying an existing User instance */ private Builder(User other) { super(User.SCHEMA$); if (isValidValue(fields()[0], other.name)) { this.name = data().deepCopy(fields()[0].schema(), other.name); fieldSetFlags()[0] = true; } if (isValidValue(fields()[1], other.favorite_number)) { this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); fieldSetFlags()[1] = true; } if (isValidValue(fields()[2], other.favorite_color)) { this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); fieldSetFlags()[2] = true; } } /** Gets the value of the 'name' field */ public java.lang.CharSequence getName() { return name; } /** Sets the value of the 'name' field */ public User.Builder setName(java.lang.CharSequence value) { validate(fields()[0], value); this.name = value; fieldSetFlags()[0] = true; return this; } /** Checks whether the 'name' field has been set */ public boolean hasName() { return fieldSetFlags()[0]; } /** Clears the value of the 'name' field */ public User.Builder clearName() { name = null; fieldSetFlags()[0] = false; return this; } /** Gets the value of the 'favorite_number' field */ public java.lang.Integer getFavoriteNumber() { return favorite_number; } /** Sets the value of the 'favorite_number' field */ public User.Builder setFavoriteNumber(java.lang.Integer value) { validate(fields()[1], value); this.favorite_number = value; fieldSetFlags()[1] = true; return this; } /** Checks whether the 'favorite_number' field has been set */ public boolean hasFavoriteNumber() { return fieldSetFlags()[1]; } /** Clears the value of the 'favorite_number' field */ public User.Builder clearFavoriteNumber() { favorite_number = null; fieldSetFlags()[1] = false; return this; } /** Gets the value of the 'favorite_color' field */ public java.lang.CharSequence getFavoriteColor() { return favorite_color; } /** Sets the value of the 'favorite_color' field */ public User.Builder setFavoriteColor(java.lang.CharSequence value) { validate(fields()[2], value); this.favorite_color = value; fieldSetFlags()[2] = true; return this; } /** Checks whether the 'favorite_color' field has been set */ public boolean hasFavoriteColor() { return fieldSetFlags()[2]; } /** Clears the value of the 'favorite_color' field */ public User.Builder clearFavoriteColor() { favorite_color = null; fieldSetFlags()[2] = false; return this; } @Override public User build() { try { User record = new User(); record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]); record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]); record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]); return record; } catch (Exception e) { throw new org.apache.avro.AvroRuntimeException(e); } } } }
4. User类序列化和反序列化测试
package examples.avro.simple; import java.io.File; import java.io.Serializable; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; public class UserTest { public static void main(String[] args) throws Exception { ///无参构造函数,依赖设值注入 User user1 = new User(); user1.setName("Alyssa"); user1.setFavoriteNumber(256); //user1没有对favorite color赋值 ///注意:User并没有实现Java的Serializable接口 System.out.println(user1 instanceof Serializable); ///重载的构造方法 User user2 = new User("Ben", 7, "red"); //使用builder构建对象 User user3 = User.newBuilder() .setName("Charlie") .setFavoriteColor("blue") .setFavoriteNumber(null) .build(); System.out.println(user3.getName() + "," + user3.getFavoriteColor()); ///1. 序列化包含Schema和3个User对象到c:/users.avro文件 DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); dataFileWriter.create(user1.getSchema(), new File("c:/users.avro")); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); dataFileWriter.close(); ///2. 从c:/users.avro文件反序列化对象 DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class); DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("c:/users.avro"), userDatumReader); User user = null; while (dataFileReader.hasNext()) { // Reuse user object by passing it to next(). This saves us from // allocating and garbage collecting many objects for files with // many items. user = dataFileReader.next(user); System.out.println(user); ///输出JSON格式的数据,Avro对User类的toString进行了改写 } } }
相关推荐
**Avro RPC快速入门** 在"avro-rpc-quickstart-master"这个压缩包中,包含了Avro RPC的快速启动示例,包括客户端和服务器端的源代码。以下是快速启动的步骤: 1. **构建环境**: 确保已经安装了Java开发工具(JDK)...
作者: (在上关注我)概括,如果您愿意的话...介绍该项目中包含的示例应用程序模拟了一个远程服务Mail,其中Avro RPC用于使用该服务发送消息。 本文档详细介绍了如何使用Maven构建和运行示例。 Avro jar文件(以及它们
依存关系安装运行npm i -s kafkajs-avro或yarn add kafkajs-avro快速入门代码 import KafkaAvro from "kafkajs-avro"( async ( ) { const kafka = new KafkaAvro ( { clientId : "<client>" , brokers : [ ...
第一个是官方网站的快速入门演示,它是数据序列化和反序列化的两种实现,分别称为SpecificMain,GenericMain; 第二个是特定样式的接口调用,名为MailMain *; 最后一个是通用样式接口调用,包括HTTP /和netty模式...
如何创建一个简单的NodeJS Express REST API服务器来在请求/响应中接收和发送avro缓冲区 如何调用发送和接收avro缓冲区客户端javascript的REST API 徽章 入门 克隆项目 git clone ...
在本篇Kafka快速入门教程中,我们主要探讨了如何使用Python客户端库`confluent-kafka`来与Apache Kafka进行交互。`confluent-kafka`是一个轻量级的Python模块,它对librdkafka进行了封装,支持Kafka 0.8以上的版本。...
该入门示例程序是异步、基于事件驱动的网络通讯框架,对于java开发入门以及netty开发入门程序员有极大的学习效果和提升作用。异步:支持多个请求同时处理 响应通过回调函数处理 例如ajax 事件驱动 :比如客户端对...
Hadoop 安装 学习 入门教程 Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, ...
#schema_registry_converter 该库提供了一种以与Java客户端兼容的方式使用Confluent Schema Registry的方法。 发行说明可以在上找到。消耗/解码,并且支持产生/编码。 也可以提供在解码时使用的... 要将其用于使用avro
大数据系列-Hive入门与实战 Hive 是什么? ---------------- Hive 是一个构建在 Hadoop 之上的数据仓库平台,能够将 SQL 语句转译成 MapReduce 作业,并在 Hadoop 集群上执行。Hive 表是 HDFS 的一个文件目录,一...
该解决方案使用Apache Kafka,我们可以轻松地将其集成到基于Spring Boot的应用程序中,该应用程序使用 (2.6.5),Apache Avro进行事件序列化和反序列化,并使用内存中的H2数据库,该数据库有助于我们的查询端基于...
《Flink入门与实战配套Java源码》是一个针对Apache Flink初学者和实践者的宝贵资料,它包含了使用Java编程语言实现的各种Flink示例。Apache Flink是一个流行的开源流处理和批处理框架,专为实时数据处理而设计,提供...
对于Cloudera的学习者来说,本培训材料提供了一个全面的入门指南,覆盖了从数据收集到数据处理的各个环节,旨在帮助使用者建立起对Cloudera平台和Hadoop生态系统的认识,从而能够有效地利用这些工具进行数据分析和...
Python爬虫技术入门到高级第五章 本章节主要介绍了爬虫技术的基础知识,包括爬虫技术概述、网络协议和 HTTP 协议、Python 基础、爬虫工具介绍、数据存储和处理、动态网页爬取、反爬虫技术、数据清洗和预处理、...
#schema_registry_converter该库提供了一种以符合 Java 客户端的方式使用 Confluent Schema Registry 的方法...要使用它使用 avro async 进行转换,请使用:[dependencies ]schema_registry_converter = {version =" 2
总体而言,本书是Hadoop入门与提高的必备读物,不仅仅覆盖了Hadoop的核心技术,还深入介绍了相关项目和实际应用案例,为读者提供了一个全面了解和掌握Hadoop技术的平台。对于那些希望在大数据时代通过Hadoop技术提升...
### Flink 入门及实战知识点详解 #### 一、Flink API 抽象级别概述 Flink 提供了多种 API 抽象级别,以适应不同的应用场景和需求。主要包括: - **DataStream API**:适用于批处理和流处理,提供丰富的转换操作。 ...
5. **连接器与格式**:熟悉Flink支持的各种输入输出源,如Kafka、RabbitMQ、HDFS、Cassandra等,以及如何使用Flink的序列化格式,如JSON、Avro、Parquet等。 6. **Table & SQL API**:学习如何使用Table API和SQL...
Flume 是 Cloudera 提供的一个高可用的、 高可靠的、分布式的海量日志采集、聚合和传输的系统。 Flume 基于流式架构,灵活简单。 1.1 Flume 定义 Flume 是一个高可用的、 高可靠的、分布式的海量日志采集、聚合和...