`
yo8237233
  • 浏览: 63060 次
  • 来自: 深圳
社区版块
存档分类
最新评论

MapReduce二次排序

 
阅读更多
     默认情况下,Map 输出的结果会对 Key 进行默认的排序,但是有时候需要对 Key 排序的同时再对 Value 进行排序,这时候就要用到二次排序了。下面让我们来介绍一下什么是二次排序。

二次排序原理
        我们把二次排序主要分为以下几个阶段。
Map 起始阶段
        在Map阶段,使用 job.setInputFormatClass() 定义的 InputFormat ,将输入的数据集分割成小数据块 split,同时 InputFormat 提供一个 RecordReader的实现。本课程中使用的是 TextInputFormat,它提供的 RecordReader 会将文本的行号作为 Key,这一行的文本作为 Value。这就是自定义 Mapper 的输入是 < LongWritable,Text> 的原因。 然后调用自定义 Mapper 的map方法,将一个个< LongWritable,Text>键值对输入给 Mapper 的 map方法。
Map 最后阶段
        在 Map 阶段的最后,会先调用 job.setPartitionerClass() 对这个 Mapper 的输出结果进行分区,每个分区映射到一个Reducer。每个分区内又调用 job.setSortComparatorClass() 设置的 Key 比较函数类排序。 可以看到,这本身就是一个二次排序。如果没有通过 job.setSortComparatorClass() 设置 Key 比较函数类,则使用 Key 实现的 compareTo() 方法。我们既可以使用 IntPair 实现的 compareTo() 方法,也可以专门定义 Key 比较函数类。
Reduce 阶段
        在 Reduce 阶段,reduce() 方法接受所有映射到这个 Reduce 的 map 输出后,也是会调用 job.setSortComparatorClass()方法设置的 Key 比较函数类,对所有数据进行排序。然后开始构造一个 Key 对应的 Value 迭代器。 这时就要用到分组,使用 job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个 Key 相同,它们就属于同一组,它们的 Value 放在一个 Value 迭代器,而这个迭代器的 Key 使用属于同一个组的所有Key的第一个Key。最后就是进入 Reducer 的 reduce() 方法,reduce() 方法的输入是所有的 Key 和它的 Value 迭代器,同样注意输入与输出的类型必须与自定义的 Reducer 中声明的一致。

接下来我们通过数据示例,可以很直观的了解二次排序的原理。

输入文件sort.txt(下载)内容为:

40  20
40  10
40  30
40  5
30  30
30  20
30  10
30  40
50  20
50  50
50  10
50  60
        输出文件的内容(从小到大排序)如下:

30  10
30  20
30  30
30  40
==============================
40  5
40  10
40  20
40  30
============================== 
50  10
50  20
50  50
50  60

二次排序的具体流程
        在 MapReduce 中,所有的 Key 是需要被比较和排序的,而且是二次,先根据 Partitioner,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类 IntPair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。

代码实现
        Hadoop 的 example 包中自带了一个 MapReduce 的二次排序算法,下面这个示例对 example 包中的二次排序源码的改进。 我们按照以下几步完成二次排序:

        第一步:自定义IntPair类,将示例数据中的key/value封装成一个整体作为Key,同时实现 WritableComparable 接口并重写其方法。
/**
* 自己定义的key类应该实现WritableComparable接口
*/
public  class IntPair implements WritableComparable<IntPair>{
	int first;//第一个成员变量
	int second;//第二个成员变量
	public void set(int left, int right){
		first = left;
		second = right;
	}
	public int getFirst(){
		return first;
	}
	public int getSecond(){
		return second;
	}
	@Override
	//反序列化,从流中的二进制转换成IntPair
	public void readFields(DataInput in) throws IOException{
		first = in.readInt();
		second = in.readInt();
	}
	@Override
	//序列化,将IntPair转化成使用流传送的二进制
	public void write(DataOutput out) throws IOException{
		out.writeInt(first);
		out.writeInt(second);
	}
	@Override
	//key的比较
	public int compareTo(IntPair o)
	{
		// TODO Auto-generated method stub
		if (first != o.first){
			return first < o.first ? -1 : 1;
		}else if (second != o.second){
			return second < o.second ? -1 : 1;
		}else{
			return 0;
		}
	}
	
	@Override
	public int hashCode(){
		return first * 157 + second;
	}
	@Override
	public boolean equals(Object right){
		if (right == null)
			return false;
		if (this == right)
			return true;
		if (right instanceof IntPair){
			IntPair r = (IntPair) right;
			return r.first == first && r.second == second;
		}else{
			return false;
		}
	}
}


第二步:自定义分区函数类FirstPartitioner,根据 IntPair 中的first实现分区。

第三步:自定义 SortComparator 实现 IntPair 类中的first和second排序。本次中没有使用这种方法,而是使用 IntPair 中的compareTo()方法实现的。

第四步:自定义 GroupingComparator 类,实现分区内的数据分组。
/**
*继承WritableComparator
*/
public static class GroupingComparator extends WritableComparator{
        protected GroupingComparator(){
            super(IntPair.class, true);
        }
        @Override
        //Compare two WritableComparables.
        public int compare(WritableComparable w1, WritableComparable w2){
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int l = ip1.getFirst();
            int r = ip2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
        }
}


  第五步:编写 MapReduce 主程序实现二次排序。
public class SecondarySort{
    // 自定义map
    public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>{
        private final IntPair intkey = new IntPair();
        private final IntWritable intvalue = new IntWritable();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            int left = 0;
            int right = 0;
            if (tokenizer.hasMoreTokens()){
                left = Integer.parseInt(tokenizer.nextToken());
                if (tokenizer.hasMoreTokens())
                    right = Integer.parseInt(tokenizer.nextToken());
                intkey.set(left, right);
                intvalue.set(right);
                context.write(intkey, intvalue);
            }
        }
    }
    // 自定义reduce
    public static class Reduce extends Reducer< IntPair, IntWritable, Text, IntWritable>{
        private final Text left = new Text();      
        public void reduce(IntPair key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException{
            left.set(Integer.toString(key.getFirst()));
            for (IntWritable val : values){
                context.write(left, val);
            }
        }
    }
    /**
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();

        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(SecondarySort.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));//输入路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出路径

        job.setMapperClass(Map.class);// Mapper
        job.setReducerClass(Reduce.class);// Reducer
        
        job.setPartitionerClass(FirstPartitioner.class);// 分区函数
        //job.setSortComparatorClass(KeyComparator.Class);//本课程并没有自定义SortComparator,而是使用IntPair自带的排序
        job.setGroupingComparatorClass(GroupingComparator.class);// 分组函数


        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
       
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
分享到:
评论

相关推荐

    mapreduce二次排序

    当我们需要对数据进行复杂的排序需求时,例如“年份升序,按照年份聚合,气温降序”,MapReduce的二次排序(Secondary Sort)机制就显得尤为重要。这个概念主要解决的是在MapReduce默认排序规则基础上,进行更精细化...

    MapReduce模型--二次排序

    MapReduce模型中的二次排序是大数据处理中一项重要的技术,它通过多层排序功能来优化数据处理性能。二次排序的核心思想在于对Key(键)进行分层排序,而不能对Value(值)进行排序。这主要是因为MapReduce框架设计时...

    mapreduce secondarysort

    ### MapReduce二次排序详解 #### 一、MapReduce二次排序概念及应用场景 **MapReduce**是一种分布式计算模型,主要用于处理大规模数据集。其中,二次排序(Secondary Sort)是指在一个键值对列表中,首先根据主键...

    大数据学习资料全排序二次排序

    "大数据学习资料全排序二次排序"这个主题,显然关注的是如何有效地对大规模数据进行排序,尤其是涉及到二次排序的概念。二次排序通常是指在第一次排序的基础上,根据另一个或多个字段进行第二次排序,以满足更复杂的...

    hadoop 二次排序 原理

    首先,二次排序是在MapReduce框架内进行的一种特殊排序方式,它遵循两个主要步骤:第一字段排序和相同第一字段下的第二字段排序。这种排序模式确保了在处理大量数据时,具有相同第一字段的记录会聚集在一起,然后再...

    hadoop分区二次排序示例.zip

    在这个“hadoop分区二次排序示例.zip”压缩包中,我们重点探讨的是如何在Hadoop MapReduce中实现特定的排序逻辑,即二次排序和分区策略。 首先,我们需要理解什么是二次排序。在标准的MapReduce流程中,数据经过map...

    hadoop 二次排序 插入数据库

    二次排序(Secondary Sort)是Hadoop MapReduce中的一个重要概念,它允许用户自定义数据的最终排序方式,以满足更复杂的排序需求。这篇博客文章(虽然链接无法直接访问,但我们可以根据常规知识来解释这个概念)可能...

    MapReduce的小应用

    - 二次排序仅适用于一步到位的MapReduce任务,不支持在Map阶段结束后对keys的任意值进行修改。 - 不可同时使用`org.apache.hadoop.mapred`和`org.apache.hadoop.mapreduce`包,这可能导致不便。 - MapReduce中间结果...

    云应用系统开发第二次项目(mapreduce)

    15. 使用 MapReduce 实现二次排序:使用 MapReduce 来实现数据的二次排序,以便对数据进行复杂的排序处理。 该项目旨在掌握 MapReduce 编程模型的基本概念和应用,并涵盖了 MapReduce 的高级应用,旨在提高学生对云...

    大数据MapReduce和YARN二次开发.pdf

    大数据MapReduce和YARN二次开发 大数据MapReduce和YARN二次开发是大数据处理技术的重要组成部分,本文档将详细介绍MapReduce的过程、搭建开发环境、运行程序和MR开发接口介绍。 MapReduce的过程 MapReduce是...

    拓思爱诺大数据-第二次作业MapReduce编程

    在“拓思爱诺大数据-第二次作业MapReduce编程”中,你将学习到如何使用MapReduce解决实际问题。首先,我们来看Hadoop的wordcount程序,这是一个经典的MapReduce示例,用于统计文本中单词出现的频率。在Map阶段,程序...

    java大数据作业_5Mapreduce、数据挖掘

    【Java大数据作业_5Mapreduce、数据挖掘】的课后作业涵盖了多个MapReduce和大数据处理的关键知识点,包括日志分析、Job执行模式、HBase的相关类、容量调度配置、MapReduce流程以及二次排序算法。下面将对这些内容...

    大数据框架(HADOOP、HIVE、HBASE)优化和简历项目编写(视频+讲义+笔记)

    03_MapReduce 二次排序回顾及Reduce Join实现详解 04_MapReduce 中Map Join实现思路及伪代码详解 05_Hive重点知识回顾总结及小表与大表关联时MapJoin优化 06_Hive中大表与大表关联时SMB Join优化 07_Hive中高级...

    大数据mapreduce案例

    除了基本的MapReduce模型,还有一些优化策略可以提高性能,例如Combiner(本地化Reduce)、Partitioner(自定义分区)和Secondary Sort(二次排序)等。这些技巧可以在不改变最终结果的情况下,减少数据传输量,提高...

    Data-Intensive Text Processing with MapReduce

    MapReduce算法的设计涉及到多种技术,包括但不限于局部聚合、对和条纹、相对频率计算、二次排序、关系连接等。 - **局部聚合**:通过在Mapper端使用Combiners来减少Reduce端的通信开销,从而提高整体性能。这种方法...

    google mapreduce

    #### 二、MapReduce的基本编程模式 MapReduce的基本编程模式涉及两个主要步骤: 1. **Map函数**:用户自定义的Map函数接收输入的键值对,并生成一系列中间的键值对。这些中间键值对会被MapReduce框架根据键进行...

    云计算 mapreduce - <Data-Intensive[1].Text.Processing.With.MapReduce>

    探讨了如何设计有效的MapReduce算法,包括局部聚合、配对与条纹化、相对频率计算、二次排序、关系连接等技术。 - **第4章:用于文本检索的倒排索引** 讨论了如何构建和优化倒排索引,包括不同的实现方法和技术...

    Mapreduce原理

    #### 二、MapReduce原理 **1. 数据处理模型** - **Map阶段**:数据处理的第一步是将原始数据切分为多个块,这些块被称为“split”。Map函数接收这些split作为输入,并对其进行处理,将数据转换为键值对形式。Map...

    mapreduce详细流程

    - **排序过程**:在MapReduce的整个过程中,经历了多次排序操作,包括: - Map端对分区号的快速排序。 - Map端对每个分区内部键的快速排序。 - Map任务结束后对文件进行归并排序。 - Reduce端对复制过来的Map...

Global site tag (gtag.js) - Google Analytics