`
ghost_face
  • 浏览: 54408 次
社区版块
存档分类
最新评论

MapReduce实现二次排序及GroupingComparator的误区

 
阅读更多

之前在网上看到了一篇使用MapReduce实现二次排序的博客,自己尝试实现了,并测试成功,代码有所改动。链接如下:

http://blog.csdn.net/zyj8170/article/details/7530728

 

所谓的二次排序:对Key和Val都进行排序(比如升序),并输出。对Key的自动排序,MapReduce可以替我们解决,但是同时对Val进行排序,则需要其他的做法。

做法一:对每个Key的所有Val,添加到ArrayList,使用Collections.sort方法进行排序,虽然能够实现,但是存在隐患,如果key的val值很多,那么对每个Key的ArrayList消耗的内存就很大,效率不高;

做法二:定制MapReduce的IO类型,定制GroupingComparator类,实现二次排序,通过定制,将默认的MR执行的方式改成自定义的,效率比较高,易于扩展;

 

现在开始实现做法二。

一、准备待排序的数据(数据跟原博客一样),见第三部分的结果对比:

二、代码如下

 

package com;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;

public class SecondSort extends Configured implements Tool {

//自定义的类型,(参见本博客的MapReduce定制类型)
    static class IntPair implements WritableComparable<IntPair> {
        private int a;
        private int b;

        public IntPair() {
            a = 0;
            b = 0;
        }

        public int getA() {
            return a;
        }

        public void setA(int a) {
            this.a = a;
        }

        public int getB() {
            return b;
        }

        public void setB(int b) {
            this.b = b;
        }

        public void set(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public int compareTo(IntPair o) {
            if (this.a == o.a) {
                if (this.b == o.b)
                    return 0;
                else
                    return this.b > o.b ? 1 : -1;
            } else
                return this.a > o.a ? 1 : -1;
        }

        @Override
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(a);
            dataOutput.writeInt(b);
        }

        @Override
        public void readFields(DataInput dataInput) throws IOException {
            a = dataInput.readInt();
            b = dataInput.readInt();
        }
    }

    static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer strTok = new StringTokenizer(value.toString());
            int a = Integer.parseInt(strTok.nextToken());
            int b = Integer.parseInt(strTok.nextToken());
            IntPair mykey = new IntPair();
            mykey.set(a, b);
            context.write(mykey, new IntWritable(b));
        }
    }

    static class MyKeyGroupComparator extends WritableComparator {
        MyKeyGroupComparator() {
            super(IntPair.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            IntPair ip1 = (IntPair) a;
            IntPair ip2 = (IntPair) b;
            if (ip1.a == ip2.a)
                return 0;
            else
                return ip1.a > ip2.a ? 1 : -1;
        }
    }

    static class Reduce extends Reducer<IntPair, IntWritable, IntWritable, IntWritable> {
        @Override
        protected void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            IntWritable myKey = new IntWritable(key.getA());
          //显式的分隔分组,便于查看
            context.write(new IntWritable(999999999), null);
            for(IntWritable i :values){
                context.write(myKey, i);
            }
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        //path是HDFS的路径字符串
        String path = "/my/inputTest/Test_SecondSort.txt";
        if (strings.length != 1) {
            System.out.println("input:" + path);
            System.out.print("arg:<out>");
            return 1;
        }
        Configuration conf = getConf();
        Job job = new Job(conf, "SecondSort");
        job.setJarByClass(SecondSort.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setGroupingComparatorClass(MyKeyGroupComparator.class);

        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        job.setNumReduceTasks(1);

        FileInputFormat.addInputPath(job, new Path(path));
        FileOutputFormat.setOutputPath(job, new Path(strings[0]));

        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        int rst = ToolRunner.run(conf, new SecondSort(), args);
        System.exit(rst);
    }
}

 三、见结果对比表,结果为结果数据1(999999999是Group分隔符):

 

 四、如果将Group的标准从根据Inpair的a的值判断改成b的值判断,Group部分代码修改如下(其余不变):

    static class MyKeyGroupComparator extends WritableComparator {
        MyKeyGroupComparator() {
            super(IntPair.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            IntPair ip1 = (IntPair) a;
            IntPair ip2 = (IntPair) b;
//            if (ip1.a == ip2.a)
//                return 0;
//            else
//                return ip1.a > ip2.a ? 1 : -1;
            if (ip1.b == ip2.b)
                return 0;
            else
                return ip1.b > ip2.b ? 1 : -1;
        }
    }

 结果见结果对比表的结果数据2.

五、结果对比表:

原始数据 结果数据1 结果数据2 结果说明

20 21 

50 51 

50 52 

50 53 

50 54 

60 51 

60 53 

60 52 

60 56 

60 57 

70 58 

60 61 

70 54 

70 55 

70 56 

70 57 

70 58 

1 2 

3 4 

5 6 

7 82 

203 21 

50 512 

50 522 

50 53 

530 54 

40 511 

20 53 

20 522 

60 56 

60 57 

740 58 

63 61 

730 54 

71 55 

71 56 

73 57 

74 58 

12 211 

31 42 

50 62 

7 8

999999999

1 2

999999999

3 4

999999999

7 8

7 82

999999999

12 211

999999999

20 21

20 53

20 522

999999999

31 42

999999999

40 511

999999999

50 51

50 52

50 53

50 54

……

999999999

1 2

999999999

3 4

999999999

7 8

999999999

7 82

999999999

12 211

999999999

20 21

999999999

20 53

999999999

20 522

999999999

31 42

999999999

40 511

999999999

50  51

999999999

50  52

999999999

50  53

999999999

50  54

……

结果1中,完全符合我们的要求,实现了二次排序,并将key为20的全部分到同一个分组中。

 

结果2中,将key为20的数据,由于b的值各不相同,因此又将其分成了三个Group;同时,对于黄色高亮的数据,b的值同样是53,却没有分到同一个分组。原因见GroupingComparator的作用。

 六、GroupingComparator的作用

Job的API解释:

翻译之后,Group的作用是:在一个reduce调用的时候,通过这个Comparator来控制Key的分组。

但是还是不太清晰,有待进一步了解。

 

 

  • 大小: 7.2 KB
0
7
分享到:
评论

相关推荐

    mapreduce二次排序

    当我们需要对数据进行复杂的排序需求时,例如“年份升序,按照年份聚合,气温降序”,MapReduce的二次排序(Secondary Sort)机制就显得尤为重要。这个概念主要解决的是在MapReduce默认排序规则基础上,进行更精细化...

    MapReduce二次排序

    MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序

    MapReduce模型--二次排序

    MapReduce模型中的二次排序是大数据处理中一项重要的技术,它通过多层排序功能来优化数据处理性能。二次排序的核心思想在于对Key(键)进行分层排序,而不能对Value(值)进行排序。这主要是因为MapReduce框架设计时...

    基于MapReduce实现决策树算法

    基于MapReduce实现决策树算法的知识点 基于MapReduce实现决策树算法是一种使用MapReduce框架来实现决策树算法的方法。在这个方法中,主要使用Mapper和Reducer来实现决策树算法的计算。下面是基于MapReduce实现决策...

    利用采样器实现mapreduce任务输出全排序

    利用采样器实现mapreduce任务输出全排序大数据-MapReduce

    MapReduce实现join连接

    简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接

    Hadoop MapReduce实现tfidf源码

    本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,这是一种在信息检索和文本挖掘中用于评估一个词在文档中的重要性的统计方法。 首先,我们要理解TF-IDF...

    Hadoop mapreduce 实现InvertedIndexer倒排索引

    Hadoop mapreduce 实现InvertedIndexer倒排索引,能用。

    MapReduce实现二度好友推荐算法

    在IT领域,尤其是在大数据处理和社交网络分析中,"MapReduce实现二度好友推荐算法"是一种常见的技术应用。MapReduce是Google提出的一种分布式计算模型,主要用于处理和生成大规模数据集。在这个场景下,我们利用...

    大数据学习资料全排序二次排序

    5. 实际案例展示如何在Hadoop中实现二次排序 6. 图形化解释帮助理解数据处理流程 通过对这些内容的学习,读者可以深入理解大数据排序的复杂性和Hadoop在其中的角色,进一步提升大数据处理的能力。

    使用Java MapReduce实现数据全局排序【100012685】

    本次实验,在 Hadoop 平台上,使用 MapReduce 实现了数据的全局排序。将详细阐述了实现所需环境及过程。用阿里云服务器安装, OS: Ubuntu20.04 LTS . Hadoop 支持用三种模式启动:单机模式、伪分布式模式、分布式...

    Hadoop mapreduce实现wordcount

    【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...

    hadoop 二次排序 原理

    在Hadoop MapReduce的早期版本(0.20.0之前),二次排序通常通过设置`setPartitionerClass`、`setOutputKeyComparatorClass`和`setOutputValueGroupingComparator`来实现。而在0.20.0及之后的版本,这些设置被替换为...

    MapReduce2中自定义排序分组

    本文将详细探讨在 MapReduce2 中如何实现自定义排序和分组,以满足特定的数据处理需求。 首先,了解 MapReduce 的工作流程是必要的。Map 阶段将输入数据分割成多个块,并在各个节点上并行处理。Reduce 阶段则负责...

    用MapReduce实现KMeans算法

    《使用MapReduce实现KMeans算法详解》 KMeans算法是一种广泛应用的无监督学习方法,用于数据聚类。在大数据处理的背景下,传统的单机实现往往无法应对海量数据,因此,结合分布式计算框架MapReduce实现KMeans算法就...

    MapReduce实现矩阵相乘算法

    本话题将深入探讨如何使用Hadoop MapReduce实现两个矩阵相乘的算法,这在数据分析、机器学习以及高性能计算中有着重要应用。 首先,理解矩阵相乘的基本原理至关重要。矩阵相乘不是简单的元素对元素相乘,而是对应...

    hadoop分区二次排序示例.zip

    在这个“hadoop分区二次排序示例.zip”压缩包中,我们重点探讨的是如何在Hadoop MapReduce中实现特定的排序逻辑,即二次排序和分区策略。 首先,我们需要理解什么是二次排序。在标准的MapReduce流程中,数据经过map...

    mapreduce wc单词计数 自定义分区 自定义排序实现

    在这个特定的案例中,我们不仅实现了基本的WordCount功能,还扩展了MapReduce的能力,通过自定义分区和自定义排序来优化数据处理流程。 首先,基础的`WordCount`实现,通常包含以下四个步骤: 1. **Map阶段**:...

Global site tag (gtag.js) - Google Analytics