`

HDFS-序列化

 
阅读更多

序列化

 

序列化是把结构化的对像转为字节流,以便网络传输或存储到磁盘设备上。反序列化是一个相反的过程,即把字节流转变为一系列的结构化对象。

 

RPC序列化建议的特性

1.紧凑(Compact)即方便网络传输,充分利用存储空间

2.快速(Fast)即序列化及反序列化性能要好

3.扩展性(Extensible)即协议有变化,可以支持新的需求

4.互操作性(Interoperable)即客户端及服务器端不依赖语言的实现

 

Hadoop使用Writables,满足紧凑、快速,不满足扩展能及互操作性

 

Writable 接口

 

 

package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;

public interface Writable {
    void write(DataOutput out) throws IOException;
    void readFields(DataInput in) throws IOException;
}
 

 

 

package com.bigdata.io;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;

public final class WritableHelper {

	public static byte[] serialize(Writable writable) throws IOException{
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		DataOutputStream dataOut = new DataOutputStream(out);
		writable.write(dataOut);
		dataOut.close();
		return out.toByteArray();
	}
	
	public static byte[] deserialize(Writable writable , byte[] bytes) throws IOException{
		ByteArrayInputStream in = new ByteArrayInputStream(bytes);
		DataInputStream dataIn = new DataInputStream(in);
		writable.readFields(dataIn);
		dataIn.close();
		return bytes;
	}
	
	public static void main(String[] args) throws IOException {
		IntWritable writable = new IntWritable();
		writable.set(163);
		byte[] bytes = serialize(writable);
		System.out.println(bytes.length+"," + Bytes.toInt(bytes));
		deserialize(writable, bytes);
		System.out.println(bytes.length+"," + Bytes.toInt(bytes));
	}
}

 

 

 

WritableComparable and comparators

package org.apache.hadoop.io;
public interface WritableComparable<T> extends Writable,Comparable<T> {
}
 
Hadoop优化比对,不需要反序列化即可比较
package org.apache.hadoop.io;
import java.util.Comparator;
public interface RawComparator<T> extends Comparator<T> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
 WritableComparator 是一个RawComparator通用的实现,为WritableComparable classes. 
 它做了两件事
 1.实现了compare()方法(返序列化)
 2.做为RawComparator的工厂类
package com.bigdata.io;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparator;

public final class WritableHelper {

	public static byte[] serialize(Writable writable) throws IOException{
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		DataOutputStream dataOut = new DataOutputStream(out);
		writable.write(dataOut);
		dataOut.close();
		return out.toByteArray();
	}
	
	public static byte[] deserialize(Writable writable , byte[] bytes) throws IOException{
		ByteArrayInputStream in = new ByteArrayInputStream(bytes);
		DataInputStream dataIn = new DataInputStream(in);
		writable.readFields(dataIn);
		dataIn.close();
		return bytes;
	}
	
	public static void main(String[] args) throws IOException {
		IntWritable writable = new IntWritable();
		writable.set(163);
		byte[] bytes = serialize(writable);
		System.out.println(bytes.length+"," + Bytes.toInt(bytes));
		deserialize(writable, bytes);
		System.out.println(bytes.length+"," + Bytes.toInt(bytes));
		RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class);
		IntWritable w1 = new IntWritable(163);
		IntWritable w2 = new IntWritable(67);
		int result = comparator.compare(w1, w2);
		System.out.println(result);
		
		byte[] b1 = serialize(w1);
		byte[] b2 = serialize(w2);
		result = comparator.compare(b1, 0, b1.length, b2, 0, b2.length);
		System.out.println(result);
	}
}
 
Writable class
ArrayWritable
TwoDArrayWritable
MapWritable
SortedMapWritable
BooleanWritable
ByteWritable
IntWritable
VIntWritable
FloatWritable
LongWritable
VLongWritable
DoubleWritable
NullWritable
Text
BytesWritable
MD5Hash
ObjectWrtiable
GenericWritable

Java primitive Writable Implementation Serialized size(bytes)
boolean BooleanWritable 1
byte ByteWritable 1
short ShortWritable 2
int IntWritable 4
VIntWritable 1-5
float FloatWritable 4
long LongWritable 8

VLongWritable 1-9
double DoubleWritable 8


Text 比较像Java里的String,最大可以存放2GB数据。
支持charAt(),find()-像String里的indexOf方法

序列化框架
AVRO

Avro Schema 通常使用JSON格式定义

Avro 数据类型及Schemas
Avro primitive types
Type Description Schema
null The absence of a value "null"
boolean  A binary value "boolean"
int 32-bit singed integer "int"
long 64-bit singed integer "long"
float Single precision(32-bit) IEEE 754 floating-point number "float"
double Double precision(64-bit) IEEE 754 floating-point number "double"
bytes Sequence of 8-bit unsigned bytes
"bytes"
string Sequence of Unicode characters "string"

Avro 复杂类型
Type Description Schema example
array An ordered collection of objects. All objects in a particular array must have the same schema.

{

 "type":"array",

  "items":"long"

}

map An unordered collection of key-value pairs.Keys must be strings, values may be any type, although within a particular map all values must have the same schema.

{

 "type":"map",

 "values":"string"

record A collection of named fields of any type.

{

  "type":'record",

  "name": "WeatherRecord",

"doc":"A weather reading.",

"fields":[

 {"name":"year","type":"int"},

 {"name":"temperature","type":"int"},

 {"name":"stationId","type":"string"}

 ]

}

enum A set of named values.

{

"type":"enum",

"name":"Cultery",

"doc":"An eating utensil.",

"symbols":["KNIFE","FORK","SPOON"]

}

fixed A fixed number of 8-bit unsigned bytes.

{

"type":"fixed",

"name":"Md5Hash",

"size":16

union A union of schemas. A union is represented by a JSON array,where each element in the arry is a schema.Data represented by a union must match one of th eschemas in the union.

[

  "null",

  "string",

   {"type":"map","values":"string"}

]

 



package com.bigdata.io.avro;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;

public class StringPair {

	/**
	 * @param args
	 * @throws IOException 
	 */
	public static void main(String[] args) throws IOException {
		Schema.Parser parser = new Schema.Parser();
		Schema schema = parser.parse(StringPair.class.getResourceAsStream("/StringPair.avsc"));
		
		//We can create an instance of an Avro record using the generic API as follows
		GenericRecord datum = new GenericData.Record(schema);
		datum.put("left", "L");
		datum.put("right", "R");
		
		// we serialize the record to an output stream
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
		Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
		writer.write(datum, encoder);
		encoder.flush();
		out.close();
		
		DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
		Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(), null);
		GenericRecord result = reader.read(null, decoder);
		String r1 = result.get("left").toString();
		String r2 = result.get("right").toString();
		System.out.println(r1+ ","+r2);
	}

}
{
	"type": "record",
	"name":	"StringPair",
	"doc":	"A pair of strings.",
	"fields": [
		{"name":"left", "type": "string"},
		{"name":"right", "type": "string"}
	]
}
 

Avro 数据文件
数据文件包含Metadata(Avro Schema、sync marker)及一系列Block,Block包含序列化Avro对象。
Block用sync marker分隔。Avro数据文件可以被分割。适合MapReduce处理。

Avro读写例子

package com.bigdata.io.avro;

import java.io.File;
import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;

public class AvroWriteToFile {

	public static void main(String[] args) throws IOException {
		Schema.Parser parser = new Schema.Parser();
		Schema schema = parser.parse(AvroWriteToFile.class.getResourceAsStream("/StringPair.avsc"));
		GenericRecord datum = new GenericData.Record(schema);
		datum.put("left", "L");
		datum.put("right", "R");
		
		File file = new File("data.avro");
		DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
		DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer);
		dataFileWriter.create(schema, file);
		dataFileWriter.append(datum);
		datum.put("left", "is left");
		datum.put("right", "is right");
		dataFileWriter.append(datum);
		dataFileWriter.close();
		
		DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
		DataFileReader<GenericRecord> fileReader = new DataFileReader<GenericRecord>(file, reader);
		GenericRecord result = null;
		for (GenericRecord record : fileReader) {
			System.out.println(record.get("left")+","+record.get("right"));
		}
		fileReader.sync(0);
		System.out.println(fileReader.getBlockCount());
		while(fileReader.hasNext()){
			result = fileReader.next();
			System.out.println(result.get("left")+","+result.get("right"));
		}
		fileReader.close();
	}
}
StringPair.avsc 增加description,一定要设置默认值,这样原来的规范也可以使用此Schema,同时新的规范也可用。
{
	"type": "record",
	"name":	"StringPair",
	"doc":	"A pair of strings.",
	"fields": [
		{"name":"left", "type": "string"},
		{"name":"right", "type": "string"},
		{"name": "description", "type": ["null", "string"], "default": null}
	]
}
 

 
分享到:
评论

相关推荐

    webhdfs-dataloader-v1

    4. **数据序列化和反序列化**:在Kafka和HDFS之间传输数据时,可能需要进行序列化和反序列化操作,如JSON或protobuf格式转换。 5. **错误处理和容错机制**:设计健壮的错误处理机制,确保在出现网络故障、数据异常等...

    访问hdfs用到的客户端jar包

    8. **protobuf-java-2.5.0.jar**:Protocol Buffers,Google的一种数据序列化协议,Hadoop内部可能使用它进行数据交换。 9. **log4j-1.2.17.jar**:日志框架,用于记录HDFS操作的日志信息。 10. **commons-...

    集群环境下hdfs jar包

    `protobuf-java-2.5.0.jar`是Google的Protocol Buffers,一种高效的序列化框架,常用于Hadoop中的数据交换和持久化。 `commons-collections-3.2.2.jar`是Apache Commons Collections库,提供了对Java集合框架的增强...

    读取HDFS序列化文件解析key、value可视化输出

    ReadAndWritePatterns是读取序列文件,并解析其key和value,然后可视化输出,所谓可视化输出是指不是乱码的输出,具体可以参考http://blog.csdn.net/fansy1990中《hadoop解析序列文件并可视化输出》

    HDFS实验手册.pdf

    - **io.serializations**: 指定支持的序列化类,这里使用的是`org.apache.hadoop.io.serializer.WritableSerialization`,即默认的序列化方式。 - **io.file.buffer.size**: 文件IO操作时使用的缓冲区大小,默认为...

    Hadoop2.2.0 HDFS开发依赖的jar包

    这些jar包不仅提供了访问HDFS的接口,还包括了网络通信、数据序列化、错误处理和集群管理等功能。在Eclipse中,你可以通过构建路径设置这些依赖项,确保你的代码能正确编译并运行在Hadoop环境中。 开发时,你可能会...

    Hadoop序列化机制

    标题中的“Hadoop序列化机制”是指Hadoop生态系统中用于数据传输和存储的数据表示方式,它是Hadoop MapReduce和Hadoop Distributed File System (HDFS)等组件之间交换数据的关键技术。序列化是将对象转化为可存储或...

    hdfs源码分析整理

    本文将对HDFS源码进行详细的分析和整理,涵盖了HDFS的目录结构、对象序列化、压缩、RPC机制、DataNode工作状态等多个方面。 目录结构分析 HDFS的目录结构是HDFS架构的基础,理解HDFS的目录结构是ucceeding HDFS源码...

    16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN

    在MapReduce编程模型中,开发者通常需要处理一系列关键任务,包括数据序列化、排序、分区、分组以及计算TopN值。以下将详细介绍这些概念及其在Hadoop环境中的实现。 一、自定义序列化 在MapReduce中,数据通常以...

    java 中Spark中将对象序列化存储到hdfs

    Java 中 Spark 中将对象序列化存储到 HDFS 概述: 在 Spark 应用中,经常会遇到这样一个需求:需要将 Java 对象序列化并存储到 HDFS,尤其是利用 MLlib 计算出来的一些模型,存储到 HDFS 以便模型可以反复利用。在...

    Python连接HDFS实现文件上传下载及Pandas转换文本文件到CSV操作

    标题中的知识点主要涉及Python连接HDFS进行文件上传下载以及Pandas转换文本文件到CSV的操作。在描述中,提到了Python在Linux环境下与HDFS交互的需求,以及使用Pandas进行数据处理的场景。从标签中我们可以进一步了解...

    hadoop之hdfs中所依赖jar

    4. `lib/*`:Hadoop项目的lib目录下还可能包含其他依赖的JAR包,如`slf4j-api.jar`(日志框架)、`protobuf.jar`(Google的协议缓冲区库,用于序列化和反序列化数据)以及各种网络和安全相关的库。 在实际应用中,...

    《Hadoop大数据开发实战》教学教案—03HDFS分布式文件系统.pdf

    本节课程主要介绍 HDFS 的概念、HDFS 存储架构和数据读写流程、HDFS 的 Shell 命令、Java 程序操作 HDFS、Hadoop 序列化、Hadoop 小文件处理、通信机制 RPC 等知识点。 HDFS 概念 HDFS(Hadoop Distributed File ...

    Hadoop大数据零基础实战培训教程- Avro数据序列化系统(1)

    Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有着高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上。而且它提供高传输率(high throughput)来访问应用程序的...

    新版Hadoop视频教程 段海涛老师Hadoop八天完全攻克Hadoop视频教程 Hadoop开发

    02-hadoop中的序列化机制.avi 03-流量求和mr程序开发.avi 04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi 06-shuffle机制.avi 07-mr程序的组件全貌.avi 08-textinputformat对切片规划的源码...

    Hadoop源码分析 完整版 共55章

    - **序列化机制**:为了满足Hadoop MapReduce和HDFS的通信需求,Hadoop采用了自定义的序列化机制而不是Java自带的序列化方式。这一机制主要体现在`org.apache.hadoop.io`包中的各类可序列化对象,它们实现了`...

    最新Hadoop的面试题总结

    - 重写序列化(write)和反序列化(read)方法。 - 序列化和反序列化顺序需保持一致。 - 用toString()方法定制输出格式,方便后续处理。 - 若作为key,还需实现Comparable接口,因MapReduce的shuffle过程会排序...

    python存数据到hdfs.docx

    1. **导入必要的库**:首先需要导入`hdfs`库以及`json`库,用于序列化JSON数据。 ```python from hdfs import Client import json ``` 2. **连接HDFS**:使用`Client`类来连接到HDFS。 ```python client = ...

Global site tag (gtag.js) - Google Analytics