`

Hadoop源码解析之: TextInputFormat如何处理跨split的行

 
阅读更多
 

Hadoop源码解析之: TextInputFormat如何处理跨split的行

 1767人阅读 评论(0) 收藏 举报

我们知道hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理:

  •  对输入数据进行切分,生成一组split,一个split会分发给一个mapper进行处理。
  •  针对每个split,再创建一个RecordReader读取Split内的数据,并按照<key,value>的形式组织成一条record传给map函数进行处理。

最常见的FormatInput就是TextInputFormat,在split的读取方面,它是将给到的Split按行读取,以行首字节在文件中的偏移做key,以行数据做value传给map函数处理,这部分的逻辑是由它所创建并使用的RecordReader:LineRecordReader封装和实现的.关于这部分逻辑,在一开始接触hadoop时会有一个常见的疑问:如果一个行被切分到两个split里(这几乎是一定会发生的情况),TextInputFormat是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如word count就是一个例子).搞清楚这个问题还是需要从源码入手了解TextInputFormat的详细工作方式,这里简单地梳理记录如下(本文参考的是hadoop1.1.2的源码):

 

1. LineRecordReader会创建一个org.apache.hadoop.util.LineReader实例,并依赖这个LineReader的readLine方法来读取一行记录,具体可参考org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text),Line 176),那么关键的逻辑就在这个readLine方法里了,下面是添加了额外中文注释的该方法源码.这个方法主要的逻辑归纳起来是3点:

  • 总是是从buffer里读取数据,如果buffer里的数据读完了,先加载下一批数据到buffer
  • 在buffer中查找"行尾",将开始位置至行尾处的数据拷贝给str(也就是最后的Value).如果为遇到"行尾",继续加载新的数据到buffer进行查找.
  • 关键点在于:给到buffer的数据是直接从文件中读取的,完全不会考虑是否超过了split的界限,而是一直读取到当前行结束为止

 

[java] view plaincopy
 
  1. /** 
  2.        * Read one line from the InputStream into the given Text.  A line 
  3.        * can be terminated by one of the following: '\n' (LF) , '\r' (CR), 
  4.        * or '\r\n' (CR+LF).  EOF also terminates an otherwise unterminated 
  5.        * line. 
  6.        * 
  7.        * @param str the object to store the given line (without newline) 
  8.        * @param maxLineLength the maximum number of bytes to store into str; 
  9.        *  the rest of the line is silently discarded. 
  10.        * @param maxBytesToConsume the maximum number of bytes to consume 
  11.        *  in this call.  This is only a hint, because if the line cross 
  12.        *  this threshold, we allow it to happen.  It can overshoot 
  13.        *  potentially by as much as one buffer length. 
  14.        * 
  15.        * @return the number of bytes read including the (longest) newline 
  16.        * found. 
  17.        * 
  18.        * @throws IOException if the underlying stream throws 
  19.        */  
  20.       public int readLine(Text str, int maxLineLength,  
  21.                           int maxBytesToConsume) throws IOException {  
  22.         /* We're reading data from in, but the head of the stream may be 
  23.          * already buffered in buffer, so we have several cases: 
  24.          * 1. No newline characters are in the buffer, so we need to copy 
  25.          *    everything and read another buffer from the stream. 
  26.          * 2. An unambiguously terminated line is in buffer, so we just 
  27.          *    copy to str. 
  28.          * 3. Ambiguously terminated line is in buffer, i.e. buffer ends 
  29.          *    in CR.  In this case we copy everything up to CR to str, but 
  30.          *    we also need to see what follows CR: if it's LF, then we 
  31.          *    need consume LF as well, so next call to readLine will read 
  32.          *    from after that. 
  33.          * We use a flag prevCharCR to signal if previous character was CR 
  34.          * and, if it happens to be at the end of the buffer, delay 
  35.          * consuming it until we have a chance to look at the char that 
  36.          * follows. 
  37.          */  
  38.         str.clear();  
  39.         int txtLength = 0//tracks str.getLength(), as an optimization  
  40.         int newlineLength = 0//length of terminating newline  
  41.         boolean prevCharCR = false//true of prev char was CR  
  42.         long bytesConsumed = 0;  
  43.         do {  
  44.           int startPosn = bufferPosn; //starting from where we left off the last time  
  45.           //如果buffer中的数据读完了,先加载一批数据到buffer里  
  46.           if (bufferPosn >= bufferLength) {  
  47.             startPosn = bufferPosn = 0;  
  48.             if (prevCharCR)  
  49.               ++bytesConsumed; //account for CR from previous read  
  50.             bufferLength = in.read(buffer);  
  51.             if (bufferLength <= 0)  
  52.               break// EOF  
  53.           }  
  54.           //注意:这里的逻辑有点tricky,由于不同操作系统对“行结束符“的定义不同:  
  55.           //UNIX: '\n'  (LF)  
  56.           //Mac:  '\r'  (CR)  
  57.           //Windows: '\r\n'  (CR)(LF)  
  58.           //为了准确判断一行的结尾,程序的判定逻辑是:  
  59.           //1.如果当前符号是LF,可以确定一定是到了行尾,但是需要参考一下前一个  
  60.           //字符,因为如果前一个字符是CR,那就是windows文件,“行结束符的长度”  
  61.           //(即变量:newlineLength,这个变量名起的有点糟糕)应该是2,否则就是UNIX文件,“行结束符的长度”为1。  
  62.           //2.如果当前符号不是LF,看一下前一个符号是不是CR,如果是也可以确定一定上个字符就是行尾了,这是一个mac文件。  
  63.           //3.如果当前符号是CR的话,还需要根据下一个字符是不是LF判断“行结束符的长度”,所以只是标记一下prevCharCR=true,供读取下个字符时参考。  
  64.           for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline  
  65.             if (buffer[bufferPosn] == LF) {  
  66.               newlineLength = (prevCharCR) ? 2 : 1;  
  67.               ++bufferPosn; // at next invocation proceed from following byte  
  68.               break;  
  69.             }  
  70.             if (prevCharCR) { //CR + notLF, we are at notLF  
  71.               newlineLength = 1;  
  72.               break;  
  73.             }  
  74.             prevCharCR = (buffer[bufferPosn] == CR);  
  75.           }  
  76.           int readLength = bufferPosn - startPosn;  
  77.           if (prevCharCR && newlineLength == 0)  
  78.             --readLength; //CR at the end of the buffer  
  79.           bytesConsumed += readLength;  
  80.           int appendLength = readLength - newlineLength;  
  81.           if (appendLength > maxLineLength - txtLength) {  
  82.             appendLength = maxLineLength - txtLength;  
  83.           }  
  84.           if (appendLength > 0) {  
  85.             str.append(buffer, startPosn, appendLength);  
  86.             txtLength += appendLength;         
  87.           }//newlineLength == 0 就意味着始终没有读到行尾,程序会继续通过文件输入流继续从文件里读取数据。  
  88.           //这里有一个非常重要的地方:in的实例创建自构造函数:org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit)  
  89.           //第86行:FSDataInputStream fileIn = fs.open(split.getPath()); 我们看以看到:  
  90.           //对于LineRecordReader:当它对取“一行”时,一定是读取到完整的行,不会受filesplit的任何影响,因为它读取是filesplit所在的文件,而不是限定在filesplit的界限范围内。  
  91.           //所以不会出现“断行”的问题!  
  92.         } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);  
  93.       
  94.         if (bytesConsumed > (long)Integer.MAX_VALUE)  
  95.           throw new IOException("Too many bytes before newline: " + bytesConsumed);      
  96.         return (int)bytesConsumed;  
  97.       }  

 

2. 按照readLine的上述行为,在遇到跨split的行时,会到下一个split继续读取数据直至行尾,那么下一个split怎么判定开头的一行有没有被上一个split的LineRecordReader读取过从而避免漏读或重复读取开头一行呢?这方面LineRecordReader使用了一个简单而巧妙的方法:既然无法断定每一个split开始的一行是独立的一行还是被切断的一行的一部分,那就跳过每个split的开始一行(当然要除第一个split之外),从第二行开始读取,然后在到达split的结尾端时总是再多读一行,这样数据既能接续起来又避开了断行带来的麻烦.以下是相关的源码:

在LineRecordReader的构造函数org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit) 108到113行确定start位置时,明确注明::会特别地忽略掉第一行!

 

[java] view plaincopy
 
  1. // If this is not the first split, we always throw away first record  
  2.     // because we always (except the last split) read one extra line in  
  3.     // next() method.  
  4.     if (start != 0) {  
  5.       start += in.readLine(new Text(), 0, maxBytesToConsume(start));  
  6.     }  

相应地,在LineRecordReader判断是否还有下一行的方法:org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text) 170到173行中,while使用的判定条件是:当前位置小于或等于split的结尾位置,也就说:当当前以处于split的结尾位置上时,while依然会执行一次,这一次读到显然已经是下一个split的开始行了!

 

 

[java] view plaincopy
 
  1. // We always read one extra line, which lies outside the upper  
  2. // split limit i.e. (end - 1)  
  3. while (getFilePosition() <= end) {  

小结:

至此,跨split的行读取的逻辑就完备了.如果引申地来看,这是map-reduce前期数据切分的一个普遍性问题,即不管我们用什么方式切分和读取一份大数据中的小部分,包括我们在实现自己的InputFormat时,都会面临在切分处数据时的连续性解析问题. 对此我们应该深刻地认识到:split最直接的现实作用是取出大数据中的一小部分给mapper处理,但这只是一种"逻辑"上的,"宏观"上的切分,在"微观"上,在split的首尾切分处,为了确保数据连续性,跨越split接续并拼接数据也是完全正当和合理的.

分享到:
评论

相关推荐

    hadoop 源码解析_yarn源码解析

    Hadoop 源码解析_Yarn 源码解析 Hadoop 是一个基于 Java 的大数据处理框架,Yarn 是 Hadoop 的资源管理器,负责资源分配、任务调度和集群管理。下面是 Yarn 源码解析的知识点: 1. MR 程序提交 MR(MapReduce)...

    Hadoop源码分析视频下载

    - 源码解析:深入Hadoop源码,研究如NameNode、DataNode、MapTask和ReduceTask等关键类的功能实现。 - 故障恢复和容错机制:探讨Hadoop如何处理硬件故障,保持数据完整性。 - 性能调优:学习如何通过调整参数和...

    Hadoop海量数据处理:技术详解与项目实战 pdf

    根据提供的文件信息,我们可以聚焦于标题和描述中的关键信息来生成相关的IT知识点,尤其是关于Hadoop海量数据处理的技术详解。 ### Hadoop海量数据处理:技术详解与项目实战 #### Hadoop简介 Hadoop是一个能够对...

    Hadoop源码分析(完整版)

    Hadoop源码分析是深入理解Hadoop分布式计算平台原理的起点,通过源码分析,可以更好地掌握Hadoop的工作机制、关键组件的实现方式和内部通信流程。Hadoop项目包括了多个子项目,其中最核心的是HDFS和MapReduce,这两...

    Hadoop源代码分析(完整版).pdf

    HDFS 是 Hadoop 的核心组件之一,是一个分布式文件系统。HDFS 的主要功能是提供一个高可靠、高可扩展的文件系统,可以存储大量的数据。HDFS 的架构主要包括以下几个部分: * Namenode:负责管理文件系统的命名空间...

    实战hadoop中的源码

    【标题】"实战hadoop中的源码"涵盖了在大数据处理领域深入理解并应用Apache Hadoop的核心技术。Hadoop是开源的分布式计算框架,它允许在大规模集群上存储和处理海量数据。通过研究Hadoop的源码,开发者可以深入了解...

    王家林的“云计算分布式大数据Hadoop实战高手之路---从零开始”的第八讲Hadoop图文训练课程:Hadoop文件系统的操作

    Hadoop作为一个开源的分布式存储与计算平台,已经成为处理大数据的关键技术之一。王家林的“云计算分布式大数据Hadoop实战高手之路”系列教程,通过三个不同的学习阶段,帮助学习者从零基础开始,逐步成为Hadoop领域...

    hadoop 源码解析-DataNode

    Hadoop 源码解析 - DataNode Hadoop 作为一个大数据处理框架,其核心组件之一是分布式文件系统(HDFS),而 DataNode 是 HDFS 中的重要组件之一。DataNode 负责存储和管理数据块,提供数据访问服务。本文将对 ...

    hadoop2.7汇总:新增功能最新编译64位安装、源码包、API、eclipse插件下载

    - **JDK 7u67**:Hadoop 2.7 推荐使用的 JDK 版本之一,用于编译和运行 Hadoop 应用。 - **protobuf-2.5.0**:提供了序列化和反序列化数据的服务,是 Hadoop 生态系统中的一个重要组件。 - **Apache Ant 1.9.4**:...

    hadoop源码编译安装包及安装步骤

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分布式存储。本文将详细介绍如何编译和安装Hadoop源码,确保你能够按照提供的步骤顺利进行。 首先,我们需要了解Hadoop的基本概念。Hadoop是由...

    Hadoop源码的入门解析

    Hadoop是一个开源的分布式计算框架,源于Apache Lucene项目,主要负责大规模数据的分布式存储和处理。它由几个核心组件构成,包括...对于分布式计算和云计算领域的专业人士来说,掌握Hadoop源码解析是一项重要的技能。

    hadoop2.7.3源码包,hadoop2.7.3zip源码包

    Hadoop是大数据处理领域中的一个核心框架,主要由Apache软件基金会开发。Hadoop 2.7.3是其一个稳定...通过这个源码包,你可以探索Hadoop如何处理大数据,学习分布式系统的设计原则,以及如何利用Java实现这样的系统。

    hadoop权威指南4和源码

    总的来说,《Hadoop权威指南4》结合源码,是一套全面的学习资料,涵盖了从理论到实践的各个方面,对于想要深入理解Hadoop并利用其处理大数据的人员来说,具有极高的参考价值。通过这本书和源码的学习,你可以提升...

    Hadoop源码编译好的源码(eclipse可直接导入)

    Hadoop是大数据处理领域中的一个核心框架,由Apache软件基金会开发并维护。它主要由HDFS(Hadoop Distributed File System)和MapReduce两大部分组成,为海量数据的存储和计算提供了可靠的解决方案。本资源提供了...

    hadoop 2.5.2 源码

    Hadoop 2.5.2源码分析 Hadoop是一个开源框架,主要用于处理和存储大量数据,它由Apache软件基金会开发并维护。Hadoop 2.5.2是Hadoop发展过程中的一个重要版本,它引入了许多改进和优化,旨在提高系统的稳定性和性能...

    Hadoop源码包

    Hadoop是大数据处理领域的重要框架,它以分布式计算模型为基础,为海量数据处理提供了高效、可靠的解决方案。Hadoop-2.6.4源码包包含了Hadoop的核心组件和相关模块,是理解Hadoop工作原理、进行二次开发或优化的基础...

    hadoop 文档:Hadoop开发者下载

    Hadoop是大数据处理领域的一个核心框架,主要用于分布式存储和计算。这个文档集合应该是关于Hadoop开发者的下载资源,可能包含了源代码、开发工具和其他相关资料。由于没有具体的描述,我将根据一般Hadoop开发者的...

    Hadoop技术内幕:深入解析YARN架构设计与实现原理 高清完整中文版PDF下载

    《Hadoop技术内幕:深入解析YARN架构设计与实现原理》是一本专注于Hadoop生态系统中资源管理和调度核心组件——YARN(Yet Another Resource Negotiator)的专著。这本书全面介绍了YARN的架构、设计思想以及实际操作...

Global site tag (gtag.js) - Google Analytics