在之前的Blog [http://flyfoxs.iteye.com/blog/2110463] 中讨论了, hadoop在文件切割时,可能会把一个行数据切割成无意义的2块. 如果不做特别处理,这会造成数据的失真及处理错误. 经人指点,发现这个BUG不存在.
Hadoop在分割文件后,后期读取中会通过一些规则来保证不会出现把一行数据分割成2行. 下面对这个后期处理机制(LineRecordReader)做一个分析:
1)数据分割是由JobClient完成,不是在hadoop集群完成.(并且这个是一个粗分,具体精确的还是依赖Mapper依赖如下规则)
2)数据的分割是由JobClient完成,但是Mapper在处理的时候,不是严格按照这个来处理,
除了第一个Split,其他的Split都是从第一个换行符开始读取
Split的结束是下一个Split的换行符,(太霸道了,除了最后一个,几乎每一都要跨越Split)
3)针对超长行,有一个理论上的Bug,就是如果有行超过了你限制的长度,那么这一行会有部分数据会被抛弃. 但是这个Bug是理论上的,因为默认值为 Integer.MAX_VALUE .
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
下面的代码可以看出LineRecordReader读取最后一行的时候,并不是严格按照Split的结束而结束. 而是必须要读取到下一个Split的换行符.
代码比较复杂已经添加了注释,如果有不明白的欢迎提问.
public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { /* We're reading data from in, but the head of the stream may be * already buffered in buffer, so we have several cases: * 1. No newline characters are in the buffer, so we need to copy * everything and read another buffer from the stream. * 2. An unambiguously terminated line is in buffer, so we just * copy to str. * 3. Ambiguously terminated line is in buffer, i.e. buffer ends * in CR. In this case we copy everything up to CR to str, but * we also need to see what follows CR: if it's LF, then we * need consume LF as well, so next call to readLine will read * from after that. * We use a flag prevCharCR to signal if previous character was CR * and, if it happens to be at the end of the buffer, delay * consuming it until we have a chance to look at the char that * follows. */ str.clear(); int txtLength = 0; //tracks str.getLength(), as an optimization int newlineLength = 0; //length of terminating newline boolean prevCharCR = false; //true of prev char was CR long bytesConsumed = 0; do { //bufferPosn记录了当前Buffer读取到哪个位置,这样当下一次循环时 int startPosn = bufferPosn; //starting from where we left off the last time //如果Buffer里面的数据已经处理完毕,则对Buffer清空,重新再从IO流读取数据 if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; if (prevCharCR) ++bytesConsumed; //account for CR from previous read //从IO中读取数据处理,只有处理完毕(bufferPosn >= bufferLength)才会再次读取 //bufferLength记录了从IO中读取了多少个字节的数据 bufferLength = in.read(buffer); if (bufferLength <= 0) break; // EOF } //在For循环总寻找断行符, 兼容MAC, Windows, Linux 多种平台的换行符 for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline //判断当前字符是否是'\n' if (buffer[bufferPosn] == LF) { //如果是'\r\n'来区分一行, newlineLength=2, 如果是'\n'则newlineLength=1 newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte break; } //如果是\r来区分一行,则newlineLength=1 if (prevCharCR) { //CR + notLF, we are at notLF newlineLength = 1; break; } //判断当前字符是否是'\r', 等待下一个循环来组合判断真正的换行符 prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn; //Buffer最后一个字节就是'\r' if (prevCharCR && newlineLength == 0) --readLength; //CR at the end of the buffer bytesConsumed += readLength; //appendLength:在本轮循环中从Buffer中读取的负载长度,去除了换行符 int appendLength = readLength - newlineLength; //txtLength:记录了最终返回的str的长度 if (appendLength > maxLineLength - txtLength) { //如果添加后,字符串长度超过了一行长度的上限,那么超过的将不会被添加到str appendLength = maxLineLength - txtLength; } //将当前Buffer中,指定区间的字符添加到返回值(str) if (appendLength > 0) { str.append(buffer, startPosn, appendLength); txtLength += appendLength; } //如果在buffer里面没有读取到换行符,并且已经读取的字节数没有超过预定大小,则继续从IO流读取下一批数据 } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed); return (int)bytesConsumed; }
下面的代码可以看出LineRecordReader是如何来判读是否需要忽略第一行
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { //只有文件的第一行不能忽略第一行 skipFirstLine = true; --start; fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start; }
参考文献:
http://blog.csdn.net/bluishglc/article/details/9380087
http://blog.csdn.net/wanghai__/article/details/6583364
相关推荐
在当今的大数据时代,MapReduce作为处理海量数据的核心技术之一,被广泛应用于各类大数据处理框架中。本文将对MapReduce的基本概念、编程模型、框架原理、相关组件以及入门编程实例等进行详细介绍。 MapReduce是一...
"大数据平台-MapReduce介绍.pdf" 大数据平台中的MapReduce是由Google公司的Jeffrey Dean和Sanjay Ghemawat开发的一个针对大规模群组中的海量数据处理的分布式编程模型。MapReduce实现了两个功能:Map函数应用于集合...
【实验报告模板 - 大数据应用-实验六】 在大数据领域,Hive 是一个至关重要的组件,它在 Hadoop 生态系统中扮演着数据仓库的角色。本次实验的主要目的是让学生深入理解 Hive 在数据处理中的功能,并熟悉其基本操作...
在大数据领域,Hadoop是一个关键的开源框架,用于存储和处理海量数据。Hadoop-3.1.3是Hadoop的稳定版本,提供了许多增强的功能和优化,使其更适合大规模分布式计算环境。在这个针对Linux系统的安装包中,我们将探讨...
通过编写WordCount程序,学生们能够直观地感受到Flink的流式处理能力,为后续更深入的大数据学习打下了坚实基础。实验过程流畅,按照老师提供的文档进行,使学习过程更为高效,激发了学生对大数据领域的探索兴趣。
6.1 Hadoop概述 6.2 HDFS 6.2.1 HDFS文件系统的原型GFS 6.2.2 HDFS文件的基本结构 6.2.3 HDFS的存储过程 6.3 MapReduce编程框架 6.3.1 MapReduce的发展历史 ...6.5.7 在Hadoop系统上运行测试程序WordCount
大数据工程师面试题和答案 大数据工程师面试题和答案是一份涵盖了大数据工程师面试中的常见问题和答案的文件。该文件涵盖了 HDFS、MapReduce、YARN 等大数据技术领域的知识点,同时也涉及到大数据工程师面试中的...
7.1 概述 7.2 MapReduce体系结构 7.3 MapReduce工作流程 7.4 实例分析:WordCount 7.5 MapReduce的具体应用 7.6 MapReduce编程实践
在大数据课程中,Scala编程基础是不可或缺的一部分,其中重点讲解了Scala的数据结构。本节主要关注数组和元组,以及集合和相关的计算函数。 首先,Scala中的数组有两种类型:定长数组和变长数组。定长数组使用`new ...
"大数据HelloWorld-Flink实现WordCount"是初学者入门Flink的典型示例,旨在介绍如何使用Flink处理数据并计算词频。在这个场景中,我们将讨论如何在本地环境中设置Flink、创建和运行一个简单的WordCount程序。 首先...
### Spark 下实现 WordCount #### 一、简介 在大数据处理领域,Apache Spark 是一个非常流行的框架,它能够高效地处理大规模数据集。WordCount 是一个经典的示例程序,用于统计文本文件中每个单词出现的次数。本篇...
1. HSDF获取文件:需要计算的源文本存在于HDFS系统上 2. Input(文件输入):HDFS中的文件都是以块(Block)为单位存储 3. Split
手写代码章节详细介绍了常见的算法和数据结构的实现,如冒泡排序、二分查找、快速排序、归并排序、二叉树以及基于Scala的Spark-WordCount实现。这些算法是面试中考察算法能力的常见问题,掌握这些算法对于通过技术...
【大数据入门笔记系列】第五节 SpringBoot集成hadoop开发环境(复杂版的WordCount)前言环境清单创建SpringBoot项目创建包创建yml添加集群主机名映射hadoop配置文件环境变量HADOOP_HOME编写代码添加hadoop依赖jar包...
《大数据技术深度解析》 大数据,作为信息技术领域的重要分支,正逐步引领科技发展的潮流。这份“大数据-课程讲义.zip”文件,包含了丰富的学习资源,旨在深入探讨大数据的基础概念、核心技术以及实际应用。以下是...
从给出的文件信息中,我们可以看出,这些内容是关于华为大数据认证HCIP-Big Data Developer H13-723考试题库的知识点,接下来我将对这些知识点进行详细解析。 1. HBase数据写入接口类:在HBase中写入数据时,我们...
大数据实验报告 Hadoop 编程实现 wordcount 单词统计程序附源码 本实验报告旨在介绍使用 Hadoop 编程实现 wordcount 单词统计程序的步骤和代码实现。实验的目的在于熟悉 Hadoop 虚拟机的安装与环境的配置,初步理解...
实验2的目的是在Hadoop平台上部署WordCount程序,以此来理解和体验云计算的基础应用。这个实验主要涉及以下几个关键知识点: 1. **Linux系统基础**:实验要求学生具备Linux系统的使用知识,包括基本的命令行操作、...
【标题】"cs-1660-gcp-wordcount-hw" 指的是一项针对计算机科学课程(可能编号为CS-1660)的作业,该作业涉及到在Google Cloud Platform (GCP)上实现一个词频统计(WordCount)程序。在大数据处理和云计算领域,...
云计算与大数据 MapReduce实验 Wordcount实验中所需数据包 WordCount.jar 不需要封装,centos7 linux hadoop实验上传所需