`
sunwinner
  • 浏览: 203770 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

MapReduce Algorithm - Secondary Sort

 
阅读更多

Secondary sort is used to sort to allow some records to arrive at a reducer ahead of other records, it requires an understanding of both data arrangement and data flow (partitioning, sorting and grouping) and how they're integrated into MapReduce. As below figure shown:

 The partitioner is invoked as part of the map output collection process, and is used to determine which reducer should receive the map output. The sorting RawComparator is used to sort the map outputs within their respective partitions, and is used in both the map and reduce sides. Finally, the grouping RawComparator is responsible for determining the group boundaries across the sorted records, the grouping happens before the reduce(...) called, which will accept a group values associated with the key emitted by Mapper. 

 

The default behavior in MapReduce is for all three functions to operate on the entire key emitted by map functions.

 

Suppose you want to order a group of people's names with secondary sort, you want to use the primary sort to order people's last names, and secondary sort on their first names. Also you have to write partitioner, sort comparator and grouping comparator classes, which are required by secondary sort to work. First of all, you have to create a composite output key, which will be emitted by Mapper. The composite key will contains two parts:

  1. The natural key, which is the key to use for joining purposes.
  2. The secondary key, which is the key to use to order all of the values sent to the reducer for the natural key.

     Now let's go through the composite key, partitioning, sorting and grouping classes(again, you can find the source code from the book Hadoop in Practice). Firstly, the composite key:
    package com.manning.hip.ch4.sort.secondary;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.*;
    
    public class Person implements WritableComparable<Person> {
    
        private String firstName;
        private String lastName;
    
        public Person() {
        }
    
        public String getFirstName() {
            return firstName;
        }
    
        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    
        public String getLastName() {
            return lastName;
        }
    
        public void setLastName(String lastName) {
            this.lastName = lastName;
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.lastName = in.readUTF();
            this.firstName = in.readUTF();
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(lastName);
            out.writeUTF(firstName);
        }
    
        @Override
        public int compareTo(Person other) {
            int cmp = this.lastName.compareTo(other.lastName);
            if (cmp != 0) {
                return cmp;
            }
            return this.firstName.compareTo(other.firstName);
        }
    
        public void set(String lastName, String firstName) {
            this.lastName = lastName;
            this.firstName = firstName;
        }
    }

Now let see the the key utilization of partitioning , sorting and grouping:



 

The following code shows the partition implementation:

package com.manning.hip.ch4.sort.secondary;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PersonNamePartitioner extends
        Partitioner<Person, Text> {

    @Override
    public int getPartition(Person key, Text value, int numPartitions) {
        return Math.abs(key.getLastName().hashCode() * 127) %
                numPartitions;
    }
}

 The sorting comparator implementation as follows, first it will compare last name then first name:

package com.manning.hip.ch4.sort.secondary;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class PersonComparator extends WritableComparator {
    protected PersonComparator() {
        super(Person.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {

        Person p1 = (Person) w1;
        Person p2 = (Person) w2;

        int cmp = p1.getLastName().compareTo(p2.getLastName());
        if (cmp != 0) {
            return cmp;
        }

        return p1.getFirstName().compareTo(p2.getFirstName());
    }
}

 Grouping occurs when the reduce phase is streaming map output records from local disk. When you are in this stage, all of records are already in secondary-sort order, and the grouping comparator needs to bundle together with the same key, the person's last name in this case.

package com.manning.hip.ch4.sort.secondary;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class PersonNameComparator extends WritableComparator {

    protected PersonNameComparator() {
        super(Person.class, true);
    }

    @Override
    public int compare(WritableComparable o1, WritableComparable o2) {

        Person p1 = (Person) o1;
        Person p2 = (Person) o2;

        return p1.getLastName().compareTo(p2.getLastName());

    }
}

 Finally the MapReduce driver class:

package com.manning.hip.ch4.sort.secondary;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public final class SortMapReduce {
    public static void main(String... args) throws Exception {
        runSortJob(args[0], args[1]);
    }

    public static void runSortJob(String input, String output)
            throws Exception {
        Configuration conf = new Configuration();

        Job job = new Job(conf);
        job.setJarByClass(SortMapReduce.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(KeyValueTextInputFormat.class);

        job.setMapOutputKeyClass(Person.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setPartitionerClass(PersonNamePartitioner.class);
        job.setSortComparatorClass(PersonComparator.class);
        job.setGroupingComparatorClass(PersonNameComparator.class);

        Path outputPath = new Path(output);

        FileInputFormat.setInputPaths(job, input);
        FileOutputFormat.setOutputPath(job, outputPath);

        outputPath.getFileSystem(conf).delete(outputPath, true);

        job.waitForCompletion(true);
    }

    public static class Map
            extends Mapper<Text, Text, Person, Text> {

        private Person outputKey = new Person();

        @Override
        protected void map(Text lastName, Text firstName, Context context)
                throws IOException, InterruptedException {
            outputKey.set(lastName.toString(), firstName.toString());
            context.write(outputKey, firstName);
        }
    }

    public static class Reduce
            extends Reducer<Person, Text, Text, Text> {

        Text lastName = new Text();

        @Override
        public void reduce(Person key, Iterable<Text> values,
                           Context context)
                throws IOException, InterruptedException {
            lastName.set(key.getLastName());
            for (Text firstName : values) {
                context.write(lastName, firstName);
            }
        }
    }
}

 

 

  • 大小: 90.6 KB
  • 大小: 45.7 KB
  • 大小: 85.8 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics