之前在网上看到了一篇使用MapReduce实现二次排序的博客,自己尝试实现了,并测试成功,代码有所改动。链接如下:
http://blog.csdn.net/zyj8170/article/details/7530728
所谓的二次排序:对Key和Val都进行排序(比如升序),并输出。对Key的自动排序,MapReduce可以替我们解决,但是同时对Val进行排序,则需要其他的做法。
做法一:对每个Key的所有Val,添加到ArrayList,使用Collections.sort方法进行排序,虽然能够实现,但是存在隐患,如果key的val值很多,那么对每个Key的ArrayList消耗的内存就很大,效率不高;
做法二:定制MapReduce的IO类型,定制GroupingComparator类,实现二次排序,通过定制,将默认的MR执行的方式改成自定义的,效率比较高,易于扩展;
现在开始实现做法二。
一、准备待排序的数据(数据跟原博客一样),见第三部分的结果对比:
二、代码如下
package com; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.StringTokenizer; public class SecondSort extends Configured implements Tool { //自定义的类型,(参见本博客的MapReduce定制类型) static class IntPair implements WritableComparable<IntPair> { private int a; private int b; public IntPair() { a = 0; b = 0; } public int getA() { return a; } public void setA(int a) { this.a = a; } public int getB() { return b; } public void setB(int b) { this.b = b; } public void set(int a, int b) { this.a = a; this.b = b; } @Override public int compareTo(IntPair o) { if (this.a == o.a) { if (this.b == o.b) return 0; else return this.b > o.b ? 1 : -1; } else return this.a > o.a ? 1 : -1; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(a); dataOutput.writeInt(b); } @Override public void readFields(DataInput dataInput) throws IOException { a = dataInput.readInt(); b = dataInput.readInt(); } } static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer strTok = new StringTokenizer(value.toString()); int a = Integer.parseInt(strTok.nextToken()); int b = Integer.parseInt(strTok.nextToken()); IntPair mykey = new IntPair(); mykey.set(a, b); context.write(mykey, new IntWritable(b)); } } static class MyKeyGroupComparator extends WritableComparator { MyKeyGroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair ip1 = (IntPair) a; IntPair ip2 = (IntPair) b; if (ip1.a == ip2.a) return 0; else return ip1.a > ip2.a ? 1 : -1; } } static class Reduce extends Reducer<IntPair, IntWritable, IntWritable, IntWritable> { @Override protected void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { IntWritable myKey = new IntWritable(key.getA()); //显式的分隔分组,便于查看 context.write(new IntWritable(999999999), null); for(IntWritable i :values){ context.write(myKey, i); } } } @Override public int run(String[] strings) throws Exception { //path是HDFS的路径字符串 String path = "/my/inputTest/Test_SecondSort.txt"; if (strings.length != 1) { System.out.println("input:" + path); System.out.print("arg:<out>"); return 1; } Configuration conf = getConf(); Job job = new Job(conf, "SecondSort"); job.setJarByClass(SecondSort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setGroupingComparatorClass(MyKeyGroupComparator.class); job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path(path)); FileOutputFormat.setOutputPath(job, new Path(strings[0])); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int rst = ToolRunner.run(conf, new SecondSort(), args); System.exit(rst); } }
三、见结果对比表,结果为结果数据1(999999999是Group分隔符):
四、如果将Group的标准从根据Inpair的a的值判断改成b的值判断,Group部分代码修改如下(其余不变):
static class MyKeyGroupComparator extends WritableComparator { MyKeyGroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { IntPair ip1 = (IntPair) a; IntPair ip2 = (IntPair) b; // if (ip1.a == ip2.a) // return 0; // else // return ip1.a > ip2.a ? 1 : -1; if (ip1.b == ip2.b) return 0; else return ip1.b > ip2.b ? 1 : -1; } }
结果见结果对比表的结果数据2.
五、结果对比表:
原始数据 | 结果数据1 | 结果数据2 | 结果说明 |
20 21 50 51 50 52 50 53 50 54 60 51 60 53 60 52 60 56 60 57 70 58 60 61 70 54 70 55 70 56 70 57 70 58 1 2 3 4 5 6 7 82 203 21 50 512 50 522 50 53 530 54 40 511 20 53 20 522 60 56 60 57 740 58 63 61 730 54 71 55 71 56 73 57 74 58 12 211 31 42 50 62 7 8 |
999999999 1 2 999999999 3 4 999999999 7 8 7 82 999999999 12 211 999999999 20 21 20 53 20 522 999999999 31 42 999999999 40 511 999999999 50 51 50 52 50 53 50 54 …… |
999999999 1 2 999999999 3 4 999999999 7 8 999999999 7 82 999999999 12 211 999999999 20 21 999999999 20 53 999999999 20 522 999999999 31 42 999999999 40 511 999999999 50 51 999999999 50 52 999999999 50 53 999999999 50 54 …… |
结果1中,完全符合我们的要求,实现了二次排序,并将key为20的全部分到同一个分组中。
结果2中,将key为20的数据,由于b的值各不相同,因此又将其分成了三个Group;同时,对于黄色高亮的数据,b的值同样是53,却没有分到同一个分组。原因见GroupingComparator的作用。 |
六、GroupingComparator的作用
Job的API解释:
翻译之后,Group的作用是:在一个reduce调用的时候,通过这个Comparator来控制Key的分组。
但是还是不太清晰,有待进一步了解。
相关推荐
当我们需要对数据进行复杂的排序需求时,例如“年份升序,按照年份聚合,气温降序”,MapReduce的二次排序(Secondary Sort)机制就显得尤为重要。这个概念主要解决的是在MapReduce默认排序规则基础上,进行更精细化...
MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序
MapReduce模型中的二次排序是大数据处理中一项重要的技术,它通过多层排序功能来优化数据处理性能。二次排序的核心思想在于对Key(键)进行分层排序,而不能对Value(值)进行排序。这主要是因为MapReduce框架设计时...
基于MapReduce实现决策树算法的知识点 基于MapReduce实现决策树算法是一种使用MapReduce框架来实现决策树算法的方法。在这个方法中,主要使用Mapper和Reducer来实现决策树算法的计算。下面是基于MapReduce实现决策...
利用采样器实现mapreduce任务输出全排序大数据-MapReduce
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,这是一种在信息检索和文本挖掘中用于评估一个词在文档中的重要性的统计方法。 首先,我们要理解TF-IDF...
Hadoop mapreduce 实现InvertedIndexer倒排索引,能用。
在IT领域,尤其是在大数据处理和社交网络分析中,"MapReduce实现二度好友推荐算法"是一种常见的技术应用。MapReduce是Google提出的一种分布式计算模型,主要用于处理和生成大规模数据集。在这个场景下,我们利用...
5. 实际案例展示如何在Hadoop中实现二次排序 6. 图形化解释帮助理解数据处理流程 通过对这些内容的学习,读者可以深入理解大数据排序的复杂性和Hadoop在其中的角色,进一步提升大数据处理的能力。
本次实验,在 Hadoop 平台上,使用 MapReduce 实现了数据的全局排序。将详细阐述了实现所需环境及过程。用阿里云服务器安装, OS: Ubuntu20.04 LTS . Hadoop 支持用三种模式启动:单机模式、伪分布式模式、分布式...
【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...
在Hadoop MapReduce的早期版本(0.20.0之前),二次排序通常通过设置`setPartitionerClass`、`setOutputKeyComparatorClass`和`setOutputValueGroupingComparator`来实现。而在0.20.0及之后的版本,这些设置被替换为...
本文将详细探讨在 MapReduce2 中如何实现自定义排序和分组,以满足特定的数据处理需求。 首先,了解 MapReduce 的工作流程是必要的。Map 阶段将输入数据分割成多个块,并在各个节点上并行处理。Reduce 阶段则负责...
《使用MapReduce实现KMeans算法详解》 KMeans算法是一种广泛应用的无监督学习方法,用于数据聚类。在大数据处理的背景下,传统的单机实现往往无法应对海量数据,因此,结合分布式计算框架MapReduce实现KMeans算法就...
本话题将深入探讨如何使用Hadoop MapReduce实现两个矩阵相乘的算法,这在数据分析、机器学习以及高性能计算中有着重要应用。 首先,理解矩阵相乘的基本原理至关重要。矩阵相乘不是简单的元素对元素相乘,而是对应...
在这个“hadoop分区二次排序示例.zip”压缩包中,我们重点探讨的是如何在Hadoop MapReduce中实现特定的排序逻辑,即二次排序和分区策略。 首先,我们需要理解什么是二次排序。在标准的MapReduce流程中,数据经过map...
在这个特定的案例中,我们不仅实现了基本的WordCount功能,还扩展了MapReduce的能力,通过自定义分区和自定义排序来优化数据处理流程。 首先,基础的`WordCount`实现,通常包含以下四个步骤: 1. **Map阶段**:...