`

Hadoop MapReduce数据流程(上)

阅读更多

本文不涉及MapReduce的原理介绍,只是从源代码的层面讲讲我对Hadoop的MapReduce的执行过程、数据流的一点理解。


首先贴上一张来之于Yahoo Hadoop 教程 的图片

Detailed Hadoop MapReduce data flow

 

由上图可以看出,在进入Map之前,InputFormat把存储在HDFS的文件进行读取和分割,形成和任务相关的InputSplits,然后RecordReader负责读取这些Splits,并把读取出来的内容作为Map函数的输入参数。下面我就从代码执行的角度来看,数据是如何一步步从HDFS的file到Map函数的。在Yahoo Hadoop 教程 中已经详细讲解了这一过程。但我作为一个细节控,更想从源代码的级别去理清这一过程,这样我才觉得踏实,才觉得自己真真切切地掌握了这个知识点,因此我仔细阅读了这部分的源代码,写篇博客记录下来,以便以后自己查看。

首先,在Mapper类的run方法中,map函数被循环调用:

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

     ...................................
    /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  } 

          在run方法中,每调用一次context.nextKeyValue(),就执行一遍map方法,而此处的context实际上是实现了Context接口的MapContextImpl(这一点可以在MultithreadedMapper的run方法看出来),其nextKeyValue,getCurrentKey,getCurrentValue方法为:

  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    return reader.nextKeyValue();
  }
  @Override
  public KEYIN getCurrentKey() throws IOException, InterruptedException {
    return reader.getCurrentKey();
  }

  @Override
  public VALUEIN getCurrentValue() throws IOException, InterruptedException {
    return reader.getCurrentValue();
  }
       上述代码中的实际上是由reader来完成nextKeyValue的工作,reader是RecordReader实例,RecordReader就是用来读取各个task的splits,产生map函数的输入参数。实现RecordReader接口的类由很多,那此处的reader到底是那个类的实例呢?我们到创建context的地方去看一看。
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
          (inputFormat.createRecordReader(split, taskContext), reporter);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    ..............

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);
    上面代码中的input是一个NewTrackingRecordReader实例,而NewTrackingRecordReader则是对inputFormat.createRecordReader(split, taskContext), reporter)返回的RecordReader对象的封装,inputFormat是InputFormat类的实例,InputFormat类定义了如何分割可读取文件,
public abstract class InputFormat<K, V> {

 
  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;
  

  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

}
    读取文件主要是通过其创建的RecordReader来完成的。Hadoop自带了好几种输入格式,关于输入格式的具体描述可以参考此处Yahoo Hadoop 教程 。JobContextImpl中包括了InputFormat的get和set方法,默认的实现是TextInputFormat ---读取文件的行,行的偏移量为key,行的内容为value。我们可以通过重写InputFormat中的isSplitable和createRecordReader来实现自定义的InputFormat,并通过JobContextImpl中的set方法来在map中采用自己的输入格式。
  @SuppressWarnings("unchecked")
  public Class<? extends InputFormat<?,?>> getInputFormatClass() 
     throws ClassNotFoundException {
    return (Class<? extends InputFormat<?,?>>) 
      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
  }
 因为读物文件是通过RecordReader完成的,因此接下来看看TextInputFormat中的 RecordReader是什么?
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes();
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
         ......................
  }

}
    可见,TextInputFormat中,创建的RecordReader为LineRecordReader,”textinputformat.record.delimiter“指的是读取一行的数据的终止符号,即遇到“ textinputformat.record.delimiter ”所包含的字符时,该一行的读取结束。可以通过Configuration的set()方法来设置自定义的终止符,如果没有设置 textinputformat.record.delimiter,那么Hadoop就采用以CR,LF或者CRLF作为终止符,这一点可以查看LineReader的readDefaultLine方法 。查看LineRecordReader的实现就知道为什么上面说 TextInputFormat是以行的偏移量为key,行的内容为value了。来看看其中的几个主要的方法:
    public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
                 ......
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();
 

    // open the file and seek to the start of the split
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);
    if (isCompressedInput()) {
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        if (null == this.recordDelimiterBytes){
          in = new LineReader(cIn, job);
        } else {
          in = new LineReader(cIn, job, this.recordDelimiterBytes);
        }

        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = cIn;
      } else {
        if (null == this.recordDelimiterBytes) {
          in = new LineReader(codec.createInputStream(fileIn, decompressor),
              job);
        } else {
          in = new LineReader(codec.createInputStream(fileIn,
              decompressor), job, this.recordDelimiterBytes);
        }
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);
      if (null == this.recordDelimiterBytes){
        in = new LineReader(fileIn, job);
      } else {
        in = new LineReader(fileIn, job, this.recordDelimiterBytes);
      }

      filePosition = fileIn;
    }

  }
  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      pos += newSize;
      inputByteCounter.increment(newSize);
      if (newSize < maxLineLength) {
        break;
      }

    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }
  首先在initialize方法里,根据传入的FileSplit来获取到当前读取文件的path,起始位置,并以此创建真正的文件读取流in,我们可以看见在nextKeyValue方法里,就是由in来读取文件,更新key和value的值。
  至此,Hadoop如何把文件数据读取出来,并以何种方式传给Map函数,就一目了然了,同时也更加理解了Yahoo Hadoop 教程 里面提到的譬如FileInputFormat的默认实现,TextInputFormat是如何实现Key-Value组合等等内容。最大的好处在于,如果我要实现一些自定义的东西,我应该如何去修改代码,如何去在合适的地方嵌入自定义的东西。



 

 

  • 大小: 54.5 KB
0
0
分享到:
评论
3 楼 austincao 2012-07-23  
看来我也要潜下心来,多看看代码了,谢谢LZ分享这么好的文章,学习了~
2 楼 waytofall 2011-06-05  
waytofall 写道
兄弟你这源码能发一份吗?
waytofall916@gmail.com
hadoop包里源码和jar的版本竟然不匹配啊……

哦,好吧,我看错了……
原来在另一个mapreduce包里……
嗯,文章写得不错,赞钻研精神~
(新版和旧版很坑爹)
1 楼 waytofall 2011-06-05  
兄弟你这源码能发一份吗?
waytofall916@gmail.com
hadoop包里源码和jar的版本竟然不匹配啊……

相关推荐

    Hadoop MapReduce实战手册(完整版)

    总之,《Hadoop MapReduce实战手册》全面覆盖了MapReduce的基本概念、工作流程、编程模型以及在大数据处理中的实际应用,是学习和理解大数据处理技术的理想读物。通过深入阅读,读者可以提升在大数据环境下的编程和...

    Hadoop MapReduce Cookbook 源码

    《Hadoop MapReduce Cookbook 源码》是一本专注于实战的书籍,旨在帮助读者通过具体的例子深入理解并掌握Hadoop MapReduce技术。MapReduce是大数据处理领域中的核心组件,尤其在处理大规模分布式数据集时,它的重要...

    Hadoop MapReduce v2 Cookbook, 2nd Edition-Packt Publishing(2015) 高清完整版PDF下载

    **Hadoop MapReduce V2** 是Hadoop生态系统中的一个关键组件,用于处理大规模数据集。相较于V1版本,V2版本在架构上进行了重大改进,引入了**YARN(Yet Another Resource Negotiator)**来分离资源管理和任务调度/...

    hadoop mapreduce编程实战

    Hadoop MapReduce 是大数据处理的核心组件之一,它提供了一个编程模型和软件框架,用于大规模数据处理。下面是 Hadoop MapReduce 编程实战的知识点总结: MapReduce 编程基础 MapReduce 是一个编程模型,用于处理...

    Hadoop MapReduce v2 Cookbook.pdf

    Hadoop MapReduce是Apache Hadoop框架的核心组件之一,它设计用于分布式处理大规模数据集,而v2的引入主要是为了解决v1在资源管理和效率上的局限性。 MapReduce的工作原理分为两个主要阶段:Map阶段和Reduce阶段。...

    Hadoop MapReduce教程.pdf

    从给定的文件信息来看,文档标题为"Hadoop MapReduce教程.pdf",描述与标题相同,标签为"Hadoop Map Reduce",部分内容虽然无法完全解析,但可以推断出与Hadoop MapReduce的基本概念、操作流程以及相关的编程模型...

    Hadoop MapReduce.pdf

    ### Hadoop MapReduce知识点概述 #### 一、Hadoop MapReduce简介 ...对于初学者来说,理解MapReduce的工作流程以及其实现方式是非常重要的,这有助于更好地利用Hadoop平台进行复杂的数据分析任务。

    mapred.zip_hadoop_hadoop mapreduce_mapReduce

    在Hadoop生态系统中,MapReduce是一种分布式计算框架,它允许用户编写并运行处理大量数据的程序。这个"mapred.zip"文件显然包含了与Hadoop MapReduce相关的测试样例、文档和源码,这对于理解MapReduce的工作原理以及...

    Hadoop_MapReduce教程

    - Hadoop MapReduce 能够智能地将计算任务调度到存储数据的节点上,减少网络传输延迟。 - 这种设计使得 MapReduce 能够有效地利用集群内的网络资源。 **2. 容错机制** - 当某个任务失败时,JobTracker 会自动...

    (源码)基于Hadoop MapReduce的分布式数据处理系统.zip

    # 基于Hadoop MapReduce的分布式数据处理系统 ## 项目简介 本项目是一个基于Hadoop MapReduce框架的分布式数据处理系统,旨在处理大规模数据集并实现高效的数据分析和处理。项目涵盖了从数据输入、处理、输出到...

    基于Hadoop MapReduce的招聘信息数据分析项目代码+数据集.rar

    Hadoop MapReduce是Apache Hadoop框架中的核心组件,它允许开发者并行处理大规模数据集,尤其适合处理分布式存储在HDFS(Hadoop Distributed File System)上的数据。 MapReduce的工作流程主要分为两个阶段:Map...

    基于Hadoop MapReduce的高校考研分数线统计分析项目代码+数据集.rar

    该项目是关于利用Hadoop ...总的来说,"基于Hadoop MapReduce的高校考研分数线统计分析项目"是一个结合理论与实践的优秀案例,涵盖了大数据处理的基本流程,对于提升数据分析技能、理解和应用MapReduce具有很高的价值。

    基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据.zip

    在大数据处理领域,Hadoop 是一个至关...通过实际操作,我们可以深入了解大数据处理的流程,并学习如何利用 Hadoop 解决实际问题。在人工智能的背景下,这种能力尤为重要,因为高效的数据处理是许多 AI 应用的基础。

    云计算技术实验报告三运行Hadoop MapReduce程序

    通过这次实验,参与者不仅掌握了Hadoop MR程序的开发流程,还深化了对Hadoop MapReduce工作流程的理解,包括Map阶段的数据分区、排序和Shuffle过程,以及Reduce阶段的聚合计算。同时,对于如何打包和执行jar文件也有...

    Hadoop Mapreduce Cookbook(英文版)

    《Hadoop MapReduce Cookbook》是一本专为大数据处理和分析领域的专业人士编写的指南,它深入浅出地介绍了如何使用Hadoop MapReduce框架解决实际问题。MapReduce是Hadoop生态系统中的核心组件,它允许用户在分布式...

    Hadoop应用系列2--MapReduce原理浅析(上)

    在IT行业中,分布式计算系统是处理大规模数据的关键技术之一,而Hadoop作为开源的分布式计算框架,其核心组件MapReduce则是实现这一目标的重要工具。本文将深入浅出地解析MapReduce的工作原理,帮助读者理解这一强大...

    基于Hadoop的简单网络爬虫,Hadoop MapReduce.zip

    本压缩包"基于Hadoop的简单网络爬虫,Hadoop MapReduce.zip"提供了一个利用Hadoop MapReduce实现的简单网络爬虫实例,旨在帮助我们理解如何在分布式环境中进行数据抓取和处理。 标题中的“基于Hadoop的简单网络爬虫...

Global site tag (gtag.js) - Google Analytics