`
qindongliang1922
  • 浏览: 2182250 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117452
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125854
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59864
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71251
社区版块
存档分类
最新评论

如何使用hadoop对海量数据进行统计并排序

阅读更多
不得不说,Hadoop确实是处理海量离线数据的利器,当然,凡是一个东西有优点必定也有缺点,hadoop的缺点也很多,比如对流式计算,实时计算,DAG具有依赖关系的计算,支持都不友好,所以,由此诞生了很多新的分布式计算框架,Storm,Spark,Tez,impala,drill,等等,他们都是针对特定问题提出一种解决方案,新框架的的兴起,并不意味者他们就可以替代hadoop,一手独大,HDFS和MapReduce依旧是很优秀的,特别是对离线海量数据的处理。

hadoop在如下的几种应用场景里,用的还是非常广泛的,1,搜索引擎建索引,2,topK热关键词统计,3,海量日志的数据分析等等。
散仙,今天的这个例子的场景要对几亿的单词或短语做统计和并按词频排序,当然这些需求都是类似WordCount问题,如果你把Hadoop自带的WordCount的例子,给搞懂了,基本上做一些IP,热词的统计与分析就很很容易了,WordCount问题,确实是一个非常具有代表性的例子。


下面进入正题,先来分析下散仙这个例子的需求,总共需要二步来完成,第一步就是对短语的统计,第二步就是对结果集的排序。所以如果使用MapReduce来完成的话,就得需要2个作业来完成这件事情,第一个作业来统计词频,第二个来负责进行排序,当然这两者之间是有依赖关系的,第二个作业的执行,需要依赖第一个作业的结果,这就是典型的M,R,R的问题并且作业之间具有依赖关系,这种问题使用MapReduce来完成,效率可能有点低,如果使用支持DAG作业的Tez来做这件事情,那么就很简单了。不过本篇散仙,要演示的例子还是基于MapReduce来完成的,有兴趣的朋友,可以研究一下使用Tez。


对于第一个作业,我们只需要改写wordcount的例子,即可,因为散仙的需求里面涉及短语的统计,所以定义的格式为,短语和短语之间使用分号隔开,(默认的格式是按单词统计的,以空格为分割符)在map时只需要,按分号打散成数组,进行处理即可,测试的数据内容如下:



map里面的核心代码如下:
  /**
     * 统计词频的map端
     * 代码
     * 
     * **/
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
   
    	String [] data=value.toString().split(";");//按每行的分号拆分短语
    	for(String s:data){
    		if(s.trim().length()>0){//忽略空字符
    		word.set(s);
    		context.write(word, one);
    		}
    	}
 
    }


reduce端的核心代码如下:

/**
     * reduce端的
     * 代码
     * **/
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();//累加词频
      }
      result.set(sum);
      context.write(new Text(key.toString()+"::"), result);//为方便短语排序,以双冒号分隔符间隔
    }
  }

main函数里面的代码如下:
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

运行结果,如下所示:
a good student::	1
good student::	3
patient::	2
patient a::	1


下面,散仙来分析下排序作业的代码,如上图所示hadoop默认的排序,是基于key排序的,如果是字符类型的则基于字典表排序,如果是数值类型的则基于数字大小排序,两种方式都是按默认的升序排列的,如果想要降序输出,就需要我们自己写个排序组件了,散仙会在下面的代码给出例子,因为我们是要基于词频排序的,所以需要反转K,V来实现对词频的排序,map端代码如下:
/**
		 * 排序作业
		 * map的实现
		 * 
		 * **/
		@Override
		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
			String s[]=value.toString().split("::");//按两个冒号拆分每行数据
			word.set(s[0]);//
			one.set(Integer.parseInt(s[1].trim()));//
			context.write(one, word);//注意,此部分,需要反转K,V顺序
		}

reduce端代码如下:
/***
  * 
  * 排序作业的
  * reduce代码
  * **/		
		@Override
		protected void reduce(IntWritable arg0, Iterable<Text> arg1, Context arg2)
				throws IOException, InterruptedException {
			for(Text t:arg1){
				result.set(t.toString());
				 arg2.write(result, arg0);
			}
		}



下面,我们再来看下排序组件的代码:

/***
 * 按词频降序排序
 * 的类
 * 
 * **/
	public static class DescSort extends  WritableComparator{

		 public DescSort() {
			 super(IntWritable.class,true);//注册排序组件
		}
		 @Override
		public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
				int arg4, int arg5) {
			return -super.compare(arg0, arg1, arg2, arg3, arg4, arg5);//注意使用负号来完成降序
		}
		 
		 @Override
		public int compare(Object a, Object b) {
	 
			return   -super.compare(a, b);//注意使用负号来完成降序
		}
		
	}


main方法里面的实现代码如下所示:

public static void main(String[] args) throws Exception{
		
		
		
		  Configuration conf = new Configuration();
		    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		    if (otherArgs.length != 2) {
		        System.err.println("Usage: wordcount <in> <out>");
		        System.exit(2);
		      }
		   Job job=new Job(conf, "sort");
		   job.setOutputKeyClass(IntWritable.class);
		   job.setOutputValueClass(Text.class);
		   job.setMapperClass(SortIntValueMapper.class);
		   job.setReducerClass(SortIntValueReducer.class)	;
		   job.setSortComparatorClass(DescSort.class);//加入排序组件
		   job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
		   job.setOutputFormatClass(TextOutputFormat.class);
		   FileInputFormat.setInputPaths(job, new Path(args[0]));
		   FileOutputFormat.setOutputPath(job, new Path(args[1]));
		 
		   System.exit(job.waitForCompletion(true) ? 0 : 1);
	}


输出结果,如下所示:
good student	3
patient	2
a good student	1
patient a	1


至此,我们可以成功实现,统计并排序的业务,当然这种类型的需求非常多而且常见,如对某个海量日志IP的分析,散仙上面的例子使用的只是测试的数据,而真实数据是对几亿或几十亿的短语构建语料库使用,配置集群方面,可以根据自己的需求,配置集群的节点个数以及map,reduce的个数,而代码,只需要我们写好,提交给hadoop集群执行即可。

最后在简单总结一下,数据处理过程中,格式是需要提前定制好的,也就是说你得很清楚的你的格式代表什么意思,另外一点,关于hadoop的中文编码问题,这个是内部固定的UTF-8格式,如果你是GBK的文件编码,则需要自己单独在map或reduce过程中处理一下,否则输出的结果可能是乱码,最好的方法就是统一成UTF-8格式,否则,很容易出现一些编码问题的。
  • 大小: 9.2 KB
  • 大小: 6.8 KB
1
1
分享到:
评论

相关推荐

    基于Hadoop的地震数据分析统计.rar

    在大数据处理领域,Hadoop是一个不可或缺的开源框架,它为海量数据的存储和处理提供了高效、可靠的解决方案。本文将深入探讨如何利用Hadoop进行地震数据分析统计,以揭示地壳活动的潜在规律,预防和减轻自然灾害的...

    基于Hadoop豆瓣电影数据分析实验报告

    在大数据时代,对海量信息进行高效处理和分析是企业决策的关键。Hadoop作为一款强大的分布式计算框架,自2006年诞生以来,已经在多个领域展现了其卓越的数据处理能力。本实验旨在利用Hadoop进行豆瓣电影数据的分析,...

    大数据 hadoop mapreduce 词频统计

    大数据处理是现代信息技术领域的一个重要概念,它涉及到海量数据的存储、管理和分析。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储大规模数据集。Hadoop的核心组件包括HDFS(Hadoop Distributed...

    基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据.zip

    在大数据处理领域,Hadoop 是一个至关重要的框架,它提供了分布式存储和计算的能力,使得海量数据的处理变得可能。在这个项目“基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据”中,我们将深入探讨...

    Hadoop之外卖订单数据分析系统

    在大数据处理领域,Hadoop是一个不可或缺的开源框架,它为海量数据的存储和处理提供了高效、可靠的解决方案。本文将深入探讨“Hadoop之外卖订单数据分析系统”,并介绍如何利用Hadoop进行大规模数据处理,以及如何将...

    Hadoop中单词统计案例运行的代码

    4. **分区与排序**:MapReduce框架会根据键进行分区(Partitioning)并排序,相同键的键值对会被送到同一个Reducer。 **Reduce阶段**: 1. **Shuffle**:在Reduce之前,框架会对相同键的键值对进行排序(Shuffle)...

    Hadoop应用案例分析:雅虎、eBay、百度、Facebook.pdf

    存储海量的数据并对其进行处理,而这正是Hadoop 的强项。如Facebook 使用Hadoop 存储 内部的日志拷贝,以及数据挖掘和日志统计;Yahoo !利用Hadoop 支持广告系统并处理网页 搜索;Twitter 则使用Hadoop 存储微博...

    词频统计,利用Hadoop中mappereduce进行单词的计数

    本主题聚焦于如何利用Hadoop的MapReduce模型进行词频统计,以《哈姆雷特》为例,展示如何在海量文本数据中高效地计算每个单词的出现次数。 【描述】:“对哈姆雷特进行词频统计,利用大数据的Hadoop框架进行计算,...

    基于Hadoop的高性能海量数据处理平台研究.pdf

    文章中提到的海量数据处理指的是对大规模数据集进行存储、管理和分析的过程。随着技术的发展,数据量成几何倍数增长,这导致传统数据处理方法无法满足需求。海量数据处理技术成为发掘数据潜在价值的关键。而高性能...

    词频统计(基于hadoop集群,python实现)

    在IT领域,大数据处理是一项关键任务,而Hadoop是一个广泛使用的开源框架,专门设计用于处理和存储海量数据。本教程将深入探讨如何利用Hadoop集群和Python实现词频统计,这是一个经典的WordCount示例,适合初学者...

    基于Hadoop 平台的数据分析方案的设计

    本文提出的基于Hadoop平台的数据分析方案,主要以国内某搜索引擎的上千万条用户搜索日志为数据源,通过分布式存储和计算框架进行数据统计分析。研究团队设计了相应的Map/Reduce程序,并给出设计思路与实例。研究中...

    海量数据处理方法

    海量数据处理方法 海量数据处理是指基于海量数据上的存储、处理、操作,解决方案...在回答这些问题时,需要对海量数据处理的定义、方法和策略有深入的理解,并且需要能够根据实际情况选择合适的数据结构和解决方法。

    陌陌聊天数据实现FineBI数据分析报表

    1. **Hadoop**:这是一个开源的分布式计算框架,它允许在大规模集群上存储和处理海量数据。Hadoop的核心组件包括HDFS(分布式文件系统)和MapReduce(分布式计算模型)。陌陌聊天数据可以被存储在HDFS上,便于分布式...

    Hadoop进行分布式计算的入门资料

    它通过分布式存储和并行处理,使得企业能够高效地管理和分析海量数据。这篇入门资料将引导我们了解如何利用Hadoop进行分布式计算。 一、Hadoop概述 Hadoop的核心组件包括HDFS(Hadoop Distributed File System)和...

    Hadoop学习总结

    Mapper接收输入记录,进行处理(如词频统计),并将结果输出为键值对(, value&gt;)。 2. **Reduce阶段**:Reducer从所有Mapper中收集相同键的值,将它们组合并处理,最终生成新的键值对作为输出。这一步骤通常用于...

    海量数据处理

    海量数据处理是指在合理的时间内,对大规模数据集进行高效存储、管理和分析的技术过程。这种处理方式不仅涉及到数据的收集、清洗和存储,更重要的是通过各种算法和技术来实现数据分析和挖掘。 #### 二、海量数据...

    hadoop实验+作业.zip

    Hadoop是由Apache软件基金会开发的一个开源框架,它允许在大规模集群上存储和处理海量数据。其核心由两个主要组件构成:HDFS(Hadoop Distributed File System)和MapReduce。 1. HDFS:HDFS是一种分布式文件系统,...

    hadoop的应用

    这些系统利用Hadoop进行大规模数据抓取、预处理和排序。 7. **基因组学研究** 生物科学领域,尤其是基因组学,也受益于Hadoop。研究人员可以使用Hadoop处理大规模的基因序列数据,加速疾病诊断和新药物的研发。 8...

Global site tag (gtag.js) - Google Analytics