`
younglibin
  • 浏览: 1216353 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

mapreduce中map是怎么做的?参数又是怎么解析传递给map方法的?没有ifelse判断数据又是如何累加的

阅读更多

1.首先介绍一下wordcount 早mapreduce框架中的 对应关系

大家都知道 mapreduce 分为 map 和reduce 两个部分,那么在wordcount例子中,很显然 对文件word 计数部分为map,对 word 数量累计部分为 reduce;

大家都明白  map接受一个参数,经过map处理后,将处理结果作为reduce的入参分发给reduce,然后在reduce中统计了word 的数量,最终输出到输出结果;

但是初看遇到的问题:

一、map的输入参数是个 Text之类的 对象,并不是 file对象

二、reduce中并没有if-else之类的判断语句 ,来说明 这个word 数量  加  一次,那个word 加一次。那么这个判断到底只是在 map中已经区分了 还是在reduce的时候才判断的

三、map过程到底做了什么,reduce过程到底做了什么?为什么它能够做到多个map多个reduce?

 

一、

1. 怎么将 文件参数 传递 到 job中呢?

在 client 我们调用了FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

实际上 addInputPath 做了以下的事情(将文件路径加载到了conf中)

public static void addInputPath(Job job, 

                                  Path path) throws IOException {

    Configuration conf = job.getConfiguration();

    path = path.getFileSystem(conf).makeQualified(path);

    String dirStr = StringUtils.escapeString(path.toString());

    String dirs = conf.get(INPUT_DIR);

    conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);

  }

 

我们再来看看 FileInputFormat 是做什么用的, FileInputFormat 实现了 InputFormat 接口 ,这个接口是hadoop用来接收客户端输入参数的。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。

 

我们会看到 在 InputFormat 接口中 有getSplits方法,也就是说分片操作实际上实在 map之前 就已经做好了

 List<InputSplit>getSplits(JobContext job) 

          Generate the list of files and make them into FileSplits.

具体实现参考  FileInputFormat  getSplits 方法:

上面是FileInputFormat的getSplits()方法,它首先得到分片的最小值minSize和最大值maxSize,它们会被用来计算分片大小。可以通过设置mapred.min.split.size和mapred.max.split.size来设置。splits链表用来存储计算得到的输入分片,files则存储作为由listStatus()获取的输入文件列表。然后对于每个输入文件,判断是否可以分割,通过computeSplitSize计算出分片大小splitSize,计算方法是:Math.max(minSize, Math.min(maxSize, blockSize));也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize。然后我们根据这个splitSize计算出每个文件的inputSplits集合,然后加入分片列表splits中。注意到我们生成InputSplit的时候按上面说的使用文件路径,分片起始位置,分片大小和存放这个文件的hosts列表来创建。最后我们还设置了输入文件数量:mapreduce.input.num.files。

 

 

 

 

二、计算出来的分片有时怎么传递给  map呢 ?对于单词数量如何累加?

我们使用了  就是InputFormat中的另一个方法createRecordReader() 这个方法:

RecordReader:

  RecordReader是用来从一个输入分片中读取一个一个的K -V 对的抽象类,我们可以将其看作是在InputSplit上的迭代器。我们从API接口中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法,由它获取分片上的下一个K-V 对。

 

可以看到接口中有:

   public abstract   boolean nextKeyValue() throws IOException, InterruptedException;

  public abstract  KEYIN getCurrentKey() throws IOException, InterruptedException;

  public abstract   VALUEIN getCurrentValue() throws IOException, InterruptedException;

  public abstract float getProgress() throws IOException, InterruptedException;

  public abstract void close() throws IOException;

 

 FileInputFormat<K,V>  

Direct Known Subclasses:

CombineFileInputFormat, KeyValueTextInputFormat, NLineInputFormat, SequenceFileInputFormat, TextInputFormat

 

对于 wordcount  测试用了 NLineInputFormat和 TextInputFormat  实现类 

 

在 InputFormat  构建一个    RecordReader 出来,然后调用RecordReader   initialize  的方法,初始化RecordReader 对象

 

那么  到底 Map是怎么调用  的呢? 通过前边我们 已经将 文件分片了,并且将文件分片的内容存放到了RecordReader中,

 

下面继续看看这些RecordReader是如何被MapReduce框架使用的

 

终于 说道 Map了 ,我么如果要实现Map  那么 一定要继承 Mapper这个类

 public abstract class Context

    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

  }

  protected void setup(Context context) throws IOException, InterruptedException

  protected void map(KEYIN key, VALUEIN value, Context context) throws IOException,InterruptedException {  }

  protected void cleanup(Context context  ) throws IOException, InterruptedException {  }

  public void run(Context context) throws IOException, InterruptedException {  }

 

 

 我们写MapReduce程序的时候,我们写的mapper都要继承这个Mapper.class,通常我们会重写map()方法,map()每次接受一个K-V对,然后我们对这个K-V对进行处理,再分发出处理后的数据。我们也可能重写setup()以对这个map task进行一些预处理,比如创建一个List之类的;我们也可能重写cleanup()方法对做一些处理后的工作,当然我们也可能在cleanup()中写出K-V对。举个例子就是:InputSplit的数据是一些整数,然后我们要在mapper中算出它们的和。我们就可以在先设置个sum属性,然后map()函数处理一个K-V对就是将其加到sum上,最后在cleanup()函数中调用context.write(key,value);

  最后我们看看Mapper.class中的run()方法,它相当于map task的驱动,我们可以看到run()方法首先调用setup()进行初始操作,然后对每个context.nextKeyValue()获取的K-V对,就调用map()函数进行处理,最后调用cleanup()做最后的处理。事实上,从context.nextKeyValue()就是使用了相应的RecordReader来获取K-V对的。

 

 我们看看Mapper.class中的Context类,它继承与MapContext,使用了一个RecordReader进行构造。下面我们再看这个MapContext。

 

public MapContextImpl(Configuration conf, TaskAttemptID taskid,

                        RecordReader<KEYIN,VALUEIN> reader,

                        RecordWriter<KEYOUT,VALUEOUT> writer,

                        OutputCommitter committer,

                        StatusReporter reporter,

                        InputSplit split) {

    super(conf, taskid, writer, committer, reporter);

    this.reader = reader;

    this.split = split;

  }

 

 

RecordReader  看来是在这里构造出来了, 那么 是谁调用这个方法,将这个承载着关键数据信息的 RecordReader 传过来了 ?

 

我们可以想象 这里 应该被框架调用的可能性比较大了,那么mapreduce 框架是怎么分别来调用map和reduce呢?  

还以为分析完map就完事了,才发现这里仅仅是做了mapreduce 框架调用前的一些准备工作,

 

还是继续分析 下 mapreduce 框架调用吧:

参考: http://www.cppblog.com/javenstudio/articles/43073.html

1.在 job提交 任务之后 首先由jobtrack 分发任务, 

 

在 任务分发完成之后  ,执行 task的时候,这时 调用了 maptask 中的  runNewMapper

 

在这个方法中调用了 MapContextImpl,  至此  这个map 和框架就可以联系起来了

 

 

 

map 过程参考:

http://www.360doc.com/content/12/0827/09/9318309_232551564.shtml

 

 

 

 

 

 

分享到:
评论

相关推荐

    使用python实现mapreduce(wordcount).doc

    在大数据处理领域,Java 通常是首选语言,但考虑到Python在数据挖掘和深度学习中的便利性,我们可以使用Python来实现MapReduce。本篇文章将探讨如何在Hadoop平台上利用Python实现WordCount,一个经典的MapReduce示例...

    在Hadoop的MapReduce任务中使用C程序的三种方法

    在Hadoop的MapReduce任务中,有时需要使用C或C++等非Java语言,这是因为开发团队可能更熟悉这些语言,或者已有代码库是用这些语言编写的。为此,Hadoop提供了三种方法来实现这一目标:Hadoop Streaming、Hadoop ...

    Hadoop大数据实训,求最高温度最低温度实验报告

    在本实验"MapReduce编程训练(三)"中,我们主要关注的是利用Hadoop MapReduce框架处理大数据,特别是针对全球气温记录的统计分析。实验的主要目标是实现以下几个关键知识点: 1. **自定义数据类型(Custom Data ...

    python MapReduce的wordcount

    ### Python 实现 MapReduce 的 WordCount 示例详解 #### 一、引言 MapReduce 是 Hadoop 生态系统中的一种...这种方法不仅有助于理解 MapReduce 的基本概念,还能够帮助开发者快速上手使用 Python 进行大数据处理任务。

    第二章 分布式文件系统HDFS+MapReduce(代码实现检查文件是否存在&WordCount统计).docx

    MapReduce简化了大规模数据处理的复杂性,使得开发人员可以专注于数据处理逻辑,而无需关注底层分布式系统的细节。 【MapReduce工作流程】 1. **Map阶段**:原始数据被分片(split),每个分片由一个Map任务处理。...

    现有student.txt和student-score.txt 将两个文件上传到hdfs上 使用Map/Reduce框架完成下面

    根据给定文件中的标题、描述、标签以及部分内容,可以总结出以下相关知识点: ### HDFS基本概念 **HDFS(Hadoop Distributed File System)**是Hadoop生态系统中的分布式文件系统,用于存储大量的数据集。HDFS的...

    04-大数据技术之高频面试题9.0.5.pdf

    - Shuffle是MapReduce过程中数据从Map任务向Reduce任务传输的过程。 - 优化方法包括:增加combiner的使用、调整shuffle buffer size、使用压缩等技术。 #### 1.2.9 Yarn 工作机制 - YARN全称为Yet Another Resource...

    Spark 编程基础(Scala 版)-机房上机实验指南

    - **控制结构**:提供if-else语句、for循环等基本控制结构。 - **函数式编程**:支持高阶函数、模式匹配等函数式编程特性,这些特性对于编写高效、可维护的Spark程序至关重要。 #### Spark与Scala的集成 - **环境...

    JAVA2核心技术第1卷[1].基础知识7th_1

    - 大数据处理:如Hadoop生态系统中的MapReduce等。 ### 二、Java基础语法 1. **变量与数据类型**: - 基本数据类型:如int、double、char等。 - 引用数据类型:如类、数组等。 2. **流程控制**: - 条件语句...

    云计算 a plus b java

    通过MapReduce编程模型来处理数据,将输入的数据进行拆分、处理和汇总,最终输出结果。 ### 二、核心概念 #### 1. MapReduce MapReduce是一种分布式计算框架,用于处理大规模数据集。它包括两个阶段:Map阶段和...

    java期末复习资料(大数据)

    总结上述内容,我们可以看到这些Java编程示例涵盖了基本的输入输出、条件判断(if-else和switch-case)、循环以及简单的数据处理。这些是Java编程的基础,也是大数据分析的起点。在大数据领域,Java提供了Hadoop和...

    算法题.zip

    - 基本语法:如变量、数据类型、控制流(if-else,for,while等)、函数、类和对象等。 - 标准库:如os、sys、re、json等模块的使用。 - 高级特性:如装饰器、生成器、上下文管理器、元编程等。 - 异常处理:...

    大数据人工智能量化投资平台.pdf

    - if...else语句:条件判断。 - switch语句:多分支选择。 - for、while循环:循环执行。 - **IDEA与Eclipse** - IntelliJ IDEA和Eclipse都是常用的Java集成开发环境(IDE)。 - 支持代码编写、调试、构建等...

    架构脑图.pdf

    - **流程控制语句**:如if-else、switch-case、for循环等,用于控制程序的执行流程。 - **IDEA与Eclipse**:常用的Java集成开发环境(IDE),提供代码编辑、调试等功能。 - **数组**:用于存储同类型数据的集合。 - ...

    用Hadoop进行分布式数据处理,第3部分:应用程序开发

    2. **Map任务**:每个map任务负责处理分配给它的数据分片,并将处理后的结果输出为一组键值对的中间集合。 3. **Shuffle和Sort**:map任务完成后,中间结果会被重新排序和分组,确保相同键的值会被同一个reduce任务...

    python成功运行hadoop

    它通过分布式文件系统(HDFS)来存储数据,并利用MapReduce编程模型进行并行计算。本文将详细介绍如何使用Python脚本来实现Hadoop MapReduce任务的基本流程。 #### 编写Map和Reduce程序 **1. Mapper Program ...

    秒针系统:面试问题

    ### 秒针系统的面试问题解析 #### 1. 查找Linux上文件大小超过100K的命令 在Linux环境中,查找文件大小超过特定阈值的命令可以通过`find`结合`du`命令来实现。具体命令如下: ```bash find /path/to/search -type...

    spark-tut-2016-intro.pdf

    Spark利用内存中的数据缓存技术来加速数据处理过程,这与传统的基于磁盘存储的MapReduce相比,具有显著的优势。Spark可以在内存中保留数据集,减少磁盘I/O操作,从而大大提高了处理速度。 ### Spark与Impala的比较 ...

    hive udaf 实现按位取与或

    在大数据处理领域,Hive作为一个基于Hadoop的数据仓库工具,被广泛用于结构化数据的查询、分析和处理。用户自定义聚合函数(User-Defined Aggregate Functions, UDAGGs 或简称 UDAFs)是Hive提供的扩展功能,允许...

Global site tag (gtag.js) - Google Analytics