`
bit1129
  • 浏览: 1067798 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Avro一】Avro入门

    博客分类:
  • Avro
 
阅读更多

本文的目的主要是总结下基于Avro Schema代码生成,然后进行序列化和反序列化开发的基本流程。需要指出的是,Avro并不要求一定得根据Schema文件生成代码,这对于动态类型语言很有用。

 

1. 添加Maven依赖

 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>learn</groupId>
    <artifactId>learn.avro</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--avro core-->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.7</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.7.7</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <!--Maven goal that helps for code generation-->
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>

                            <!--sourceDirectory: where to find the schema file, schema: .avsc; protocol: .avpr-->
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 

2.定义schema文件(users.avsc)

放置到src/main/avro目录下

{"namespace": "examples.avro.simple",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

namespace:包名

type: record表示记录,可以理解为几个field构成的一个record,类似于Java的JavaBean

name:类名

fiels:成员变量,favorite_number最后会解释成getFavoriteNumber和setFavoriteNumber

 

3. 生成User类

在Intellij Idea的Maven视图中,learn avro->Plugins->avro->avro:schema,右击avro:schema,执行Run Maven Build,生成schema对应的Java实体类

 

/**
 * Autogenerated by Avro
 * 
 * DO NOT EDIT DIRECTLY
 */
package examples.avro.simple;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  @Deprecated public java.lang.CharSequence name;
  @Deprecated public java.lang.Integer favorite_number;
  @Deprecated public java.lang.CharSequence favorite_color;

  /**
   * Default constructor.  Note that this does not initialize fields
   * to their default values from the schema.  If that is desired then
   * one should use <code>newBuilder()</code>. 
   */
  public User() {}

  /**
   * All-args constructor.
   */
  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color) {
    this.name = name;
    this.favorite_number = favorite_number;
    this.favorite_color = favorite_color;
  }

  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  // Used by DatumWriter.  Applications should not call. 
  public java.lang.Object get(int field$) {
    switch (field$) {
    case 0: return name;
    case 1: return favorite_number;
    case 2: return favorite_color;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }
  // Used by DatumReader.  Applications should not call. 
  @SuppressWarnings(value="unchecked")
  public void put(int field$, java.lang.Object value$) {
    switch (field$) {
    case 0: name = (java.lang.CharSequence)value$; break;
    case 1: favorite_number = (java.lang.Integer)value$; break;
    case 2: favorite_color = (java.lang.CharSequence)value$; break;
    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
    }
  }

  /**
   * Gets the value of the 'name' field.
   */
  public java.lang.CharSequence getName() {
    return name;
  }

  /**
   * Sets the value of the 'name' field.
   * @param value the value to set.
   */
  public void setName(java.lang.CharSequence value) {
    this.name = value;
  }

  /**
   * Gets the value of the 'favorite_number' field.
   */
  public java.lang.Integer getFavoriteNumber() {
    return favorite_number;
  }

  /**
   * Sets the value of the 'favorite_number' field.
   * @param value the value to set.
   */
  public void setFavoriteNumber(java.lang.Integer value) {
    this.favorite_number = value;
  }

  /**
   * Gets the value of the 'favorite_color' field.
   */
  public java.lang.CharSequence getFavoriteColor() {
    return favorite_color;
  }

  /**
   * Sets the value of the 'favorite_color' field.
   * @param value the value to set.
   */
  public void setFavoriteColor(java.lang.CharSequence value) {
    this.favorite_color = value;
  }

  /** Creates a new User RecordBuilder */
  public static User.Builder newBuilder() {
    return new User.Builder();
  }
  
  /** Creates a new User RecordBuilder by copying an existing Builder */
  public static User.Builder newBuilder(User.Builder other) {
    return new User.Builder(other);
  }
  
  /** Creates a new User RecordBuilder by copying an existing User instance */
  public static User.Builder newBuilder(User other) {
    return new User.Builder(other);
  }
  
  /**
   * RecordBuilder for User instances.
   */
  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
    implements org.apache.avro.data.RecordBuilder<User> {

    private java.lang.CharSequence name;
    private java.lang.Integer favorite_number;
    private java.lang.CharSequence favorite_color;

    /** Creates a new Builder */
    private Builder() {
      super(User.SCHEMA$);
    }
    
    /** Creates a Builder by copying an existing Builder */
    private Builder(User.Builder other) {
      super(other);
      if (isValidValue(fields()[0], other.name)) {
        this.name = data().deepCopy(fields()[0].schema(), other.name);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.favorite_number)) {
        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.favorite_color)) {
        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
        fieldSetFlags()[2] = true;
      }
    }
    
    /** Creates a Builder by copying an existing User instance */
    private Builder(User other) {
            super(User.SCHEMA$);
      if (isValidValue(fields()[0], other.name)) {
        this.name = data().deepCopy(fields()[0].schema(), other.name);
        fieldSetFlags()[0] = true;
      }
      if (isValidValue(fields()[1], other.favorite_number)) {
        this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number);
        fieldSetFlags()[1] = true;
      }
      if (isValidValue(fields()[2], other.favorite_color)) {
        this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color);
        fieldSetFlags()[2] = true;
      }
    }

    /** Gets the value of the 'name' field */
    public java.lang.CharSequence getName() {
      return name;
    }
    
    /** Sets the value of the 'name' field */
    public User.Builder setName(java.lang.CharSequence value) {
      validate(fields()[0], value);
      this.name = value;
      fieldSetFlags()[0] = true;
      return this; 
    }
    
    /** Checks whether the 'name' field has been set */
    public boolean hasName() {
      return fieldSetFlags()[0];
    }
    
    /** Clears the value of the 'name' field */
    public User.Builder clearName() {
      name = null;
      fieldSetFlags()[0] = false;
      return this;
    }

    /** Gets the value of the 'favorite_number' field */
    public java.lang.Integer getFavoriteNumber() {
      return favorite_number;
    }
    
    /** Sets the value of the 'favorite_number' field */
    public User.Builder setFavoriteNumber(java.lang.Integer value) {
      validate(fields()[1], value);
      this.favorite_number = value;
      fieldSetFlags()[1] = true;
      return this; 
    }
    
    /** Checks whether the 'favorite_number' field has been set */
    public boolean hasFavoriteNumber() {
      return fieldSetFlags()[1];
    }
    
    /** Clears the value of the 'favorite_number' field */
    public User.Builder clearFavoriteNumber() {
      favorite_number = null;
      fieldSetFlags()[1] = false;
      return this;
    }

    /** Gets the value of the 'favorite_color' field */
    public java.lang.CharSequence getFavoriteColor() {
      return favorite_color;
    }
    
    /** Sets the value of the 'favorite_color' field */
    public User.Builder setFavoriteColor(java.lang.CharSequence value) {
      validate(fields()[2], value);
      this.favorite_color = value;
      fieldSetFlags()[2] = true;
      return this; 
    }
    
    /** Checks whether the 'favorite_color' field has been set */
    public boolean hasFavoriteColor() {
      return fieldSetFlags()[2];
    }
    
    /** Clears the value of the 'favorite_color' field */
    public User.Builder clearFavoriteColor() {
      favorite_color = null;
      fieldSetFlags()[2] = false;
      return this;
    }

    @Override
    public User build() {
      try {
        User record = new User();
        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]);
        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]);
        return record;
      } catch (Exception e) {
        throw new org.apache.avro.AvroRuntimeException(e);
      }
    }
  }
}

 

4. User类序列化和反序列化测试

package examples.avro.simple;

import java.io.File;
import java.io.Serializable;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

public class UserTest {
    public static void main(String[] args) throws Exception {
        
        ///无参构造函数,依赖设值注入
        User user1 = new User();
        user1.setName("Alyssa");
        user1.setFavoriteNumber(256);
        //user1没有对favorite color赋值 
        ///注意:User并没有实现Java的Serializable接口
        System.out.println(user1 instanceof Serializable);
        ///重载的构造方法
        User user2 = new User("Ben", 7, "red");

        //使用builder构建对象
        User user3 = User.newBuilder()
                .setName("Charlie")
                .setFavoriteColor("blue")
                .setFavoriteNumber(null)
                .build();

        System.out.println(user3.getName() + "," + user3.getFavoriteColor());


        ///1. 序列化包含Schema和3个User对象到c:/users.avro文件
        DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
        DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);   
        dataFileWriter.create(user1.getSchema(), new File("c:/users.avro"));
        dataFileWriter.append(user1);
        dataFileWriter.append(user2);
        dataFileWriter.append(user3);
        dataFileWriter.close();

        ///2. 从c:/users.avro文件反序列化对象
        DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
        DataFileReader<User> dataFileReader = new DataFileReader<User>(new File("c:/users.avro"), userDatumReader);
        User user = null;
        while (dataFileReader.hasNext()) {
            // Reuse user object by passing it to next(). This saves us from
            // allocating and garbage collecting many objects for files with
            // many items.
            user = dataFileReader.next(user);
            System.out.println(user);  ///输出JSON格式的数据,Avro对User类的toString进行了改写
        }
    }
}

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    avro-rpc程序示例

    **Avro RPC快速入门** 在"avro-rpc-quickstart-master"这个压缩包中,包含了Avro RPC的快速启动示例,包括客户端和服务器端的源代码。以下是快速启动的步骤: 1. **构建环境**: 确保已经安装了Java开发工具(JDK)...

    avro-rpc-quickstart:Apache Avro RPC快速入门

    作者: (在上关注我)概括,如果您愿意的话...介绍该项目中包含的示例应用程序模拟了一个远程服务Mail,其中Avro RPC用于使用该服务发送消息。 本文档详细介绍了如何使用Maven构建和运行示例。 Avro jar文件(以及它们

    kafkajs-avro:基于主题和版本的KafkaJS + Avro编码解码

    依存关系安装运行npm i -s kafkajs-avro或yarn add kafkajs-avro快速入门代码 import KafkaAvro from "kafkajs-avro"( async ( ) { const kafka = new KafkaAvro ( { clientId : "&lt;client&gt;" , brokers : [ ...

    avro-rpc-demo:avro rpc的演示代码

    第一个是官方网站的快速入门演示,它是数据序列化和反序列化的两种实现,分别称为SpecificMain,GenericMain; 第二个是特定样式的接口调用,名为MailMain *; 最后一个是通用样式接口调用,包括HTTP /和netty模式...

    avro-rest-js:在requestResponse中接收和发送AVRO缓冲区的客户端和服务器端javascript REST API示例

    如何创建一个简单的NodeJS Express REST API服务器来在请求/响应中接收和发送avro缓冲区 如何调用发送和接收avro缓冲区客户端javascript的REST API 徽章 入门 克隆项目 git clone ...

    kafkapython教程-Kafka快速入门(十二)-Python客户端.pdf

    在本篇Kafka快速入门教程中,我们主要探讨了如何使用Python客户端库`confluent-kafka`来与Apache Kafka进行交互。`confluent-kafka`是一个轻量级的Python模块,它对librdkafka进行了封装,支持Kafka 0.8以上的版本。...

    基于netty的spring入门程序

    该入门示例程序是异步、基于事件驱动的网络通讯框架,对于java开发入门以及netty开发入门程序员有极大的学习效果和提升作用。异步:支持多个请求同时处理 响应通过回调函数处理 例如ajax 事件驱动 :比如客户端对...

    Hadoop安装学习-入门教程

    Hadoop 安装 学习 入门教程 Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, ...

    schema_registry_converter:一个板条箱,用于将字节转换为更有用的东西,以及与Confluent Schema Registry兼容的另一种方式。 支持Avro,Protobuf,Json模式以及异步和阻塞

    #schema_registry_converter 该库提供了一种以与Java客户端兼容的方式使用Confluent Schema Registry的方法。 发行说明可以在上找到。消耗/解码,并且支持产生/编码。 也可以提供在解码时使用的... 要将其用于使用avro

    大数据系列-Hive入门与实战.pptx

    大数据系列-Hive入门与实战 Hive 是什么? ---------------- Hive 是一个构建在 Hadoop 之上的数据仓库平台,能够将 SQL 语句转译成 MapReduce 作业,并在 Hadoop 集群上执行。Hive 表是 HDFS 的一个文件目录,一...

    spring-kafka-event-sourcing-sampler:展示如何使用Spring Boot,Spring Kafka,Apache Avro和Apache Kafka构建基于事件的小型应用程序

    该解决方案使用Apache Kafka,我们可以轻松地将其集成到基于Spring Boot的应用程序中,该应用程序使用 (2.6.5),Apache Avro进行事件序列化和反序列化,并使用内存中的H2数据库,该数据库有助于我们的查询端基于...

    Flink入门与实战配套java源码

    《Flink入门与实战配套Java源码》是一个针对Apache Flink初学者和实践者的宝贵资料,它包含了使用Java编程语言实现的各种Flink示例。Apache Flink是一个流行的开源流处理和批处理框架,专为实时数据处理而设计,提供...

    Cloudera基础培训材料.pdf

    对于Cloudera的学习者来说,本培训材料提供了一个全面的入门指南,覆盖了从数据收集到数据处理的各个环节,旨在帮助使用者建立起对Cloudera平台和Hadoop生态系统的认识,从而能够有效地利用这些工具进行数据分析和...

    Python爬虫技术入门到高级第五章

    Python爬虫技术入门到高级第五章 本章节主要介绍了爬虫技术的基础知识,包括爬虫技术概述、网络协议和 HTTP 协议、Python 基础、爬虫工具介绍、数据存储和处理、动态网页爬取、反爬虫技术、数据清洗和预处理、...

    将字节转换为更有用的东西的板条箱,反之亦然,与 Confluent Schema Registry 兼容。 支持 Avro、Protobuf、Json 模式,以及异步和阻塞。

    #schema_registry_converter该库提供了一种以符合 Java 客户端的方式使用 Confluent Schema Registry 的方法...要使用它使用 avro async 进行转换,请使用:[dependencies ]schema_registry_converter = {version =" 2

    Hadoop.The.Definitive.Guide

    总体而言,本书是Hadoop入门与提高的必备读物,不仅仅覆盖了Hadoop的核心技术,还深入介绍了相关项目和实际应用案例,为读者提供了一个全面了解和掌握Hadoop技术的平台。对于那些希望在大数据时代通过Hadoop技术提升...

    Flink入门及实战-下.pptx

    ### Flink 入门及实战知识点详解 #### 一、Flink API 抽象级别概述 Flink 提供了多种 API 抽象级别,以适应不同的应用场景和需求。主要包括: - **DataStream API**:适用于批处理和流处理,提供丰富的转换操作。 ...

    flink学习资料(包含网盘视频地址)

    5. **连接器与格式**:熟悉Flink支持的各种输入输出源,如Kafka、RabbitMQ、HDFS、Cassandra等,以及如何使用Flink的序列化格式,如JSON、Avro、Parquet等。 6. **Table & SQL API**:学习如何使用Table API和SQL...

    尚硅谷大数据技术之Flume

    Flume 是 Cloudera 提供的一个高可用的、 高可靠的、分布式的海量日志采集、聚合和传输的系统。 Flume 基于流式架构,灵活简单。 1.1 Flume 定义 Flume 是一个高可用的、 高可靠的、分布式的海量日志采集、聚合和...

Global site tag (gtag.js) - Google Analytics