`

hadoop 如何自定义类型

 
阅读更多
记录一下hadoop 数据类型章节的笔记,以便后期使用,本文是边学习边记录,持续更新中

Hadoop 常用自带的数据类型和Java数据类型配比如下

Hadoop类型Java类型描述
BooleanWritableboolean布尔型
IntWritableint整型
FloatWritablefloat浮点float
DoubleWritabledouble浮点型double
ByteWritablebyte整数类型byte
TextString字符串型
ArrayWritableArray数组型



在此首先明确定义下序列化
参考百度百科
序列化 (Serialization)将对象的状态信息转换为可以存储或传输的形式的过程。在序列化期间,对象将其当前状态写入到临时或持久性存储区。以后,可以通过从存储区中读取或反序列化对象的状态,重新创建该对象。


Hadoop自定义类型必须实现的一个接口 Writable 代码如下

public interface Writable {

  void write(DataOutput out) throws IOException;

  void readFields(DataInput in) throws IOException;
}


write 方法:Serialize the fields of this object to out

readFields:Deserialize the fields of this object from in

实现该接口后,还需要手动实现一个静态方法,在该方法中返回自定义类型的无参构造方法

for example
 public static MyWritable read(DataInput in) throws IOException {
         MyWritable w = new MyWritable();
         w.readFields(in);
         return w;
       }



官方完成例子

public class MyWritable implements Writable {
       // Some data     
       private int counter;
       private long timestamp;
       
       public void write(DataOutput out) throws IOException {
         out.writeInt(counter);
         out.writeLong(timestamp);
       }
       
       public void readFields(DataInput in) throws IOException {
         counter = in.readInt();
         timestamp = in.readLong();
       }
       
       public static MyWritable read(DataInput in) throws IOException {
         MyWritable w = new MyWritable();
         w.readFields(in);
         return w;
       }
     }




WritableComparables can be compared to each other, typically via Comparators. Any type which is to be used as a key in the Hadoop Map-Reduce framework should implement this interface.


如果该自定义类型作为key,那么需要实现 WritableComparable 接口,这个接口实现了两个接口 ,分别为 Comparable<T>, Writable

类似上一段代码 主要新增 compareTo 方法 代码如下

  public int compareTo(MyWritableComparable w) {
         int thisValue = this.value;
         int thatValue = ((IntWritable)o).value;
         return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
       }



特殊的类型 NullWritable

NullWritable是Writable的一个特殊类,序列化的长度为0,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。

特殊的类型 ObjectWritable
ObjectWritable 是对java 基本类型的一个通用封装:用于客户端与服务器间传输的Writable对象,也是对RPC传输对象的封装,因为RPC上交换的信息只能是JAVA的基础数据类型,String或者Writable类型,而ObjectWritable是对其子类的抽象封装
ObjectWritable会往流里写入如下信息:

对象类名,对象自己的串行化结果

其序列化和反序列化方法如下:

public void readFields(DataInput in) throws IOException {
    readObject(in, this, this.conf);
  }
   
  public void write(DataOutput out) throws IOException {
    writeObject(out, instance, declaredClass, conf);
  }

public static void writeObject(DataOutput out, Object instance,
                               Class declaredClass, 
                               Configuration conf) throws IOException {
  //对象为空则抽象出内嵌数据类型NullInstance
  if (instance == null) {                       // null
    instance = new NullInstance(declaredClass, conf);
    declaredClass = Writable.class;
  }
  //先写入类名
  UTF8.writeString(out, declaredClass.getName()); // always write declared
  /*
   * 封装的对象为数组类型,则逐个序列化(序列化为length+对象的序列化内容)
   * 采用了迭代
   */
   
  if (declaredClass.isArray()) {                // array
    int length = Array.getLength(instance);
    out.writeInt(length);
    for (int i = 0; i < length; i++) {
      writeObject(out, Array.get(instance, i),
                  declaredClass.getComponentType(), conf);
    }
    //为String类型直接写入
  } else if (declaredClass == String.class) {   // String
    UTF8.writeString(out, (String)instance);
     
  }//基本数据类型写入 
  else if (declaredClass.isPrimitive()) {     // primitive type
 
    if (declaredClass == Boolean.TYPE) {        // boolean
      out.writeBoolean(((Boolean)instance).booleanValue());
    } else if (declaredClass == Character.TYPE) { // char
      out.writeChar(((Character)instance).charValue());
    } else if (declaredClass == Byte.TYPE) {    // byte
      out.writeByte(((Byte)instance).byteValue());
    } else if (declaredClass == Short.TYPE) {   // short
      out.writeShort(((Short)instance).shortValue());
    } else if (declaredClass == Integer.TYPE) { // int
      out.writeInt(((Integer)instance).intValue());
    } else if (declaredClass == Long.TYPE) {    // long
      out.writeLong(((Long)instance).longValue());
    } else if (declaredClass == Float.TYPE) {   // float
      out.writeFloat(((Float)instance).floatValue());
    } else if (declaredClass == Double.TYPE) {  // double
      out.writeDouble(((Double)instance).doubleValue());
    } else if (declaredClass == Void.TYPE) {    // void
    } else {
      throw new IllegalArgumentException("Not a primitive: "+declaredClass);
    }
    //枚举类型写入
  } else if (declaredClass.isEnum()) {         // enum
    UTF8.writeString(out, ((Enum)instance).name());
    //hadoop的Writable类型写入
  } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
    UTF8.writeString(out, instance.getClass().getName());
    ((Writable)instance).write(out);
 
  } else {
    throw new IOException("Can't write: "+instance+" as "+declaredClass);
  }
}

public static Object readObject(DataInput in, Configuration conf)
    throws IOException {
    return readObject(in, null, conf);
  }
     
  /** Read a {<a href="http://my.oschina.net/link1212" class="referer" target="_blank">@link</a> Writable}, {<a href="http://my.oschina.net/link1212" class="referer" target="_blank">@link</a> String}, primitive type, or an array of
   * the preceding. */
  @SuppressWarnings("unchecked")
  public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
    throws IOException {
      //获取反序列化的名字
    String className = UTF8.readString(in);
    //假设为基本数据类型
    Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
    /*
     * 判断是否为基本数据类型,不是则为空,则为Writable类型,
     * 对于Writable类型从Conf配置文件中读取类名,
     * 在这里只是获取类名,而并没有反序列化对象
     */
     
    if (declaredClass == null) {
      try {
        declaredClass = conf.getClassByName(className);
      } catch (ClassNotFoundException e) {
        throw new RuntimeException("readObject can't find class " + className, e);
      }
    }    
    //基本数据类型
    Object instance;
    //为基本数据类型,逐一反序列化
    if (declaredClass.isPrimitive()) {            // primitive types
 
      if (declaredClass == Boolean.TYPE) {             // boolean
        instance = Boolean.valueOf(in.readBoolean());
      } else if (declaredClass == Character.TYPE) {    // char
        instance = Character.valueOf(in.readChar());
      } else if (declaredClass == Byte.TYPE) {         // byte
        instance = Byte.valueOf(in.readByte());
      } else if (declaredClass == Short.TYPE) {        // short
        instance = Short.valueOf(in.readShort());
      } else if (declaredClass == Integer.TYPE) {      // int
        instance = Integer.valueOf(in.readInt());
      } else if (declaredClass == Long.TYPE) {         // long
        instance = Long.valueOf(in.readLong());
      } else if (declaredClass == Float.TYPE) {        // float
        instance = Float.valueOf(in.readFloat());
      } else if (declaredClass == Double.TYPE) {       // double
        instance = Double.valueOf(in.readDouble());
      } else if (declaredClass == Void.TYPE) {         // void
        instance = null;
      } else {
        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
      }
 
    } else if (declaredClass.isArray()) {              // array
      int length = in.readInt();
      instance = Array.newInstance(declaredClass.getComponentType(), length);
      for (int i = 0; i < length; i++) {
        Array.set(instance, i, readObject(in, conf));
      }
       
    } else if (declaredClass == String.class) {        // String类型的反序列化
      instance = UTF8.readString(in);
    } else if (declaredClass.isEnum()) {         // enum的反序列化
      instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
    } else {                                      // Writable
      Class instanceClass = null;
      String str = "";
      try {
          //剩下的从Conf对象中获取类型Class
        str = UTF8.readString(in);
        instanceClass = conf.getClassByName(str);
      } catch (ClassNotFoundException e) {
        throw new RuntimeException("readObject can't find class " + str, e);
      }
      /*
       * 带用了WritableFactories工厂去new instanceClass(实现了Writable接口)对象出来
       * 在调用实现Writable对象自身的反序列化方法
       */
      
      Writable writable = WritableFactories.newInstance(instanceClass, conf);
      writable.readFields(in);
      instance = writable;
 
      if (instanceClass == NullInstance.class) {  // null
        declaredClass = ((NullInstance)instance).declaredClass;
        instance = null;
      }
    }
    //最后存储反序列化后待封装的ObjectWritable对象
    if (objectWritable != null) {                 // store values
      objectWritable.declaredClass = declaredClass;
      objectWritable.instance = instance;
    }
 
    return instance;
       
  }


特殊的类型 GenericWritable
例如一个reduce中的输入从多个map中获,然而各个map的输出value类型都不同,这就需要 GenericWritable 类型  map端用法如下
 context.write(new Text(str), new MyGenericWritable(new LongWritable(1)));
context.write(new Text(str), new MyGenericWritable(new Text("1"))); 

在reduce 中用法如下
for (MyGenericWritable time : values){  
                //获取MyGenericWritable对象  
                Writable writable = time.get();  
                //如果当前是LongWritable类型  
                if (writable instanceof LongWritable){  
                      
                    count += ((LongWritable) writable).get();  
                }  
                //如果当前是Text类型  
                if (writable instanceof Text){  
                    count += Long.parseLong(((Text)writable).toString());  
                }  
            }  


自定义MyGenericWritable如下
class MyGenericWritable extends GenericWritable{  
  
    //无参构造函数  
    public MyGenericWritable() {  
          
    }  
      
    //有参构造函数  
    public MyGenericWritable(Text text) {  
        super.set(text);  
    }  
      
    //有参构造函数  
    public MyGenericWritable(LongWritable longWritable) {  
        super.set(longWritable);  
    }  
  
      
    @Override  
    protected Class<? extends Writable>[] getTypes() {  
          
        return new Class[]{LongWritable.class,Text.class};  
    }  

分享到:
评论

相关推荐

    hadoop自定义类型编程

    在Hadoop生态系统中,自定义类型编程是开发者经常会遇到的需求,尤其当处理的数据类型不局限于Hadoop默认支持的基本类型(如IntWritable、Text等)时。本教程将深入探讨如何在MapReduce作业中创建和使用自定义数据...

    Hadoop 自定义 Partitioner 源代码

    在标题“Hadoop 自定义 Partitioner 源代码”中,我们可以理解为讨论的是如何创建和理解 Partitioner 的源代码,以便于开发者可以更好地控制 MapReduce job 中的数据分片过程。自定义 Partitioner 可能涉及到以下...

    MapReduce模型--自定义数据类型

    总结来说,自定义数据类型是Hadoop MapReduce模型中不可或缺的一部分,它扩展了Hadoop处理数据的能力,使其能够处理更加复杂和多样化的数据类型。通过实现WritableComparable接口,开发者可以定义自己的数据结构,并...

    Hadoop学习笔记—5.自定义类型处理手机上网日志-附件资源

    Hadoop学习笔记—5.自定义类型处理手机上网日志-附件资源

    springboot对hadoop增删改查源码及hadoop图片访问

    - 对于“增删改查”操作,我们可以定义自定义的Repository接口,扩展Spring Data Hadoop提供的方法,比如`save()`, `delete()`, `findById()`, `findAll()`等。 - 而具体的数据操作逻辑,如MapReduce任务,可以...

    Hadoop 官方文档(中文版)

    - 集群规划:如何根据硬件资源和预期负载来规划节点数量和类型。 - 配置文件详解:如core-site.xml, hdfs-site.xml, mapred-site.xml等,这些配置文件是搭建和管理Hadoop集群的关键。 - NameNode和DataNode:...

    大数据之路选择Hadoop还是MaxCompute?Hadoop开源与MaxCompute对比材料

    - **2.x版本系列**:引入了YARN (Yet Another Resource Negotiator),这是一种新的资源管理和任务调度系统,使得Hadoop能够更好地支持多种类型的数据处理应用程序。 此外,市场上还有几家知名的Hadoop发行商,如...

    hadoop-streaming-2.8.0_jar_2.8.0_hadoop_streaming_

    2. **DumpTypedBytes**: 这可能是用于处理二进制数据的工具类,它可能在读取或写入特定格式的数据时被调用,尤其是在处理自定义数据类型时。 3. **Environment.class**: 这可能涉及到 Hadoop Streaming 中的任务...

    Hadoop集群-WordCount运行详解.pdf

    1.3WordCount源码分析中,1.3.1特别数据类型介绍了Hadoop自定义的几种数据类型,它们在实现MapReduce程序中扮演重要角色。1.3.2旧的WordCount分析与1.3.3新的WordCount分析,从源码层面解读了旧版和新版的WordCount...

    Hadoop集群服务器与本地连接

    - 如果需要自定义,可以下载hadoop-eclipse-plugin-1.0.4.jar,并将其复制到Eclipse安装目录下的plugins文件夹。 - 之后按照上述简单配置方法进行设置。 二、Hadoop使用基础 1. **Map/Reduce界面**: - 在...

    hadoop 1.2.1核心源码

    Hadoop提供了一套API,允许开发者处理各种数据格式,如TextInputFormat、SequenceFileInputFormat等,以及自定义InputFormat以适应特定的数据源。 4. **fs**: 文件系统接口(FileSystem API)位于此目录中,它抽象...

    实战hadoop源代码

    你可能会看到如何处理文本数据、CSV数据或其他结构化数据的示例,以及如何自定义Partitioner、Combiner和OutputFormat。 4. **数据分桶与分区**:源代码可能涉及到如何根据特定键值进行数据分区,以优化数据的分布...

    Hadoop权威指南 第二版(中文版)

     用户自定义函数  过滤UDF  计算UDF  加载UDF  数据处理操作  加载和存储数据  过滤数据  分组与连接数据  对数据进行排序  组合和分割数据  Pig实战  并行处理  参数代换 第12章 Hive  1.1 安装Hive ...

    spark-2.4.7-bin-without-hadoop

    《Spark 2.4.7 无Hadoop版本详解及应用》 Spark 2.4.7 是 Apache Spark 的一个重要版本,它是一个流行的...对于那些已经有自定义存储解决方案或不需要Hadoop功能的开发者和数据科学家来说,这个版本无疑是理想的选择。

    Hadoop3.x系统文档

    - MapReduce运行框架的三种类型,分别是本地模式、伪分布式模式和完全分布式模式。 - mapreduce.admin.user.env设置为空值时,在不同操作系统上会有哪些不同的值。 通过这些配置项的对比,用户能够更好地理解Hadoop...

    Hadoop入门手册

    7. **数据输入与输出**:理解Hadoop如何处理不同类型的数据源,如文本文件、CSV、JSON等,并学习使用InputFormat和OutputFormat自定义数据格式。 8. **Hadoop应用实例**:通过具体的案例,如网页日志分析、推荐系统...

    hadoop中文文档

    用户可以通过自定义InputFormat和OutputFormat来处理特定类型的数据。 8. **故障恢复与容错机制**:Hadoop具有内置的故障检测和恢复机制,如心跳检测、数据块的冗余复制等,以确保系统的稳定运行。如果DataNode或...

    hadoop入门学习文档

    Hadoop 提供了分布式文件系统(HDFS)、运算资源调度系统(YARN)以及分布式运算编程框架(MapReduce)等核心组件,支持用户自定义业务逻辑来处理海量数据。 #### Hadoop 背景介绍 - **起源与发展**:Hadoop 最初...

    Cloudera Hadoop 安装指南

    标准安装通常适用于大多数情况,而高级安装则允许用户自定义更多的设置。 - **配置网络**:此步骤包括设置主机名、网络接口等基本信息,以确保Cloudera Manager能够正确地识别和连接到集群中的所有节点。 - **选择...

    《Hadoop权威指南》示例代码

    示例代码会展示如何编写自定义的Mapper和Reducer类来执行特定的数据处理任务。 3. **YARN(Yet Another Resource Negotiator)**:YARN是Hadoop的资源管理系统,负责调度和管理集群上的计算资源。它允许不同类型的...

Global site tag (gtag.js) - Google Analytics