`
m635674608
  • 浏览: 5027253 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

利用hadoop mapreduce 做数据排序

 
阅读更多
  我们的需求是想统计一个文件中用IK分词后每个词出现的次数,然后按照出现的次数降序排列。也就是高频词统计。

由于hadoop在reduce之后就不能对结果做什么了,所以只能分为两个job完成,第一个job统计次数,第二个job对第一个job的结果排序。 第一个job的就是hadoop最简单的例子countwords,我要说的是用hadoop对结果排序。 假设第一个job的结果输出如下:

part-r-0000文件内容:

a    5
b    4
c    74
d    78
e    1
r    64
f    4

要做的就是按照每个词出现的次数降序排列。

<!-- lang: lua -->
**********************************分割线*****************************************

首先可能会出现这样的问题:

1.可能上一个job为多个reduce,也就是会产生多个结果文件,因为一个reduce就会生成一个结果文件,结果存放在上一个job输出目录下类似part-r-00的文件里。

2.需要排序的文件内容很大,所以需要考虑多个reduce的情况。

<!-- lang: lua -->
*********************************分割线*******************************

怎么去设计mapreduce

1.在map阶段按照行读取文本,然后调用map方法时把上一个job的结果颠倒,也就是map后结果应该是这样的

<!-- lang: java -->
5    a
4    b
74    c
................
.........................
4    f

2.然后map后,hadoop会对结果进行分组,这时结果就会变成 <!-- lang: lua -->

(5:a)
(4:b,f)
(74:c)

3.因为hadoop对数据分组后默认是按照key升序排序的,所以需要自定义排序函数将分组数据降序排序。

4.然后按照reduce数目的大小自定义分区函数,让结果形成多个区间,比如我认为大于50的应该在一个区间,一共3个reduce,那么最后的 数据应该是三个区间,大于50的直接分到第一个分区0,25到50之间的分到第二个分区1,小于25的分到第三个分区2.因为分区数和reduce数是相 同的,所以不同的分区对应不同的reduce,因为分区是从0开始的,数据分区到分区0的会被分到第一个reduce处理,分区是1的会分到第2个 reduce处理,依次类推。并且reduce对应着输出文件,所以,第一个reduce生成的文件就会是part-r-0000,第二个reduce对 应的生成文件就会是part-r-0001,依次类推,所以reduce处理时只需要把key和value再倒过来直接输出。这样最后就会形成数目最大的 字符串就会在第一个生成文件里,排好序的数据就是按照文件命名的顺序存放的。

**其实就是利用了hadoop分组的特点,会把key相同的字符串放到一个组里,然后我们把分组的数据用自己定义的排序函数按照key排序后,再按照分区函数分到不同的reduce,固然会是第一个reduce结果文件里面是最大数字的已排序集合,也就是说需要排好序的数据时只需要依次遍历reduce的结果文件part-r-0000,part-r0001,part-r-0002...。当然,如果只有一个reduce,那就正好是一个排好序的结果文件。**

代码如下:

<!-- lang: lua -->
*******************************分割线*****************************************

map:

<!-- lang: java -->
/**
 * 把上一个mapreduce的结果的key和value颠倒,调到后就可以按照key排序了。
 * 
 * @author zhangdonghao
 * 
 */
    public class SortIntValueMapper extends
    Mapper<LongWritable, Text, IntWritable, Text> {
private final static IntWritable wordCount = new IntWritable(1);

private Text word = new Text();

public SortIntValueMapper() {
    super();
}

@Override
public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

    StringTokenizer tokenizer = new StringTokenizer(value.toString());
    while (tokenizer.hasMoreTokens()) {
        word.set(tokenizer.nextToken().trim());
        wordCount.set(Integer.valueOf(tokenizer.nextToken().trim()));
        context.write(wordCount, word);
    }
}
}

reudce:

<!-- lang: java -->
/**
 * 把key和value颠倒过来输出
 * @author zhangdonghao
 * 
 */
    public class SortIntValueReduce extends
    Reducer<IntWritable, Text, Text, IntWritable> {

private Text result = new Text();

@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
    for (Text val : values) {
        result.set(val.toString());
        context.write(result, key);
    }

}
}

Partitioner:

<!-- lang: java -->
/**
 * 按照key的大小来划分区间,当然,key 是 int值
 * 
 * @author zhangdonghao
 * 
 */
public class KeySectionPartitioner<K, V> extends Partitioner<K, V> {

public KeySectionPartitioner() {
}

@Override
public int getPartition(K key, V value, int numReduceTasks) {
    /**
     * int值的hashcode还是自己本身的数值
     */
    //这里我认为大于maxValue的就应该在第一个分区
    int maxValue = 50;
    int keySection = 0;
    // 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0
    if (numReduceTasks > 1 && key.hashCode() < maxValue) {
        int sectionValue = maxValue / (numReduceTasks - 1);
        int count = 0;
        while ((key.hashCode() - sectionValue * count) > sectionValue) {
            count++;
        }
        keySection = numReduceTasks - 1 - count;
    }
    return keySection;
}

}

Comparator:

<!-- lang: java -->
/**
 * int的key按照降序排列
 * 
 * @author zhangdonghao
 * 
 */
public class IntKeyDescComparator extends WritableComparator {

protected IntKeyDescComparator() {
    super(IntWritable.class, true);

}

@Override
public int compare(WritableComparable a, WritableComparable b) {
    return -super.compare(a, b);
}

}

job的关键设置:

<!-- lang: java -->
            /**
     * 这里是map输出的key和value类型
     */
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(SortIntValueMapper.class);
    // job.setCombinerClass(WordCountReduce.class);
    job.setReducerClass(SortIntValueReduce.class);
    // key按照降序排列
    job.setSortComparatorClass(IntKeyAscComparator.class);

    job.setPartitionerClass(KeySectionPartitioner.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
            /**
              *这里可以放输入目录数组,也就是可以把上一个job所有的结果都放进去
             **/
            FileInputFormat.setInputPaths(job, inputPath);

    FileOutputFormat.setOutputPath(job,outputPath);



http://my.oschina.net/132722/blog/168022
分享到:
评论

相关推荐

    Hadoop MapReduce实现tfidf源码

    在大数据处理领域,Hadoop MapReduce是一种广泛应用的分布式计算框架,它使得在大规模数据集上进行并行计算成为可能。本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document ...

    大数据 hadoop mapreduce 词频统计

    Hadoop的核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,这两个组件共同为大数据处理提供了强大的支持。 MapReduce是一种分布式计算模型,由Google提出,Hadoop对其进行了实现。在MapReduce中,...

    Hadoop MapReduce v2 Cookbook, 2nd Edition-Packt Publishing(2015) 高清完整版PDF下载

    本书通过一系列实用案例展示了如何利用Hadoop MapReduce V2解决实际问题。例如: - **日志分析**:通过MapReduce对大量日志数据进行分析,提取有价值的信息。 - **推荐系统构建**:使用MapReduce进行数据预处理和...

    Java操作Hadoop Mapreduce基本实践源码

    在大数据处理领域,Hadoop MapReduce是一个至关重要的组件,它为海量数据的并行处理提供了分布式计算框架。本文将深入探讨如何使用...通过深入学习和实践,开发者可以利用Hadoop MapReduce解决大数据处理中的各种问题。

    基于Apriori算法的频繁项集Hadoop mapreduce

    总结起来,"基于Apriori算法的频繁项集Hadoop mapreduce"是一个利用大数据处理框架Hadoop的MapReduce模型,高效地执行经典数据挖掘算法Apriori的过程,旨在发现大规模数据集中的频繁项集和关联规则,为商业智能、...

    基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据.zip

    总结起来,这个项目涵盖了 Hadoop 分布式系统的基础知识,包括 MapReduce 模型、数据存储(HDFS)、资源管理(YARN)以及数据处理和分析。通过实际操作,我们可以深入了解大数据处理的流程,并学习如何利用 Hadoop ...

    Hadoop MapReduce v2 Cookbook.pdf

    《Hadoop MapReduce v2 Cookbook》是一本针对大数据处理领域的重要参考书籍,专注于介绍Hadoop MapReduce的最新版本——v2(也称为YARN,Yet Another Resource Negotiator)。Hadoop MapReduce是Apache Hadoop框架的...

    Hadoop MapReduce教程.pdf

    社交媒体平台利用Hadoop处理用户动态,优化推荐算法;科研机构使用Hadoop进行基因组数据分析,加速科学研究进程。 #### 总结 Hadoop MapReduce作为大数据处理领域的重要工具,其强大之处在于能够高效处理大规模...

    007_hadoop中MapReduce应用案例_1_数据去重

    在IT行业中,Hadoop MapReduce是一种...这个案例涵盖了数据处理的关键步骤,包括数据分片、并行处理、中间结果的聚合以及最终的数据去重,对于理解Hadoop MapReduce的工作原理及其在大数据处理中的应用具有很高的价值。

    Hadoop mapreduce 实现MR_DesicionTreeBuilder 决策树

    在大数据处理领域,Hadoop MapReduce 是一种广泛使用的计算框架,尤其在处理大规模数据集时。决策树(Decision Tree)是一种流行的机器学习算法,常用于分类和回归问题。本项目结合了两者,实现了一个名为 MR_...

    mapred.zip_hadoop_hadoop mapreduce_mapReduce

    在Hadoop生态系统中,MapReduce是一种分布式计算框架,它允许用户编写并运行处理大量数据的程序。...通过深入研究文档、源码和测试样例,我们可以更好地利用MapReduce解决实际的大规模数据处理问题。

    Hadoop MapReduce.pdf

    ### Hadoop MapReduce知识点概述 #### 一、Hadoop MapReduce简介 ...对于初学者来说,理解MapReduce的工作流程以及其实现方式是非常重要的,这有助于更好地利用Hadoop平台进行复杂的数据分析任务。

    Hadoop_MapReduce教程

    尽管 Hadoop MapReduce 在大规模数据处理方面表现出色,但它也有一些局限性: - **不适合低延迟交互式查询**:由于 MapReduce 的工作方式,不适合需要低延迟响应的应用场景。 - **不适合流式数据处理**:虽然可以...

    基于Hadoop MapReduce的电商网站商品数据分析.rar

    本项目将深入探讨如何利用Hadoop MapReduce对电商网站的商品数据进行有效的分析。 首先,我们来理解Map阶段。在这个阶段,原始数据被分割成多个小块(通常由HDFS完成),然后这些数据块被分发到集群的不同节点上。...

    基于Hadoop MapReduce的高校考研分数线统计分析项目代码+数据集.rar

    该项目是关于利用Hadoop MapReduce框架对高校考研分数线进行统计分析的一个实例。MapReduce是一种分布式计算模型,由Google提出,广泛应用于大规模数据处理。在这个项目中,我们将深入探讨如何运用MapReduce来处理...

    Hadoop mapreduce 实现MatrixMultiply矩阵相乘

    在大数据处理领域,Hadoop MapReduce 是一种广泛使用的并行计算框架,用于处理和存储海量数据。本主题将深入探讨如何使用Hadoop MapReduce来实现MatrixMultiply,即矩阵相乘,这是一个基础且重要的数学运算,尤其在...

    Python中Hadoop MapReduce的一个简单示例.zip

    这个名为"Python中Hadoop MapReduce的一个简单示例.zip"的压缩包,显然是为了帮助用户理解如何在Python环境下利用Hadoop MapReduce框架进行数据处理。我们将详细探讨MapReduce的基本概念、工作原理以及Python在...

    基于Hadoop MapReduce的短视频主播数据分析项目代码+数据集.rar

    该项目是关于利用Hadoop MapReduce对短视频主播的数据进行分析,以获取有价值的信息并进行业务洞察。MapReduce是一种分布式计算框架,常用于处理海量数据,而Hadoop是它的一个开源实现。在这个项目中,我们将深入...

    Hadoop Mapreduce Cookbook(英文版)

    《Hadoop MapReduce Cookbook》是一本专为大数据处理和分析领域的专业人士编写的指南,它深入浅出地介绍了如何使用Hadoop MapReduce框架解决实际问题。MapReduce是Hadoop生态系统中的核心组件,它允许用户在分布式...

Global site tag (gtag.js) - Google Analytics