`

hadoop之MapReduce自定义二次排序流程实例详解

 
阅读更多

原博客地址:http://zengzhaozheng.blog.51cto.com/8219051/1379271

           hadoop之MapReduce自定义二次排序流程实例详解

一、概述

    MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的。在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求。对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现的原理以及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的。本文将通过一个实际的MapReduce二次排序例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和map、reduce端的日志来验证所描述的处理流程的正确性。

二、需求描述

1、输入数据:

sort1    1

sort2    3

sort2    77

sort2    54

sort1    2

sort6    22

sort6    221

sort6    20

2、目标输出

sort1 1,2

sort2 3,54,77

sort6 20,22,221

三、解决思路

   1、首先,在思考解决问题思路时,我们先应该深刻的理解MapReduce处理数据的整个流程,这是最基础的,不然的话是不可能找到解决问题的思路的。我描述一下MapReduce处理数据的大概简单流程:首先,MapReduce框架通过getSplit方法实现对原始文件的切片之后,每一个切片对应着一个map task,inputSplit输入到Map函数进行处理,中间结果经过环形缓冲区的排序,然后分区、自定义二次排序(如果有的话)和合并,再通过shuffle操作将数据传输到reduce task端,reduce端也存在着缓冲区,数据也会在缓冲区和磁盘中进行合并排序等操作,然后对数据按照Key值进行分组,然后每处理完一个分组之后就会去调用一次reduce函数,最终输出结果。大概流程我画了一下,如下图:

wKioL1MoQZWh0GGNAAKQli7hniE185.jpg

2、具体解决思路

(1)Map端处理:

   根据上面的需求,我们有一个非常明确的目标就是要对第一列相同的记录合并,并且对合并后的数字进行排序。我们都知道MapReduce框架不管是默认排序或者是自定义排序都只是对Key值进行排序,现在的情况是这些数据不是key值,怎么办?其实我们可以将原始数据的Key值和其对应的数据组合成一个新的Key值,然后新的Key值对应的还是之前的数字。那么我们就可以将原始数据的map输出变成类似下面的数据结构:

{[sort1,1],1}

{[sort2,3],3}

{[sort2,77],77}

{[sort2,54],54}

{[sort1,2],2}

{[sort6,22],22}

{[sort6,221],221}

{[sort6,20],20}

那么我们只需要对[]里面的新key值进行排序就ok了。然后我们需要自定义一个分区处理器,因为我的目标不是想将新key相同的传到同一个reduce中,而是想将新key中的第一个字段相同的才放到同一个reduce中进行分组合并,所以我们需要根据新key值中的第一个字段来自定义一个分区处理器。通过分区操作后,得到的数据流如下:

Partition1:{[sort1,1],1}、{[sort1,2],2}

Partition2:{[sort2,3],3}、{[sort2,77],77}、{[sort2,54],54}

Partition3:{[sort6,22],22}、{[sort6,221],221}、{[sort6,20],20}

 

分区操作完成之后,我调用自己的自定义排序器对新的Key值进行排序。

{[sort1,1],1}

{[sort1,2],2}

{[sort2,3],3}

{[sort2,54],54}

{[sort2,77],77}

{[sort6,20],20}

{[sort6,22],22}

{[sort6,221],221}

(2)Reduce端处理:

   经过Shuffle处理之后,数据传输到Reducer端了。在Reducer端对按照组合键的第一个字段来进行分组,并且没处理完一次分组之后就会调用一次reduce函数来对这个分组进行处理输出。最终的各个分组的数据结构变成类似下面的数据结构:

{[sort1,2],[1,2]}

{[sort2,77],[3,54,77]}

{[sort6,221],[20,22,221]}

看到了这个最终的分组,很可能会有人会怀疑:为什么分组过后的key会变成这样?其实是这样的,数据通过排序之后会在reduce端进行分组,而且进入到分组函数的数据是已经经过排序的,我们拿第一个分组输入来说:{[sort1,1],1}、{[sort1,2],2}。当这2组数依次进入到分组函数,我们自定义的分组函数将组合key的第一个值作为分组key,然后进行合并,之后分组后数据变成:{[sort1,?],[1,2]},这了的?是究竟应该是什么值,MapReduce框架在分组的时候因为需要合并所以按照进入分组函数的顺序最后一个进入的则会成为这个分组后key的一部分,即为{[sort1,2],[1,2]}。文章最后面也做了验证,情况reduce端的日志信息。

四、具体实现

1、自定义组合键

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 自定义组合键
 * @author zenghzhaozheng
 */
public class CombinationKey implements WritableComparable<CombinationKey>{
    private static final Logger logger = LoggerFactory.getLogger(CombinationKey.class);
    private Text firstKey;
    private IntWritable secondKey;
    public CombinationKey() {
        this.firstKey = new Text();
        this.secondKey = new IntWritable();
    }
    public Text getFirstKey() {
        return this.firstKey;
    }
    public void setFirstKey(Text firstKey) {
        this.firstKey = firstKey;
    }
    public IntWritable getSecondKey() {
        return this.secondKey;
    }
    public void setSecondKey(IntWritable secondKey) {
        this.secondKey = secondKey;
    }
    @Override
    public void readFields(DataInput dateInput) throws IOException {
        // TODO Auto-generated method stub
        this.firstKey.readFields(dateInput);
        this.secondKey.readFields(dateInput);
    }
    @Override
    public void write(DataOutput outPut) throws IOException {
        this.firstKey.write(outPut);
        this.secondKey.write(outPut);
    }
    /**
     * 自定义比较策略
     * 注意:该比较策略用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,
     * 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整)
     */
    @Override
    public int compareTo(CombinationKey combinationKey) {
        logger.info("-------CombinationKey flag-------");
        return this.firstKey.compareTo(combinationKey.getFirstKey());
    }
}

说明:在自定义组合键的时候,我们需要特别注意,一定要实现WritableComparable接口,并且实现compareTo方法的比较策略。这个用于mapreduce的第一次默认排序,也就是发生在map阶段的sort小阶段,发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整),但是其对我们最终的二次排序结果是没有影响的。我们二次排序的最终结果是由我们的自定义比较器决定的。

2、自定义分区器

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.mr.secondSort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 自定义分区
 * @author zengzhaozheng
 */
public class DefinedPartition extends Partitioner<CombinationKey,IntWritable>{
    private static final Logger logger = LoggerFactory.getLogger(DefinedPartition.class);
    /**
     *  数据输入来源:map输出
     * @author zengzhaozheng
     * @param key map输出键值
     * @param value map输出value值
     * @param numPartitions 分区总数,即reduce task个数
     */
    @Override
    public int getPartition(CombinationKey key, IntWritable value,int numPartitions) {
        logger.info("--------enter DefinedPartition flag--------");
        /**
         * 注意:这里采用默认的hash分区实现方法
         * 根据组合键的第一个值作为分区
         * 这里需要说明一下,如果不自定义分区的话,mapreduce框架会根据默认的hash分区方法,
         * 将整个组合将相等的分到一个分区中,这样的话显然不是我们要的效果
         */
        logger.info("--------out DefinedPartition flag--------");
        /**
         * 此处的分区方法选择比较重要,其关系到是否会产生严重的数据倾斜问题
         * 采取什么样的分区方法要根据自己的数据分布情况来定,尽量将不同key的数据打散
         * 分散到各个不同的reduce进行处理,实现最大程度的分布式处理。
         */
        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
    }
}

说明:具体说明看代码注释。

3、自定义比较器

 

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 自定义二次排序策略
 * @author zengzhaoheng
 */
public class DefinedComparator extends WritableComparator {
    private static final Logger logger = LoggerFactory.getLogger(DefinedComparator.class);
    public DefinedComparator() {
        super(CombinationKey.class,true);
    }
    @Override
    public int compare(WritableComparable combinationKeyOne,
            WritableComparable CombinationKeyOther) {
        logger.info("---------enter DefinedComparator flag---------");
                                                                                                                                                                                             
        CombinationKey c1 = (CombinationKey) combinationKeyOne;
        CombinationKey c2 = (CombinationKey) CombinationKeyOther;
                                                                                                                                                                                             
        /**
         * 确保进行排序的数据在同一个区内,如果不在同一个区则按照组合键中第一个键排序
         * 另外,这个判断是可以调整最终输出的组合键第一个值的排序
         * 下面这种比较对第一个字段的排序是升序的,如果想降序这将c1和c2倒过来(假设1)
         */
        if(!c1.getFirstKey().equals(c2.getFirstKey())){
            logger.info("---------out DefinedComparator flag---------");
            return c1.getFirstKey().compareTo(c2.getFirstKey());
            }
        else{//按照组合键的第二个键的升序排序,将c1和c2倒过来则是按照数字的降序排序(假设2)
            logger.info("---------out DefinedComparator flag---------");
            return c1.getSecondKey().get()-c2.getSecondKey().get();//0,负数,正数
        }
        /**
         * (1)按照上面的这种实现最终的二次排序结果为:
         * sort1    1,2
         * sort2    3,54,77
         * sort6    20,22,221
         * (2)如果实现假设1,则最终的二次排序结果为:
         * sort6    20,22,221
         * sort2    3,54,77
         * sort1    1,2
         * (3)如果实现假设2,则最终的二次排序结果为:
         * sort1    2,1
         * sort2    77,54,3
         * sort6    221,22,20
         */
        }
}

说明:自定义比较器决定了我们二次排序的结果。自定义比较器需要继承WritableComparator类,并且重写compare方法实现自己的比较策略。具体的排序问题请看注释。

4、自定义分组策略

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 自定义分组策略
 * 将组合将中第一个值相同的分在一组
 * @author zengzhaozheng
 */
public class DefinedGroupSort extends WritableComparator{
    private static final Logger logger = LoggerFactory.getLogger(DefinedGroupSort.class);
    public DefinedGroupSort() {
        super(CombinationKey.class,true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        logger.info("-------enter DefinedGroupSort flag-------");
        CombinationKey ck1 = (CombinationKey)a;
        CombinationKey ck2 = (CombinationKey)b;
        logger.info("-------Grouping result:"+ck1.getFirstKey().
                compareTo(ck2.getFirstKey())+"-------");
        logger.info("-------out DefinedGroupSort flag-------");
        return ck1.getFirstKey().compareTo(ck2.getFirstKey());
    }
}

5、主体程序实现

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.mr;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * @author zengzhaozheng
 *
 * 用途说明:二次排序mapreduce
 * 需求描述:
 * ---------------输入-----------------
 * sort1,1
 * sort2,3
 * sort2,77
 * sort2,54
 * sort1,2
 * sort6,22
 * sort6,221
 * sort6,20
 * ---------------目标输出---------------
 * sort1 1,2
 * sort2 3,54,77
 * sort6 20,22,221
 */
public class SecondSortMR extends Configured  implements Tool {
    private static final Logger logger = LoggerFactory.getLogger(SecondSortMR.class);
    public static class SortMapper extends Mapper<Text, Text, CombinationKey, IntWritable> {
    //---------------------------------------------------------
        /**
         * 这里特殊要说明一下,为什么要将这些变量写在map函数外边。
         * 对于分布式的程序,我们一定要注意到内存的使用情况,对于mapreduce框架,
         * 每一行的原始记录的处理都要调用一次map函数,假设,此个map要处理1亿条输
         * 入记录,如果将这些变量都定义在map函数里边则会导致这4个变量的对象句柄编
         * 程非常多(极端情况下将产生4*1亿个句柄,当然java也是有自动的gc机制的,
         * 一定不会达到这么多,但是会浪费很多时间去GC),导致栈内存被浪费掉。我们将其写在map函数外边,
         * 顶多就只有4个对象句柄。
         */
        CombinationKey combinationKey = new CombinationKey();
        Text sortName = new Text();
        IntWritable score = new IntWritable();
        String[] inputString = null;
    //---------------------------------------------------------
        @Override
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            logger.info("---------enter map function flag---------");
            //过滤非法记录
            if(key == null || value == null || key.toString().equals("")
                    || value.equals("")){
                return;
            }
            sortName.set(key.toString());
            score.set(Integer.parseInt(value.toString()));
            combinationKey.setFirstKey(sortName);
            combinationKey.setSecondKey(score);
            //map输出
            context.write(combinationKey, score);
            logger.info("---------out map function flag---------");
        }
    }
    public static class SortReducer extends
    Reducer<CombinationKey, IntWritable, Text, Text> {
        StringBuffer sb = new StringBuffer();
        Text sore = new Text();
        /**
         * 这里要注意一下reduce的调用时机和次数:reduce每处理一个分组的时候会调用一
         * 次reduce函数。也许有人会疑问,分组是什么?看个例子就明白了:
         * eg:
         * {{sort1,{1,2}},{sort2,{3,54,77}},{sort6,{20,22,221}}}
         * 这个数据结果是分组过后的数据结构,那么一个分组分别为{sort1,{1,2}}、
         * {sort2,{3,54,77}}、{sort6,{20,22,221}}
         */
        @Override
        protected void reduce(CombinationKey key,
                Iterable<IntWritable> value, Context context)
                throws IOException, InterruptedException {
            sb.delete(0, sb.length());//先清除上一个组的数据
            Iterator<IntWritable> it = value.iterator();
                                                                                                                                                                                          
            while(it.hasNext()){
                sb.append(it.next()+",");
            }
            //去除最后一个逗号
            if(sb.length()>0){
                sb.deleteCharAt(sb.length()-1);
            }
            sore.set(sb.toString());
            context.write(key.getFirstKey(),sore);
            logger.info("---------enter reduce function flag---------");
            logger.info("reduce Input data:{["+key.getFirstKey()+","+
            key.getSecondKey()+"],["+sore+"]}");
            logger.info("---------out reduce function flag---------");
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf=getConf(); //获得配置文件对象
        Job job=new Job(conf,"SoreSort");
        job.setJarByClass(SecondSortMR.class);
                                                                                                                                                                                      
        FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
                                                                                                                                                                                                                                                                                                                           
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);
                                                                                                                                                                                      
        job.setPartitionerClass(DefinedPartition.class); //设置自定义分区策略
                                                                                                                                                                                                                                                                                                                           
        job.setGroupingComparatorClass(DefinedGroupSort.class); //设置自定义分组策略
        job.setSortComparatorClass(DefinedComparator.class); //设置自定义二次排序策略
                                                                                                                                                                                     
        job.setInputFormatClass(KeyValueTextInputFormat.class); //设置文件输入格式
        job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
                                                                                                                                                                                      
        //设置map的输出key和value类型
        job.setMapOutputKeyClass(CombinationKey.class);
        job.setMapOutputValueClass(IntWritable.class);
                                                                                                                                                                                      
        //设置reduce的输出key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.waitForCompletion(true);
        return job.isSuccessful()?0:1;
    }
                                                                                                                                                                                  
    public static void main(String[] args) {
        try {
            int returnCode =  ToolRunner.run(new SecondSortMR(),args);
            System.exit(returnCode);
        catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
                                                                                                                                                                                      
    }
}

6、运行最终结果

打jar包运行:

wKiom1MoOLmT7RfEAAfQcUTaxyA751.jpg

最终结果:

wKiom1MoOQrCuyMaAADPOme0878037.jpg

五、处理流程验证

看前面的代码,都知道我在各个组件上已经设置好了相应的标志,用于追踪整个MapReduce处理二次排序的处理流程。现在让我们分别看看Map端和Reduce端的日志情况。

   (1)Map端日志分析

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
2014-03-18 17:07:45,278 INFO org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library
2014-03-18 17:07:45,432 WARN org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Source name ugi already exists!
2014-03-18 17:07:45,501 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 0
2014-03-18 17:07:45,506 INFO org.apache.hadoop.mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@69b01afa
2014-03-18 17:07:45,584 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 100
2014-03-18 17:07:45,618 INFO org.apache.hadoop.mapred.MapTask: data buffer = 79691776/99614720
2014-03-18 17:07:45,618 INFO org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680
2014-03-18 17:07:45,626 WARN org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library not loaded
2014-03-18 17:07:45,634 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,634 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,634 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,634 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,634 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,635 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,636 INFO com.mr.SecondSortMR: ---------enter map function flag---------
2014-03-18 17:07:45,637 INFO com.mr.DefinedPartition: --------enter DefinedPartition flag--------
2014-03-18 17:07:45,637 INFO com.mr.DefinedPartition: --------out DefinedPartition flag--------
2014-03-18 17:07:45,637 INFO com.mr.SecondSortMR: ---------out map function flag---------
2014-03-18 17:07:45,637 INFO org.apache.hadoop.mapred.MapTask: Starting flush of map output
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,651 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------enter DefinedComparator flag---------
2014-03-18 17:07:45,652 INFO com.mr.DefinedComparator: ---------out DefinedComparator flag---------
2014-03-18 17:07:45,656 INFO org.apache.hadoop.mapred.MapTask: Finished spill 0
2014-03-18 17:07:45,661 INFO org.apache.hadoop.mapred.Task: Task:attempt_201312292019_13586_m_000000_0 is done. And is in the process of commiting
2014-03-18 17:07:48,494 INFO org.apache.hadoop.mapred.Task: Task 'attempt_201312292019_13586_m_000000_0' done.
2014-03-18 17:07:48,526 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2014-03-18 17:07:48,548 INFO org.apache.hadoop.io.nativeio.NativeIO: Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2014-03-18 17:07:48,548 INFO org.apache.hadoop.io.nativeio.NativeIO: Got UserName hadoop for UID 1000 from the native implementation

从map端的日志,我们可以很容易的看出来每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,就调用自定义二次排序函数对其进行排序。

(2)Reduce端日志分析

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
2014-03-18 17:07:51,266 INFO org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library
2014-03-18 17:07:51,418 WARN org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Source name ugi already exists!
2014-03-18 17:07:51,486 INFO org.apache.hadoop.util.ProcessTree: setsid exited with exit code 0
2014-03-18 17:07:51,491 INFO org.apache.hadoop.mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@28bb494b
2014-03-18 17:07:51,537 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager: MemoryLimit=195749472, MaxSingleShuffleLimit=48937368
2014-03-18 17:07:51,542 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Thread started: Thread for merging on-disk files
2014-03-18 17:07:51,542 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Thread started: Thread for merging in memory files
2014-03-18 17:07:51,542 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Thread waiting: Thread for merging on-disk files
2014-03-18 17:07:51,543 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Need another 1 map output(s) where 0 is already in progress
2014-03-18 17:07:51,543 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Thread started: Thread for polling Map Completion Events
2014-03-18 17:07:51,543 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Scheduled 0 outputs (0 slow hosts and0 dup hosts)
2014-03-18 17:07:56,544 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201312292019_13586_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup hosts)
2014-03-18 17:07:57,553 INFO org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting
2014-03-18 17:07:57,553 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread joined.
2014-03-18 17:07:57,553 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager
2014-03-18 17:07:57,553 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left.
2014-03-18 17:07:57,553 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 1 files left.
2014-03-18 17:07:57,577 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted segments
2014-03-18 17:07:57,577 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 130 bytes
2014-03-18 17:07:57,583 INFO org.apache.hadoop.mapred.ReduceTask: Merged 1 segments, 130 bytes to disk to satisfy reduce memory limit
2014-03-18 17:07:57,584 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1 files, 134 bytes from disk
2014-03-18 17:07:57,584 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce
2014-03-18 17:07:57,584 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted segments
2014-03-18 17:07:57,586 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 130 bytes
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------Grouping result:0-------
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------Grouping result:-1-------
2014-03-18 17:07:57,599 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,600 INFO com.mr.SecondSortMR: ---------enter reduce function flag---------
2014-03-18 17:07:57,600 INFO com.mr.SecondSortMR: reduce Input data:{[sort1,2],[1,2]}
2014-03-18 17:07:57,600 INFO com.mr.SecondSortMR: ---------out reduce function flag---------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------Grouping result:0-------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------Grouping result:0-------
2014-03-18 17:07:57,600 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------Grouping result:-4-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: ---------enter reduce function flag---------
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: reduce Input data:{[sort2,77],[3,54,77]}
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: ---------out reduce function flag---------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------Grouping result:0-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------enter DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------Grouping result:0-------
2014-03-18 17:07:57,601 INFO com.mr.DefinedGroupSort: -------out DefinedGroupSort flag-------
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: ---------enter reduce function flag---------
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: reduce Input data:{[sort6,221],[20,22,221]}
2014-03-18 17:07:57,601 INFO com.mr.SecondSortMR: ---------out reduce function flag---------
2014-03-18 17:07:57,641 INFO org.apache.hadoop.mapred.Task: Task:attempt_201312292019_13586_r_000000_0 is done. And is in the process of commiting
2014-03-18 17:08:00,668 INFO org.apache.hadoop.mapred.Task: Task attempt_201312292019_13586_r_000000_0 is allowed to commit now
2014-03-18 17:08:00,682 INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Saved output of task 'attempt_201312292019_13586_r_000000_0' to /user/hadoop/z.zeng/output23
2014-03-18 17:08:03,593 INFO org.apache.hadoop.mapred.Task: Task 'attempt_201312292019_13586_r_000000_0' done.
2014-03-18 17:08:03,596 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2014-03-18 17:08:03,615 INFO org.apache.hadoop.io.nativeio.NativeIO: Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2014-03-18 17:08:03,615 INFO org.apache.hadoop.io.nativeio.NativeIO: Got UserName hadoop for UID 1000 from the native implementation

首先,我们看了Reduce端的日志,第一个信息我应该能够很容易的看出来的,就是分组和reduce函数处理都是在shuffle完成之后才进行的。另外一点我们也非常容易看出,就是每处理完一个分组数据就会去调用一次的reduce函对这个分组来进行处理和输出。此外,说明一下分组函数的返回值问题,当返回值为0时候才会被分到同一个组当中。另外一点我们也可以看出来,一个分组中每合并n个值就会有n-1分组函数返回0值,也就是说有进行了n-1次比较。

   所以,中map端和reduce端的日志情况来看,MapReduce框架处理二次排序的总体流程正如我上面的图所画的,整一个流程是正确的。

、总结

    本文主要从MapReduce框架执行的流程,去分析了如何去实现二次排序,通过代码进行了实现,并且对整个流程进行了验证。另外,要吐槽一下,网络上有很多文章都记录了MapReudce处理二次排序问题,但是对MapReduce框架整个处理流程的描述错漏很多,而且他们最终的流程描述也没有证据可以支撑。所以,对于网络上的学习资源不能够完全依赖,要融入自己的思想,并且要重要的观点进行代码或者实践的验证。另外,今天在一个hadoop交流群上听到少部分人在讨论,有了hive我们就不用学习些MapReduce程序?对这这个问题我是这么认为:我不相信写不好MapReduce程序的程序员会写好hive语句,最起码的他们对整个执行流程是一无所知的,更不用说性能问题了,有可能连最常见的数据倾斜问题的弄不清楚。

 

分享到:
评论

相关推荐

    大数据Hadoop核心之MapReduce详解

    大数据Hadoop核心之MapReduce详解 MapReduce是Hadoop核心模块之一,作为一个分布式运算程序的编程框架,用于用户开发基于Hadoop的数据分析应用。MapReduce的核心功能是将用户编写的业务逻辑代码和自带默认组件整合...

    hadoop集群mapreduce例子共52页.pdf.z

    《Hadoop集群MapReduce实战详解》 MapReduce是Apache Hadoop框架的核心组件之一,它为大规模数据处理提供了分布式计算模型。本资料“hadoop集群mapreduce例子共52页.pdf”详细介绍了MapReduce的工作原理、架构及其...

    Hadoop从入门到上手企业开发

    060 MapReduce执行流程之Shuffle和排序流程以及Map端分析 061 MapReduce执行流程之Reduce端分析 062 MapReduce Shuffle过程讲解和Map Shuffle Phase讲解 063 Reduce Shuffle Phase讲解 064 源代码跟踪查看Map Task和...

    hadoop2.X配置详解和mapreduce详解

    在Hadoop 2.x版本中,为了解决单点故障(Single Point of Failure, SPOF)的问题,引入了High Availability(HA)特性,特别是针对NameNode的高可靠性...对于MapReduce,理解YARN的角色和作业执行流程也是至关重要的。

    Hadoop运行流程详解

    Hadoop运行流程详解 Hadoop是一个开源分布式计算框架,核心由两个主要组件构成:HDFS(Hadoop Distributed File System)和MapReduce。本篇将详细阐述Hadoop中的MapReduce执行流程,包括其主要概念、数据结构和整体...

    hadoop MapReduce编程教程

    #### 四、WordCount实例详解 - **WordCount**:这是一个经典的MapReduce示例程序,用于统计文本文件中各个单词出现的频率。 - **映射阶段**:读取输入文本,将每一行文本按单词拆分,并为每个单词生成键值对(单词,...

    hadoop运行wordcount实例

    ### Hadoop运行WordCount实例详解 #### 一、Hadoop简介与WordCount程序的重要性 Hadoop 是一个由Apache基金会所开发的分布式系统基础架构。它能够处理非常庞大的数据集,并且能够在集群上运行,通过将大数据分割...

    HBase MapReduce完整实例

    《HBase MapReduce实战详解》 在大数据处理领域,HBase和MapReduce是两个不可或缺的重要组件。HBase作为分布式列式存储系统,适用于大规模数据的实时读写操作;而MapReduce则是Apache Hadoop的核心组件之一,用于...

    Hadoop数据处理框架MapReduce原理及开发

    ### Hadoop数据处理框架MapReduce原理及开发 #### 一、Hadoop生态系统概述 Hadoop是一个开源软件框架,主要用于分布式存储和处理大规模数据集。它由Apache软件基金会维护和发展,自2006年成立以来已经成为了大数据...

    hadoop eclips 的插件 和实例程序

    二、Hadoop实例程序的创建与运行 1. 新建项目:在Eclipse中,选择“File” -&gt; “New” -&gt; “Project”,然后在“Map/Reduce Project”选项中,为你的Hadoop项目命名并点击“Next”。 2. 创建Mapper和Reducer类:在...

    深入云计算:Hadoop应用开发实战详解 源代码

    1. **HDFS(Hadoop Distributed File System)**:HDFS的核心设计原则是“一次写入,多次读取”(Write Once, Read Many),确保数据的可靠性。它将大文件分割成多个块,并将这些块复制到多台机器上,提供冗余以防止...

    尚硅谷大数据技术之Hadoop

    【尚硅谷大数据技术之Hadoop】是一门深入探讨大数据处理技术的课程,主要聚焦于开源框架Hadoop。Hadoop是Apache软件基金会开发的一个分布式计算项目,它为大规模数据集(大于1TB)提供了高容错性的分布式存储和计算...

    MapReduce计算模式详解

    ### MapReduce计算模式详解 #### 一、MapReduce简单概述 MapReduce是一种高效的大数据处理技术,它由Google提出并在Hadoop中得到了广泛的应用。MapReduce不仅是一个计算平台,还是一个并行计算框架和并行程序设计...

    初学Hadoop之图解MapReduce与WordCount示例分析

    Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算...1、Hadoop示例程序WordCount详解及实例2、hadoop学习笔记:mapreduce框架详解3、hadoop示例程序wo

    详解Hadoop.2013

    Hadoop的设计借鉴了Google在大规模分布式系统方面的研究成果,其核心组件是HDFS(Hadoop Distributed File System)和MapReduce。HDFS是Google文件系统(GFS)的开源实现,而MapReduce则是Google的计算框架。 HDFS...

    Windows下使用Hadoop实例

    ### Windows下使用Hadoop实例详解 #### 一、概述 在Windows环境下搭建并运行Hadoop实例,需要经过一系列步骤,包括但不限于安装Cygwin、配置Java环境、安装配置SSH服务等。本文旨在详细介绍如何在Windows操作系统...

    深入云计算:Hadoop应用开发实战详解(修订版)源代码

    《深入云计算:Hadoop应用开发实战详解(修订版)源代码》是一本专注于Hadoop开发实践的书籍,其源代码提供了丰富的学习资源,适合初学者和有经验的开发者深入理解和掌握Hadoop技术栈。Hadoop作为大数据处理的核心...

    Hadoop的yarn详解

    Hadoop的YARN架构是Hadoop版本2.x引入的一个重要组件,它负责处理资源管理和作业调度,而核心的计算任务处理则交给了MapReduce、Tez、Spark等计算框架。YARN的出现是为了解决Hadoop早期版本中的可扩展性问题,它通过...

    MapReduce 编程模型

    ### MapReduce 编程模型详解 #### 一、引言:MapReduce——大规模数据处理的革新者 在当今数字化时代,大数据的处理已成为各行业关注的焦点。随着互联网的飞速发展,数据量呈指数级增长,传统的数据处理方法已无法...

    Hadoop官方教案

    第二课时:HDFS详解 HDFS是Hadoop的核心组成部分,它将大文件分块并存储在多台廉价服务器上,实现了高可用性和容错性。本课将深入解析HDFS的数据模型、命名空间、副本策略以及数据读写流程,帮助理解HDFS如何保证...

Global site tag (gtag.js) - Google Analytics