前言
Hadoop二次排序简单说就是先根据字段A分组排序,然后在对组内根据字段B排序。Hadoop二次排序在面试的时候出现频率也是比较高的。今天花了点时间通过源码深入学习了一下。(后面内容以Hadoop自带实例——SecondarySort讲解,见本文附录)
Hadoop默认是根据reduce key排序,通过Hadoop二次排序可以实现根据value值(需要将其放入复合key中)进行排序,排序后做可以取TOP值。比如可以做,每个网站访问量最大前10个URL等分析。
关键点
1、Partitioner
它的作用是决定数据分区,说白了就是决定map输出key-value由哪个reduce处理,每个map task输出的key-value都会执行Partitioner的getPartition()方法,用于返回当前key-value由哪个reduce处理。
本例中Partitioner基于map函数输出IntPair(first, second)第一个元素,即first,进行求余运算,所以得到的结果是first相同的key-value会发送到同一reduce。
2、IntPair的compareTo()方法
IntPair是map输出的key,它的compareTo()方法决定map输出排序规则。IntPair的实现规则是:先按照first排序,相同first按照second排序(所谓的二次排序其实在这里就实现了)。结果如下:
-------------
1982 6
1984 3
1984 4
1984 5
1984 5
1988 10
-------------
运行时机:
- map函数,从缓冲区spill key-value到本地磁盘
- reduce函数。获取逻辑:如果有GroupingComparator就返回,否则返回map输出比较函数。
3、GroupingComparator
实现逻辑
根据IntPair的first字段进行排序
运行时机:
ReduceTask.run() ->
// copy、sort完成之后
RawComparator comparator = job.getOutputValueGroupingComparator(); // 这里获取comparator
runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
runNewReducer() ->
....
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW, committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext); // reducerContext拥有comparator
reducer.run() ->
while (context.nextKey()) { <-
...
}
context.nextKey() ->
while (hasMore && nextKeyIsSame) {
nextKeyValue(); <- ①
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue(); <- ②
} else {
return false;
}
nextKeyValue() ->
....
if (hasMore) {
next = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
next.getData(),
next.getPosition(),
next.getLength() - next.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
}
....
可以看到GroupingComparator在reduce函数内被调用,用于迭代读取reduce输入文件过程中,判断key是否发生变化。那它有什么作用呢?要会回答这个问题,不如先问问,如果没有GroupingComparator结果会如何?
如果在Job提交时不设置GroupingComparator,那comparator将使用conf中"mapred.output.key.comparator.class"对应的类,如果没有设置"mapred.output.key.comparator.class",则根据map输出key从WritableComparator获取注册的comparator(IntPair通过" WritableComparator.define(IntPair.class, new Comparator());"注册)。本例中,如果不设置GroupingComparator,就会使用IntPair的内嵌类Comparator的compareTo()方法判断,即先比较first,再比较second。这样在迭代读取reduce输入数据的时候,会发生这样的情况:first相同,second不同,comparator会认为两条记录不一致,从而变更key值,继续迭代,这样就无法将相同first的数据聚合到一个迭代中进行处理的,即相同first通过second进行排序。
其它
- pig可以通过内嵌foreach方式实现二次排序功能
- SQL中需要使用子查询实现该功能,见:http://heipark.iteye.com/blog/1776101
附录
下图是我整理的流程,更易于理解^_^
public class SecondarySort { /** * Define a pair of integers that are writable. * They are serialized in a byte comparable format. */ public static class IntPair implements WritableComparable<IntPair> { private int first = 0; private int second = 0; /** * Set the left and right values. */ public void set(int left, int right) { first = left; second = right; } public int getFirst() { return first; } public int getSecond() { return second; } /** * Read the two integers. * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1 */ @Override public void readFields(DataInput in) throws IOException { first = in.readInt() + Integer.MIN_VALUE; second = in.readInt() + Integer.MIN_VALUE; } @Override public void write(DataOutput out) throws IOException { out.writeInt(first - Integer.MIN_VALUE); out.writeInt(second - Integer.MIN_VALUE); } @Override public int hashCode() { return first * 157 + second; } @Override public boolean equals(Object right) { if (right instanceof IntPair) { IntPair r = (IntPair) right; return r.first == first && r.second == second; } else { return false; } } /** A Comparator that compares serialized IntPair. */ public static class Comparator extends WritableComparator { public Comparator() { super(IntPair.class); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return compareBytes(b1, s1, l1, b2, s2, l2); } } static { // register this comparator WritableComparator.define(IntPair.class, new Comparator()); } @Override public int compareTo(IntPair o) { if (first != o.first) { return first < o.first ? -1 : 1; } else if (second != o.second) { return second < o.second ? -1 : 1; } else { return 0; } } } /** * Partition based on the first part of the pair. */ public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{ @Override public int getPartition(IntPair key, IntWritable value, int numPartitions) { return Math.abs(key.getFirst() * 127) % numPartitions; } } /** * Compare only the first part of the pair, so that reduce is called once * for each value of the first part. */ public static class FirstGroupingComparator implements RawComparator<IntPair> { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8); } @Override public int compare(IntPair o1, IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 : 1); } } /** * Read two integers from each line and generate a key, value pair * as ((left, right), right). */ public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> { private final IntPair key = new IntPair(); private final IntWritable value = new IntWritable(); @Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(inValue.toString()); int left = 0; int right = 0; if (itr.hasMoreTokens()) { left = Integer.parseInt(itr.nextToken()); if (itr.hasMoreTokens()) { right = Integer.parseInt(itr.nextToken()); } key.set(left, right); value.set(right); context.write(key, value); } } } /** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> { private static final Text SEPARATOR = new Text("------------------------------------------------"); private final Text first = new Text(); @Override public void reduce(IntPair key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { context.write(SEPARATOR, null); first.set(Integer.toString(key.getFirst())); for(IntWritable value: values) { context.write(first, value); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: secondarysort <in> <out>"); System.exit(2); } Job job = new Job(conf, "secondary sort"); job.setJarByClass(SecondarySort.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); // group and partition by the first int in the pair job.setPartitionerClass(FirstPartitioner.class); job.setGroupingComparatorClass(FirstGroupingComparator.class); // the map output is IntPair, IntWritable job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); // the reduce output is Text, IntWritable job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
--end
相关推荐
- 配置文件详解:如core-site.xml, hdfs-site.xml, mapred-site.xml等,这些配置文件是搭建和管理Hadoop集群的关键。 - NameNode和DataNode:NameNode是HDFS的元数据管理节点,DataNode则是存储数据的节点,理解...
这个压缩包文件“hadoop 1.2.1核心源码”包含了Hadoop项目的核心组件,让我们深入探讨一下其中涉及的关键知识点。 1. **Hadoop架构**:Hadoop的核心由两个主要部分组成:HDFS(Hadoop Distributed File System)和...
在大数据领域,Hadoop和Hive是两个至关重要的技术组件,它们在处理大规模数据存储和分析方面发挥着关键作用。本篇文章将详细探讨Hadoop和Hive的相关面试知识点,帮助求职者更好地准备大数据领域的面试。 首先,我们...
6. **Chap 7 - 高级MapReduce技术**:涵盖如使用Secondary Sort进行复杂排序,或者使用New API(MapReduce v2, YARN)提升性能和资源管理。 7. **Chap 8 - Hadoop优化与故障排除**:这部分内容可能涉及Hadoop集群的...
Hadoop作为开源的大数据处理框架,其源码解析对于理解其工作原理、优化系统性能以及进行二次开发至关重要。本篇将深入探讨Hadoop源码中的关键知识点。 1. **Hadoop架构**:Hadoop主要由HDFS(Hadoop Distributed ...
- **HADOOP-12-152**: 包含DataNode(负责数据存储)、Secondary NameNode(辅助NameNode进行检查点操作)、HiveGateway、WebHCat(提供RESTful API来执行Hive和Pig任务)、Hue(提供图形界面)、ImpalaDaemon(用于...
这本书中文版的出现,使得更多的中国读者能够无障碍地学习和理解Hadoop技术,而带书签的特性则方便了读者快速定位和复习相关章节。 Hadoop是Apache基金会的一个开源项目,其核心由两个主要组件构成:Hadoop ...
1. **Hadoop架构**:介绍Hadoop的整体设计,包括NameNode、DataNode、Secondary NameNode等节点的角色和功能,以及它们如何协同工作以提供数据存储和访问。 2. **HDFS原理**:详细解析HDFS的数据存储机制,如数据块...
书中还讨论了Secondary Sort(二级排序)问题,提供了Hadoop和Spark解决方案的详细案例分析,包括输入、预期输出、map()和reduce()函数的介绍,以及实际代码示例的运行结果。 书中对于Secondary Sort问题进行了深入...
为此,引入了Secondary NameNode来定期合并fsimage和editlog,以防止NameNode成为单点故障。 3. HDFS数据完整性 - 通过CRC32校验和确保数据的完整性,客户端在写入数据块前计算校验和,并在读取时进行验证,发现...
Hadoop是大数据处理领域的重要工具,它以分布式计算模型为基础,为海量数据处理提供了高效、可靠的解决方案。Hadoop源码分析对于理解...这不仅有助于解决现有问题,也为未来可能出现的新需求提供了理论基础和实践指导。
中间结果通过shuffle和sort阶段进行排序和分组,最后由reduce函数处理。 4. **Hadoop源码解析** Hadoop源代码中,`org.apache.hadoop.mapreduce`包是MapReduce的核心,包含Job、Task、Mapper和Reducer等关键类。`...
书中的实验部分涵盖了Hadoop生态系统中的多个关键组件,通过这些章节的代码和材料,读者可以亲自动手实践,从而加深对Hadoop的理解。 在提供的9120OS_Code.zip压缩包中,包含了书中各个章节的源代码和相关材料,...
在这个计划书中,我们将涵盖以下几个关键知识点: 1. **Hadoop集群搭建**:Hadoop集群的搭建是实现大数据处理的第一步。这涉及到配置各个节点,安装Java运行环境,下载并安装Hadoop,设置环境变量,以及配置Hadoop...
这可能包括优化Mapper和Reducer的逻辑,以及使用Hadoop的Secondary Sort等技术来改善数据局部性。 4. **索引构建**:为了加速查询,论文可能会讨论如何在分布式环境中构建和维护索引,如Bloom Filter或Bitmap索引,...
【Java大数据作业_5Mapreduce、数据挖掘】的课后作业涵盖了多个MapReduce和大数据处理的关键知识点,包括日志分析、Job执行模式、HBase的相关类、容量调度配置、MapReduce流程以及二次排序算法。下面将对这些内容...
### 分布式大数据处理架构知识点解析 ...以上内容详细介绍了分布式大数据处理架构的核心知识点,涵盖了Hadoop生态系统的关键组件和技术细节,旨在帮助读者深入理解分布式数据处理的核心原理和技术实现。
6. **Secondary Sort模式**:在某些情况下,简单的键值排序不足以满足需求,Secondary Sort允许自定义排序规则,以满足更复杂的业务逻辑。 7. **Map-only作业**:如果不需要Reduce阶段,可以使用Map-only作业,这会...