- 浏览: 218848 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (114)
- hbase (3)
- akka (7)
- hdfs (6)
- mapreduce (1)
- hive (0)
- zookeeper (8)
- storm (0)
- geese (0)
- leaf (0)
- stormbase (0)
- scala (2)
- oozie (11)
- zeromq (1)
- netty (3)
- mongodb (0)
- sqoop (2)
- flume (3)
- mahout (1)
- redis (0)
- lucene (1)
- solr (1)
- ganglia (3)
- 分布式理论 (2)
- hadoop (42)
- others (14)
- mq (1)
- clojure (3)
- flume ng (1)
- linux (1)
- esper (0)
最新评论
-
javalogo:
<div class="quote_title ...
什么是Flume -
leibnitz:
what are they meanings
Hadoop Ganglia Metric Item -
di1984HIT:
没用过啊。
akka 介绍-Actor 基础 -
di1984HIT:
写的不错。
Hadoop管理-集群维护 -
developerinit:
很好,基本上介绍了
什么是Flume
序列化
序列化是把结构化的对像转为字节流,以便网络传输或存储到磁盘设备上。反序列化是一个相反的过程,即把字节流转变为一系列的结构化对象。
RPC序列化建议的特性
1.紧凑(Compact)即方便网络传输,充分利用存储空间
2.快速(Fast)即序列化及反序列化性能要好
3.扩展性(Extensible)即协议有变化,可以支持新的需求
4.互操作性(Interoperable)即客户端及服务器端不依赖语言的实现
Hadoop使用Writables,满足紧凑、快速,不满足扩展能及互操作性
Writable 接口
package org.apache.hadoop.io; import java.io.DataOutput; import java.io.DataInput; import java.io.IOException; public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
package com.bigdata.io; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; public final class WritableHelper { public static byte[] serialize(Writable writable) throws IOException{ ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); writable.write(dataOut); dataOut.close(); return out.toByteArray(); } public static byte[] deserialize(Writable writable , byte[] bytes) throws IOException{ ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; } public static void main(String[] args) throws IOException { IntWritable writable = new IntWritable(); writable.set(163); byte[] bytes = serialize(writable); System.out.println(bytes.length+"," + Bytes.toInt(bytes)); deserialize(writable, bytes); System.out.println(bytes.length+"," + Bytes.toInt(bytes)); } }
WritableComparable and comparators
package org.apache.hadoop.io; public interface WritableComparable<T> extends Writable,Comparable<T> { }
Hadoop优化比对,不需要反序列化即可比较
package org.apache.hadoop.io; import java.util.Comparator; public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
WritableComparator 是一个RawComparator通用的实现,为WritableComparable classes.
它做了两件事
1.实现了compare()方法(返序列化)
2.做为RawComparator的工厂类
package com.bigdata.io; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparator; public final class WritableHelper { public static byte[] serialize(Writable writable) throws IOException{ ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); writable.write(dataOut); dataOut.close(); return out.toByteArray(); } public static byte[] deserialize(Writable writable , byte[] bytes) throws IOException{ ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; } public static void main(String[] args) throws IOException { IntWritable writable = new IntWritable(); writable.set(163); byte[] bytes = serialize(writable); System.out.println(bytes.length+"," + Bytes.toInt(bytes)); deserialize(writable, bytes); System.out.println(bytes.length+"," + Bytes.toInt(bytes)); RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class); IntWritable w1 = new IntWritable(163); IntWritable w2 = new IntWritable(67); int result = comparator.compare(w1, w2); System.out.println(result); byte[] b1 = serialize(w1); byte[] b2 = serialize(w2); result = comparator.compare(b1, 0, b1.length, b2, 0, b2.length); System.out.println(result); } }
Writable class
ArrayWritable
TwoDArrayWritable
MapWritable
SortedMapWritable
BooleanWritable
ByteWritable
IntWritable
VIntWritable
FloatWritable
LongWritable
VLongWritable
DoubleWritable
NullWritable
Text
BytesWritable
MD5Hash
ObjectWrtiable
GenericWritable
Java primitive | Writable Implementation | Serialized size(bytes) |
boolean | BooleanWritable | 1 |
byte | ByteWritable | 1 |
short | ShortWritable | 2 |
int | IntWritable | 4 |
VIntWritable | 1-5 | |
float | FloatWritable | 4 |
long | LongWritable | 8 |
VLongWritable | 1-9 | |
double | DoubleWritable | 8 |
Text 比较像Java里的String,最大可以存放2GB数据。
支持charAt(),find()-像String里的indexOf方法
序列化框架
AVRO
Avro Schema 通常使用JSON格式定义
Avro 数据类型及Schemas
Avro primitive types
Type | Description | Schema |
null | The absence of a value | "null" |
boolean | A binary value | "boolean" |
int | 32-bit singed integer | "int" |
long | 64-bit singed integer | "long" |
float | Single precision(32-bit) IEEE 754 floating-point number | "float" |
double | Double precision(64-bit) IEEE 754 floating-point number | "double" |
bytes | Sequence of 8-bit unsigned bytes |
"bytes" |
string | Sequence of Unicode characters | "string" |
Avro 复杂类型
Type | Description | Schema example |
array | An ordered collection of objects. All objects in a particular array must have the same schema. |
{ "type":"array", "items":"long" } |
map | An unordered collection of key-value pairs.Keys must be strings, values may be any type, although within a particular map all values must have the same schema. |
{ "type":"map", "values":"string" |
record | A collection of named fields of any type. |
{ "type":'record", "name": "WeatherRecord", "doc":"A weather reading.", "fields":[ {"name":"year","type":"int"}, {"name":"temperature","type":"int"}, {"name":"stationId","type":"string"} ] } |
enum | A set of named values. |
{ "type":"enum", "name":"Cultery", "doc":"An eating utensil.", "symbols":["KNIFE","FORK","SPOON"] } |
fixed | A fixed number of 8-bit unsigned bytes. |
{ "type":"fixed", "name":"Md5Hash", "size":16 |
union | A union of schemas. A union is represented by a JSON array,where each element in the arry is a schema.Data represented by a union must match one of th eschemas in the union. |
[ "null", "string", {"type":"map","values":"string"} ]
|
package com.bigdata.io.avro; import java.io.ByteArrayOutputStream; import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; public class StringPair { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(StringPair.class.getResourceAsStream("/StringPair.avsc")); //We can create an instance of an Avro record using the generic API as follows GenericRecord datum = new GenericData.Record(schema); datum.put("left", "L"); datum.put("right", "R"); // we serialize the record to an output stream ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); writer.write(datum, encoder); encoder.flush(); out.close(); DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); Decoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(), null); GenericRecord result = reader.read(null, decoder); String r1 = result.get("left").toString(); String r2 = result.get("right").toString(); System.out.println(r1+ ","+r2); } }
{ "type": "record", "name": "StringPair", "doc": "A pair of strings.", "fields": [ {"name":"left", "type": "string"}, {"name":"right", "type": "string"} ] }
Avro 数据文件
数据文件包含Metadata(Avro Schema、sync marker)及一系列Block,Block包含序列化Avro对象。
Block用sync marker分隔。Avro数据文件可以被分割。适合MapReduce处理。
Avro读写例子
package com.bigdata.io.avro; import java.io.File; import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; public class AvroWriteToFile { public static void main(String[] args) throws IOException { Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(AvroWriteToFile.class.getResourceAsStream("/StringPair.avsc")); GenericRecord datum = new GenericData.Record(schema); datum.put("left", "L"); datum.put("right", "R"); File file = new File("data.avro"); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer); dataFileWriter.create(schema, file); dataFileWriter.append(datum); datum.put("left", "is left"); datum.put("right", "is right"); dataFileWriter.append(datum); dataFileWriter.close(); DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); DataFileReader<GenericRecord> fileReader = new DataFileReader<GenericRecord>(file, reader); GenericRecord result = null; for (GenericRecord record : fileReader) { System.out.println(record.get("left")+","+record.get("right")); } fileReader.sync(0); System.out.println(fileReader.getBlockCount()); while(fileReader.hasNext()){ result = fileReader.next(); System.out.println(result.get("left")+","+result.get("right")); } fileReader.close(); } }
StringPair.avsc 增加description,一定要设置默认值,这样原来的规范也可以使用此Schema,同时新的规范也可用。
{ "type": "record", "name": "StringPair", "doc": "A pair of strings.", "fields": [ {"name":"left", "type": "string"}, {"name":"right", "type": "string"}, {"name": "description", "type": ["null", "string"], "default": null} ] }
发表评论
-
Hadoop TestDFSIO
2013-04-21 21:02 2432@VM [bigdata@bigdata hadoo ... -
Hadoop NNBENCH
2013-04-21 20:46 1630@VM [bigdata@bigdata hadoop]$ ... -
Hadoop 安装手册
2013-04-08 15:47 1193Hadoop 安装手册 软件准备 ... -
What do real life hadoop workloads look like
2012-09-10 15:52 833http://www.cloudera.com/blog/20 ... -
CDH4 HA 切换时间
2012-09-05 15:15 4373blocksize:35M filesize 96M zk-s ... -
CDH4 HA 切换
2012-09-05 10:51 1383HA 切换问题 切换时间太长。。。 copy 0 ... ... -
Hadoop CDh4 Standby HA 启动过程
2012-08-02 11:40 2863根据日志: StandBy NN启动过程 1.获得Active ... -
CDH4 HA test
2012-08-01 14:55 2647场景: NN HA 设置成功,HA切换客户端出现异 ... -
Hadoop TextOutput
2012-07-29 21:08 906TextOutputFormat 分隔符参数: mapredu ... -
Hadoop SteamXMLRecordReader
2012-07-28 23:59 704StreamXmlRecordReader 设置属性 str ... -
Hadoop NLineInputFormat
2012-07-28 23:52 1647NLineInputFormat 重写了splits 设置 ... -
KeyValueTextInputFormat
2012-07-28 23:40 954key/value 分割符 mapreduce.input. ... -
Hadoop 控制split尺寸
2012-07-28 23:08 1337三个参数决定Map的Split尺寸 1.mapred.min ... -
Setting up Disks for Hadoop
2012-07-22 12:13 873Setting up Disks for Hadoop He ... -
Upgrade hadoop need think about it
2012-07-21 17:17 884Compatibility When movin ... -
Hadoop 0.23 config differ from 0.20.205
2012-07-21 17:14 923http://hadoop.apache.org/common ... -
Hadoop hdfs block 状态
2012-07-15 13:37 7231.In Service -
Hadoop 配置不当引起集群不稳
2012-07-05 15:35 1025配置不当内容 资源配置不当:内存、文件句柄数量、磁盘空间 ... -
Hadoop管理-集群维护
2012-07-03 15:27 50051.检查HDFS状态 fsck命令 1)f ... -
Hadoop Ganglia Metric Item
2012-06-27 11:13 2024dfs.FSDirectory.files_delete ...
相关推荐
4. **数据序列化和反序列化**:在Kafka和HDFS之间传输数据时,可能需要进行序列化和反序列化操作,如JSON或protobuf格式转换。 5. **错误处理和容错机制**:设计健壮的错误处理机制,确保在出现网络故障、数据异常等...
8. **protobuf-java-2.5.0.jar**:Protocol Buffers,Google的一种数据序列化协议,Hadoop内部可能使用它进行数据交换。 9. **log4j-1.2.17.jar**:日志框架,用于记录HDFS操作的日志信息。 10. **commons-...
`protobuf-java-2.5.0.jar`是Google的Protocol Buffers,一种高效的序列化框架,常用于Hadoop中的数据交换和持久化。 `commons-collections-3.2.2.jar`是Apache Commons Collections库,提供了对Java集合框架的增强...
ReadAndWritePatterns是读取序列文件,并解析其key和value,然后可视化输出,所谓可视化输出是指不是乱码的输出,具体可以参考http://blog.csdn.net/fansy1990中《hadoop解析序列文件并可视化输出》
- **io.serializations**: 指定支持的序列化类,这里使用的是`org.apache.hadoop.io.serializer.WritableSerialization`,即默认的序列化方式。 - **io.file.buffer.size**: 文件IO操作时使用的缓冲区大小,默认为...
这些jar包不仅提供了访问HDFS的接口,还包括了网络通信、数据序列化、错误处理和集群管理等功能。在Eclipse中,你可以通过构建路径设置这些依赖项,确保你的代码能正确编译并运行在Hadoop环境中。 开发时,你可能会...
标题中的“Hadoop序列化机制”是指Hadoop生态系统中用于数据传输和存储的数据表示方式,它是Hadoop MapReduce和Hadoop Distributed File System (HDFS)等组件之间交换数据的关键技术。序列化是将对象转化为可存储或...
本文将对HDFS源码进行详细的分析和整理,涵盖了HDFS的目录结构、对象序列化、压缩、RPC机制、DataNode工作状态等多个方面。 目录结构分析 HDFS的目录结构是HDFS架构的基础,理解HDFS的目录结构是ucceeding HDFS源码...
在MapReduce编程模型中,开发者通常需要处理一系列关键任务,包括数据序列化、排序、分区、分组以及计算TopN值。以下将详细介绍这些概念及其在Hadoop环境中的实现。 一、自定义序列化 在MapReduce中,数据通常以...
Java 中 Spark 中将对象序列化存储到 HDFS 概述: 在 Spark 应用中,经常会遇到这样一个需求:需要将 Java 对象序列化并存储到 HDFS,尤其是利用 MLlib 计算出来的一些模型,存储到 HDFS 以便模型可以反复利用。在...
标题中的知识点主要涉及Python连接HDFS进行文件上传下载以及Pandas转换文本文件到CSV的操作。在描述中,提到了Python在Linux环境下与HDFS交互的需求,以及使用Pandas进行数据处理的场景。从标签中我们可以进一步了解...
4. `lib/*`:Hadoop项目的lib目录下还可能包含其他依赖的JAR包,如`slf4j-api.jar`(日志框架)、`protobuf.jar`(Google的协议缓冲区库,用于序列化和反序列化数据)以及各种网络和安全相关的库。 在实际应用中,...
本节课程主要介绍 HDFS 的概念、HDFS 存储架构和数据读写流程、HDFS 的 Shell 命令、Java 程序操作 HDFS、Hadoop 序列化、Hadoop 小文件处理、通信机制 RPC 等知识点。 HDFS 概念 HDFS(Hadoop Distributed File ...
Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有着高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上。而且它提供高传输率(high throughput)来访问应用程序的...
02-hadoop中的序列化机制.avi 03-流量求和mr程序开发.avi 04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi 06-shuffle机制.avi 07-mr程序的组件全貌.avi 08-textinputformat对切片规划的源码...
- **序列化机制**:为了满足Hadoop MapReduce和HDFS的通信需求,Hadoop采用了自定义的序列化机制而不是Java自带的序列化方式。这一机制主要体现在`org.apache.hadoop.io`包中的各类可序列化对象,它们实现了`...
- 重写序列化(write)和反序列化(read)方法。 - 序列化和反序列化顺序需保持一致。 - 用toString()方法定制输出格式,方便后续处理。 - 若作为key,还需实现Comparable接口,因MapReduce的shuffle过程会排序...
1. **导入必要的库**:首先需要导入`hdfs`库以及`json`库,用于序列化JSON数据。 ```python from hdfs import Client import json ``` 2. **连接HDFS**:使用`Client`类来连接到HDFS。 ```python client = ...