`
zljpp
  • 浏览: 260207 次
社区版块
存档分类
最新评论

Hadoop实现Secondary Sort

阅读更多

在hadoop中每个reduce的输入的key都是有序的,而value则是无序的。而且同一个job运行多次,由于map完成顺序不同,reduce收到的value顺序是不固定的。那如何才能实现reduce收到有序的value呢?这就需要Secondary Sort。

Secondary Sort要解决的问题:reduce收到的value有序。

这里举一个场景,来说明Secondary Sort是如何实现的。假设我们有若干公司若干部门的人数,数据样例如下:

 

 

公司名   部门的人数

Taobao 52
Taobao 31
Taobao 67
Alipay 10
Alipay 36
Alipay 29
B2B 120
B2B 72
Aliyun 13
Aliyun 32
Aliyun 3

我们想知道每个公司的最大部门(人数最多)的人数。即希望先按公司名group,然后对group内的人数降序排列,最后取每个group的第一个即可。

由于reduce收到的value是无序的,所以要对value进行排序,首先需要将value封装到key里面。即需要自定义key的类型,代码如下:

 

[java] view plaincopy
  1. <span style="font-family:Microsoft YaHei;font-size:18px;">import java.io.DataInput;  
  2. import java.io.DataOutput;  
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.IntWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.io.WritableComparable;  
  8.   
  9. public class MyKey implements WritableComparable<MyKey> {  
  10.   public final Text first;  
  11.   public final IntWritable second;  
  12.   
  13.   public MyKey() {  
  14.     first = new Text();  
  15.     second = new IntWritable();  
  16.   }  
  17.   
  18.   public MyKey(Text first, IntWritable second) {  
  19.     this.first = first;  
  20.     this.second = second;  
  21.   }  
  22.   
  23.   @Override  
  24.   public void write(DataOutput out) throws IOException {  
  25.     first.write(out);  
  26.     second.write(out);  
  27.   }  
  28.   
  29.   @Override  
  30.   public void readFields(DataInput in) throws IOException {  
  31.     first.readFields(in);  
  32.     second.readFields(in);  
  33.   }  
  34.   
  35.   @Override  
  36.   public String toString() {  
  37.     return first + "\t" + second;  
  38.   }  
  39.   
  40.   @Override  
  41.   public int compareTo(MyKey tp) {  
  42.     int cmp = first.compareTo(tp.first);  
  43.     if (cmp != 0) {  
  44.       return cmp;  
  45.     }  
  46.     return -second.compareTo(tp.second);  
  47.   }  
  48. }</span>  

这里新定义的类型为MyKey封装了一个Text和一个IntWritable,依次存放公司名和部门人数。Hadoop要求key的类型必须实现Writable和Comparable,前者为了支持序列化和反序列化,后者为了实现基于比较的排序。需要注意的是compareTo()方法中先按first即公司名升序排列,后按second即部门人数降序排列。另外toString()方法的实现是为了定义输出格式,即公司名+tab+最大部门人数。

 

定义key后还不能满足需求。因为默认的HashPartitioner会将相同的key分配给同一个reduce,而我们希望的是first相同的key分给同一个reduce处理,默认的Partitioner显然保证不了这一点。这就需要我们自定义Partitioner,实现first相同的key分配给同一个reduce。实现代码如下:

 

[java] view plaincopy
  1. <span style="font-family:Microsoft YaHei;font-size:18px;">import org.apache.hadoop.io.NullWritable;  
  2. import org.apache.hadoop.mapred.JobConf;  
  3. import org.apache.hadoop.mapred.Partitioner;  
  4.   
  5.   
  6. public class MyPartitioner   
  7.   implements Partitioner<MyKey, NullWritable> {  
  8.   
  9.   @Override  
  10.   public void configure(JobConf job) {}  
  11.   
  12.   @Override  
  13.   public int getPartition(MyKey key, NullWritable value, int numPartitions) {  
  14.     return (key.first.hashCode() & Integer.MAX_VALUE) % numPartitions;  
  15.   }  
  16. }  
  17. </span>  

MyPartitioner的getPartition()方法中,只考虑first,不考虑second,这样就满足了我们的需求。

 

实现到这一步后,reduce会获取到按first升序且按second降序排列的key序列。而我们希望first相同的key中,只获取第一个的second即可,其他数据可以忽略。这就需要数据执行reduce前按照key的first字段进行归并,即grouping。first相同的key归为一个group,将第一个key和所有的value(value为NullWritable类型,无需处理)传给reduce()方法。然后reduce将key输出即可实现目的。为了实现这样的grouping操纵,需要自定义归并比较器(ValueGroupingComparator),代码如下:

 

[java] view plaincopy
  1. <span style="font-family:Microsoft YaHei;font-size:18px;">import org.apache.hadoop.io.WritableComparable;  
  2. import org.apache.hadoop.io.WritableComparator;  
  3.   
  4. public class MyGroupComparator extends WritableComparator {  
  5.   protected MyGroupComparator() {  
  6.     super(MyKey.classtrue);  
  7.   }  
  8.     
  9.   @Override  
  10.   public int compare(WritableComparable w1, WritableComparable w2) {  
  11.     MyKey m1 = (MyKey) w1;  
  12.     MyKey m2 = (MyKey) w2;  
  13.     return m1.first.compareTo(m2.first);  
  14.   }  
  15.   
  16. }  
  17. </span>  

从MyGroupComparator代码中可以看出,compare中只比较firest而忽略second。

 

以上模块自定义好后,map和reduce实现会相当容易。map只需要将公司名和部门人数构造成一个MyKey对象即可。而reduce中将收到的key输出就ok了。实现SecondarySort的作业代码如下:

 

[java] view plaincopy
  1. <span style="font-family:Microsoft YaHei;font-size:18px;">import java.io.IOException;  
  2. import java.util.Iterator;  
  3.   
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.conf.Configured;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.*;  
  8. import org.apache.hadoop.mapred.*;  
  9. import org.apache.hadoop.util.Tool;  
  10. import org.apache.hadoop.util.ToolRunner;  
  11.   
  12. public class MySecondarySort extends Configured implements Tool{  
  13.     
  14.   public static class MyMap extends MapReduceBase  
  15.     implements Mapper<Text, Text, MyKey, NullWritable> {  
  16.       
  17.     private IntWritable num = new IntWritable();  
  18.       
  19.     @Override  
  20.     public void map(Text key, Text value,   
  21.                     OutputCollector<MyKey, NullWritable> output,   
  22.                     Reporter reporter) throws IOException {  
  23.   
  24.       num.set(Integer.parseInt(value.toString()));  
  25.       MyKey myKey = new MyKey(key, num);  
  26.       output.collect(myKey, NullWritable.get());  
  27.     }  
  28.   }  
  29.     
  30.   public static class MyReduce extends MapReduceBase  
  31.     implements Reducer<MyKey, NullWritable, MyKey, NullWritable> {  
  32.   
  33.     @Override    
  34.     public void reduce(MyKey key, Iterator<NullWritable> values,  
  35.                        OutputCollector<MyKey, NullWritable> output,   
  36.                        Reporter reporter) throws IOException {  
  37.       output.collect(key, NullWritable.get());  
  38.     }  
  39.   }  
  40.     
  41.     
  42.   @Override  
  43.   public int run(String[] args) throws Exception {  
  44.     JobConf conf = new JobConf(getConf(), MySecondarySort.class);   
  45.     conf.setJobName("wordcount");  
  46.    
  47.     conf.setOutputKeyClass(MyKey.class);  
  48.     conf.setOutputValueClass(NullWritable.class);  
  49.       
  50.     conf.setMapperClass(MyMap.class);          
  51.     conf.setReducerClass(MyReduce.class);  
  52.      
  53.     conf.setPartitionerClass(MyPartitioner.class);  
  54.     conf.setOutputValueGroupingComparator(MyGroupComparator.class);  
  55.       
  56.     conf.setInputFormat(KeyValueTextInputFormat.class);  
  57.     conf.set("key.value.separator.in.input.line"" ");  
  58.   
  59.     FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  60.     FileOutputFormat.setOutputPath(conf, new Path(args[1]));  
  61.     JobClient.runJob(conf);  
  62.     return 0;  
  63.   }  
  64.     
  65.   public static void main(String[] args) throws Exception {   
  66.     int res = ToolRunner.run(new Configuration(), new MySecondarySort(), args);  
  67.     System.exit(res);  
  68.   }  
  69. }  
  70. </span>  

注意由于输入格式是key+空格+value,这里采用KeyValueTextInputFormat,避免了map中做分割字符串操作。

 

对于输入如下内容的文件:

 

$ bin/hadoopfs -cat /liangly/list

Taobao 52

Taobao 31

Taobao 67

Alipay 10

Alipay 36

Alipay 29

B2B 120

B2B 72

Aliyun 13

Aliyun 32

Aliyun 3

执行上面实现的Job:

 

$ bin/hadoopjar job.jar MySecondarySort \

> -Dmapred.map.tasks=3 \

> -Dmapred.reduce.tasks=2 \

> /liangly/list \

/liangly/out

作业结束后输出如下:

 

$ bin/hadoopfs -cat /liangly/out/*

Alipay  36

Aliyun 32

B2B    120

Taobao  67

由于数据量很小,很容易确定已经达到了预期目的。

分享到:
评论

相关推荐

    Hadoop大数据期末考试重点

    21. **Mapper类**:Hadoop提供的Mapper类是实现Map阶段逻辑的基础类。 以上是对Hadoop大数据期末考试重点内容的详细解读,涵盖了Hadoop的分布式文件系统HDFS、MapReduce计算模型以及相关配置和操作细节,考生需要对...

    hadoop源代码存档

    2. hadoop-hdfs:实现了HDFS,包括NameNode(元数据管理)、DataNode(数据存储)和Secondary NameNode(元数据备份)的源码。 3. hadoop-mapreduce:实现了MapReduce计算框架,包括JobTracker(任务调度)、...

    数据算法--HadoopSpark大数据处理技巧.pdf

    这份“数据算法--HadoopSpark大数据处理技巧”文档显然探讨了如何利用这两个工具进行复杂的数据操作,具体涉及到Scala编程实现的两个重要算法:Secondary Sort(二级排序)和Common Friends(共同朋友计算)。...

    hadoop 1.2.1核心源码

    9. **MapReduce工作流程**:在Hadoop 1.2.1中,MapReduce包括Map阶段和Reduce阶段,中间数据通过Shuffle和Sort进行处理。通过源码,我们可以看到这些阶段如何协调工作,以及数据如何在节点之间流动。 10. **HDFS...

    Hadoop 官方文档(中文版)

    - MapReduce 模型:理解Map和Reduce两个主要阶段,以及Shuffle和Sort的过程。 - JobTracker与TaskTracker(旧版)/ResourceManager与NodeManager(YARN):了解任务调度和执行的逻辑。 - 自定义Mapper和Reducer:...

    Hadoop Real-World Solutions Cookbook 源代码

    6. **Chap 7 - 高级MapReduce技术**:涵盖如使用Secondary Sort进行复杂排序,或者使用New API(MapReduce v2, YARN)提升性能和资源管理。 7. **Chap 8 - Hadoop优化与故障排除**:这部分内容可能涉及Hadoop集群的...

    hadoop高级应用

    5. **数据处理优化**:在Hadoop中,可以通过优化MapReduce的Job配置,如调整Split大小,优化Reducer数量,使用Combiner减少网络传输,以及利用Secondary Sort等技术来提升处理效率。 6. **容错与稳定性**:Hadoop...

    Data Algorithms: Recipes for Scaling Up with Hadoop and Spark pdf

    5. Secondary Sort问题的定义、成因、解决方案以及Hadoop和Spark框架下的实现方法。 6. 如何编写MapReduce程序中的核心函数:map()和reduce(),及其在具体算法中的应用。 7. Hadoop新API在数据处理中的使用。 8. ...

    基于Hadoop平台的通信数据分布式查询算法的设计与实现

    这可能包括优化Mapper和Reducer的逻辑,以及使用Hadoop的Secondary Sort等技术来改善数据局部性。 4. **索引构建**:为了加速查询,论文可能会讨论如何在分布式环境中构建和维护索引,如Bloom Filter或Bitmap索引,...

    hadoop-2.7.1.tar.gz

    - HA(High Availability):在2.7.1版本中,HDFS支持NameNode的高可用性,通过使用Secondary NameNode和Quorum Journal Manager实现,确保了服务的连续性。 - HDFS Federation:允许多个独立的命名空间并行运行在...

    实战hadoop中的源码

    源码中包含了如检查点、备份NameNode和Secondary NameNode的功能实现,这些对于保证集群稳定性至关重要。 7. **Hadoop扩展性**:Hadoop支持YARN(Yet Another Resource Negotiator),提供更灵活的资源管理和调度。...

    hadoop 二次排序 原理

    本文将深入解析Hadoop的二次排序(Secondary Sort)原理,这是一个允许用户自定义排序规则以满足特定需求的功能。 首先,二次排序是在MapReduce框架内进行的一种特殊排序方式,它遵循两个主要步骤:第一字段排序和...

    Hadoop权威指南(中文版)(带书签)

    在《Hadoop权威指南》中,读者可以深入了解到Hadoop的架构原理,包括NameNode和DataNode的角色,以及Secondary NameNode的功能。书中详细解释了HDFS的写入、读取过程,如何处理数据块的复制和故障恢复,以及如何扩展...

    hadoop in action中文电子版

    本书首先会介绍Hadoop的基本架构,包括NameNode、DataNode、Secondary NameNode等组件的工作方式,以及HDFS的文件读写流程。接着,它会详细讲解MapReduce的工作原理,包括Mapper和Reducer阶段,以及中间结果的 ...

    hadoop-3.1.1-src.tar.gz

    Hadoop是Apache软件基金会开发...总之,“hadoop-3.1.1-src.tar.gz”是一个宝贵的资源,涵盖了Hadoop的核心技术、设计理念以及实现细节,对于开发者、研究者和数据工程师来说,是深入了解和掌握大数据处理的重要入口。

    hadoop学习总结(面试必备)

    2. Shuffle与Sort:Map阶段产生的中间结果依据键进行排序和分区,为Reduce阶段做准备。 3. Reduce阶段:Reduce函数将相同键的值组合,进行聚合计算,生成最终结果。 4. 并行处理:多个Map任务和Reduce任务可以并行...

    hadoop 2.7.2 源码

    在这个模块中,你可以看到Hadoop如何处理配置文件、实现高效的网络通信以及文件操作。 5. **MapReduce的Shuffle和Sort过程**:这是MapReduce中非常关键的一步,它对Map阶段的输出进行排序和分区,然后将数据传递给...

    Hadoop和hive大数据面试题

    1. Hadoop的架构:理解Hadoop的主节点(NameNode)和从节点(DataNode)的角色,以及Secondary NameNode的作用。 2. HDFS的工作原理:深入理解数据块的概念,以及副本策略如何确保数据冗余和容错性。 3. MapReduce的...

    Hadoop面试100题

    - **Shuffle & Sort**:中间结果的排序和分区,为Reducer输入做准备。 4. **Hadoop与Java的关系** - **编程接口**:Hadoop主要使用Java API开发,如`org.apache.hadoop.mapreduce`包下的类。 - **序列化**:...

    hadoop权威指南4书上项目源码等

    理解Hadoop的分布式环境,包括NameNode、DataNode、Secondary NameNode等节点的角色,以及它们如何协同工作以确保数据的可靠存储和访问。 2. **HDFS详解**:HDFS的文件块、副本策略、故障恢复机制是学习的重点。书...

Global site tag (gtag.js) - Google Analytics