`

Hadoop MapReduce中如何处理跨行Block和UnputSplit

阅读更多

Hadoop的初学者经常会疑惑这样两个问题:1.Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?2.在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,如果被分成两个InputSplit,这样一个InputSplit里面就有一行不完整的数据,那么处理这个InputSplit的Mapper会不会得出不正确的结果?

对于上面的两个问题,首先要明确两个概念:Block和InputSplit

      1. block是hdfs存储文件的单位(默认是64M);
      2. InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。

因此,以行记录形式的文本,还真可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。

 

  public List<InputSplit> getSplits(JobContext job) throws IOException {
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);      
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                     blkLocations[blkIndex].getHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkLocations.length-1].getHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    LOG.debug("Total # of splits: " + splits.size());
    return splits;
  }

  从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize

对文件进行切分,splitSize = computeSplitSize(blockSize, minSize, maxSize);blockSize,minSize,maxSize都可以配置,默认splitSize 就等于blockSize的默认值(64m)。

FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,其可能被切分到不同的InputSplit。但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成,在Hadoop里,记录行形式的文本,通常采用默认的TextInputFormat,TextInputFormat关联的是LineRecordReader,下面我们来看看LineRecordReader的的nextKeyValue方法里读取文件的代码:

 

    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }

 

 其读取文件是通过LineReader(in就是一个LineReader实例)的readLine方法完成的:

 

  public int readLine(Text str, int maxLineLength,
                      int maxBytesToConsume) throws IOException {
    if (this.recordDelimiterBytes != null) {
      return readCustomLine(str, maxLineLength, maxBytesToConsume);
    } else {
      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
    }
  }

  /**
   * Read a line terminated by one of CR, LF, or CRLF.
   */
  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
  throws IOException {
    str.clear();
    int txtLength = 0; //tracks str.getLength(), as an optimization
    int newlineLength = 0; //length of terminating newline
    boolean prevCharCR = false; //true of prev char was CR
    long bytesConsumed = 0;
    do {
      int startPosn = bufferPosn; //starting from where we left off the last time
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        if (prevCharCR)
          ++bytesConsumed; //account for CR from previous read
        bufferLength = in.read(buffer);
        if (bufferLength <= 0)
          break; // EOF
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
        if (buffer[bufferPosn] == LF) {
          newlineLength = (prevCharCR) ? 2 : 1;
          ++bufferPosn; // at next invocation proceed from following byte
          break;
        }
        if (prevCharCR) { //CR + notLF, we are at notLF
          newlineLength = 1;
          break;
        }
        prevCharCR = (buffer[bufferPosn] == CR);
      }
      int readLength = bufferPosn - startPosn;
      if (prevCharCR && newlineLength == 0)
        --readLength; //CR at the end of the buffer
      bytesConsumed += readLength;
      int appendLength = readLength - newlineLength;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);   //①

    if (bytesConsumed > (long)Integer.MAX_VALUE)
      throw new IOException("Too many bytes before newline: " + bytesConsumed);    
    return (int)bytesConsumed;
  }

  我们分析下readDefaultLine方法,do-while循环体主要是读取文件,然后遍历读取的内容,找到默认的换行符就终止循环。前面说,对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取。这就体现在上述代码的While循环的终止条件上:

while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

newlineLength==0则以为一次do-while循环中读取的内容中没有遇到换行符,因maxBytesToConsume的默认值为Integer.MAX_VALUE,所以如果读取的内容没有遇到换行符,则会一直读取下去,知道读取的内容超过maxBytesToConsume。这样的出来方式,解决了一行记录跨InputSplit的读取问题,同样也会造成下面两个疑问:

1.既然在LineReader读取方法里面没有对考虑InputSplit的end进行处理,难道读取一个InputSplit的时候,会这样无限的读取下去么?

2.如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如果做到不读取L这条记录在B中的部分呢?

为了解决这两个问题,Hadoop通过下面的代码来做到:LineRecordReader的nextKeyValue方法。

 

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {         //②
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      pos += newSize;
      inputByteCounter.increment(newSize);
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

 

    通过代码②处得While条件,就保证了InputSplit读取边界的问题,如果存在跨InputSplit的记录,也只好跨InputSplit读取一次。

     再来看LineRecordReader的initialize方法:

    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;

    如果不是第一InputSplit,则在读取的时候,LineRecordReader会自动忽略掉第一个换行符之前的所有内容,这样就不存在重读读取的问题。

 

此次,前面提到的两个问题就回到完了。。。。。

 

 

 

 

 

 

 

 

2
0
分享到:
评论
3 楼 fibers 2012-08-01  
上面问题有点错误,更正一下
BlockLocation :
private String[] hosts; //hostnames of datanodes
private String[] names; //hostname:portNumber of datanodes
private String[] topologyPaths; // full path name in network topology
private long offset;  //offset of the of the block in the file
private long length;

假如文件:100M, splitsize:50M, block:64M,
那么刚开始BlockLocation[]的数据应该是(names,topologypaths就不列了)
[hostA, 0, 64M],[hostB, 64M, 34M]
最后计算完的inputsplit里的数据应该是
[path, 0, 50M, hostA],[path, 50M, 50M, hostB]

可是50M-64M应该是在A里的。。

这是为什么?谢谢

2 楼 fibers 2012-08-01  
提个问题。。。这段代码我也看过。。。当初就有一个挺纳闷的事情。。也是关于跨block的。

源代码是这样子的。 hadoop-0.20.1的源码:
for (FileStatus file: listStatus(job)) {
      Path path = file.getPath();
      FileSystem fs = path.getFileSystem(job.getConfiguration());
      long length = file.getLen();
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
      if ((length != 0) && isSplitable(job, path)) {
        long blockSize = file.getBlockSize();
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);

        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
                                   blkLocations[blkIndex].getHosts()));
          bytesRemaining -= splitSize;
        }
       if (bytesRemaining != 0) {
          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
          blkLocations[blkLocations.length-1].getHosts()));
        }
      } else if (length != 0) {
        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
      } else {
        //Create empty hosts array for zero length files
        splits.add(new FileSplit(path, 0, length, new String[0]));
      }
    }
    LOG.debug("Total # of splits: " + splits.size());
    return splits;
  }
分成的InputSplit里应该是记录了, [Path, 起始位置,split的长度,所在的Host].
但如果我的文件是100M , splitsize是 50M, 当前Block是64M, 那么1M-50M的文件是在一个block A里,host A, 50M-64M也是在block A里, host A里,64M-100M在block B, host B里。。。那么按如上代码FileSplit记录的应该是[path, 0, 50M, hostA][path, 50M, 50M, hostA]

1 楼 austincao 2012-07-23  
分析很到位!学习了~

相关推荐

    Hadoop mapreduce实现wordcount

    【标题】Hadoop MapReduce 实现 WordCount ...通过理解和实践 Hadoop MapReduce 的 WordCount 示例,开发者可以快速掌握 MapReduce 的基本工作原理,为进一步学习和应用大数据处理技术打下坚实基础。

    Hadoop MapReduce实现tfidf源码

    本篇文章将详细讲解如何利用Hadoop MapReduce实现TF-IDF(Term Frequency-Inverse Document Frequency)算法,这是一种在信息检索和文本挖掘中用于评估一个词在文档中的重要性的统计方法。 首先,我们要理解TF-IDF...

    大数据 hadoop mapreduce 词频统计

    在这个过程中,Hadoop MapReduce通过并行化处理和容错机制,能够高效地处理大规模数据,即使在硬件故障的情况下也能确保数据完整性。同时,MapReduce的编程模型相对简单,使得开发者能够专注于业务逻辑,而不是底层...

    Hadoop MapReduce实战手册(完整版)

    MapReduce是Hadoop生态系统中的核心组件之一,用于处理和生成大规模数据集。该书旨在帮助读者理解并掌握如何使用MapReduce解决实际的大数据问题。 MapReduce的核心理念是将复杂的分布式计算任务分解为两个主要阶段...

    Hadoop MapReduce Cookbook 源码

    Map阶段负责数据的拆分和处理,将原始输入数据分解为键值对,并发送到各个工作节点进行并行处理。Reduce阶段则负责整合Map阶段的结果,执行聚合操作,最终生成所需的输出。这一过程确保了数据处理的高效性和可扩展性...

    python hadoop mapreduce 相似用户|mapreduce.rar

    总之,这个项目旨在展示如何用Python和Hadoop MapReduce解决社交网络中的相似用户分析问题。虽然代码可能不够精致,但它提供了一个起点,让人们了解如何在实际问题中应用这两个工具。对于想要进一步学习大数据处理的...

    Hadoop MapReduce v2 Cookbook, 2nd Edition-Packt Publishing(2015) 高清完整版PDF下载

    **Hadoop MapReduce V2** 是Hadoop生态系统中的一个关键组件,用于处理大规模数据集。相较于V1版本,V2版本在架构上进行了重大改进,引入了**YARN(Yet Another Resource Negotiator)**来分离资源管理和任务调度/...

    10.Hadoop MapReduce教程1

    Hadoop MapReduce 是 Hadoop 生态系统中的一部分,它是一种可靠的、可扩展的并行处理框架,用于处理大规模数据集。MapReduce 是一种编程模型,它将计算任务分解为两个阶段:Map 阶段和 Reduce 阶段。 Map 阶段的...

    基于Apriori算法的频繁项集Hadoop mapreduce

    而Hadoop MapReduce则是一个分布式计算框架,能够处理和存储海量数据。现在我们详细探讨这两个概念以及它们如何协同工作。 首先,让我们理解Apriori算法的基本原理。Apriori算法由Raghu Ramakrishnan和Gehrke在1994...

    hadoop mapreduce编程实战

    Hadoop MapReduce 是大数据处理的核心组件之一,它提供了一个编程模型和软件框架,用于大规模数据处理。下面是 Hadoop MapReduce 编程实战的知识点总结: MapReduce 编程基础 MapReduce 是一个编程模型,用于处理...

    Hadoop MapReduce.md

    结论: 本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: ...2、有运行效率问题,MapReduce 需要将中间产生的数据保存到硬盘中,因此会有读写数据延迟问题。 3、不支持实时处理,它原始的设计就是以批处理为主。

    Hadoop MapReduce v2 Cookbook.pdf

    Hadoop MapReduce是Apache Hadoop框架的核心组件之一,它设计用于分布式处理大规模数据集,而v2的引入主要是为了解决v1在资源管理和效率上的局限性。 MapReduce的工作原理分为两个主要阶段:Map阶段和Reduce阶段。...

    hadoop mapreduce helloworld 能调试

    在大数据处理领域,Hadoop MapReduce 是一个至关重要的框架,它允许开发者编写分布式应用程序来处理海量数据。...在实际操作中,不断实践和理解 Hadoop 生态系统将使你能够更好地驾驭这个强大的大数据处理工具。

    Hadoop MapReduce v2 Cookbook (第二版)

    Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing

    Hadoop mapreduce 实现KMeans

    在大数据处理领域,Hadoop MapReduce 是一种广泛使用的分布式计算框架,它允许高效地处理海量数据。KMeans 是一种常见的无监督机器学习算法,用于聚类分析,将数据集中的对象按照相似性分组成不同的簇。现在我们来...

    Hadoop MapReduce教程.pdf

    从给定的文件信息来看,文档标题为"Hadoop MapReduce教程.pdf",描述与标题相同,标签为"Hadoop Map Reduce...通过合理设计Map和Reduce函数,可以充分发挥Hadoop MapReduce的性能优势,解决实际问题中的数据处理难题。

    基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip

    基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...

    Hadoop mapreduce 实现MR_DesicionTreeBuilder 决策树

    Hadoop MapReduce 是 Apache Hadoop 的核心组件之一,它提供了一种处理和存储大规模数据集的并行计算模型。Map 阶段将输入数据分割成小块,并在各个节点上并行运行映射任务。Reduce 阶段则负责收集映射阶段的结果,...

    hadoop mapreduce 例子项目,运行了单机wordcount

    Hadoop MapReduce是一种分布式计算框架,它允许在大型数据集上进行并行处理。这个例子项目是关于在单机环境中运行WordCount程序的,这是一个经典的MapReduce示例,用于统计文本文件中每个单词出现的次数。 首先,让...

Global site tag (gtag.js) - Google Analytics