`

MapReduce的自制Writable分组输出及组内排序

 
阅读更多
 MapReduce的自制Writable分组输出及组内排序
2013-09-03 10:50:51

输入文件格式如下:

name1    2

name3    4

name1    6

name1    1

name3    3

name1    0

要求输出的文件格式如下:

name1    0,1,2,6

name3    3,4

要求是按照第一列分组,name1与name3也是按照顺序排列的,组内升序排序

思路:

常规的输出,无法排序key所对应的多个值的顺序。为了排序组内中的值,需要将key与value放在同一个组。Job中有两个方法setGroupingComparatorClass和setSortComparatorClass,可以利用这两个方法来实现组内排序。但是这些排序都是基于key的,则就要将key和value定义成组合键。

但是必须要保证第一列相同的全部都放在同一个分区中,则就需要自定义分区,分区的时候只考虑第一列的值。由于partitioner仅仅能保证每一个reducer接受同一个name的所有记录,但是reducer仍然是通过键进行分组的分区,也就说该分区中还是按照键来分成不同的组,还需要分组只参考name值

先按照name分组,再在name中内部进行排序。

解决方法:

运用自定义组合键的策略,将name和1定义为一个组合键。在分区的时候只参考name的值,即继承partitioner。

 由于要按照name分组,则就需要定义分组策略,然后设置setGroupingComparatorClass。

setGroupingComparatorClass主要定义哪些key可以放置在一组,分组的时候会对组合键进行比较,由于这里只需要考虑组合键中的一个值,则定义实现一个WritableComparator,设置比较策略。

对于组内的排序,可以利用setSortComparatorClass来实现,

这个方法主要用于定义key如何进行排序在它们传递给reducer之前,

这里就可以来进行组内排序。

具体代码:

     Hadoop版本号:hadoop1.1.2

自定义组合键

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package whut;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
//自定义组合键策略
//java基本类型数据
public class TextInt implements WritableComparable{
    //直接利用java的基本数据类型
    private String firstKey;
    private int secondKey;
    //必须要有一个默认的构造函数
    public String getFirstKey() {
        return firstKey;
    }
    public void setFirstKey(String firstKey) {
        this.firstKey = firstKey;
    }
    public int getSecondKey() {
        return secondKey;
    }
    public void setSecondKey(int secondKey) {
        this.secondKey = secondKey;
    }
                                                                                                                                                                          
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(firstKey);
        out.writeInt(secondKey);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        firstKey=in.readUTF();
        secondKey=in.readInt();
    }
    //map的键的比较就是根据这个方法来进行的
    @Override
    public int compareTo(Object o) {
        // TODO Auto-generated method stub
        TextInt ti=(TextInt)o;
        //利用这个来控制升序或降序
        //this本对象写在前面代表是升序
        //this本对象写在后面代表是降序
        return this.getFirstKey().compareTo(ti.getFirstKey());
    }
}

分组策略

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package whut;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//主要就是对于分组进行排序,分组只按照组建键中的一个值进行分组
public class TextComparator extends WritableComparator {
    //必须要调用父类的构造器
    protected TextComparator() {
        super(TextInt.class,true);//注册comparator
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextInt ti1=(TextInt)a;
        TextInt ti2=(TextInt)b;
        return ti1.getFirstKey().compareTo(ti2.getFirstKey());
    }
}

组内排序策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package whut;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//分组内部进行排序,按照第二个字段进行排序
public class TextIntComparator extends WritableComparator {
    public TextIntComparator()
    {
        super(TextInt.class,true);
    }
    //这里可以进行排序的方式管理
    //必须保证是同一个分组的
    //a与b进行比较
    //如果a在前b在后,则会产生升序
    //如果a在后b在前,则会产生降序
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // TODO Auto-generated method stub
        TextInt ti1=(TextInt)a;
        TextInt ti2=(TextInt)b;
        //首先要保证是同一个组内,同一个组的标识就是第一个字段相同
        if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
           return ti1.getFirstKey().compareTo(ti2.getFirstKey());
        else
           return ti2.getSecondKey()-ti1.getSecondKey();//0,-1,1
    }
                                                                                                                                                          
}

分区策略

 

1
2
3
4
5
6
7
8
9
10
11
package whut;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
//参数为map的输出类型
public class KeyPartitioner extends Partitioner<TextInt, IntWritable> {
    @Override
    public int getPartition(TextInt key, IntWritable value, int numPartitions) {
        // TODO Auto-generated method stub
        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    }
}

MapReduce策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package whut;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//需要对数据进行分组以及组内排序的时候
public class SortMain extends Configured implements Tool{
    //这里设置输入文格式为KeyValueTextInputFormat
    //name1 5
    //默认输入格式都是Text,Text
    public static class GroupMapper extends
       Mapper<Text, Text, TextInt, IntWritable>  {
        public IntWritable second=new IntWritable();
        public TextInt tx=new TextInt();
        @Override
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            String lineKey=key.toString();
            String lineValue=value.toString();
            int lineInt=Integer.parseInt(lineValue);
            tx.setFirstKey(lineKey);
            tx.setSecondKey(lineInt);
            second.set(lineInt);
            context.write(tx, second);
        }
    }
    //设置reduce
    public static class GroupReduce extends Reducer<TextInt, IntWritable, Text, Text>
    {
        @Override
        protected void reduce(TextInt key, Iterable<IntWritable> values,
               Context context)
                throws IOException, InterruptedException {
            StringBuffer sb=new StringBuffer();
            for(IntWritable val:values)
            {
                sb.append(val+",");
            }
            if(sb.length()>0)
            {
                sb.deleteCharAt(sb.length()-1);
            }
            context.write(new Text(key.getFirstKey()), new Text(sb.toString()));
        }
    }
                                                                                                                                        
    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf=getConf();
        Job job=new Job(conf,"SecondarySort");
        job.setJarByClass(SortMain.class);
        // 设置输入文件的路径,已经上传在HDFS
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 设置输出文件的路径,输出文件也存在HDFS中,但是输出目录不能已经存在
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
                                                                                                                                            
        job.setMapperClass(GroupMapper.class);
        job.setReducerClass(GroupReduce.class);
        //设置分区方法
        job.setPartitionerClass(KeyPartitioner.class);
                                                                                                                                            
        //下面这两个都是针对map端的
        //设置分组的策略,哪些key可以放置到一组中
    /*************关键点**********/
        //key 的第一次排序在 sortandspill 这是map端发生的
job.setSortComparatorClass(TextIntComparator.class);
//这里就可以设置对组内如何排序的方法
       //key的第二次排序 这是在reduce端之前 sort and shuffle中
        job.setGroupingComparatorClass(TextComparator.class);
        //设置key如何进行排序在传递给reducer之前.
        
     
        //设置输入文件格式
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        //使用默认的输出格式即TextInputFormat
        //设置map的输出key和value类型
        job.setMapOutputKeyClass(TextInt.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reduce的输出key和value类型
        //job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.waitForCompletion(true);
        int exitCode=job.isSuccessful()?0:1;
        return exitCode;
    }
                                                                                                                                        
    public static void main(String[] args)  throws Exception
    {
       int exitCode=ToolRunner.run(new SortMain(), args);
       System.exit(exitCode);
    }
}

注意事项

   1,设置分组排序按照升序还是降序是在自定义WritableComparable中的compareTo()方法实现的,具体升序或者降序的设置在代码中已经注释说明

   2,设置组内值进行升序还是降序的排序是在组内排序策略中的compare()方法注释说明的。

   3,这里同时最重要的一点是,将第二列即放在组合键中,又作为value,这样对于组合键排序也就相当于对于value进行排序了。

   4,在自定义组合键的时候,对于组合键中的数据的基本类型可以采用Java的基本类型也可以采用Hadoop的基本数据类型,对于Hadoop的基本数据类型一定要记得初始化new一个基本数据类型对象。对于组合键类,必须要有默认的构造方法。

本文出自 “在云端的追梦” 博客,请务必保留此出处http://computerdragon.blog.51cto.com/6235984/1287721

分享到:
评论

相关推荐

    MapReduce2中自定义排序分组

    本文将详细探讨在 MapReduce2 中如何实现自定义排序和分组,以满足特定的数据处理需求。 首先,了解 MapReduce 的工作流程是必要的。Map 阶段将输入数据分割成多个块,并在各个节点上并行处理。Reduce 阶段则负责...

    利用采样器实现mapreduce任务输出全排序

    利用采样器实现mapreduce任务输出全排序大数据-MapReduce

    mapreduce二次排序

    以文件`test.txt`为例,假设它包含历年气温数据,我们可以读取这些数据,通过MapReduce的二次排序处理,最后得到的结果将是按照年份升序排列,每一年内的气温则按照降序排列。`secondarySort`可能是一个示例程序或...

    MapReduce多路径输入输出

    这是 MapReduce 的多路径输入输出示例代码。有关大数据的相关文章可以阅读我的专栏:《大数据之Hadoop》 http://blog.csdn.net/column/details/bumblebee-hadoop.html

    mapreduce wc单词计数 自定义分区 自定义排序实现

    2. **Shuffle阶段**:MapReduce框架按照键(这里是单词)对输出进行分区和排序,将相同键的记录聚集在一起。 3. **Reduce阶段**:Reducer接收到所有相同键的记录,将它们的值(这里是计数1)求和,生成 `(word, ...

    分布式文件系统实例-mapreduce-排序

    分布式文件系统实例——MapReduce排序 MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。它将大型任务分解为可并行处理的小任务,通过“Map”阶段进行数据预处理,然后在“Reduce”...

    MapReduce模型--二次排序

    MapReduce模型中的二次排序是大数据处理中一项重要的技术,它通过多层排序功能来优化数据处理性能。二次排序的核心思想在于对Key(键)进行分层排序,而不能对Value(值)进行排序。这主要是因为MapReduce框架设计时...

    【MapReduce篇04】MapReduce之OutputFormat数据输出1

    在Hadoop MapReduce框架中,OutputFormat扮演着至关重要的角色,它是定义如何将Mapper和Reducer产生的中间结果转化为最终输出格式的规范。MapReduce之OutputFormat数据输出主要涉及到以下几个方面: 1. **...

    MapReduce二次排序

    MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序

    MapReduce类型及格式

    MapReduce模型的核心思想是将任务分解为两个阶段:Map(映射)阶段和Reduce(归约)阶段,其输入和输出均为键值对(key-value pair)。 在MapReduce模型中,Map阶段通常处理输入文件中的数据,将输入数据集拆分为...

    实验项目 MapReduce 编程

    实验项目“MapReduce 编程”旨在让学生深入理解并熟练运用MapReduce编程模型,这是大数据处理领域中的核心技术之一。实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 ...

    Hadoop MapReduce高级特性

    Hadoop内置计数器主要分为几组,包括MapReduce任务计数器组、文件系统计数器组、文件输入格式计数器组、文件输出格式计数器组、作业计数器组等。这些计数器能够提供任务执行过程的详细信息,如物理内存占用、虚拟...

    大数据与云计算培训学习资料 Hadoop的MapReduce中多文件输出 共9页.pdf

    【大数据与云计算培训学习资料 Hadoop的MapReduce中多文件输出】 MapReduce是Hadoop框架的核心部分,主要用于处理大规模数据的分布式计算。在默认情况下,MapReduce任务的输出是一个单一的文件,由TextOutputFormat...

    基于MapReduce的网页排序算法

    **基于MapReduce的网页排序算法** 网页排序是搜索引擎优化中的一个重要环节,旨在确定网页在搜索结果中的排列顺序。其中,PageRank是Google最早使用的网页重要性算法,它通过计算网页之间的链接关系来评估其重要性...

    MapReduce.docx

    Mapper的输出数据经过分区和排序后,准备进入Reduce阶段。 **Reduce阶段**: Reduce阶段对Map阶段产生的中间结果进行聚合和汇总。ReduceTask进程会调用用户定义的Reducer类,Reducer接收已分区且排序过的键值对,...

    Hadoop中MapReduce基本案例及代码(五)

    对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。

    16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN

    在MapReduce编程模型中,开发者通常需要处理一系列关键任务,包括数据序列化、排序、分区、分组以及计算TopN值。以下将详细介绍这些概念及其在Hadoop环境中的实现。 一、自定义序列化 在MapReduce中,数据通常以...

    mapreduce案例代码及案例涉及文件

    排序确保了相同键的值会被分到一起,而分组则将相同键的值聚合成一组,以便Reduce函数处理。案例可能包含关于如何配置和实现这些功能的代码示例。 **分区(Partitioning)**: 在某些情况下,我们可能希望控制中间...

    MapReduce实现单词计数并排序.zip_mapReduce_云计算_单词计数_统计单词_输出前三

    在这个场景中,我们将使用MapReduce来实现一个特定的应用,即对文本文档中的单词进行计数,并按照频率进行排序,最后输出出现频率最高的前三个单词。 **Map阶段** 在Map阶段,输入的数据被分割成多个块,每个块由一...

Global site tag (gtag.js) - Google Analytics