`

Hadoop源码解析之: TextInputFormat如何处理跨split的行

 
阅读更多
 

Hadoop源码解析之: TextInputFormat如何处理跨split的行

 1767人阅读 评论(0) 收藏 举报

我们知道hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理:

  •  对输入数据进行切分,生成一组split,一个split会分发给一个mapper进行处理。
  •  针对每个split,再创建一个RecordReader读取Split内的数据,并按照<key,value>的形式组织成一条record传给map函数进行处理。

最常见的FormatInput就是TextInputFormat,在split的读取方面,它是将给到的Split按行读取,以行首字节在文件中的偏移做key,以行数据做value传给map函数处理,这部分的逻辑是由它所创建并使用的RecordReader:LineRecordReader封装和实现的.关于这部分逻辑,在一开始接触hadoop时会有一个常见的疑问:如果一个行被切分到两个split里(这几乎是一定会发生的情况),TextInputFormat是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如word count就是一个例子).搞清楚这个问题还是需要从源码入手了解TextInputFormat的详细工作方式,这里简单地梳理记录如下(本文参考的是hadoop1.1.2的源码):

 

1. LineRecordReader会创建一个org.apache.hadoop.util.LineReader实例,并依赖这个LineReader的readLine方法来读取一行记录,具体可参考org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text),Line 176),那么关键的逻辑就在这个readLine方法里了,下面是添加了额外中文注释的该方法源码.这个方法主要的逻辑归纳起来是3点:

  • 总是是从buffer里读取数据,如果buffer里的数据读完了,先加载下一批数据到buffer
  • 在buffer中查找"行尾",将开始位置至行尾处的数据拷贝给str(也就是最后的Value).如果为遇到"行尾",继续加载新的数据到buffer进行查找.
  • 关键点在于:给到buffer的数据是直接从文件中读取的,完全不会考虑是否超过了split的界限,而是一直读取到当前行结束为止

 

[java] view plaincopy
 
  1. /** 
  2.        * Read one line from the InputStream into the given Text.  A line 
  3.        * can be terminated by one of the following: '\n' (LF) , '\r' (CR), 
  4.        * or '\r\n' (CR+LF).  EOF also terminates an otherwise unterminated 
  5.        * line. 
  6.        * 
  7.        * @param str the object to store the given line (without newline) 
  8.        * @param maxLineLength the maximum number of bytes to store into str; 
  9.        *  the rest of the line is silently discarded. 
  10.        * @param maxBytesToConsume the maximum number of bytes to consume 
  11.        *  in this call.  This is only a hint, because if the line cross 
  12.        *  this threshold, we allow it to happen.  It can overshoot 
  13.        *  potentially by as much as one buffer length. 
  14.        * 
  15.        * @return the number of bytes read including the (longest) newline 
  16.        * found. 
  17.        * 
  18.        * @throws IOException if the underlying stream throws 
  19.        */  
  20.       public int readLine(Text str, int maxLineLength,  
  21.                           int maxBytesToConsume) throws IOException {  
  22.         /* We're reading data from in, but the head of the stream may be 
  23.          * already buffered in buffer, so we have several cases: 
  24.          * 1. No newline characters are in the buffer, so we need to copy 
  25.          *    everything and read another buffer from the stream. 
  26.          * 2. An unambiguously terminated line is in buffer, so we just 
  27.          *    copy to str. 
  28.          * 3. Ambiguously terminated line is in buffer, i.e. buffer ends 
  29.          *    in CR.  In this case we copy everything up to CR to str, but 
  30.          *    we also need to see what follows CR: if it's LF, then we 
  31.          *    need consume LF as well, so next call to readLine will read 
  32.          *    from after that. 
  33.          * We use a flag prevCharCR to signal if previous character was CR 
  34.          * and, if it happens to be at the end of the buffer, delay 
  35.          * consuming it until we have a chance to look at the char that 
  36.          * follows. 
  37.          */  
  38.         str.clear();  
  39.         int txtLength = 0//tracks str.getLength(), as an optimization  
  40.         int newlineLength = 0//length of terminating newline  
  41.         boolean prevCharCR = false//true of prev char was CR  
  42.         long bytesConsumed = 0;  
  43.         do {  
  44.           int startPosn = bufferPosn; //starting from where we left off the last time  
  45.           //如果buffer中的数据读完了,先加载一批数据到buffer里  
  46.           if (bufferPosn >= bufferLength) {  
  47.             startPosn = bufferPosn = 0;  
  48.             if (prevCharCR)  
  49.               ++bytesConsumed; //account for CR from previous read  
  50.             bufferLength = in.read(buffer);  
  51.             if (bufferLength <= 0)  
  52.               break// EOF  
  53.           }  
  54.           //注意:这里的逻辑有点tricky,由于不同操作系统对“行结束符“的定义不同:  
  55.           //UNIX: '\n'  (LF)  
  56.           //Mac:  '\r'  (CR)  
  57.           //Windows: '\r\n'  (CR)(LF)  
  58.           //为了准确判断一行的结尾,程序的判定逻辑是:  
  59.           //1.如果当前符号是LF,可以确定一定是到了行尾,但是需要参考一下前一个  
  60.           //字符,因为如果前一个字符是CR,那就是windows文件,“行结束符的长度”  
  61.           //(即变量:newlineLength,这个变量名起的有点糟糕)应该是2,否则就是UNIX文件,“行结束符的长度”为1。  
  62.           //2.如果当前符号不是LF,看一下前一个符号是不是CR,如果是也可以确定一定上个字符就是行尾了,这是一个mac文件。  
  63.           //3.如果当前符号是CR的话,还需要根据下一个字符是不是LF判断“行结束符的长度”,所以只是标记一下prevCharCR=true,供读取下个字符时参考。  
  64.           for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline  
  65.             if (buffer[bufferPosn] == LF) {  
  66.               newlineLength = (prevCharCR) ? 2 : 1;  
  67.               ++bufferPosn; // at next invocation proceed from following byte  
  68.               break;  
  69.             }  
  70.             if (prevCharCR) { //CR + notLF, we are at notLF  
  71.               newlineLength = 1;  
  72.               break;  
  73.             }  
  74.             prevCharCR = (buffer[bufferPosn] == CR);  
  75.           }  
  76.           int readLength = bufferPosn - startPosn;  
  77.           if (prevCharCR && newlineLength == 0)  
  78.             --readLength; //CR at the end of the buffer  
  79.           bytesConsumed += readLength;  
  80.           int appendLength = readLength - newlineLength;  
  81.           if (appendLength > maxLineLength - txtLength) {  
  82.             appendLength = maxLineLength - txtLength;  
  83.           }  
  84.           if (appendLength > 0) {  
  85.             str.append(buffer, startPosn, appendLength);  
  86.             txtLength += appendLength;         
  87.           }//newlineLength == 0 就意味着始终没有读到行尾,程序会继续通过文件输入流继续从文件里读取数据。  
  88.           //这里有一个非常重要的地方:in的实例创建自构造函数:org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit)  
  89.           //第86行:FSDataInputStream fileIn = fs.open(split.getPath()); 我们看以看到:  
  90.           //对于LineRecordReader:当它对取“一行”时,一定是读取到完整的行,不会受filesplit的任何影响,因为它读取是filesplit所在的文件,而不是限定在filesplit的界限范围内。  
  91.           //所以不会出现“断行”的问题!  
  92.         } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);  
  93.       
  94.         if (bytesConsumed > (long)Integer.MAX_VALUE)  
  95.           throw new IOException("Too many bytes before newline: " + bytesConsumed);      
  96.         return (int)bytesConsumed;  
  97.       }  

 

2. 按照readLine的上述行为,在遇到跨split的行时,会到下一个split继续读取数据直至行尾,那么下一个split怎么判定开头的一行有没有被上一个split的LineRecordReader读取过从而避免漏读或重复读取开头一行呢?这方面LineRecordReader使用了一个简单而巧妙的方法:既然无法断定每一个split开始的一行是独立的一行还是被切断的一行的一部分,那就跳过每个split的开始一行(当然要除第一个split之外),从第二行开始读取,然后在到达split的结尾端时总是再多读一行,这样数据既能接续起来又避开了断行带来的麻烦.以下是相关的源码:

在LineRecordReader的构造函数org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit) 108到113行确定start位置时,明确注明::会特别地忽略掉第一行!

 

[java] view plaincopy
 
  1. // If this is not the first split, we always throw away first record  
  2.     // because we always (except the last split) read one extra line in  
  3.     // next() method.  
  4.     if (start != 0) {  
  5.       start += in.readLine(new Text(), 0, maxBytesToConsume(start));  
  6.     }  

相应地,在LineRecordReader判断是否还有下一行的方法:org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text) 170到173行中,while使用的判定条件是:当前位置小于或等于split的结尾位置,也就说:当当前以处于split的结尾位置上时,while依然会执行一次,这一次读到显然已经是下一个split的开始行了!

 

 

[java] view plaincopy
 
  1. // We always read one extra line, which lies outside the upper  
  2. // split limit i.e. (end - 1)  
  3. while (getFilePosition() <= end) {  

小结:

至此,跨split的行读取的逻辑就完备了.如果引申地来看,这是map-reduce前期数据切分的一个普遍性问题,即不管我们用什么方式切分和读取一份大数据中的小部分,包括我们在实现自己的InputFormat时,都会面临在切分处数据时的连续性解析问题. 对此我们应该深刻地认识到:split最直接的现实作用是取出大数据中的一小部分给mapper处理,但这只是一种"逻辑"上的,"宏观"上的切分,在"微观"上,在split的首尾切分处,为了确保数据连续性,跨越split接续并拼接数据也是完全正当和合理的.

分享到:
评论

相关推荐

    MapReduce源码流程.pdf

    3.2.4. `LineRecordReader.initialize()`确保单词不会跨split边界,例如通过确保每个split从行的开头开始读取。 3.3. 在map阶段,`mapper.run()`中的`mapContext`提供了上下文信息,使得我们可以访问和控制map的...

    分布式电源接入配电网的技术挑战与解决方案:风光互补无功补偿及PSO优化

    内容概要:本文探讨了分布式电源(DG)接入配电网所带来的技术挑战及其解决方案。首先介绍了DG接入对配电网潮流分布和电压稳定性的影响,随后详细讨论了风光互补无功补偿技术的应用,旨在稳定电压和提高电能质量。接着,文章阐述了粒子群算法(PSO)在电气互联和故障点位定位中的应用,展示了其在优化电网拓扑结构和快速准确定位故障方面的优势。最后,通过Simulink建模和仿真实验,验证了所提出的方法和技术的有效性。 适合人群:从事电力系统研究、分布式电源集成、智能电网优化的专业人士,以及对相关技术感兴趣的工程技术人员。 使用场景及目标:适用于分布式电源接入配电网的设计与优化,特别是在解决电压波动、无功补偿不足和故障定位不准等问题时。目标是提升配电网的稳定性和效率,确保电力系统的可靠运行。 其他说明:文中提供了多个Matlab和Python代码示例,用于具体实现风光互补无功补偿、粒子群优化算法以及Simulink仿真模型,便于读者理解和实践。

    基于博途V15的1500系列PLC六层电梯SCL编程与梯形图实现

    内容概要:本文详细介绍了使用博途V15软件和1500系列PLC实现单部六层电梯控制系统的SCL编程方法及其梯形图实现。主要内容涵盖电梯的基本控制逻辑,如楼层升降、平层停靠、呼叫响应等。文中通过具体代码示例展示了如何定义关键变量、处理楼层呼叫信号、实现电梯运行和平层停靠逻辑。此外,还讨论了状态机的设计、方向决策算法以及开关门控制等重要环节。文章强调了SCL语言在处理复杂逻辑方面的优势,并对比了梯形图在故障诊断时的直观性。 适合人群:对工业自动化控制感兴趣的技术人员,尤其是熟悉西门子PLC编程的工程师。 使用场景及目标:适用于需要深入了解电梯控制系统编程原理和技术实现的人群。目标是帮助读者掌握SCL语言和梯形图在电梯控制中的应用,提高编程技能。 其他说明:文章提供了完整的代码片段和详细的解释,有助于读者理解和实践。同时提醒读者关注实际应用中的细节问题,如安全保护机制、信号防抖处理等。

    电力电子领域LLC谐振变换器的MATLAB/Simulink仿真及软开关实现

    内容概要:本文详细介绍了如何使用MATLAB/Simulink对全桥和半桥LLC谐振变换器进行仿真,涵盖驱动配置、谐振参数计算、软开关验证以及闭环控制等方面。首先,文章讲解了半桥LLC的基本配置,包括PWM生成、死区时间和谐振参数的设定。接着,讨论了全桥LLC的扩展及其相对于半桥的优势,如更宽的增益范围和更好的输入电压适应性。然后,深入探讨了软开关的验证方法,强调了ZVS(零电压开关)的重要性和实现方式。最后,介绍了闭环控制的设计思路,包括PID控制器的应用和参数调整技巧。 适合人群:从事电力电子设计的研究人员和技术工程师,尤其是那些希望深入了解LLC谐振变换器仿真和优化的人群。 使用场景及目标:适用于需要进行LLC谐振变换器仿真的项目,旨在帮助工程师掌握从基本配置到高级控制的完整流程,确保高效稳定的电源转换系统设计。 其他说明:文中提供了大量MATLAB代码片段,便于读者理解和实践。此外,还给出了许多实用的调试建议和注意事项,有助于避免常见错误并提高仿真成功率。

    居民健康监测系统 2025免费JAVA微信小程序毕设

    2025免费微信小程序毕业设计成品,包括源码+数据库+往届论文资料,附带启动教程和安装包。 启动教程:https://www.bilibili.com/video/BV1BfB2YYEnS 讲解视频:https://www.bilibili.com/video/BV1BVKMeZEYr 技术栈:Uniapp+Vue.js+SpringBoot+MySQL。 开发工具:Idea+VSCode+微信开发者工具。

    宿舍管理系统 2025免费JAVA微信小程序毕设

    2025免费微信小程序毕业设计成品,包括源码+数据库+往届论文资料,附带启动教程和安装包。 启动教程:https://www.bilibili.com/video/BV1BfB2YYEnS 讲解视频:https://www.bilibili.com/video/BV1BVKMeZEYr 技术栈:Uniapp+Vue.js+SpringBoot+MySQL。 开发工具:Idea+VSCode+微信开发者工具。

    电力系统中同步发电机短路与电弧仿真的关键技术及其实现

    内容概要:本文详细介绍了同步发电机短路仿真和电弧仿真的重要性及其具体实现方法。首先讨论了同步发电机短路仿真的核心基础——派克变换,展示了如何利用Python进行派克变换的代码实现,并解释了短路电流的计算方法,包括次暂态电流、暂态电流和稳态电流。接着,文章探讨了电弧仿真的物理特性和数学模型,特别是经典的Mayr电弧模型,并给出了Matlab代码示例。此外,还提到了电弧在不同环境条件下的特性研究,如气压、湿度等因素对电弧的影响。最后,文章强调了这两种仿真在电力系统动态分析中的应用场景,特别是在评估短路故障对发电机及周边设备的影响方面的作用。 适合人群:从事电力系统研究的专业人士、电气工程师、高校师生及相关领域的研究人员。 使用场景及目标:适用于需要深入了解同步发电机短路和电弧仿真原理的研究人员和技术人员,旨在提高电力系统的安全性、可靠性,优化保护措施的设计。 其他说明:文中不仅提供了理论知识,还附带了具体的代码实现,便于读者理解和实践。同时,文章指出了仿真过程中可能出现的问题及解决方案,如数值稳定性问题和接口时序处理等。

    学生选课系统 2025免费JAVA微信小程序毕设

    2025免费微信小程序毕业设计成品,包括源码+数据库+往届论文资料,附带启动教程和安装包。 启动教程:https://www.bilibili.com/video/BV1BfB2YYEnS 讲解视频:https://www.bilibili.com/video/BV1BVKMeZEYr 技术栈:Uniapp+Vue.js+SpringBoot+MySQL。 开发工具:Idea+VSCode+微信开发者工具。

    基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明

    基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明,个人经导师指导并认可通过的高分设计项目,评审分99分,代码完整确保可以运行,小白也可以亲自搞定,主要针对计算机相关专业的正在做大作业的学生和需要项目实战练习的学习者,可作为毕业设计、课程设计、期末大作业。 基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模拟浏览器行为的小红书关键词搜索和笔记爬取源码+文档说明基于Selenium模

    医笙小程序系统 2025免费JAVA微信小程序毕设

    2025免费微信小程序毕业设计成品,包括源码+数据库+往届论文资料,附带启动教程和安装包。 启动教程:https://www.bilibili.com/video/BV1BfB2YYEnS 讲解视频:https://www.bilibili.com/video/BV1BVKMeZEYr 技术栈:Uniapp+Vue.js+SpringBoot+MySQL。 开发工具:Idea+VSCode+微信开发者工具。

    工业自动化中高速追剪飞锯系统的维伦通触摸屏与台达PLC程序解析

    内容概要:本文深入探讨了高速追剪飞锯系统的实现细节,特别是维伦通触摸屏和台达PLC之间的协同工作。触摸屏作为人机交互界面,允许操作员设置如切割长度、运行速度等参数,并通过与PLC寄存器的关联实现数据传输。台达PLC则负责执行复杂的电子凸轮追剪算法,确保切割过程的高精度和稳定性。文中还介绍了关键的PLC指令,如MC_GearIn和CAM_GEN,以及它们在速度同步和位置控制中的应用。此外,文章揭示了一些调试技巧和潜在问题,如数据类型对齐、补偿算法和参数调整方法。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是那些对PLC编程和人机界面设计感兴趣的人。 使用场景及目标:适用于需要理解和优化高速追剪飞锯系统的场合,旨在提高生产效率和产品质量。通过学习本文,读者可以掌握如何设置和调试此类系统,从而减少故障率并提升性能。 其他说明:文章不仅提供了理论知识,还包括了许多实用的操作建议和经验分享,有助于读者更好地应对实际工作中遇到的技术挑战。

    高速永磁同步电机Maxwell仿真:50000-100000rpm转速区间的电磁与机械设计挑战

    内容概要:本文详细探讨了高速永磁同步电机(HSPMSM)在50000-100000rpm转速范围内的设计与仿真挑战。首先介绍了高速电机的应用背景及其面临的离心力和电磁损耗等问题。接着,通过具体实例展示了如何利用Maxwell软件进行电机的几何建模、材料设置、边界条件与激励设置,并进行了详细的模拟结果分析。文中特别强调了在极端转速条件下,如10万转时,电机内部的物理现象以及相应的优化措施,如采用碳纤维护套增强机械强度、调整损耗计算模型以提高精度等。 适合人群:从事电机设计与仿真的工程师和技术研究人员,尤其是对高速永磁同步电机感兴趣的从业者。 使用场景及目标:适用于希望深入了解高速永磁同步电机设计原理及仿真技巧的人群,旨在帮助他们掌握Maxwell软件的具体应用方法,解决实际工程中遇到的技术难题,如高转速下的电磁兼容性和机械可靠性问题。 其他说明:文章不仅提供了理论指导,还包括大量实用的操作步骤和代码示例,有助于读者快速上手并应用于实际工作中。此外,文中提到的一些特殊处理方式(如碳纤维护套的应用),为解决特定工况下的技术瓶颈提供了新思路。

    浪潮英信服务器 SA5212M5 用户手册

    浪潮英信服务器 SA5212M5 用户手册

    COMSOL仿真中放电电极击穿空气的电场分布与击穿电压计算

    内容概要:本文详细介绍了如何使用COMSOL进行放电电极击穿空气的仿真。首先构建了一个针尖电极和球头圆柱电极组成的模型,设置了静电和电流耦合的物理场,并进行了网格优化。通过参数化扫描和MATLAB脚本,计算不同间隙距离下的击穿电压,并利用Paschen曲线进行验证。同时探讨了电场强度在尖端的集中现象及其对击穿的影响,提出了改进网格质量和求解器设置的方法。最后,通过电场矢量图和电势分布图展示了仿真的结果。 适合人群:从事电磁场仿真、电气工程、等离子体物理等相关领域的研究人员和技术人员。 使用场景及目标:适用于需要精确计算电极间击穿电压和电场分布的研究项目,帮助设计高压设备和评估电极结构的安全性和可靠性。 其他说明:文中提供了详细的建模步骤和代码片段,便于读者复现实验结果。同时强调了网格质量、边界条件和求解器设置对仿真准确性的重要影响。

    家居项目后端资源采用ssm架构

    家居项目后端资源采用ssm架构

    互联网大厂面试题合集:并发编程面试题-重点.pdf

    整理一线大厂面试题合集

    牵牛花铅笔素材儿童教学课件模板.pptx

    牵牛花铅笔素材儿童教学课件模板

    我的日记 2025/4/19

    2024年的记录。

    互联网大厂面试题合集:Linux操作系统面试题.pdf

    整理一线大厂面试题合集

    Apollo 7.0行为预测模块升级:轨迹交互与评估器设计详解及其应用

    内容概要:本文详细解析了Apollo 7.0行为预测模块的关键升级点,主要包括新增的Inter-TNT模式、VECTORNET_EVALUATOR以及JOINTLY_PREDICTION_PLANNING_EVALUATOR。这些组件通过引入轨迹交互模拟、动态归一化、联合预测规划等创新机制,显著提高了障碍物轨迹预测的准确性和场景适应性。特别是在处理复杂交通场景如高速公路变道、十字路口交汇时表现出色。此外,文中还介绍了增量式特征更新机制的应用,有效减少了CPU占用,提升了系统的实时性能。 适用人群:适用于对自动驾驶技术感兴趣的开发者、研究人员和技术爱好者,尤其是那些希望深入了解Apollo平台行为预测模块工作原理的人群。 使用场景及目标:①帮助读者理解Apollo 7.0行为预测模块的技术细节;②指导开发者如何利用这些新技术提升自动驾驶系统的预测精度;③为研究者提供有价值的参考资料,促进相关领域的进一步探索。 其他说明:文章不仅提供了详细的代码解读,还包括了实际应用场景中的效果对比,使读者能够全面掌握新旧版本之间的差异。同时,附带的思维导图有助于快速理清各个子模块之间的调用关系和数据流向。

Global site tag (gtag.js) - Google Analytics