`
臻是二哥
  • 浏览: 188185 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
博客专栏
Group-logo
Java技术分享
浏览量:0
社区版块
存档分类
最新评论

从WordCount看MapReduce框架执行流程

阅读更多
代码如下:

import java.io.IOException;

import java.util.StringTokenizer;

 

import org.apache.hadoop.conf.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.util.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.Reducer.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.lib.partition.*;

 

public class WordCount {

public static class MyMapper extends Mapper<Object,TextText,IntWritable>{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

 

public void map(Object key, Text value,Context context) throws IOException,InterruptedException{

StringTokenizer st=new StringTokenizer(value.toString());

while(st.hasMoreTokens()){

String str=st.nextToken();

word.set(str);

context.write(wordone);

System.out.println(str+"="+one.get());

}

}

}

 

public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

private IntWritable result = new IntWritable();

public void reduce(Text key,Iterable<IntWritable> values ,Context context) throws IOException,InterruptedException{

int sum=0;

for(IntWritable val:values)

{

sum+=val.get();

}

System.out.println(key+"="+sum);

result.set(sum);

context.write(key,result);

}

 

}

 

 

public static void main(String [] args) throws Exception

{

Configuration conf= new Configuration();

 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length != 2) {

      System.err.println("Usage: wordcount <in> <out>");

      System.exit(2);

    }

   Job job=new Job(conf,"wordcount" );

   /**InputFormat类的作用是将输入的数据分割成splits,并将splits进一步拆成<K,V>

   *可以通过setInputFormatClass()方法进行设置

   *默认为TextInputFormat.class,默认情况可以不写

   **/

   job.setInputFormatClass(TextInputFormat.class);

   /**

    *Mapper类的作用是实现map函数,将splits作为输入生成一个结果

    *可以通过setMapperClass()方法进行设置

    *默认为Mapper.class,默认情况可以不写,此时输入即输出

    */

   job.setMapperClass(MyMapper.class);

   /**

    * 设置Mapper输出的key的类型

    */

   job.setMapOutputKeyClass(Text.class);

   /**

    * 设置Mapper输出的value的类型

    */

   job.setMapOutputValueClass(IntWritable.class);

   /**

    * Combiner类的作用是实现combine函数,将mapper的输出作为输入,合并具有形同key值得键值对

    * 可以通过setCombinerClass()方法进行设置

    * 默认为null,默认情况不写,此时输入即输出

    */

   job.setCombinerClass(MyReducer.class);

   /**

    * Partitioner类的作用是实现getPartition函数,用于在洗牌过程中将由Mapper输入的结果分成R份,每份交给一个Reducer

    * 可以通过setPartitionerClass()方法进行设置

    * 默认为HashPartitioner.class,默认情况可以不写,此时输入即输出

    */

   job.setPartitionerClass(HashPartitioner.class);

   /**

    * Reducer类的作用是实现reduce函数,将有combiner的输出作为输入,得到最终结果

    * 可以通过setReducerClass()方法进行设置

    * 默认为Reducer.class,默认情况可以不写,此时输入即输出

    */

   job.setReducerClass(MyReducer.class);

   /**

    * OutputFormat类,负责输出最终结果

    * 可以通过setOutputFormatClass()方法进行设置

    * 默认TextOutputFormat.class,默认情况可以不写,此时输入即输出

    */

   //job.setOutputFormatClass(TextOutputFormat.class);

   /**

    * 设置reduce()输出的key的类型

    */

   job.setOutputKeyClass(Text.class);

   /**

    * 设置reduce()输出的value的类型

    */

   job.setOutputValueClass(IntWritable.class);

   

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.out.println("运行啦");

    System.exit(job.waitForCompletion(true) ? 0 : 1);

 

}

}

我们先来看main函数,以便了解hadoopMapReduce的过程,假设输入为两个文件



 

 

 

 

InputFormat类将输入文件划分成多个splits,同时将这些splits转化为<K,V>的形式,如下,可以发现,当使用默认的TextInputFormat进行处理的时候,K为偏移量,V为每行的文本。




 
  

接着map()方法

 

对以上结果进行处理,根据MyMapper中的map方法,得到以下结果:




 

 

接下来Mapper框架对以上结果进行处理,根据key值进行排序,并合并成集合,得到以下结果:



 

 

 

接下来combine类对以上结果进行处理(实际上combine是一个本地的reducer,所以用MyReducer给他复制,见文章最后)得到结果如下:



 

 

 

接下来Reducer框架对其进行处理,结果如下:


 
 
 
 

 

 

 

 

 

接下来reduce()方法进行处理,结果如下:




 
 
 

 

 

 

 

 

以上是wordcount程序执行的全过程,通过wordcount的代码,我们了解了MapReduce框架的执行流程,如下

InputFormat>>map()方法>>Mapper框架>>Combiner>>Partitioner>>Reducer框架>>reduce()方法

以上的每个步骤,不设置具体的类时都会有个默认的类,除了InputFormat类以外,其他类的默认类都是输入即输出,但是InputFormat的默认类是输出为<K,V>

因此,对于wordcount程序来说,如果都采用默认类的话,输出应该为    




 
                       

 

 

 

 

下面来说说Combiner类,本质上是一个本地的Reducer类。其设计初衷是在本地将需要reduce操作的数据合并,以减少不必要的通信代价,以提高hadoop的运行性能。 

但值得注意的是,并不是所有的mapreduce程序都可以将reduce过程移植到本地进行combine,这需要在逻辑上考虑这种移植是否可行!要想 进行本地reducecombine),一个必要的条件是,reduce的输入输出格式必须一样!比如,对于wordcount程序,combine是 可行的,combine的输出(实际上是reduce的输出)与reduce的输入吻合。因此我们才可以有 job.setCombinerClass(MyReducer.class);


实际上,在CombinerReducer之间还有一个Partitioner类,该类用于在shuffle过程中将中间值分成R份,每份由一个Reducer负责。通过使用job.setPartitionerClass()来进行设置,默认使用HashPartitioner类。

 

 

  • 大小: 3.3 KB
  • 大小: 4 KB
  • 大小: 6.3 KB
  • 大小: 5.8 KB
  • 大小: 5 KB
  • 大小: 3.6 KB
  • 大小: 2.5 KB
  • 大小: 2.7 KB
  • 大小: 10.9 KB
  • 大小: 3.3 KB
0
0
分享到:
评论

相关推荐

    hadoop 框架下 mapreduce源码例子 wordcount

    Hadoop的分布式文件系统(HDFS)会自动处理文件的分发,MapReduce框架将根据配置运行Map和Reduce任务。 6. **Hadoop 2.2版本的特性**: Hadoop 2.2是YARN(Yet Another Resource Negotiator)引入的版本,它将资源...

    mapreduce的wordCount案例

    接下来,MapReduce框架会对这些键值对进行分区和排序,确保所有相同键的值会被传递到同一个Reduce任务。在WordCount中,这一步对于汇总每个单词的总数至关重要。 Reduce阶段接收到Map阶段处理后的键值对,对每个...

    mapreducewordcounter-master_wordcount_mapReduce_

    在实际运行时,这些脚本会与Hadoop的MapReduce框架结合,通过Hadoop的命令行工具提交作业进行执行。 总结来说,"mapreducewordcounter-master"项目是通过Python实现的MapReduce word count应用,它展示了如何在...

    MapReduce之wordcount范例代码

    WordCount是MapReduce中的一个经典示例,它用于统计文本中各个单词出现的次数,简单明了地展示了MapReduce的核心理念和工作流程。 在Hadoop环境中,MapReduce通过两个主要阶段来完成任务:Map阶段和Reduce阶段。...

    MapReduce经典例子WordCount运行详解.pdf

    程序运行时,MapReduce框架会启动JVM执行任务,显示任务相关信息,包括Job ID、输入输出文件数量、map和reduce任务的数量及处理记录。 3. 查看和解析结果 运行完成后,结果会存储在指定的输出目录下,通常包含多个...

    MapReduce WordCount

    此外,随着Spark等新型大数据处理框架的出现,虽然它们仍然可以利用MapReduce的思想,但提供了更高效和灵活的计算模型,如DAG(有向无环图)执行模型。 总结,MapReduce WordCount是大数据处理的基础,通过它我们...

    用mapreduce计算框架实现了4个小demo wordcount、基于物品的推荐算法和基于用户的推荐算法

    MapReduce是一种分布式计算框架,由Apache Hadoop项目开发,主要用于处理和存储大量数据。这个框架在人工智能(AI)领域有着广泛的应用,特别是在大数据分析中。Hadoop是实现MapReduce的核心库,同时也是一种分布式...

    hadoop mapreduce编程实战

    * MapReduce 程序的执行流程 * MapReduce 程序的优化方法 MapReduce 项目实践 在实践中,我们可以使用 MapReduce 来解决各种大数据处理问题。以下是一些 MapReduce 项目实践: * WordCount 程序编写及代码分析 * ...

    hadoop mapreduce

    文档中提到MapReduce框架具有一些用户界面,用于处理任务的有效载荷(payload)、配置作业(job configuration)、执行任务和管理环境(task execution & environment)、提交和监控作业(job submission & ...

    Hadoop集群-WordCount运行详解.pdf

    1.4WordCount处理过程描述了WordCount程序在MapReduce框架下完整的执行流程。输入数据经过Map阶段分解为多个键值对,其中键是单词,值是出现的次数(通常是1)。在Reduce阶段,相同键的键值对被归约(合并),最终...

    MapReduce - WordCount案例 - 含各种部署方式源码

    总结,MapReduce在处理大数据时提供了强大的能力,而WordCount案例则展示了其基本工作流程。通过Java实现,我们可以灵活地设计和部署MapReduce作业,无论是本地测试还是远程集群执行。理解并熟练掌握这些知识,对于...

    云帆大数据----04 MapReduce入门编程、框架原理、

    MapReduce框架中还有两个重要的组件是Combiner和Partitioner。Combiner用于局部地对Map输出的结果进行合并,以减少数据的传输量并提高效率。Partitioner负责将Map输出的中间数据按照键(Key)进行分区,确保具有相同...

    006_hadoop中MapReduce详解_3

    在Hadoop生态系统中,MapReduce是一种分布式计算框架,主要用于处理和生成大数据集。"006_hadoop中MapReduce详解_3"可能是指一个系列教程的第三部分,着重讲解MapReduce的核心概念、工作原理以及实际应用。在这个...

    mapreduce案例数据

    除了基本的Map和Reduce任务,MapReduce框架还支持其他组件,如Combiner(本地化减少)、Partitioner(决定哪些键值对发送到哪个Reducer)和OutputFormat(定义结果的格式)。此外,可以使用自定义逻辑来优化性能,...

    MapReduce计算模式详解

    ### MapReduce计算模式详解 #### 一、MapReduce简单概述 MapReduce是一种高效的大数据处理技术,它由Google提出并在Hadoop中...通过WordCount程序的具体实现,我们可以更深入地理解MapReduce的工作原理和内部流程。

    MapReduce 2.0

    MapReduce框架会自动处理数据的并行化、容错、负载均衡和数据本地性优化等问题。 尽管MapReduce在离线大规模数据处理上表现出色,但它也存在一些不擅长的方面。例如,MapReduce不适用于要求毫秒级或秒级返回结果的...

    wordcount_demo

    尽管wordcount看起来简单,但它背后的分布式计算逻辑对于理解和应用更复杂的MapReduce算法至关重要。通过学习和实践wordcount,开发者可以更好地掌握Hadoop生态系统的运作方式,为进一步探索大数据分析和处理打下...

    WordCount2_hadoopwordcount_

    `Hadoop WordCount`的工作流程分为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,输入数据(通常是文本文件)被分割成多个块,每个块由一个Map任务处理。Map任务读取数据,对每一行进行分词,然后将每个单词与计数...

Global site tag (gtag.js) - Google Analytics