记录一下hadoop 数据类型章节的笔记,以便后期使用,本文是边学习边记录,持续更新中
Hadoop 常用自带的数据类型和Java数据类型配比如下
在此首先明确定义下序列化
参考百度百科
序列化 (Serialization)将对象的状态信息转换为可以存储或传输的形式的过程。在序列化期间,对象将其当前状态写入到临时或持久性存储区。以后,可以通过从存储区中读取或反序列化对象的状态,重新创建该对象。
Hadoop自定义类型必须实现的一个接口 Writable 代码如下
write 方法:Serialize the fields of this object to out
readFields:Deserialize the fields of this object from in
实现该接口后,还需要手动实现一个静态方法,在该方法中返回自定义类型的无参构造方法
for example
官方完成例子
WritableComparables can be compared to each other, typically via Comparators. Any type which is to be used as a key in the Hadoop Map-Reduce framework should implement this interface.
如果该自定义类型作为key,那么需要实现 WritableComparable 接口,这个接口实现了两个接口 ,分别为 Comparable<T>, Writable
类似上一段代码 主要新增 compareTo 方法 代码如下
特殊的类型 NullWritable
NullWritable是Writable的一个特殊类,序列化的长度为0,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。
特殊的类型 ObjectWritable
ObjectWritable 是对java 基本类型的一个通用封装:用于客户端与服务器间传输的Writable对象,也是对RPC传输对象的封装,因为RPC上交换的信息只能是JAVA的基础数据类型,String或者Writable类型,而ObjectWritable是对其子类的抽象封装
ObjectWritable会往流里写入如下信息:
对象类名,对象自己的串行化结果
其序列化和反序列化方法如下:
特殊的类型 GenericWritable
例如一个reduce中的输入从多个map中获,然而各个map的输出value类型都不同,这就需要 GenericWritable 类型 map端用法如下
在reduce 中用法如下
自定义MyGenericWritable如下
Hadoop 常用自带的数据类型和Java数据类型配比如下
Hadoop类型 | Java类型 | 描述 |
BooleanWritable | boolean | 布尔型 |
IntWritable | int | 整型 |
FloatWritable | float | 浮点float |
DoubleWritable | double | 浮点型double |
ByteWritable | byte | 整数类型byte |
Text | String | 字符串型 |
ArrayWritable | Array | 数组型 |
在此首先明确定义下序列化
参考百度百科
序列化 (Serialization)将对象的状态信息转换为可以存储或传输的形式的过程。在序列化期间,对象将其当前状态写入到临时或持久性存储区。以后,可以通过从存储区中读取或反序列化对象的状态,重新创建该对象。
Hadoop自定义类型必须实现的一个接口 Writable 代码如下
public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
write 方法:Serialize the fields of this object to out
readFields:Deserialize the fields of this object from in
实现该接口后,还需要手动实现一个静态方法,在该方法中返回自定义类型的无参构造方法
for example
public static MyWritable read(DataInput in) throws IOException { MyWritable w = new MyWritable(); w.readFields(in); return w; }
官方完成例子
public class MyWritable implements Writable { // Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public static MyWritable read(DataInput in) throws IOException { MyWritable w = new MyWritable(); w.readFields(in); return w; } }
WritableComparables can be compared to each other, typically via Comparators. Any type which is to be used as a key in the Hadoop Map-Reduce framework should implement this interface.
如果该自定义类型作为key,那么需要实现 WritableComparable 接口,这个接口实现了两个接口 ,分别为 Comparable<T>, Writable
类似上一段代码 主要新增 compareTo 方法 代码如下
public int compareTo(MyWritableComparable w) { int thisValue = this.value; int thatValue = ((IntWritable)o).value; return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); }
特殊的类型 NullWritable
NullWritable是Writable的一个特殊类,序列化的长度为0,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符,如在MapReduce中,如果你不需要使用键或值,你就可以将键或值声明为NullWritable,NullWritable是一个不可变的单实例类型。
特殊的类型 ObjectWritable
ObjectWritable 是对java 基本类型的一个通用封装:用于客户端与服务器间传输的Writable对象,也是对RPC传输对象的封装,因为RPC上交换的信息只能是JAVA的基础数据类型,String或者Writable类型,而ObjectWritable是对其子类的抽象封装
ObjectWritable会往流里写入如下信息:
对象类名,对象自己的串行化结果
其序列化和反序列化方法如下:
public void readFields(DataInput in) throws IOException { readObject(in, this, this.conf); } public void write(DataOutput out) throws IOException { writeObject(out, instance, declaredClass, conf); } public static void writeObject(DataOutput out, Object instance, Class declaredClass, Configuration conf) throws IOException { //对象为空则抽象出内嵌数据类型NullInstance if (instance == null) { // null instance = new NullInstance(declaredClass, conf); declaredClass = Writable.class; } //先写入类名 UTF8.writeString(out, declaredClass.getName()); // always write declared /* * 封装的对象为数组类型,则逐个序列化(序列化为length+对象的序列化内容) * 采用了迭代 */ if (declaredClass.isArray()) { // array int length = Array.getLength(instance); out.writeInt(length); for (int i = 0; i < length; i++) { writeObject(out, Array.get(instance, i), declaredClass.getComponentType(), conf); } //为String类型直接写入 } else if (declaredClass == String.class) { // String UTF8.writeString(out, (String)instance); }//基本数据类型写入 else if (declaredClass.isPrimitive()) { // primitive type if (declaredClass == Boolean.TYPE) { // boolean out.writeBoolean(((Boolean)instance).booleanValue()); } else if (declaredClass == Character.TYPE) { // char out.writeChar(((Character)instance).charValue()); } else if (declaredClass == Byte.TYPE) { // byte out.writeByte(((Byte)instance).byteValue()); } else if (declaredClass == Short.TYPE) { // short out.writeShort(((Short)instance).shortValue()); } else if (declaredClass == Integer.TYPE) { // int out.writeInt(((Integer)instance).intValue()); } else if (declaredClass == Long.TYPE) { // long out.writeLong(((Long)instance).longValue()); } else if (declaredClass == Float.TYPE) { // float out.writeFloat(((Float)instance).floatValue()); } else if (declaredClass == Double.TYPE) { // double out.writeDouble(((Double)instance).doubleValue()); } else if (declaredClass == Void.TYPE) { // void } else { throw new IllegalArgumentException("Not a primitive: "+declaredClass); } //枚举类型写入 } else if (declaredClass.isEnum()) { // enum UTF8.writeString(out, ((Enum)instance).name()); //hadoop的Writable类型写入 } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable UTF8.writeString(out, instance.getClass().getName()); ((Writable)instance).write(out); } else { throw new IOException("Can't write: "+instance+" as "+declaredClass); } } public static Object readObject(DataInput in, Configuration conf) throws IOException { return readObject(in, null, conf); } /** Read a {<a href="http://my.oschina.net/link1212" class="referer" target="_blank">@link</a> Writable}, {<a href="http://my.oschina.net/link1212" class="referer" target="_blank">@link</a> String}, primitive type, or an array of * the preceding. */ @SuppressWarnings("unchecked") public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf) throws IOException { //获取反序列化的名字 String className = UTF8.readString(in); //假设为基本数据类型 Class<?> declaredClass = PRIMITIVE_NAMES.get(className); /* * 判断是否为基本数据类型,不是则为空,则为Writable类型, * 对于Writable类型从Conf配置文件中读取类名, * 在这里只是获取类名,而并没有反序列化对象 */ if (declaredClass == null) { try { declaredClass = conf.getClassByName(className); } catch (ClassNotFoundException e) { throw new RuntimeException("readObject can't find class " + className, e); } } //基本数据类型 Object instance; //为基本数据类型,逐一反序列化 if (declaredClass.isPrimitive()) { // primitive types if (declaredClass == Boolean.TYPE) { // boolean instance = Boolean.valueOf(in.readBoolean()); } else if (declaredClass == Character.TYPE) { // char instance = Character.valueOf(in.readChar()); } else if (declaredClass == Byte.TYPE) { // byte instance = Byte.valueOf(in.readByte()); } else if (declaredClass == Short.TYPE) { // short instance = Short.valueOf(in.readShort()); } else if (declaredClass == Integer.TYPE) { // int instance = Integer.valueOf(in.readInt()); } else if (declaredClass == Long.TYPE) { // long instance = Long.valueOf(in.readLong()); } else if (declaredClass == Float.TYPE) { // float instance = Float.valueOf(in.readFloat()); } else if (declaredClass == Double.TYPE) { // double instance = Double.valueOf(in.readDouble()); } else if (declaredClass == Void.TYPE) { // void instance = null; } else { throw new IllegalArgumentException("Not a primitive: "+declaredClass); } } else if (declaredClass.isArray()) { // array int length = in.readInt(); instance = Array.newInstance(declaredClass.getComponentType(), length); for (int i = 0; i < length; i++) { Array.set(instance, i, readObject(in, conf)); } } else if (declaredClass == String.class) { // String类型的反序列化 instance = UTF8.readString(in); } else if (declaredClass.isEnum()) { // enum的反序列化 instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in)); } else { // Writable Class instanceClass = null; String str = ""; try { //剩下的从Conf对象中获取类型Class str = UTF8.readString(in); instanceClass = conf.getClassByName(str); } catch (ClassNotFoundException e) { throw new RuntimeException("readObject can't find class " + str, e); } /* * 带用了WritableFactories工厂去new instanceClass(实现了Writable接口)对象出来 * 在调用实现Writable对象自身的反序列化方法 */ Writable writable = WritableFactories.newInstance(instanceClass, conf); writable.readFields(in); instance = writable; if (instanceClass == NullInstance.class) { // null declaredClass = ((NullInstance)instance).declaredClass; instance = null; } } //最后存储反序列化后待封装的ObjectWritable对象 if (objectWritable != null) { // store values objectWritable.declaredClass = declaredClass; objectWritable.instance = instance; } return instance; }
特殊的类型 GenericWritable
例如一个reduce中的输入从多个map中获,然而各个map的输出value类型都不同,这就需要 GenericWritable 类型 map端用法如下
context.write(new Text(str), new MyGenericWritable(new LongWritable(1))); context.write(new Text(str), new MyGenericWritable(new Text("1")));
在reduce 中用法如下
for (MyGenericWritable time : values){ //获取MyGenericWritable对象 Writable writable = time.get(); //如果当前是LongWritable类型 if (writable instanceof LongWritable){ count += ((LongWritable) writable).get(); } //如果当前是Text类型 if (writable instanceof Text){ count += Long.parseLong(((Text)writable).toString()); } }
自定义MyGenericWritable如下
class MyGenericWritable extends GenericWritable{ //无参构造函数 public MyGenericWritable() { } //有参构造函数 public MyGenericWritable(Text text) { super.set(text); } //有参构造函数 public MyGenericWritable(LongWritable longWritable) { super.set(longWritable); } @Override protected Class<? extends Writable>[] getTypes() { return new Class[]{LongWritable.class,Text.class}; }
发表评论
-
Sort-based Shuffle的设计与实现
2016-03-15 08:49 818原文 http://www.cnblogs.com/hsea ... -
spark的几个重要概念
2015-12-04 14:09 0本节主要记录以下几个概念 一:RDD的五大特点 二:RDD 窄 ... -
spark部署安装调试
2015-12-02 11:28 743本节记录spark下载-->编译-->安装--&g ... -
spark基本概念
2015-11-12 10:45 796记录一下课堂笔记: ... -
hadoop计算能力调度器配置
2015-10-29 10:39 1024问题出现 hadoop默认调度器是FIFO,其原理就是先按照作 ... -
HBase在各大应用中的优化和改进
2015-10-28 14:59 705Facebook之前曾经透露过Facebook的hbase架构 ... -
一篇很好的解决系统问题过程描述文章
2015-09-23 08:40 506在网上看到的一篇解决h ... -
通过GeoHash核心原理来分析hbase rowkey设计
2015-09-08 15:49 3526注:本文是结合hbase ... -
从OpenTsdb来分析rowkey设计
2015-09-06 16:04 4957讨论此问题前,先理解 ... -
HBase中asynchbase的使用方式
2015-08-25 10:32 8207Hbase的原生java 客户端是完全同步的,当你使用原生AP ... -
Mapreduce优化的点滴
2015-07-16 15:18 840注:转载 1. 使用自定义Writable 自带的Text ... -
napreduce shuffle 过程记录
2015-07-10 11:23 769在我看来 hadoop的核心是mapre ... -
ZooKeeper伪分布式集群安装及使用
2015-02-13 08:29 9271. zookeeper介绍 ZooKeeper是一个为分 ... -
hadoop-mahout 核心算法总结
2015-02-07 10:08 1569其实大家都知道hadoop为我们提供了一个大的框架,真正的 ... -
推荐引擎内部原理--mahout
2015-01-22 11:11 576转载自:https://www.ibm.com/devel ... -
hadoop 动态添加删除节点
2015-01-20 13:39 675转自:http://www.cnblogs.com/rill ... -
hbase hadoop zookeeper
2015-01-19 14:47 0hadoop 部署手册 http://www.iteblo ... -
mapreduce 开发以及部署
2015-01-16 13:56 850前面几篇文章的梳理让我对hadoop新yarn 框架有了一 ... -
hadoop yarn几个问题的记录
2015-01-13 11:48 665本文主要介绍以下几 ... -
hadoop集群部署时候的几个问题记录
2015-01-13 10:24 750本章部署一个hadoop 集群 ...
相关推荐
在Hadoop生态系统中,自定义类型编程是开发者经常会遇到的需求,尤其当处理的数据类型不局限于Hadoop默认支持的基本类型(如IntWritable、Text等)时。本教程将深入探讨如何在MapReduce作业中创建和使用自定义数据...
在标题“Hadoop 自定义 Partitioner 源代码”中,我们可以理解为讨论的是如何创建和理解 Partitioner 的源代码,以便于开发者可以更好地控制 MapReduce job 中的数据分片过程。自定义 Partitioner 可能涉及到以下...
总结来说,自定义数据类型是Hadoop MapReduce模型中不可或缺的一部分,它扩展了Hadoop处理数据的能力,使其能够处理更加复杂和多样化的数据类型。通过实现WritableComparable接口,开发者可以定义自己的数据结构,并...
Hadoop学习笔记—5.自定义类型处理手机上网日志-附件资源
- 对于“增删改查”操作,我们可以定义自定义的Repository接口,扩展Spring Data Hadoop提供的方法,比如`save()`, `delete()`, `findById()`, `findAll()`等。 - 而具体的数据操作逻辑,如MapReduce任务,可以...
- 集群规划:如何根据硬件资源和预期负载来规划节点数量和类型。 - 配置文件详解:如core-site.xml, hdfs-site.xml, mapred-site.xml等,这些配置文件是搭建和管理Hadoop集群的关键。 - NameNode和DataNode:...
- **2.x版本系列**:引入了YARN (Yet Another Resource Negotiator),这是一种新的资源管理和任务调度系统,使得Hadoop能够更好地支持多种类型的数据处理应用程序。 此外,市场上还有几家知名的Hadoop发行商,如...
2. **DumpTypedBytes**: 这可能是用于处理二进制数据的工具类,它可能在读取或写入特定格式的数据时被调用,尤其是在处理自定义数据类型时。 3. **Environment.class**: 这可能涉及到 Hadoop Streaming 中的任务...
1.3WordCount源码分析中,1.3.1特别数据类型介绍了Hadoop自定义的几种数据类型,它们在实现MapReduce程序中扮演重要角色。1.3.2旧的WordCount分析与1.3.3新的WordCount分析,从源码层面解读了旧版和新版的WordCount...
- 如果需要自定义,可以下载hadoop-eclipse-plugin-1.0.4.jar,并将其复制到Eclipse安装目录下的plugins文件夹。 - 之后按照上述简单配置方法进行设置。 二、Hadoop使用基础 1. **Map/Reduce界面**: - 在...
Hadoop提供了一套API,允许开发者处理各种数据格式,如TextInputFormat、SequenceFileInputFormat等,以及自定义InputFormat以适应特定的数据源。 4. **fs**: 文件系统接口(FileSystem API)位于此目录中,它抽象...
你可能会看到如何处理文本数据、CSV数据或其他结构化数据的示例,以及如何自定义Partitioner、Combiner和OutputFormat。 4. **数据分桶与分区**:源代码可能涉及到如何根据特定键值进行数据分区,以优化数据的分布...
用户自定义函数 过滤UDF 计算UDF 加载UDF 数据处理操作 加载和存储数据 过滤数据 分组与连接数据 对数据进行排序 组合和分割数据 Pig实战 并行处理 参数代换 第12章 Hive 1.1 安装Hive ...
《Spark 2.4.7 无Hadoop版本详解及应用》 Spark 2.4.7 是 Apache Spark 的一个重要版本,它是一个流行的...对于那些已经有自定义存储解决方案或不需要Hadoop功能的开发者和数据科学家来说,这个版本无疑是理想的选择。
- MapReduce运行框架的三种类型,分别是本地模式、伪分布式模式和完全分布式模式。 - mapreduce.admin.user.env设置为空值时,在不同操作系统上会有哪些不同的值。 通过这些配置项的对比,用户能够更好地理解Hadoop...
7. **数据输入与输出**:理解Hadoop如何处理不同类型的数据源,如文本文件、CSV、JSON等,并学习使用InputFormat和OutputFormat自定义数据格式。 8. **Hadoop应用实例**:通过具体的案例,如网页日志分析、推荐系统...
用户可以通过自定义InputFormat和OutputFormat来处理特定类型的数据。 8. **故障恢复与容错机制**:Hadoop具有内置的故障检测和恢复机制,如心跳检测、数据块的冗余复制等,以确保系统的稳定运行。如果DataNode或...
Hadoop 提供了分布式文件系统(HDFS)、运算资源调度系统(YARN)以及分布式运算编程框架(MapReduce)等核心组件,支持用户自定义业务逻辑来处理海量数据。 #### Hadoop 背景介绍 - **起源与发展**:Hadoop 最初...
标准安装通常适用于大多数情况,而高级安装则允许用户自定义更多的设置。 - **配置网络**:此步骤包括设置主机名、网络接口等基本信息,以确保Cloudera Manager能够正确地识别和连接到集群中的所有节点。 - **选择...
示例代码会展示如何编写自定义的Mapper和Reducer类来执行特定的数据处理任务。 3. **YARN(Yet Another Resource Negotiator)**:YARN是Hadoop的资源管理系统,负责调度和管理集群上的计算资源。它允许不同类型的...