2013-09-03 10:50:51
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package whut;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
//自定义组合键策略 //java基本类型数据 public class TextInt implements WritableComparable{
//直接利用java的基本数据类型
private String firstKey;
private int secondKey;
//必须要有一个默认的构造函数
public String getFirstKey() {
return firstKey;
}
public void setFirstKey(String firstKey) {
this .firstKey = firstKey;
}
public int getSecondKey() {
return secondKey;
}
public void setSecondKey( int secondKey) {
this .secondKey = secondKey;
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(firstKey);
out.writeInt(secondKey);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
firstKey=in.readUTF();
secondKey=in.readInt();
}
//map的键的比较就是根据这个方法来进行的
@Override
public int compareTo(Object o) {
// TODO Auto-generated method stub
TextInt ti=(TextInt)o;
//利用这个来控制升序或降序
//this本对象写在前面代表是升序
//this本对象写在后面代表是降序
return this .getFirstKey().compareTo(ti.getFirstKey());
}
} |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
package whut;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组 public class TextComparator extends WritableComparator {
//必须要调用父类的构造器
protected TextComparator() {
super (TextInt. class , true ); //注册comparator
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
TextInt ti1=(TextInt)a;
TextInt ti2=(TextInt)b;
return ti1.getFirstKey().compareTo(ti2.getFirstKey());
}
} |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package whut;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//分组内部进行排序,按照第二个字段进行排序 public class TextIntComparator extends WritableComparator {
public TextIntComparator()
{
super (TextInt. class , true );
}
//这里可以进行排序的方式管理
//必须保证是同一个分组的
//a与b进行比较
//如果a在前b在后,则会产生升序
//如果a在后b在前,则会产生降序
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
TextInt ti1=(TextInt)a;
TextInt ti2=(TextInt)b;
//首先要保证是同一个组内,同一个组的标识就是第一个字段相同
if (!ti1.getFirstKey().equals(ti2.getFirstKey()))
return ti1.getFirstKey().compareTo(ti2.getFirstKey());
else
return ti2.getSecondKey()-ti1.getSecondKey(); //0,-1,1
}
} |
1
2
3
4
5
6
7
8
9
10
11
|
package whut;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
//参数为map的输出类型 public class KeyPartitioner extends Partitioner<TextInt, IntWritable> {
@Override
public int getPartition(TextInt key, IntWritable value, int numPartitions) {
// TODO Auto-generated method stub
return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
}
} |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
package whut;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//需要对数据进行分组以及组内排序的时候 public class SortMain extends Configured implements Tool{
//这里设置输入文格式为KeyValueTextInputFormat
//name1 5
//默认输入格式都是Text,Text
public static class GroupMapper extends
Mapper<Text, Text, TextInt, IntWritable> {
public IntWritable second= new IntWritable();
public TextInt tx= new TextInt();
@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
String lineKey=key.toString();
String lineValue=value.toString();
int lineInt=Integer.parseInt(lineValue);
tx.setFirstKey(lineKey);
tx.setSecondKey(lineInt);
second.set(lineInt);
context.write(tx, second);
}
}
//设置reduce
public static class GroupReduce extends Reducer<TextInt, IntWritable, Text, Text>
{
@Override
protected void reduce(TextInt key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
StringBuffer sb= new StringBuffer();
for (IntWritable val:values)
{
sb.append(val+ "," );
}
if (sb.length()> 0 )
{
sb.deleteCharAt(sb.length()- 1 );
}
context.write( new Text(key.getFirstKey()), new Text(sb.toString()));
}
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf=getConf();
Job job= new Job(conf, "SecondarySort" );
job.setJarByClass(SortMain. class );
// 设置输入文件的路径,已经上传在HDFS
FileInputFormat.addInputPath(job, new Path(args[ 0 ]));
// 设置输出文件的路径,输出文件也存在HDFS中,但是输出目录不能已经存在
FileOutputFormat.setOutputPath(job, new Path(args[ 1 ]));
job.setMapperClass(GroupMapper. class );
job.setReducerClass(GroupReduce. class );
//设置分区方法
job.setPartitionerClass(KeyPartitioner. class );
//下面这两个都是针对map端的
//设置分组的策略,哪些key可以放置到一组中
/*************关键点**********/
//key 的第一次排序在 sortandspill 这是map端发生的 job.setSortComparatorClass( TextIntComparator. class );
//这里就可以设置对组内如何排序的方法
//key的第二次排序 这是在reduce端之前 sort and shuffle中
job.setGroupingComparatorClass( TextComparator. class );
//设置key如何进行排序在传递给reducer之前.
//设置输入文件格式
job.setInputFormatClass(KeyValueTextInputFormat. class );
//使用默认的输出格式即TextInputFormat
//设置map的输出key和value类型
job.setMapOutputKeyClass(TextInt. class );
job.setMapOutputValueClass(IntWritable. class );
//设置reduce的输出key和value类型
//job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text. class );
job.setOutputValueClass(Text. class );
job.waitForCompletion( true );
int exitCode=job.isSuccessful()? 0 : 1 ;
return exitCode;
}
public static void main(String[] args) throws Exception
{
int exitCode=ToolRunner.run( new SortMain(), args);
System.exit(exitCode);
}
} |
相关推荐
本文将详细探讨在 MapReduce2 中如何实现自定义排序和分组,以满足特定的数据处理需求。 首先,了解 MapReduce 的工作流程是必要的。Map 阶段将输入数据分割成多个块,并在各个节点上并行处理。Reduce 阶段则负责...
利用采样器实现mapreduce任务输出全排序大数据-MapReduce
以文件`test.txt`为例,假设它包含历年气温数据,我们可以读取这些数据,通过MapReduce的二次排序处理,最后得到的结果将是按照年份升序排列,每一年内的气温则按照降序排列。`secondarySort`可能是一个示例程序或...
这是 MapReduce 的多路径输入输出示例代码。有关大数据的相关文章可以阅读我的专栏:《大数据之Hadoop》 http://blog.csdn.net/column/details/bumblebee-hadoop.html
2. **Shuffle阶段**:MapReduce框架按照键(这里是单词)对输出进行分区和排序,将相同键的记录聚集在一起。 3. **Reduce阶段**:Reducer接收到所有相同键的记录,将它们的值(这里是计数1)求和,生成 `(word, ...
分布式文件系统实例——MapReduce排序 MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。它将大型任务分解为可并行处理的小任务,通过“Map”阶段进行数据预处理,然后在“Reduce”...
MapReduce模型中的二次排序是大数据处理中一项重要的技术,它通过多层排序功能来优化数据处理性能。二次排序的核心思想在于对Key(键)进行分层排序,而不能对Value(值)进行排序。这主要是因为MapReduce框架设计时...
在Hadoop MapReduce框架中,OutputFormat扮演着至关重要的角色,它是定义如何将Mapper和Reducer产生的中间结果转化为最终输出格式的规范。MapReduce之OutputFormat数据输出主要涉及到以下几个方面: 1. **...
MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序
MapReduce模型的核心思想是将任务分解为两个阶段:Map(映射)阶段和Reduce(归约)阶段,其输入和输出均为键值对(key-value pair)。 在MapReduce模型中,Map阶段通常处理输入文件中的数据,将输入数据集拆分为...
实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...
Hadoop内置计数器主要分为几组,包括MapReduce任务计数器组、文件系统计数器组、文件输入格式计数器组、文件输出格式计数器组、作业计数器组等。这些计数器能够提供任务执行过程的详细信息,如物理内存占用、虚拟...
【大数据与云计算培训学习资料 Hadoop的MapReduce中多文件输出】 MapReduce是Hadoop框架的核心部分,主要用于处理大规模数据的分布式计算。在默认情况下,MapReduce任务的输出是一个单一的文件,由TextOutputFormat...
**基于MapReduce的网页排序算法** 网页排序是搜索引擎优化中的一个重要环节,旨在确定网页在搜索结果中的排列顺序。其中,PageRank是Google最早使用的网页重要性算法,它通过计算网页之间的链接关系来评估其重要性...
Mapper的输出数据经过分区和排序后,准备进入Reduce阶段。 **Reduce阶段**: Reduce阶段对Map阶段产生的中间结果进行聚合和汇总。ReduceTask进程会调用用户定义的Reducer类,Reducer接收已分区且排序过的键值对,...
对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。
在MapReduce编程模型中,开发者通常需要处理一系列关键任务,包括数据序列化、排序、分区、分组以及计算TopN值。以下将详细介绍这些概念及其在Hadoop环境中的实现。 一、自定义序列化 在MapReduce中,数据通常以...
排序确保了相同键的值会被分到一起,而分组则将相同键的值聚合成一组,以便Reduce函数处理。案例可能包含关于如何配置和实现这些功能的代码示例。 **分区(Partitioning)**: 在某些情况下,我们可能希望控制中间...
在这个场景中,我们将使用MapReduce来实现一个特定的应用,即对文本文档中的单词进行计数,并按照频率进行排序,最后输出出现频率最高的前三个单词。 **Map阶段** 在Map阶段,输入的数据被分割成多个块,每个块由一...