`

mapreduce的二次排序

阅读更多
mr自带的例子中的源码SecondarySort,我重新写了一下,基本没变。

这个例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程)

public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable>

1 首先说一下工作原理:

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。

在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

2  二次排序 就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序 的结果 。例如

输入文件

20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8

输出:(注意需要分割线)


------------------------------------------------
1       2
------------------------------------------------
3       4
------------------------------------------------
5       6
------------------------------------------------
7       8
7       82
------------------------------------------------
12      211
------------------------------------------------
20      21
20      53
20      522
------------------------------------------------
31      42
------------------------------------------------
40      511
------------------------------------------------
50      51
50      52
50      53
50      53
50      54
50      62
50      512
50      522
------------------------------------------------
60      51
60      52
60      53
60      56
60      56
60      57
60      57
60      61
------------------------------------------------
63      61
------------------------------------------------
70      54
70      55
70      56
70      57
70      58
70      58
------------------------------------------------
71      55
71      56
------------------------------------------------
73      57
------------------------------------------------
74      58
------------------------------------------------
203     21
------------------------------------------------
530     54
------------------------------------------------
730     54
------------------------------------------------
740     58

3  具体步骤:


1 自定义key。

在mr中,所有的key是需要被比较和排序的,并且是二次,先根据partitione,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。
所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法
//反序列化,从流中的二进制转换成IntPair
public void readFields(DataInput in) throws IOException
       
//序列化,将IntPair转化成使用流传送的二进制
public void write(DataOutput out)

//key的比较
public int compareTo(IntPair o)
       
另外新定义的类应该重写的两个方法
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
public int hashCode()
public boolean equals(Object right)

2 由于key是自定义的,所以还需要自定义一下类:

2.1 分区函数类。这是key的第一次比较。
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>

在job中设置使用setPartitionerClasss

2.2 key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator。
public static class KeyComparator extends WritableComparator
必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)
另一种方法是 实现接口RawComparator。
在job中设置使用setSortComparatorClass。

2.3 分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。
public static class GroupingComparator extends WritableComparator
同key比较函数类,必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)
同key比较函数类,分组函数类另一种方法是实现接口RawComparator。
在job中设置使用setGroupingComparatorClass。

另外注意的是,如果reduce的输入与输出不是同一种类型,则不要定义Combiner也使用reduce,因为Combiner的输出是reduce的输入。除非重新定义一个Combiner。

4 代码。这个例子中没有使用key比较函数类,而是使用key的实现的compareTo方法

view plaincopy to clipboardprint?
package secondarySort; 
import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.util.StringTokenizer; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
 
public class SecondarySort { 
    //自己定义的key类应该实现WritableComparable接口 
    public static class IntPair implements WritableComparable<IntPair> { 
        int first; 
        int second; 
        /**
         * Set the left and right values.
         */ 
        public void set(int left, int right) { 
            first = left; 
            second = right; 
        } 
        public int getFirst() { 
            return first; 
        } 
        public int getSecond() { 
            return second; 
        } 
        @Override 
        //反序列化,从流中的二进制转换成IntPair 
        public void readFields(DataInput in) throws IOException { 
            // TODO Auto-generated method stub 
            first = in.readInt(); 
            second = in.readInt(); 
        } 
        @Override 
        //序列化,将IntPair转化成使用流传送的二进制 
        public void write(DataOutput out) throws IOException { 
            // TODO Auto-generated method stub 
            out.writeInt(first); 
            out.writeInt(second); 
        } 
        @Override 
        //key的比较 
        public int compareTo(IntPair o) { 
            // TODO Auto-generated method stub 
            if (first != o.first) { 
                return first < o.first ? -1 : 1; 
            } else if (second != o.second) { 
                return second < o.second ? -1 : 1; 
            } else { 
                return 0; 
            } 
        } 
         
        //新定义类应该重写的两个方法 
        @Override 
        //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce) 
        public int hashCode() { 
            return first * 157 + second; 
        } 
        @Override 
        public boolean equals(Object right) { 
            if (right == null) 
                return false; 
            if (this == right) 
                return true; 
            if (right instanceof IntPair) { 
                IntPair r = (IntPair) right; 
                return r.first == first && r.second == second; 
            } else { 
                return false; 
            } 
        } 
    } 
     /**
       * 分区函数类。根据first确定Partition。
       */ 
      public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{ 
        @Override 
        public int getPartition(IntPair key, IntWritable value,  
                                int numPartitions) { 
          return Math.abs(key.getFirst() * 127) % numPartitions; 
        } 
      } 
       
      /**
       * 分组函数类。只要first相同就属于同一个组。
       */ 
    /*//第一种方法,实现接口RawComparator
    public static class GroupingComparator implements RawComparator<IntPair> {
        @Override
        public int compare(IntPair o1, IntPair o2) {
            int l = o1.getFirst();
            int r = o2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
        }
        @Override
        //一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
            // TODO Auto-generated method stub
             return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
                     b2, s2, Integer.SIZE/8);
        }
    }*/ 
    //第二种方法,继承WritableComparator 
    public static class GroupingComparator extends WritableComparator { 
          protected GroupingComparator() { 
            super(IntPair.class, true); 
          } 
          @Override 
          //Compare two WritableComparables. 
          public int compare(WritableComparable w1, WritableComparable w2) { 
            IntPair ip1 = (IntPair) w1; 
            IntPair ip2 = (IntPair) w2; 
            int l = ip1.getFirst(); 
            int r = ip2.getFirst(); 
            return l == r ? 0 : (l < r ? -1 : 1); 
          } 
        } 
     
         
    // 自定义map 
    public static class Map extends 
            Mapper<LongWritable, Text, IntPair, IntWritable> { 
        private final IntPair intkey = new IntPair(); 
        private final IntWritable intvalue = new IntWritable(); 
        public void map(LongWritable key, Text value, Context context) 
                throws IOException, InterruptedException { 
            String line = value.toString(); 
            StringTokenizer tokenizer = new StringTokenizer(line); 
            int left = 0; 
            int right = 0; 
            if (tokenizer.hasMoreTokens()) { 
                left = Integer.parseInt(tokenizer.nextToken()); 
                if (tokenizer.hasMoreTokens()) 
                    right = Integer.parseInt(tokenizer.nextToken()); 
                intkey.set(left, right); 
                intvalue.set(right); 
                context.write(intkey, intvalue); 
            } 
        } 
    } 
    // 自定义reduce 
    // 
    public static class Reduce extends 
            Reducer<IntPair, IntWritable, Text, IntWritable> { 
        private final Text left = new Text(); 
        private static final Text SEPARATOR =  
              new Text("------------------------------------------------"); 
        public void reduce(IntPair key, Iterable<IntWritable> values, 
                Context context) throws IOException, InterruptedException { 
            context.write(SEPARATOR, null); 
            left.set(Integer.toString(key.getFirst())); 
            for (IntWritable val : values) { 
                context.write(left, val); 
            } 
        } 
    } 
    /**
     * @param args
     */ 
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { 
        // TODO Auto-generated method stub 
        // 读取hadoop配置 
        Configuration conf = new Configuration(); 
        // 实例化一道作业 
        Job job = new Job(conf, "secondarysort"); 
        job.setJarByClass(SecondarySort.class); 
        // Mapper类型 
        job.setMapperClass(Map.class); 
        // 不再需要Combiner类型,因为Combiner的输出类型<Text, IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用 
        //job.setCombinerClass(Reduce.class); 
        // Reducer类型 
        job.setReducerClass(Reduce.class); 
        // 分区函数 
        job.setPartitionerClass(FirstPartitioner.class); 
        // 分组函数 
        job.setGroupingComparatorClass(GroupingComparator.class); 
         
        // map 输出Key的类型 
        job.setMapOutputKeyClass(IntPair.class); 
        // map输出Value的类型 
        job.setMapOutputValueClass(IntWritable.class); 
        // rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat 
        job.setOutputKeyClass(Text.class); 
        // rduce输出Value的类型 
        job.setOutputValueClass(IntWritable.class); 
         
        // 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。 
        job.setInputFormatClass(TextInputFormat.class); 
        // 提供一个RecordWriter的实现,负责数据输出。 
        job.setOutputFormatClass(TextOutputFormat.class); 
         
        // 输入hdfs路径 
        FileInputFormat.setInputPaths(job, new Path(args[0])); 
        // 输出hdfs路径 
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
        // 提交job 
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
分享到:
评论
2 楼 xingwang.ye 2014-06-18  
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{                                    
         @Override                                                                                                      
        public int getPartition(IntPair key, IntWritable value,                                                         
                                int numPartitions) {                                                                    
          return Math.abs(key.getFirst() * 127) % numPartitions;                                                        
        }                                                                                                               
      }

---------------
请教一下,这儿的127是什么意思?

------------------------
         public int hashCode() {                                                                                        
            return first * 157 + second;                                                                                
        } 

----------
157又是何意?
1 楼 xingwang.ye 2014-06-18  
老兄你哪儿有hadoop全排序的例子么?
就和你的blog中“mapreduce的二次排序 ”那样风格的,
我自己仿照着hadoop example的sort(用到TotalOrderPartitioner这个partition)的写了一个,输入数据是我自己随机生成的10000个0-9999的随机数,结果总是报错,郁闷,奔溃,理解不够,水平不到家,求教

相关推荐

    MapReduce二次排序

    MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序

    mapreduce二次排序

    当我们需要对数据进行复杂的排序需求时,例如“年份升序,按照年份聚合,气温降序”,MapReduce的二次排序(Secondary Sort)机制就显得尤为重要。这个概念主要解决的是在MapReduce默认排序规则基础上,进行更精细化...

    MapReduce模型--二次排序

    MapReduce模型中的二次排序是大数据处理中一项重要的技术,它通过多层排序功能来优化数据处理性能。二次排序的核心思想在于对Key(键)进行分层排序,而不能对Value(值)进行排序。这主要是因为MapReduce框架设计时...

    mapreduce secondarysort

    ### MapReduce二次排序详解 #### 一、MapReduce二次排序概念及应用场景 **MapReduce**是一种分布式计算模型,主要用于处理大规模数据集。其中,二次排序(Secondary Sort)是指在一个键值对列表中,首先根据主键...

    大数据学习资料全排序二次排序

    "大数据学习资料全排序二次排序"这个主题,显然关注的是如何有效地对大规模数据进行排序,尤其是涉及到二次排序的概念。二次排序通常是指在第一次排序的基础上,根据另一个或多个字段进行第二次排序,以满足更复杂的...

    hadoop 二次排序 原理

    首先,二次排序是在MapReduce框架内进行的一种特殊排序方式,它遵循两个主要步骤:第一字段排序和相同第一字段下的第二字段排序。这种排序模式确保了在处理大量数据时,具有相同第一字段的记录会聚集在一起,然后再...

    hadoop分区二次排序示例.zip

    在这个“hadoop分区二次排序示例.zip”压缩包中,我们重点探讨的是如何在Hadoop MapReduce中实现特定的排序逻辑,即二次排序和分区策略。 首先,我们需要理解什么是二次排序。在标准的MapReduce流程中,数据经过map...

    hadoop 二次排序 插入数据库

    二次排序(Secondary Sort)是Hadoop MapReduce中的一个重要概念,它允许用户自定义数据的最终排序方式,以满足更复杂的排序需求。这篇博客文章(虽然链接无法直接访问,但我们可以根据常规知识来解释这个概念)可能...

    MapReduce的小应用

    - 二次排序仅适用于一步到位的MapReduce任务,不支持在Map阶段结束后对keys的任意值进行修改。 - 不可同时使用`org.apache.hadoop.mapred`和`org.apache.hadoop.mapreduce`包,这可能导致不便。 - MapReduce中间结果...

    云应用系统开发第二次项目(mapreduce)

    15. 使用 MapReduce 实现二次排序:使用 MapReduce 来实现数据的二次排序,以便对数据进行复杂的排序处理。 该项目旨在掌握 MapReduce 编程模型的基本概念和应用,并涵盖了 MapReduce 的高级应用,旨在提高学生对云...

    大数据MapReduce和YARN二次开发.pdf

    大数据MapReduce和YARN二次开发 大数据MapReduce和YARN二次开发是大数据处理技术的重要组成部分,本文档将详细介绍MapReduce的过程、搭建开发环境、运行程序和MR开发接口介绍。 MapReduce的过程 MapReduce是...

    拓思爱诺大数据-第二次作业MapReduce编程

    在“拓思爱诺大数据-第二次作业MapReduce编程”中,你将学习到如何使用MapReduce解决实际问题。首先,我们来看Hadoop的wordcount程序,这是一个经典的MapReduce示例,用于统计文本中单词出现的频率。在Map阶段,程序...

    java大数据作业_5Mapreduce、数据挖掘

    【Java大数据作业_5Mapreduce、数据挖掘】的课后作业涵盖了多个MapReduce和大数据处理的关键知识点,包括日志分析、Job执行模式、HBase的相关类、容量调度配置、MapReduce流程以及二次排序算法。下面将对这些内容...

    大数据框架(HADOOP、HIVE、HBASE)优化和简历项目编写(视频+讲义+笔记)

    03_MapReduce 二次排序回顾及Reduce Join实现详解 04_MapReduce 中Map Join实现思路及伪代码详解 05_Hive重点知识回顾总结及小表与大表关联时MapJoin优化 06_Hive中大表与大表关联时SMB Join优化 07_Hive中高级...

    大数据mapreduce案例

    除了基本的MapReduce模型,还有一些优化策略可以提高性能,例如Combiner(本地化Reduce)、Partitioner(自定义分区)和Secondary Sort(二次排序)等。这些技巧可以在不改变最终结果的情况下,减少数据传输量,提高...

    google mapreduce

    #### 二、MapReduce的基本编程模式 MapReduce的基本编程模式涉及两个主要步骤: 1. **Map函数**:用户自定义的Map函数接收输入的键值对,并生成一系列中间的键值对。这些中间键值对会被MapReduce框架根据键进行...

    云计算 mapreduce - <Data-Intensive[1].Text.Processing.With.MapReduce>

    探讨了如何设计有效的MapReduce算法,包括局部聚合、配对与条纹化、相对频率计算、二次排序、关系连接等技术。 - **第4章:用于文本检索的倒排索引** 讨论了如何构建和优化倒排索引,包括不同的实现方法和技术...

    Mapreduce原理

    #### 二、MapReduce原理 **1. 数据处理模型** - **Map阶段**:数据处理的第一步是将原始数据切分为多个块,这些块被称为“split”。Map函数接收这些split作为输入,并对其进行处理,将数据转换为键值对形式。Map...

    mapreduce详细流程

    - **排序过程**:在MapReduce的整个过程中,经历了多次排序操作,包括: - Map端对分区号的快速排序。 - Map端对每个分区内部键的快速排序。 - Map任务结束后对文件进行归并排序。 - Reduce端对复制过来的Map...

Global site tag (gtag.js) - Google Analytics