`
tenght
  • 浏览: 51159 次
社区版块
存档分类
最新评论

[hadoop2.7.1]I/O之序列化(serializer)

 
阅读更多

先来看下org.apache.hadoop.io.serializer的类图(hadoop2.7.1):


由类图看:

接口三个:

1、Deserializer:定义反序列化接口;

2、Serializer:定义序列化接口;
3、Serialization:定义了一系列和序列化相关并相互依赖对象的接口。

依据这三个接口,分别实现了2个类,分别是支持Writable机制的WritableSerialization和支持Java序列化的JavaSerialization,这样一共是6个实现类。

SerilizationFactory:维护一个Serilization的ArrayList。它具有参数为Configuration的构造函数,把parameter io.serializations中逗号隔开的serialization都添加进来。

Deserializer:将字节流转为一个对象。这个接口的方法有:打开流,反序列化,关闭流

源码:

package org.apache.hadoop.io.serializer;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
 * <p>
 * Provides a facility for deserializing objects of type <T> from an
 * {@link InputStream}.
 * </p>
 * 
 * <p>
 * Deserializers are stateful, but must not buffer the input since
 * other producers may read from the input between calls to
 * {@link #deserialize(Object)}.
 * </p>
 * @param <T>
 */
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public interface Deserializer<T> {
  /**
   * <p>Prepare the deserializer for reading.</p>
   */
  void open(InputStream in) throws IOException;
  
  /**
   * <p>
   * Deserialize the next object from the underlying input stream.
   * If the object <code>t</code> is non-null then this deserializer
   * <i>may</i> set its internal state to the next object read from the input
   * stream. Otherwise, if the object <code>t</code> is null a new
   * deserialized object will be created.
   * </p>
   * @return the deserialized object
   */
  T deserialize(T t) throws IOException;
  
  /**
   * <p>Close the underlying input stream and clear up any resources.</p>
   */
  void close() throws IOException;
}


Serializer:将一个对象转换为一个字节流的实现实例,该接口的方法有:打开流,序列化,关闭流

源码:

package org.apache.hadoop.io.serializer;

import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
 * <p>
 * Provides a facility for serializing objects of type <T> to an
 * {@link OutputStream}.
 * </p>
 * 
 * <p>
 * Serializers are stateful, but must not buffer the output since
 * other producers may write to the output between calls to
 * {@link #serialize(Object)}.
 * </p>
 * @param <T>
 */
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public interface Serializer<T> {
  /**
   * <p>Prepare the serializer for writing.</p>
   */
  void open(OutputStream out) throws IOException;
  
  /**
   * <p>Serialize <code>t</code> to the underlying output stream.</p>
   */
  void serialize(T t) throws IOException;
  
  /**
   * <p>Close the underlying output stream and clear up any resources.</p>
   */  
  void close() throws IOException;
}

Serialization:使用抽象工厂的设计模式,封装了一对Serializer/Deserializer,判断是否支持输入的类,根据输入的类给出序列化接口和反序列化接口。

源码:

package org.apache.hadoop.io.serializer;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
 * <p>
 * Encapsulates a {@link Serializer}/{@link Deserializer} pair.
 * </p>
 * @param <T>
 */
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public interface Serialization<T> {
  
  /**
   * Allows clients to test whether this {@link Serialization}
   * supports the given class.
   */
  boolean accept(Class<?> c);
  
  /**
   * @return a {@link Serializer} for the given class.
   */
  Serializer<T> getSerializer(Class<T> c);

  /**
   * @return a {@link Deserializer} for the given class.
   */
  Deserializer<T> getDeserializer(Class<T> c);
}

SerializationFactory :序列化工厂,初始化时从配置项io.serializations中获取序列化工具,默认使用org.apache.hadoop.io.serializer.WritableSerialization作为序列化工具。通过调用getSerializer和getDeserializer来获取序列化与反序列化工具。

源码:

package org.apache.hadoop.io.serializer;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.serializer.avro.AvroReflectSerialization;
import org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization;
import org.apache.hadoop.util.ReflectionUtils;

/**
 * <p>
 * A factory for {@link Serialization}s.
 * </p>
 */
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class SerializationFactory extends Configured {
  
  private static final Log LOG =
    LogFactory.getLog(SerializationFactory.class.getName());

  private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();
  
  /**
   * <p>
   * Serializations are found by reading the <code>io.serializations</code>
   * property from <code>conf</code>, which is a comma-delimited list of
   * classnames.
   * </p>
   */
  public SerializationFactory(Configuration conf) {
    super(conf);
    for (String serializerName : conf.getTrimmedStrings(
      CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
      new String[]{WritableSerialization.class.getName(),
        AvroSpecificSerialization.class.getName(),
        AvroReflectSerialization.class.getName()})) {
      add(conf, serializerName);
    }
  }
  
  @SuppressWarnings("unchecked")
  private void add(Configuration conf, String serializationName) {
    try {
      Class<? extends Serialization> serializionClass =
        (Class<? extends Serialization>) conf.getClassByName(serializationName);
      serializations.add((Serialization)
      ReflectionUtils.newInstance(serializionClass, getConf()));
    } catch (ClassNotFoundException e) {
      LOG.warn("Serialization class not found: ", e);
    }
  }

  public <T> Serializer<T> getSerializer(Class<T> c) {
    Serialization<T> serializer = getSerialization(c);
    if (serializer != null) {
      return serializer.getSerializer(c);
    }
    return null;
  }

  public <T> Deserializer<T> getDeserializer(Class<T> c) {
    Serialization<T> serializer = getSerialization(c);
    if (serializer != null) {
      return serializer.getDeserializer(c);
    }
    return null;
  }

  @SuppressWarnings("unchecked")
  public <T> Serialization<T> getSerialization(Class<T> c) {
    for (Serialization serialization : serializations) {
      if (serialization.accept(c)) {
        return (Serialization<T>) serialization;
      }
    }
    return null;
  }
  
}

下面对SerializationFactory生产Serializations做个简单的解析说明:

首先来看其构造函数里的一个全局参数:CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,它的值定义如下:

  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
  public static final String  IO_SERIALIZATIONS_KEY = "io.serializations";

而使用SerializationFactory的构造函数:publicSerializationFactory(Configurationconf) 时,使用配置文件:Configuration:core-default.xml,core-site.xml。如:
SerializationFactoryfactory=newSerializationFactory(conf);

而在hadoop2.7.1中默认配置文件core-default.xml的io.serializations的属性如下:

<property>
  <name>io.serializations</name>
  <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization<alue>
  <description>A list of serialization classes that can be used for
  obtaining serializers and deserializers.</description>
</property>

由此,通过SerializationFactory生产的Serializations有三个:
org.apache.hadoop.io.serializer.WritableSerialization,
org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,
org.apache.hadoop.io.serializer.avro.AvroReflectSerialization

通过其方法public <T> Serializer<T> getSerializer(Class<T> c),public <T> Serialization<T> getSerialization(Class<T> c)便能得到相应的Serialization:

  public <T> Serializer<T> getSerializer(Class<T> c) {
    Serialization<T> serializer = getSerialization(c);
    if (serializer != null) {
      return serializer.getSerializer(c);
    }
    return null;
  }

<div>@SuppressWarnings("unchecked")
public<T>Serialization<T>getSerialization(Class<T>c){
for(Serializationserialization:serializations){
if(serialization.accept(c))<strong></strong>{                          //注1
return(Serialization<T>)serialization;
}
}
returnnull;
}</div>

注1:if (serialization.accept(c))将会调用相应类的accept函数,例如:如果serialization的值为:org.apache.hadoop.io.serializer.WritableSerialization,则将调用:

  @InterfaceAudience.Private
  @Override
  public boolean accept(Class<?> c) {
    return Writable.class.isAssignableFrom(c);
  }

如果serialization的值为:org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,则将调用:

  @InterfaceAudience.Private
  @Override
  public boolean accept(Class<?> c) {
    return SpecificRecord.class.isAssignableFrom(c);          //注2
  }

注2:

public boolean isAssignableFrom(Class<?>cls)

判定此 Class 对象所表示的类或接口与指定的 Class 参数所表示的类或接口是否相同,或是否是其超类或超接口。如果是则返回 true;否则返回 false。如果该 Class 表示一个基本类型,且指定的 Class 参数正是该 Class 对象,则该方法返回 true;否则返回 false

特别地,通过身份转换或扩展引用转换,此方法能测试指定 Class 参数所表示的类型能否转换为此 Class 对象所表示的类型。有关详细信息,请参阅 Java Language Specification 的第 5.1.1 和 5.1.4 节。

参数:
cls - 要检查的 Class 对象
返回:
表明 cls 类型的对象能否赋予此类对象的 boolean
抛出:
NullPointerException - 如果指定的 Class 参数为 null。








分享到:
评论

相关推荐

    hadoop2.7.1 windows缺少的文件winutils.exe,hadoop.dll

    下载winutils.exe,hadoop.dll放到hadoop环境的bin目录,建议尽量使用版本匹配的,必然hadoop-2.6就使用2.6版本的。2.7版本就使用2.7.。理论上2.7版本可以使用在2.6版本上

    hadoop-2.7.1.zip

    与Hadoop 2.7.1一同提及的还有hive-1.2.1,Hive是基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能。在Hive 1.2.1中,可能包含的改进有: 1. 性能优化,包括更快的查询执行...

    Hadoop安装教程_单机/伪分布式配置_Hadoop2.7.1/Ubuntu 16.04

    Hadoop安装教程_单机/伪分布式配置_Hadoop2.7.1/Ubuntu 16.04 本教程主要讲述了在 Ubuntu 16.04 环境下安装 Hadoop 2.7.1 的步骤,包括单机模式、伪分布式模式和分布式模式三种安装方式。以下是本教程的知识点总结...

    hadoop2.7.1.rar

    标题中的"hadoop2.7.1.rar"表明这是一个关于Apache Hadoop的压缩文件,具体版本为2.7.1。Hadoop是一个开源框架,主要用于分布式存储和计算,它由Apache软件基金会开发,广泛应用于大数据处理领域。这个压缩包可能是...

    hadoop-2.7.1

    10. **安装与部署**:安装Hadoop 2.7.1通常涉及解压`hadoop-2.7.1.tar.gz`,配置环境变量,格式化HDFS,启动守护进程等步骤。集群部署还需要考虑节点间的通信和数据同步。 综上所述,Hadoop 2.7.1是一个强大的大...

    hadoop2.7.1 eclipse插件

    hadoop2.7.1的eclipse插件,编译环境,eclipse 4.4(luna) ,jdk1.7,ant1.9.6,maven3.3,hadoop2.7.1,centos6.7,jdk1.7 要注意的是开发黄金下jdk版本必须是jdk1.7及以上,否则无法使用

    hadoop2.7.1的Windows版本

    5. 运行`bin/start-all.sh`启动Hadoop服务,或者使用Hadoop提供的图形化管理工具如Hadoop UI进行监控。 总之,要在Windows上运行Hadoop 2.7.1,我们需要解决与Unix/Linux不兼容的问题,包括使用模拟Linux环境、修改...

    Hadoop2.7.1中文文档

    Hadoop2.7.1是Hadoop发展中的一个重要版本,它在前一个版本的基础上进行了一系列的优化和改进,增强了系统的稳定性和性能。这个压缩包文件包含的是Hadoop2.7.1的中文文档,对于学习和理解Hadoop的运作机制、配置以及...

    hadoop 2.7.1

    在使用Hadoop时,需要注意数据的分块策略,合理设置Block Size以优化I/O效率。同时,为了保证数据安全,定期进行NameNode的快照备份是必要的。此外,监控系统性能,如磁盘使用率、CPU和内存使用情况,以及网络带宽,...

    hadoop.zip hadoop2.7.1安装包

    在Hadoop2.7.1安装包中,`hadoop-2.7.1.tar.gz`是主要的发布文件,包含了Hadoop的所有组件和依赖库。这个tarball文件通常在Linux环境下使用,通过解压缩可以得到Hadoop的源代码和二进制文件。用户需要配置环境变量、...

    hadoop2.7.1平台搭建

    hadoop2.7.1平台搭建

    eclipse hadoop2.7.1 plugin 配置

    `eclipse hadoop2.7.1 plugin`是为了方便开发者在Eclipse中进行Hadoop项目开发而设计的插件。本文将详细介绍如何配置Eclipse以支持Hadoop 2.7.1,并讨论相关的知识点。 首先,配置Eclipse Hadoop插件的步骤如下: ...

    winutils.exe_hadoop-2.7.1

    《Hadoop Winutils.exe在2.7.1版本中的应用与配置详解》 Hadoop作为一个分布式计算框架,广泛应用于大数据处理领域。在Windows环境中,Winutils.exe和hadoop.dll是Hadoop的重要组成部分,它们为Hadoop在Windows上的...

    hadoop2.7.1-win32.zip

    标题 "hadoop2.7.1-win32.zip" 指示了这是一个适用于Windows 32位操作系统的Hadoop版本,具体为2.7.1。Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在大量计算机节点上处理和存储海量数据。这个...

    hadoop-2.7.1.tar.gz

    标题中的"hadoop-2.7.1.tar.gz"是一个压缩包文件,它是Apache Hadoop的2.7.1版本。Hadoop是一个开源框架,主要用于分布式存储和计算,它使得处理和存储海量数据变得可能。".tar.gz"是Linux/Unix系统中常用的文件压缩...

    hadoop-2.7.1.rar

    同时,此版本还对 HDFS 和 MapReduce 进行了性能优化,例如更快的数据读写速度、更高效的磁盘 I/O 和网络通信等。 在实际应用中,Hadoop 2.7.1 可用于各种场景,如日志分析、推荐系统、图像处理、生物信息学研究等...

    hadoop2.7.1版本的hadoop.dll,winutils.exe

    Hadoop 2.7.1是这个框架的一个重要版本,它包含了各种优化和改进,以提高数据处理的效率和稳定性。在这个版本中,有两个关键的组件是hadoop.dll和winutils.exe,它们在Windows环境下运行Hadoop时扮演着至关重要的...

    Spark所需的hadoop2.7.1相关资源

    Hadoop2.7.1是Hadoop的一个重要版本,它带来了许多改进和优化,而Spark则是一个快速、通用且可扩展的数据处理框架,尤其在处理大规模数据时表现出色。Spark与Hadoop的兼容性是确保大数据工作流流畅运行的关键。 ...

    使用Maven编译Hadoop(2.7.1)

    http://archive.apache.org/dist/hadoop/core/hadoop-2.7.1/hadoop-2.7.1-src.tar.gz ``` - **解压操作**: ``` # tar -zxvf hadoop-2.7.1-src.tar.gz -C /opt ``` 解压完成后,进入到 `/opt/hadoop-2.7.1-...

Global site tag (gtag.js) - Google Analytics