`
heipark
  • 浏览: 2094760 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop二次排序关键点和出现时机(也叫辅助排序、Secondary Sort)

阅读更多

前言

    Hadoop二次排序简单说就是先根据字段A分组排序,然后在对组内根据字段B排序。Hadoop二次排序在面试的时候出现频率也是比较高的。今天花了点时间通过源码深入学习了一下。(后面内容以Hadoop自带实例——SecondarySort讲解,见本文附录)

    Hadoop默认是根据reduce key排序,通过Hadoop二次排序可以实现根据value值(需要将其放入复合key中)进行排序,排序后做可以取TOP值。比如可以做,每个网站访问量最大前10个URL等分析。

 

关键点

1、Partitioner

    它的作用是决定数据分区,说白了就是决定map输出key-value由哪个reduce处理,每个map task输出的key-value都会执行Partitioner的getPartition()方法,用于返回当前key-value由哪个reduce处理。

    本例中Partitioner基于map函数输出IntPair(first, second)第一个元素,即first,进行求余运算,所以得到的结果是first相同的key-value会发送到同一reduce

 

2、IntPair的compareTo()方法

    IntPair是map输出的key,它的compareTo()方法决定map输出排序规则。IntPair的实现规则是:先按照first排序,相同first按照second排序(所谓的二次排序其实在这里就实现了)。结果如下:

-------------

1982 6
1984 3
1984 4
1984 5
1984 5
1988 10

-------------

运行时机:

  1. map函数,从缓冲区spill key-value到本地磁盘
  2. reduce函数。获取逻辑:如果有GroupingComparator就返回,否则返回map输出比较函数。

3、GroupingComparator

实现逻辑

    根据IntPair的first字段进行排序

 

运行时机:

 

ReduceTask.run() ->

    // copy、sort完成之后

    RawComparator comparator = job.getOutputValueGroupingComparator(); // 这里获取comparator

    runNewReducer(job, umbilical, reporter, rIter, comparator,  keyClass, valueClass);

 

runNewReducer() ->

    ....

    org.apache.hadoop.mapreduce.Reducer.Context 
         reducerContext = createReduceContext(reducer, job, getTaskID(),
                                               rIter, reduceInputKeyCounter,
                                               reduceInputValueCounter,
                                               trackedRW, committer,
                                               reporter, comparator, keyClass,
                                               valueClass);

    reducer.run(reducerContext); // reducerContext拥有comparator

 

reducer.run() ->

    while (context.nextKey()) { <-

    ...

    }

 

context.nextKey() ->     

    while (hasMore && nextKeyIsSame) {
      nextKeyValue(); <- ①
    }

    if (hasMore) {
      if (inputKeyCounter != null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue(); <- ②
    } else {
      return false;
    }

 


nextKeyValue() ->

    ....

    if (hasMore) {
      next = input.getKey();
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
                                         currentRawKey.getLength(),
                                         next.getData(),
                                         next.getPosition(),
                                         next.getLength() - next.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false;
    }

    ....

 

       可以看到GroupingComparator在reduce函数内被调用,用于迭代读取reduce输入文件过程中,判断key是否发生变化。那它有什么作用呢?要会回答这个问题,不如先问问,如果没有GroupingComparator结果会如何?

        如果在Job提交时不设置GroupingComparator,那comparator将使用conf中"mapred.output.key.comparator.class"对应的类,如果没有设置"mapred.output.key.comparator.class",则根据map输出key从WritableComparator获取注册的comparator(IntPair通过" WritableComparator.define(IntPair.class, new Comparator());"注册)。本例中,如果不设置GroupingComparator,就会使用IntPair的内嵌类Comparator的compareTo()方法判断,即先比较first,再比较second。这样在迭代读取reduce输入数据的时候,会发生这样的情况:first相同,second不同,comparator会认为两条记录不一致,从而变更key值,继续迭代,这样就无法将相同first的数据聚合到一个迭代中进行处理的,即相同first通过second进行排序

 

其它

 

附录

下图是我整理的流程,更易于理解^_^

 

public class SecondarySort {
 
  /**
   * Define a pair of integers that are writable.
   * They are serialized in a byte comparable format.
   */
  public static class IntPair 
                      implements WritableComparable<IntPair> {
    private int first = 0;
    private int second = 0;
    
    /**
     * Set the left and right values.
     */
    public void set(int left, int right) {
      first = left;
      second = right;
    }
    public int getFirst() {
      return first;
    }
    public int getSecond() {
      return second;
    }
    /**
     * Read the two integers. 
     * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
     */
    @Override
    public void readFields(DataInput in) throws IOException {
      first = in.readInt() + Integer.MIN_VALUE;
      second = in.readInt() + Integer.MIN_VALUE;
    }
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeInt(first - Integer.MIN_VALUE);
      out.writeInt(second - Integer.MIN_VALUE);
    }
    @Override
    public int hashCode() {
      return first * 157 + second;
    }
    @Override
    public boolean equals(Object right) {
      if (right instanceof IntPair) {
        IntPair r = (IntPair) right;
        return r.first == first && r.second == second;
      } else {
        return false;
      }
    }
    /** A Comparator that compares serialized IntPair. */ 
    public static class Comparator extends WritableComparator {
      public Comparator() {
        super(IntPair.class);
      }

      public int compare(byte[] b1, int s1, int l1,
                         byte[] b2, int s2, int l2) {
        return compareBytes(b1, s1, l1, b2, s2, l2);
      }
    }

    static {                                        // register this comparator
      WritableComparator.define(IntPair.class, new Comparator());
    }

    @Override
    public int compareTo(IntPair o) {
      if (first != o.first) {
        return first < o.first ? -1 : 1;
      } else if (second != o.second) {
        return second < o.second ? -1 : 1;
      } else {
        return 0;
      }
    }
  }
  
  /**
   * Partition based on the first part of the pair.
   */
  public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
    @Override
    public int getPartition(IntPair key, IntWritable value, 
                            int numPartitions) {
      return Math.abs(key.getFirst() * 127) % numPartitions;
    }
  }

  /**
   * Compare only the first part of the pair, so that reduce is called once
   * for each value of the first part.
   */
  public static class FirstGroupingComparator 
                implements RawComparator<IntPair> {
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
      return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
                                             b2, s2, Integer.SIZE/8);
    }

    @Override
    public int compare(IntPair o1, IntPair o2) {
      int l = o1.getFirst();
      int r = o2.getFirst();
      return l == r ? 0 : (l < r ? -1 : 1);
    }
  }

  /**
   * Read two integers from each line and generate a key, value pair
   * as ((left, right), right).
   */
  public static class MapClass 
         extends Mapper<LongWritable, Text, IntPair, IntWritable> {
    
    private final IntPair key = new IntPair();
    private final IntWritable value = new IntWritable();
    
    @Override
    public void map(LongWritable inKey, Text inValue, 
                    Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(inValue.toString());
      int left = 0;
      int right = 0;
      if (itr.hasMoreTokens()) {
        left = Integer.parseInt(itr.nextToken());
        if (itr.hasMoreTokens()) {
          right = Integer.parseInt(itr.nextToken());
        }
        key.set(left, right);
        value.set(right);
        context.write(key, value);
      }
    }
  }
  
  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce 
         extends Reducer<IntPair, IntWritable, Text, IntWritable> {
    private static final Text SEPARATOR = 
      new Text("------------------------------------------------");
    private final Text first = new Text();
    
    @Override
    public void reduce(IntPair key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      context.write(SEPARATOR, null);
      first.set(Integer.toString(key.getFirst()));
      for(IntWritable value: values) {
        context.write(first, value);
      }
    }
  }
  
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: secondarysort <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "secondary sort");
    job.setJarByClass(SecondarySort.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(Reduce.class);

    // group and partition by the first int in the pair
    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(FirstGroupingComparator.class);

    // the map output is IntPair, IntWritable
    job.setMapOutputKeyClass(IntPair.class);
    job.setMapOutputValueClass(IntWritable.class);

    // the reduce output is Text, IntWritable
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}

 

 

 

--end

 

  • 大小: 147.7 KB
分享到:
评论

相关推荐

    Hadoop 官方文档(中文版)

    - 配置文件详解:如core-site.xml, hdfs-site.xml, mapred-site.xml等,这些配置文件是搭建和管理Hadoop集群的关键。 - NameNode和DataNode:NameNode是HDFS的元数据管理节点,DataNode则是存储数据的节点,理解...

    hadoop 1.2.1核心源码

    这个压缩包文件“hadoop 1.2.1核心源码”包含了Hadoop项目的核心组件,让我们深入探讨一下其中涉及的关键知识点。 1. **Hadoop架构**:Hadoop的核心由两个主要部分组成:HDFS(Hadoop Distributed File System)和...

    Hadoop和hive大数据面试题

    在大数据领域,Hadoop和Hive是两个至关重要的技术组件,它们在处理大规模数据存储和分析方面发挥着关键作用。本篇文章将详细探讨Hadoop和Hive的相关面试知识点,帮助求职者更好地准备大数据领域的面试。 首先,我们...

    Hadoop Real-World Solutions Cookbook 源代码

    6. **Chap 7 - 高级MapReduce技术**:涵盖如使用Secondary Sort进行复杂排序,或者使用New API(MapReduce v2, YARN)提升性能和资源管理。 7. **Chap 8 - Hadoop优化与故障排除**:这部分内容可能涉及Hadoop集群的...

    实战hadoop中的源码

    Hadoop作为开源的大数据处理框架,其源码解析对于理解其工作原理、优化系统性能以及进行二次开发至关重要。本篇将深入探讨Hadoop源码中的关键知识点。 1. **Hadoop架构**:Hadoop主要由HDFS(Hadoop Distributed ...

    Hadoop集群测试报告.pdf

    - **HADOOP-12-152**: 包含DataNode(负责数据存储)、Secondary NameNode(辅助NameNode进行检查点操作)、HiveGateway、WebHCat(提供RESTful API来执行Hive和Pig任务)、Hue(提供图形界面)、ImpalaDaemon(用于...

    Hadoop权威指南(中文版)(带书签)

    这本书中文版的出现,使得更多的中国读者能够无障碍地学习和理解Hadoop技术,而带书签的特性则方便了读者快速定位和复习相关章节。 Hadoop是Apache基金会的一个开源项目,其核心由两个主要组件构成:Hadoop ...

    Hadoop权威指南(中文版-带目录索引)

    1. **Hadoop架构**:介绍Hadoop的整体设计,包括NameNode、DataNode、Secondary NameNode等节点的角色和功能,以及它们如何协同工作以提供数据存储和访问。 2. **HDFS原理**:详细解析HDFS的数据存储机制,如数据块...

    Data Algorithms: Recipes for Scaling Up with Hadoop and Spark pdf

    书中还讨论了Secondary Sort(二级排序)问题,提供了Hadoop和Spark解决方案的详细案例分析,包括输入、预期输出、map()和reduce()函数的介绍,以及实际代码示例的运行结果。 书中对于Secondary Sort问题进行了深入...

    hadoop分享.pptx

    为此,引入了Secondary NameNode来定期合并fsimage和editlog,以防止NameNode成为单点故障。 3. HDFS数据完整性 - 通过CRC32校验和确保数据的完整性,客户端在写入数据块前计算校验和,并在读取时进行验证,发现...

    hadoop源码分析

    Hadoop是大数据处理领域的重要工具,它以分布式计算模型为基础,为海量数据处理提供了高效、可靠的解决方案。Hadoop源码分析对于理解...这不仅有助于解决现有问题,也为未来可能出现的新需求提供了理论基础和实践指导。

    Hadoop源代码分析完整版.rar

    中间结果通过shuffle和sort阶段进行排序和分组,最后由reduce函数处理。 4. **Hadoop源码解析** Hadoop源代码中,`org.apache.hadoop.mapreduce`包是MapReduce的核心,包含Job、Task、Mapper和Reducer等关键类。`...

    Hadoop实战手册—实验需要的材料

    书中的实验部分涵盖了Hadoop生态系统中的多个关键组件,通过这些章节的代码和材料,读者可以亲自动手实践,从而加深对Hadoop的理解。 在提供的9120OS_Code.zip压缩包中,包含了书中各个章节的源代码和相关材料,...

    Hadoop大数据技术实验(实训)计划书(任务书)

    在这个计划书中,我们将涵盖以下几个关键知识点: 1. **Hadoop集群搭建**:Hadoop集群的搭建是实现大数据处理的第一步。这涉及到配置各个节点,安装Java运行环境,下载并安装Hadoop,设置环境变量,以及配置Hadoop...

    基于Hadoop平台的通信数据分布式查询算法的设计与实现

    这可能包括优化Mapper和Reducer的逻辑,以及使用Hadoop的Secondary Sort等技术来改善数据局部性。 4. **索引构建**:为了加速查询,论文可能会讨论如何在分布式环境中构建和维护索引,如Bloom Filter或Bitmap索引,...

    java大数据作业_5Mapreduce、数据挖掘

    【Java大数据作业_5Mapreduce、数据挖掘】的课后作业涵盖了多个MapReduce和大数据处理的关键知识点,包括日志分析、Job执行模式、HBase的相关类、容量调度配置、MapReduce流程以及二次排序算法。下面将对这些内容...

    分布式大数据处理架构.pptx

    ### 分布式大数据处理架构知识点解析 ...以上内容详细介绍了分布式大数据处理架构的核心知识点,涵盖了Hadoop生态系统的关键组件和技术细节,旨在帮助读者深入理解分布式数据处理的核心原理和技术实现。

    HadoopDesignPatterns

    6. **Secondary Sort模式**:在某些情况下,简单的键值排序不足以满足需求,Secondary Sort允许自定义排序规则,以满足更复杂的业务逻辑。 7. **Map-only作业**:如果不需要Reduce阶段,可以使用Map-only作业,这会...

Global site tag (gtag.js) - Google Analytics