import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.partition.*;
public class WordCount {
public static class MyMapper extends Mapper<Object,Text, Text,IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value,Context context) throws IOException,InterruptedException{
StringTokenizer st=new StringTokenizer(value.toString());
while(st.hasMoreTokens()){
String str=st.nextToken();
word.set(str);
context.write(word, one);
System.out.println(str+"="+one.get());
}
}
}
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values ,Context context) throws IOException,InterruptedException{
int sum=0;
for(IntWritable val:values)
{
sum+=val.get();
}
System.out.println(key+"="+sum);
result.set(sum);
context.write(key,result);
}
}
public static void main(String [] args) throws Exception
{
Configuration conf= new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job=new Job(conf,"wordcount" );
/**InputFormat类的作用是将输入的数据分割成splits,并将splits进一步拆成<K,V>
*可以通过setInputFormatClass()方法进行设置
*默认为TextInputFormat.class,默认情况可以不写
**/
job.setInputFormatClass(TextInputFormat.class);
/**
*Mapper类的作用是实现map函数,将splits作为输入生成一个结果
*可以通过setMapperClass()方法进行设置
*默认为Mapper.class,默认情况可以不写,此时输入即输出
*/
job.setMapperClass(MyMapper.class);
/**
* 设置Mapper输出的key的类型
*/
job.setMapOutputKeyClass(Text.class);
/**
* 设置Mapper输出的value的类型
*/
job.setMapOutputValueClass(IntWritable.class);
/**
* Combiner类的作用是实现combine函数,将mapper的输出作为输入,合并具有形同key值得键值对
* 可以通过setCombinerClass()方法进行设置
* 默认为null,默认情况不写,此时输入即输出
*/
job.setCombinerClass(MyReducer.class);
/**
* Partitioner类的作用是实现getPartition函数,用于在洗牌过程中将由Mapper输入的结果分成R份,每份交给一个Reducer
* 可以通过setPartitionerClass()方法进行设置
* 默认为HashPartitioner.class,默认情况可以不写,此时输入即输出
*/
job.setPartitionerClass(HashPartitioner.class);
/**
* Reducer类的作用是实现reduce函数,将有combiner的输出作为输入,得到最终结果
* 可以通过setReducerClass()方法进行设置
* 默认为Reducer.class,默认情况可以不写,此时输入即输出
*/
job.setReducerClass(MyReducer.class);
/**
* OutputFormat类,负责输出最终结果
* 可以通过setOutputFormatClass()方法进行设置
* 默认TextOutputFormat.class,默认情况可以不写,此时输入即输出
*/
//job.setOutputFormatClass(TextOutputFormat.class);
/**
* 设置reduce()输出的key的类型
*/
job.setOutputKeyClass(Text.class);
/**
* 设置reduce()输出的value的类型
*/
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.out.println("运行啦");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我们先来看main函数,以便了解hadoop的MapReduce的过程,假设输入为两个文件
InputFormat类将输入文件划分成多个splits,同时将这些splits转化为<K,V>的形式,如下,可以发现,当使用默认的TextInputFormat进行处理的时候,K为偏移量,V为每行的文本。
接着map()方法
对以上结果进行处理,根据MyMapper中的map方法,得到以下结果:
接下来Mapper框架对以上结果进行处理,根据key值进行排序,并合并成集合,得到以下结果:
接下来combine类对以上结果进行处理(实际上combine是一个本地的reducer,所以用MyReducer给他复制,见文章最后)得到结果如下:
接下来Reducer框架对其进行处理,结果如下:
接下来reduce()方法进行处理,结果如下:
以上是wordcount程序执行的全过程,通过wordcount的代码,我们了解了MapReduce框架的执行流程,如下
InputFormat>>map()方法>>Mapper框架>>Combiner类>>Partitioner类>>Reducer框架>>reduce()方法
以上的每个步骤,不设置具体的类时都会有个默认的类,除了InputFormat类以外,其他类的默认类都是输入即输出,但是InputFormat的默认类是输出为<K,V>
因此,对于wordcount程序来说,如果都采用默认类的话,输出应该为
下面来说说Combiner类,本质上是一个本地的Reducer类。其设计初衷是在本地将需要reduce操作的数据合并,以减少不必要的通信代价,以提高hadoop的运行性能。
但值得注意的是,并不是所有的mapreduce程序都可以将reduce过程移植到本地进行combine,这需要在逻辑上考虑这种移植是否可行!要想 进行本地reduce(combine),一个必要的条件是,reduce的输入输出格式必须一样!比如,对于wordcount程序,combine是 可行的,combine的输出(实际上是reduce的输出)与reduce的输入吻合。因此我们才可以有 job.setCombinerClass(MyReducer.class);
实际上,在Combiner和Reducer之间还有一个Partitioner类,该类用于在shuffle过程中将中间值分成R份,每份由一个Reducer负责。通过使用job.setPartitionerClass()来进行设置,默认使用HashPartitioner类。
相关推荐
Hadoop的分布式文件系统(HDFS)会自动处理文件的分发,MapReduce框架将根据配置运行Map和Reduce任务。 6. **Hadoop 2.2版本的特性**: Hadoop 2.2是YARN(Yet Another Resource Negotiator)引入的版本,它将资源...
接下来,MapReduce框架会对这些键值对进行分区和排序,确保所有相同键的值会被传递到同一个Reduce任务。在WordCount中,这一步对于汇总每个单词的总数至关重要。 Reduce阶段接收到Map阶段处理后的键值对,对每个...
在实际运行时,这些脚本会与Hadoop的MapReduce框架结合,通过Hadoop的命令行工具提交作业进行执行。 总结来说,"mapreducewordcounter-master"项目是通过Python实现的MapReduce word count应用,它展示了如何在...
WordCount是MapReduce中的一个经典示例,它用于统计文本中各个单词出现的次数,简单明了地展示了MapReduce的核心理念和工作流程。 在Hadoop环境中,MapReduce通过两个主要阶段来完成任务:Map阶段和Reduce阶段。...
程序运行时,MapReduce框架会启动JVM执行任务,显示任务相关信息,包括Job ID、输入输出文件数量、map和reduce任务的数量及处理记录。 3. 查看和解析结果 运行完成后,结果会存储在指定的输出目录下,通常包含多个...
此外,随着Spark等新型大数据处理框架的出现,虽然它们仍然可以利用MapReduce的思想,但提供了更高效和灵活的计算模型,如DAG(有向无环图)执行模型。 总结,MapReduce WordCount是大数据处理的基础,通过它我们...
MapReduce是一种分布式计算框架,由Apache Hadoop项目开发,主要用于处理和存储大量数据。这个框架在人工智能(AI)领域有着广泛的应用,特别是在大数据分析中。Hadoop是实现MapReduce的核心库,同时也是一种分布式...
* MapReduce 程序的执行流程 * MapReduce 程序的优化方法 MapReduce 项目实践 在实践中,我们可以使用 MapReduce 来解决各种大数据处理问题。以下是一些 MapReduce 项目实践: * WordCount 程序编写及代码分析 * ...
文档中提到MapReduce框架具有一些用户界面,用于处理任务的有效载荷(payload)、配置作业(job configuration)、执行任务和管理环境(task execution & environment)、提交和监控作业(job submission & ...
1.4WordCount处理过程描述了WordCount程序在MapReduce框架下完整的执行流程。输入数据经过Map阶段分解为多个键值对,其中键是单词,值是出现的次数(通常是1)。在Reduce阶段,相同键的键值对被归约(合并),最终...
总结,MapReduce在处理大数据时提供了强大的能力,而WordCount案例则展示了其基本工作流程。通过Java实现,我们可以灵活地设计和部署MapReduce作业,无论是本地测试还是远程集群执行。理解并熟练掌握这些知识,对于...
MapReduce框架中还有两个重要的组件是Combiner和Partitioner。Combiner用于局部地对Map输出的结果进行合并,以减少数据的传输量并提高效率。Partitioner负责将Map输出的中间数据按照键(Key)进行分区,确保具有相同...
在Hadoop生态系统中,MapReduce是一种分布式计算框架,主要用于处理和生成大数据集。"006_hadoop中MapReduce详解_3"可能是指一个系列教程的第三部分,着重讲解MapReduce的核心概念、工作原理以及实际应用。在这个...
除了基本的Map和Reduce任务,MapReduce框架还支持其他组件,如Combiner(本地化减少)、Partitioner(决定哪些键值对发送到哪个Reducer)和OutputFormat(定义结果的格式)。此外,可以使用自定义逻辑来优化性能,...
### MapReduce计算模式详解 #### 一、MapReduce简单概述 MapReduce是一种高效的大数据处理技术,它由Google提出并在Hadoop中...通过WordCount程序的具体实现,我们可以更深入地理解MapReduce的工作原理和内部流程。
MapReduce框架会自动处理数据的并行化、容错、负载均衡和数据本地性优化等问题。 尽管MapReduce在离线大规模数据处理上表现出色,但它也存在一些不擅长的方面。例如,MapReduce不适用于要求毫秒级或秒级返回结果的...
尽管wordcount看起来简单,但它背后的分布式计算逻辑对于理解和应用更复杂的MapReduce算法至关重要。通过学习和实践wordcount,开发者可以更好地掌握Hadoop生态系统的运作方式,为进一步探索大数据分析和处理打下...
`Hadoop WordCount`的工作流程分为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,输入数据(通常是文本文件)被分割成多个块,每个块由一个Map任务处理。Map任务读取数据,对每一行进行分词,然后将每个单词与计数...