Secondary sort is used to sort to allow some records to arrive at a reducer ahead of other records, it requires an understanding of both data arrangement and data flow (partitioning, sorting and grouping) and how they're integrated into MapReduce. As below figure shown:
The partitioner is invoked as part of the map output collection process, and is used to determine which reducer should receive the map output. The sorting RawComparator is used to sort the map outputs within their respective partitions, and is used in both the map and reduce sides. Finally, the grouping RawComparator is responsible for determining the group boundaries across the sorted records, the grouping happens before the reduce(...) called, which will accept a group values associated with the key emitted by Mapper.
The default behavior in MapReduce is for all three functions to operate on the entire key emitted by map functions.
Suppose you want to order a group of people's names with secondary sort, you want to use the primary sort to order people's last names, and secondary sort on their first names. Also you have to write partitioner, sort comparator and grouping comparator classes, which are required by secondary sort to work. First of all, you have to create a composite output key, which will be emitted by Mapper. The composite key will contains two parts:
- The natural key, which is the key to use for joining purposes.
-
The secondary key, which is the key to use to order all of the values sent to the reducer for the natural key.
Now let's go through the composite key, partitioning, sorting and grouping classes(again, you can find the source code from the book Hadoop in Practice). Firstly, the composite key:package com.manning.hip.ch4.sort.secondary; import org.apache.hadoop.io.WritableComparable; import java.io.*; public class Person implements WritableComparable<Person> { private String firstName; private String lastName; public Person() { } public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @Override public void readFields(DataInput in) throws IOException { this.lastName = in.readUTF(); this.firstName = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(lastName); out.writeUTF(firstName); } @Override public int compareTo(Person other) { int cmp = this.lastName.compareTo(other.lastName); if (cmp != 0) { return cmp; } return this.firstName.compareTo(other.firstName); } public void set(String lastName, String firstName) { this.lastName = lastName; this.firstName = firstName; } }
Now let see the the key utilization of partitioning , sorting and grouping:
The following code shows the partition implementation:
package com.manning.hip.ch4.sort.secondary; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class PersonNamePartitioner extends Partitioner<Person, Text> { @Override public int getPartition(Person key, Text value, int numPartitions) { return Math.abs(key.getLastName().hashCode() * 127) % numPartitions; } }
The sorting comparator implementation as follows, first it will compare last name then first name:
package com.manning.hip.ch4.sort.secondary; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class PersonComparator extends WritableComparator { protected PersonComparator() { super(Person.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { Person p1 = (Person) w1; Person p2 = (Person) w2; int cmp = p1.getLastName().compareTo(p2.getLastName()); if (cmp != 0) { return cmp; } return p1.getFirstName().compareTo(p2.getFirstName()); } }
Grouping occurs when the reduce phase is streaming map output records from local disk. When you are in this stage, all of records are already in secondary-sort order, and the grouping comparator needs to bundle together with the same key, the person's last name in this case.
package com.manning.hip.ch4.sort.secondary; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class PersonNameComparator extends WritableComparator { protected PersonNameComparator() { super(Person.class, true); } @Override public int compare(WritableComparable o1, WritableComparable o2) { Person p1 = (Person) o1; Person p2 = (Person) o2; return p1.getLastName().compareTo(p2.getLastName()); } }
Finally the MapReduce driver class:
package com.manning.hip.ch4.sort.secondary; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public final class SortMapReduce { public static void main(String... args) throws Exception { runSortJob(args[0], args[1]); } public static void runSortJob(String input, String output) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(SortMapReduce.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Person.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setPartitionerClass(PersonNamePartitioner.class); job.setSortComparatorClass(PersonComparator.class); job.setGroupingComparatorClass(PersonNameComparator.class); Path outputPath = new Path(output); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); job.waitForCompletion(true); } public static class Map extends Mapper<Text, Text, Person, Text> { private Person outputKey = new Person(); @Override protected void map(Text lastName, Text firstName, Context context) throws IOException, InterruptedException { outputKey.set(lastName.toString(), firstName.toString()); context.write(outputKey, firstName); } } public static class Reduce extends Reducer<Person, Text, Text, Text> { Text lastName = new Text(); @Override public void reduce(Person key, Iterable<Text> values, Context context) throws IOException, InterruptedException { lastName.set(key.getLastName()); for (Text firstName : values) { context.write(lastName, firstName); } } } }
相关推荐
Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----【MapReduce 概述】---- 代码 Hadoop 3.x(MapReduce)----...
Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop 序列化】---- 代码 Hadoop 3.x(MapReduce)----【Hadoop ...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
hadoop-mapreduce-examples-2.7.1.jar
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-app-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.6.5.jar 赠送原API文档:hadoop-mapreduce-client-core-2.6.5-javadoc.jar 赠送源代码:hadoop-mapreduce-client-core-2.6.5-sources.jar 包含翻译后的API文档:...
hadoop-mapreduce-examples-2.6.5.jar 官方案例源码
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
hadoop中的demo,wordcount列子用到的JAR包 用法: # 在容器里运行WordCount程序,该程序需要2个参数...hadoop jar hadoop-mapreduce-examples-2.7.1-sources.jar org.apache.hadoop.examples.WordCount input output
Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据...
MapReduce--->实现简单的数据清洗需要的数据文件
赠送jar包:hadoop-mapreduce-client-common-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.7.3-sources.jar; 赠送Maven依赖信息...
hadoop-mapreduce-examples-2.0.0-alpha.jar
hadoop-mapreduce-client-core-2.5.1.jar,mapreduce必备组件,供学习使用 欢迎下载
赠送jar包:hadoop-mapreduce-client-jobclient-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.5.1-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-jobclient-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.7.3-sources.jar; 赠送...