一、Hadoop序列化
1、序列化(Serialization)是指把结构化对象转化为字节流。
2、反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。
3、Java序列化(java.io.Serializable)
二、序列化格式特点:
1、紧凑:高效使用存储空间。
2、快速:读写数据的额外开销小
3、可扩展:可透明地读取老格式的数据
4、互操作:支持多语言的交互
三、Hadoop的序列化格式:Writable
四、Hadoop序列化的作用
1、序列化在分布式环境的两大作用:进程间通信,永久存储。
2、Hadoop节点间通信。
五、使用hadoop内置的序列化类(不使用自定义序列化类),实现流量统计的功能。
public class TrafficApp1 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration(), TrafficApp1.class.getSimpleName());
job.setJarByClass(TrafficApp1.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
Text k2 = new Text();
Text v2 = new Text();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splited = line.split("\t");
k2.set(splited[1]);
//将得到的数据拼接成String字符串,用于reduce输入使用
v2.set(splited[6]+"\t"+splited[7]+"\t"+ splited[8]+"\t"+ splited[9]);
context.write(k2, v2);
}
}
public static class MyReduce extends Reducer<Text, Text, Text, Text>{
Text v3 = new Text();
@Override
protected void reduce(Text k2, Iterable<Text> v2s,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
long t1 = 0L;
long t2 = 0L;
long t3 = 0L;
long t4 = 0L;
String[] splited = null;
for(Text v2 : v2s){
//将map输入的字符串分割解析并计算
splited = v2.toString().split("\t");
t1 += Long.parseLong(splited[0]);
t2 += Long.parseLong(splited[1]);
t3 += Long.parseLong(splited[2]);
t4 += Long.parseLong(splited[3]);
}
//输出格式化的字符串
v3.set(t1+"\t"+t2+"\t"+t3+"\t"+t4);
context.write(k2, v3);
}
}
}
六、自定义序列化类
public class TrafficApp {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration(), TrafficApp.class.getSimpleName());
job.setJarByClass(TrafficApp.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TrafficWritable.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TrafficWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{
Text k2 = new Text();
TrafficWritable v2 = new TrafficWritable();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, TrafficWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splited = line.split("\t");
k2.set(splited[1]);
v2.set(splited[6], splited[7], splited[8], splited[9]);
context.write(k2, v2);
}
}
public static class MyReduce extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{
TrafficWritable v3 = new TrafficWritable();
@Override
protected void reduce(Text k2, Iterable<TrafficWritable> v2s,
Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context)
throws IOException, InterruptedException {
long t1 = 0L;
long t2 = 0L;
long t3 = 0L;
long t4 = 0L;
for(TrafficWritable v2 : v2s){
t1 += v2.t1;
t2 += v2.t2;
t3 += v2.t3;
t4 += v2.t4;
}
v3.set(t1, t2, t3, t4);
context.write(k2, v3);
}
}
static class TrafficWritable implements Writable{
long t1;
long t2;
long t3;
long t4;
public TrafficWritable(){}
public void set(long t1,long t2,long t3,long t4){
this.t1 = t1;
this.t2 = t2;
this.t3 = t3;
this.t4 = t4;
}
public void set(String t1,String t2,String t3, String t4){
this.t1 = Long.parseLong(t1);
this.t2 = Long.parseLong(t2);
this.t3 = Long.parseLong(t3);
this.t4 = Long.parseLong(t4);
}
public void readFields(DataInput in) throws IOException {
this.t1 = in.readLong();
this.t2 = in.readLong();
this.t3 = in.readLong();
this.t4 = in.readLong();
}
public void write(DataOutput out) throws IOException {
out.writeLong(t1);
out.writeLong(t2);
out.writeLong(t3);
out.writeLong(t4);
}
@Override
public String toString() {
return this.t1+"\t"+t2+"\t"+t3+"\t"+t4+"\t";
}
}
}
分享到:
相关推荐
本篇文章将深入探讨Hadoop的序列化机制,以及如何自定义Bean对象实现序列化接口。 **2.1 序列化概述** 序列化是将内存中的对象转化为可存储或可传输的数据格式的过程,而反序列化则是相反的操作,将这些数据恢复为...
文档中提到了Hadoop I/O,包括压缩、序列化框架以及Avro和Sequence File等文件格式的介绍。序列化是数据结构转换成能够进行网络传输或存储到文件中的格式的过程。数据序列化之后,可以被压缩以减少存储空间的使用和...
### Hadoop vs. Spark - **数据处理速度**:Spark通常比Hadoop快,特别是在迭代算法和实时数据处理方面。 - **易用性**:Spark提供了更简洁和更现代的API。 - **内存使用**:Spark优化了内存使用,而Hadoop ...
一、自定义序列化 在MapReduce中,数据通常以键值对的形式传输,因此需要进行序列化和反序列化。Hadoop提供了默认的Writables接口,如IntWritable、Text等,但当处理自定义对象时,我们需要实现WritableComparable...
Hadoop的序列化方式不同于Java的标准序列化,它使用了一种称为`Writable`的自定义格式。`Writable`接口设计得更加紧凑且高效,旨在满足大数据处理场景的需求,它的主要特点是: 1. **紧凑**:`Writable`序列化格式...
1. **提供一个无参数的构造函数**:在反序列化过程中,Hadoop需要通过反射调用此构造函数来创建新实例。 2. **重写`write()`方法**:该方法负责将对象的数据写入`DataOutput`流,如`DataOutputStream`。在这个例子中...
此外,Hadoop I/O是Hadoop生态中对输入输出处理的重要部分,包括了数据格式化和序列化。Hadoop定义了Writable接口,用于实现自定义数据类型,以便MapReduce能处理复杂的数据结构。它也支持了Avro和SequenceFile等...
这个案例展示了如何在Hadoop环境中利用自定义序列化类处理和分析数据。通过实现`Writable`接口,我们可以创建自己的数据类型,使得MapReduce能够理解和处理这些自定义对象。在实际应用中,这样的灵活性对于处理复杂...
六、自定义类型序列化优化 对于性能敏感的应用,可以考虑使用更高效的序列化库,如Kryo或FastJavaSerialization,以减少数据转换的开销。Hadoop提供了一种称为`GenericWritable`的抽象类,可以帮助你快速实现自定义...
Hadoop提供了一套序列化接口,使用它可以通过自定义的数据类型与网络和磁盘上的存储系统进行交互。 5. 压缩:在Hadoop中,数据在存储之前和传输过程中可以进行压缩,以减少存储空间和提高网络传输效率。压缩分为几...
- **序列化机制**:为了满足Hadoop MapReduce和HDFS的通信需求,Hadoop采用了自定义的序列化机制而不是Java自带的序列化方式。这一机制主要体现在`org.apache.hadoop.io`包中的各类可序列化对象,它们实现了`...
总结来说,Hadoop的序列化机制主要基于`Writable`接口,通过自定义的序列化和反序列化方法实现数据的转换。`ObjectWritable`作为通用的载体,适应了RPC通信中不同类型的对象传输。`WritableFactories`则是保证`...
在Java和其他编程语言中,有多种序列化实现方法,包括Java自带的序列化、Writable接口(常见于Hadoop生态系统)以及Avro。下面将详细介绍这三种序列化方式。 1. **Java自带的序列化** Java内置的序列化机制是通过...
框架默认处理键值对的序列化和反序列化,但自定义序列化器可以按需求使用。 9. **应用示例**:Hadoop广泛应用于大数据处理场景,如Nutch搜索引擎的PageRank计算、QQ空间的日志分析(PV, UV统计)等。 10. **运行...
**自定义序列化和反序列化** 如果默认的序列化方式不能满足需求,可以重写`writeObject()`和`readObject()`方法来自定义序列化过程: ```java private void writeObject(ObjectOutputStream out) throws ...
源代码中可见,自定义序列化的实现非常直接,通常涉及两个主要方法:write(DataOutput out)和readFields(DataInput in)。例如,如果要实现一个自定义的可序列化类MyWritable,我们会编写write方法以写出对象的状态到...
02-hadoop中的序列化机制.avi 03-流量求和mr程序开发.avi 04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi 06-shuffle机制.avi 07-mr程序的组件全貌.avi 08-textinputformat对切片规划的源码...