`
hugh.wangp
  • 浏览: 293434 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

MapReduce的排序和二次排序

阅读更多

 

自己学习排序和二次排序的知识整理如下。
1.Hadoop的序列化格式介绍:Writable
2.Hadoop的key排序逻辑
3.全排序
4.如何自定义自己的Writable类型
5.如何实现二次排序


1.Hadoop的序列化格式介绍:Writable
要了解和编写MR实现排序必须要知道的第一个知识点就是Writable相关的接口和类,这些是HADOOP自己的序列化格式。更多的可能是要关注他的Subinterfaces:WritableComparable<T>。他是继承Writable和Comparable<T>接口,继而WritableComparable<T>的实现除了具有序列化特性,更重要的是具有了比较的特性,而比较的特性在MapReduce里是很重要的,因为MR中有个基于键的排序过程,所以可以作为键的类型必须具有Comparable<T>的特性。
除了WritableComparable接口外,还有一个接口RawComparaotor。
WritableComparable和RawComparator两个接口的区别是:
WritableComparable是需要把数据流反序列化为对象后,然后做对象之间的比较,而RawComparator是直接比较数据流的数据,不需要数据流反序列化成对象,省去了新建对象的开销。

2.Hadoop的key排序逻辑
Hadoop本身Key的数据类型的排序逻辑其实就是依赖于Hadoop本身的继承与WritableComparable<T>的基本数据类型和其他类型(相关类型可参考《Hadoop权威指南》第二版的90页)的compareTo方法的定义。
Key排序的规则:
1.如果调用jobconf的setOutputKeyComparatorClass()设置mapred.output.key.comparator.class
2.否则,使用key已经登记的comparator
3.否则,实现接口WritableComparable的compareTo()函数来操作
例如IntWritable的比较算法如下:
public int compareTo(Object o) {
    int thisValue = this.value;
    int thatValue = ((IntWritable)o).value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  }
 
可以修改compareTo来实现自己所需的比较算法。

虽然我们知道是compareTo这个方法实现Key的排序,但其实我们在使用Hadoop的基本数据类型时不需要关注这个排序如何实现,因为Hadoop的框架会自动调用compareTo这个方法实现key的排序。但是这个排序只是局限在map或者reduce内部。针对于map与map,reduce与reduce之间的排序compareTo就管不着了,虽然这种情况不常出现,但是确实存在这种问题的,而且确实有适用场景,比如说全排序。

3.全排序
这里就需要关注Partition这个阶段,Partition阶段是针对每个Reduce,需要创建一个分区,然后把Map的输出结果映射到特定的分区中。这个分区中可能会有N个Key对应的数据,但是一个Key的所有数据只能在一个分区中。在实现全排序的过程中,如果只有一个reduce,也就是只有一个Partition,那么所有Map的输出都会经过一个Partition到一个reduce里,在一个reduce里可以根据compareTo(也可以采用其他比较算法)来排序,实现全排序。但是这种情况就让MapReduce失去了分布式计算的光环。
所以全排序的大概思路为:确保Partition之间是有序的就OK了,即保证Partition1的最大值小于Partition2的最小值就OK了,即便这样做也还是有个问题:Partition的分布不均,可能导致某些Partition处理的数据量远大于其他Partition处理的数据量。而实现全排序的核心步骤为:取样和Partition。
先“取样”,保证Partition得更均匀: 
1) 对Math.min(10, splits.length)个split(输入分片)进行随机取样,对每个split取10000个样,总共10万个样
2) 10万个样排序,根据reducer的数量(n),取出间隔平均的n-1个样
3) 将这个n-1个样写入partitionFile(_partition.lst,是一个SequenceFile),key是取的样,值是nullValue
4) 将partitionFile写入DistributedCache 
整个全排序的详细介绍可参照:http://www.iteye.com/topic/709986

4.如何自定义自己的Writable类型
自定义自己的Writable类型的场景应该很简单:Hadoop自带的数据类型要么在功能上不能满足需求,要么在性能上满足需求,毕竟Hadoop还在发展,不是所有情况都考虑的,但是他提供了自主的框架实现我们想要的功能。
定义自己的Writable类型需要实现:
a.重载构造函数
b.实现set和get方法
c.实现接口的方法:write()、readFields()、compareTo()
d.(可选)相当于JAVA构造的对象,重写java.lang.Object的hashCode()、equals()、toString()。Partition阶段默认的hashpartitioner会根据hashCode()来选择分区,如果不要对自定义类型做key进行分区,hashCode()可不实现
具体例子可参考hadoop的基本类型IntWritable的实现
public class IntWritable implements WritableComparable {
  private int value;

  public IntWritable() {}

  public IntWritable(int value) { set(value); }

  /** Set the value of this IntWritable. */
  public void set(int value) { this.value = value; }

  /** Return the value of this IntWritable. */
  public int get() { return value; }

  public void readFields(DataInput in) throws IOException {
    value = in.readInt();
  }

  public void write(DataOutput out) throws IOException {
    out.writeInt(value);
  }

  /** Returns true iff <code>o</code> is a IntWritable with the same value. */
  public boolean equals(Object o) {
    if (!(o instanceof IntWritable))
      return false;
    IntWritable other = (IntWritable)o;
    return this.value == other.value;
  }

  public int hashCode() {
    return value;
  }

  /** Compares two IntWritables. */
  public int compareTo(Object o) {
    int thisValue = this.value;
    int thatValue = ((IntWritable)o).value;
    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  }

  public String toString() {
    return Integer.toString(value);
  }
}
 

5.如何实现二次排序
二次排序的工作原理涉及到如下几方面:
a.创建key的数据类型,key要包括两次排序的元素
b.setPartitionerClass(Class<? extends Partitioner> theClass)
hadoop0.20.0以后的函数为setPartitionerClass
c.setOutputKeyComparatorClass(Class<? extends RawComparator> theClass)
hadoop0.20.0以后的函数为setSortComparatorClass
d.setOutputValueGroupingComparator(Class<? extends RawComparator> theClass)
hadoop0.20.0以后的函数为setGroupingComparatorClass

根据hadoop自己提供的example:org.apache.hadoop.examplesSecondarySort来说明二次排序具体是如何实现的.

SecondarySort实现IntPair、FirstPartitioner、FirstGroupingComparator、MapClass、Reduce这几个内部类,然后在main函数中调用。先说明下main函数中有哪些地方和普通的MR代码不同。
不同点是多了这两个set:
job.setPartitionerClass(FirstPartitioner.class);
设置自定义的Partition操作,在此是调用我们自定义的内部类FirstPartitioner
job.setGroupingComparatorClass(FirstGroupingComparator.class);
设置哪些value进入哪些key的迭代器中,在此是调用自定义的内部类FirstGroupingComparator

具体的操作逻辑为:

a.定义一个作为key的类型IntPair,在IntPair中有两个变量first、second,SecondarySort就是在对first排序后再对second再排序处理
b.定义分区函数类FirstPartitioner,Key的第一次排序。在FirstPartitioner实现如何处理key的first,把key对应的数据划分到不同的分区中。这样key中first相同的value会被放在同一个reduce中,在reduce中再做第二次排序 
c(代码没有实现,其实内部是有处理).key比较函数类,key的第二次排序,是继承WritableComparator的一个比较器。setSortComparatorClass可以实现。
为什么没有使用setSortComparatorClass()是因为hadoop对key排序的规则(参看2.Hadoop的key排序逻辑)决定的。由于我们在IntPair中已经定义了compareTo()函数。
d.定义分组函数类FirstGroupingComparator,保证只要key的的第一部分相同,value就进入key的value迭代器中。
0
0
分享到:
评论
1 楼 mojunbin 2012-12-02  
呵呵,整体思路都讲到了哦。

相关推荐

    mapreduce二次排序

    当我们需要对数据进行复杂的排序需求时,例如“年份升序,按照年份聚合,气温降序”,MapReduce的二次排序(Secondary Sort)机制就显得尤为重要。这个概念主要解决的是在MapReduce默认排序规则基础上,进行更精细化...

    MapReduce二次排序

    MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序

    MapReduce模型--二次排序

    MapReduce模型中的二次排序是大数据处理中一项重要的...在设计二次排序逻辑时,需要仔细考虑数据的业务需求和处理性能,选择合适的分区、排序和分组策略。同时,自定义Key类和比较器也是实现二次排序不可或缺的部分。

    大数据MapReduce和YARN二次开发.pdf

    大数据MapReduce和YARN二次开发 大数据MapReduce和YARN二次开发是大数据处理技术的重要组成部分,本文档将详细介绍MapReduce的过程、搭建开发环境、运行程序和MR开发接口介绍。 MapReduce的过程 MapReduce是...

    大数据学习资料全排序二次排序

    "大数据学习资料全排序二次排序"这个主题,显然关注的是如何有效地对大规模数据进行排序,尤其是涉及到二次排序的概念。二次排序通常是指在第一次排序的基础上,根据另一个或多个字段进行第二次排序,以满足更复杂的...

    hadoop 二次排序 原理

    首先,二次排序是在MapReduce框架内进行的一种特殊排序方式,它遵循两个主要步骤:第一字段排序和相同第一字段下的第二字段排序。这种排序模式确保了在处理大量数据时,具有相同第一字段的记录会聚集在一起,然后再...

    hadoop 二次排序 插入数据库

    综上所述,"hadoop 二次排序 插入数据库"这个主题涵盖了Hadoop MapReduce中的高级排序技巧和数据持久化策略,对于那些需要处理复杂排序逻辑并存储处理结果的开发者来说,这是一个非常实用的话题。遗憾的是,由于博客...

    hadoop分区二次排序示例.zip

    在这个“hadoop分区二次排序示例.zip”压缩包中,我们重点探讨的是如何在Hadoop MapReduce中实现特定的排序逻辑,即二次排序和分区策略。 首先,我们需要理解什么是二次排序。在标准的MapReduce流程中,数据经过map...

    云应用系统开发第二次项目(mapreduce)

    15. 使用 MapReduce 实现二次排序:使用 MapReduce 来实现数据的二次排序,以便对数据进行复杂的排序处理。 该项目旨在掌握 MapReduce 编程模型的基本概念和应用,并涵盖了 MapReduce 的高级应用,旨在提高学生对云...

    拓思爱诺大数据-第二次作业MapReduce编程

    在“拓思爱诺大数据-第二次作业MapReduce编程”中,你将学习到如何使用MapReduce解决实际问题。首先,我们来看Hadoop的wordcount程序,这是一个经典的MapReduce示例,用于统计文本中单词出现的频率。在Map阶段,程序...

    MapReduce的小应用

    - 二次排序仅适用于一步到位的MapReduce任务,不支持在Map阶段结束后对keys的任意值进行修改。 - 不可同时使用`org.apache.hadoop.mapred`和`org.apache.hadoop.mapreduce`包,这可能导致不便。 - MapReduce中间结果...

    mapreduce secondarysort

    ### MapReduce二次排序详解 #### 一、MapReduce二次排序概念及应用场景 **MapReduce**是一种分布式计算模型,主要用于处理大规模数据集。其中,二次排序(Secondary Sort)是指在一个键值对列表中,首先根据主键...

    java大数据作业_5Mapreduce、数据挖掘

    【Java大数据作业_5Mapreduce、数据挖掘】的课后作业涵盖了多个MapReduce和大数据处理的关键知识点,包括日志分析、Job执行模式、HBase的相关类、容量调度配置、MapReduce流程以及二次排序算法。下面将对这些内容...

    2018最新BAT大数据面试题.docx

    - Spill前进行二次排序,首先是根据partition排序,其次是根据key排序。 - 使用Combiner(如果设置)来预处理数据,减少写入磁盘的数据量。 - 最终生成一个排序好的文件供Reduce阶段使用。 2. **Reduce端Shuffle...

    大数据mapreduce案例

    除了基本的MapReduce模型,还有一些优化策略可以提高性能,例如Combiner(本地化Reduce)、Partitioner(自定义分区)和Secondary Sort(二次排序)等。这些技巧可以在不改变最终结果的情况下,减少数据传输量,提高...

    Mapreduce原理

    在这个阶段,来自不同Map任务的结果会被按照键进行排序和分组,然后分配给各个Reduce任务。这一过程涉及到数据的重新分布,确保具有相同键的数据被发送到相同的Reduce任务。 - **Reduce阶段**:Reduce任务接收到...

    google mapreduce

    这些中间键值对会被MapReduce框架根据键进行排序和分组。 2. **Reduce函数**:Reduce函数处理的是经过排序和分组后的中间键值对,即对于每个唯一的键,Reduce函数都会被调用来处理所有与此键相关的值。 #### 三、...

    mapreduce详细流程

    - **排序过程**:在MapReduce的整个过程中,经历了多次排序操作,包括: - Map端对分区号的快速排序。 - Map端对每个分区内部键的快速排序。 - Map任务结束后对文件进行归并排序。 - Reduce端对复制过来的Map...

    MapReduceV2笔记

    在二次排序案例中,需要掌握二次排序的原理和实现方法,以及如何优化排序过程。在Reduce端join案例中,要了解如何通过MapReduce实现不同数据集的合并。在Map端join案例中,需要掌握Map端join的原理和优化技巧。这些...

Global site tag (gtag.js) - Google Analytics