WordCount是hadoop的一个入门程序,其地位相当与java中的HelloWorld,它使用map/reduce对文本中出现的单词进行计数。
程序分三个部分:
Map类 Reduce类 和主函数
Map类:Map类继承Mapreducebase类并实现了Mapper接口,其中 MapReduceBase类:实现了Mapper和Reducer接口的基类(其中的方法只是实现接口,而未作任何事情)。Map在实现Mapper接口之后重写其map方法,将输入的<k1/v1>进行处理生成<k2/v2>作为reduce输入。
Reduce类:同样继承MapReduceBase类。实现Reducer接口,重写reduce()方法,将输入的<k2/v2>进行处理生成<k3/v3>输出。
主函数 :主函数是程序的入口,生成JobConf对象向hadoop框架描述map/reduce执行的工作。
源码详细分析:一、Map类, Map类实现了Mapper接口并对map()方法进行了重写。
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> //Mapper<k1,v1,k2,v2> k v 表示输入和输出的数据类型,其中k1应为Object或者LongWritable ,因为map在读取文本时是一行一行读取的。 { private final static IntWritable one = new IntWritable(1);//将每个单词的数量都设成1. private Text word = new Text(); //Text一个变量相当于String,用来存储传来的文本的值 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException //具体的map函数在每个节点上执行,将<k1/v1>处理成<k2/v2>用output.collect()输出给 Reduce。 { String line = value.toString(); //将Text类型转换成String StringTokenizer tokenizer = new StringTokenizer(line); //再转换成StringTokenizer while (tokenizer.hasMoreTokens()) //当文本有分割符时 { word.set(tokenizer.nextToken()); //返回从当前位置到下一个分割符的字符串 output.collect(word, one); } } } 比如一行文本: welcome to lanjie laijie lead to success Map进行处理之后的结果是: welcome 1 to 1 lanjie 1 lanjie 1 lead 1 to 1 success 1
二、Reduce类
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> //<Text, IntWritable, Text, IntWritable>字段对应 着<k2,v2,k3,v3> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException //Text key, Iterator<IntWritable> values 对应从map传来的<k2/v2>, OutputCollector<Text, IntWritable> output用来保存结果 { int sum = 0; while (values.hasNext()) { sum += values.next().get(); //计算每个key的数量,得到总数保存在sum中 } output.collect(key, new IntWritable(sum)); //将数据写到文件中 } } 比如:welcome to lanjie laijie lead to success 输出 lanjie 2 lead 1 success 1 to 2 welcome 1 系统默认排序为升序。
三、主函数
//程序的入口,即住函数 public static void main(String[] args) throws Exception { String path1="hdfs://master:9000/xiong/test";//要统计的文件 String path2="hdfs://master:9000/xiong/result";//保存结果的文件 /** * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作 * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等 */ JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); //设置一个用户定义的job名称 conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类 conf.setOutputValueClass(IntWritable.class); //为job输出设置value类 conf.setMapperClass(Map.class); //为job设置Mapper类 conf.setCombinerClass(Reduce.class); //为job设置Combiner类 conf.setReducerClass(Reduce.class); //为job设置Reduce类 conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类 conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类 /** * InputFormat描述map-reduce中对job的输入定义 * setInputPaths():为map-reduce job设置路径数组作为输入列表 * setInputPath():为map-reduce job设置路径数组作为输出列表 */ FileInputFormat.setInputPaths(conf, new Path(path1)); FileOutputFormat.setOutputPath(conf, new Path(path2)); JobClient.runJob(conf); //运行一个job } }
具体完整代码如下:
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount{ public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } /** * 这是一个实现reduce的类 * @author hadoop * */ public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); //将数据写到文件中 } } //程序的入口,即住函数 public static void main(String[] args) throws Exception { String path1="hdfs://master:9000/xiong/test";//要统计的文件 String path2="hdfs://master:9000/xiong/result";//保存结果的文件 /** * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作 * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等 */ JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); //设置一个用户定义的job名称 conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类 conf.setOutputValueClass(IntWritable.class); //为job输出设置value类 conf.setMapperClass(Map.class); //为job设置Mapper类 conf.setCombinerClass(Reduce.class); //为job设置Combiner类 conf.setReducerClass(Reduce.class); //为job设置Reduce类 conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类 conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类 /** * InputFormat描述map-reduce中对job的输入定义 * setInputPaths():为map-reduce job设置路径数组作为输入列表 * setInputPath():为map-reduce job设置路径数组作为输出列表 */ FileInputFormat.setInputPaths(conf, new Path(path1)); FileOutputFormat.setOutputPath(conf, new Path(path2)); JobClient.runJob(conf); //运行一个job } }
相关推荐
Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算...1、Hadoop示例程序WordCount详解及实例2、hadoop学习笔记:mapreduce框架详解3、hadoop示例程序wo
### Hadoop 安装及详细学习笔记 #### Hadoop 概述 Hadoop 是一个能够对大量数据进行分布式处理的软件框架,它旨在提供高扩展性、可靠性和高效性,适用于处理PB级别的数据集。Hadoop 的核心组件包括 HDFS(Hadoop ...
Wordcount是Hadoop中的一个经典示例程序,用于统计文本文件中单词出现的次数。这个例子展示了如何使用MapReduce处理大规模数据。在HDFS中,我们可以通过编程接口进行文件操作,如上传、下载、删除等。 以下是一些...
- **WordCount 示例**: WordCount 是Hadoop中最经典的示例程序之一,用于演示如何通过MapReduce框架处理大量文本数据,实现单词计数的功能。 - **Map阶段**: 将输入的文本行进行分割,并为每个单词创建键值对(...
【尚硅谷大数据技术之Hadoop】是一门深入探讨大数据处理技术的课程,主要聚焦于开源框架Hadoop。Hadoop是Apache软件基金会开发的一个分布式计算项目,它为大规模数据集(大于1TB)提供了高容错性的分布式存储和计算...
内容概要: mr执行笔记; mapreduce框架的规范; wc流程.xls; wordcount的伪代码; yarn提交job的源码流程; YARN中提交job的详细流程; 打开流的关键代码; 打开流的调用流程; 日志格式;
Spark笔记1.docx Spark 是什么? Spark 是一个基于内存的统一分析引擎,用于大规模数据处理,包括离线计算、实时计算和快速查询(交互式查询)。它具有快、易用和通用等特点,可以进行离线计算、交互式查询、实时...
在标签中提到“笔记”,通常代表着课程资料或者学习过程中记录的关键点。这里的笔记可能包括了分布式计算基础、使用Python进行编程时的重要函数、类库、设计模式和实际案例的代码示例。学生可以通过阅读这些笔记,...
5. MapReduce编程:学习MapReduce编程模型,编写并运行一个简单的WordCount程序,统计文本中的单词频数。 四、实验设备与环境 实验所需硬件主要包括一台笔记本电脑,软件环境包括VMware Workstation 16 Pro、...
找工作流程是大数据面试宝典的开篇,包括学习技能、编写简历、投递简历、预约面试时间、面试、等待回复、拿到offer、入职以及准备必备资料和签订合同九个步骤。这个流程帮助求职者了解从开始求职到成功入职整个过程...
Spark Core学习 对最近在看的赵星老师Spark视频中关于SparkCore的几个案例进行总结。 目录1.WordCountWordCount 执行流程详解2.统计最受欢迎老师topN1. 方法一:普通方法,不设置分组/分区2. 方法二:设置分组和过滤...