版权声明:本文为博主原创文章,未经博主允许不得转载。
我们知道hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理:
- 对输入数据进行切分,生成一组split,一个split会分发给一个mapper进行处理。
- 针对每个split,再创建一个RecordReader读取Split内的数据,并按照<key,value>的形式组织成一条record传给map函数进行处理。
最常见的FormatInput就是TextInputFormat,在split的读取方面,它是将给到的Split按行读取,以行首字节在文件中的偏移做key,以行数据做value传给map函数处理,这部分的逻辑是由它所创建并使用的RecordReader:LineRecordReader封装和实现的.关于这部分逻辑,在一开始接触hadoop时会有一个常见的疑问:如果一个行被切分到两个split里(这几乎是一定会发生的情况),TextInputFormat是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如word count就是一个例子).搞清楚这个问题还是需要从源码入手了解TextInputFormat的详细工作方式,这里简单地梳理记录如下(本文参考的是hadoop1.1.2的源码):
1. LineRecordReader会创建一个org.apache.hadoop.util.LineReader实例,并依赖这个LineReader的readLine方法来读取一行记录,具体可参考org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text),Line 176),那么关键的逻辑就在这个readLine方法里了,下面是添加了额外中文注释的该方法源码.这个方法主要的逻辑归纳起来是3点:
- 总是是从buffer里读取数据,如果buffer里的数据读完了,先加载下一批数据到buffer
- 在buffer中查找"行尾",将开始位置至行尾处的数据拷贝给str(也就是最后的Value).如果为遇到"行尾",继续加载新的数据到buffer进行查找.
- 关键点在于:给到buffer的数据是直接从文件中读取的,完全不会考虑是否超过了split的界限,而是一直读取到当前行结束为止
- /**
- * Read one line from the InputStream into the given Text. A line
- * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
- * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
- * line.
- *
- * @param str the object to store the given line (without newline)
- * @param maxLineLength the maximum number of bytes to store into str;
- * the rest of the line is silently discarded.
- * @param maxBytesToConsume the maximum number of bytes to consume
- * in this call. This is only a hint, because if the line cross
- * this threshold, we allow it to happen. It can overshoot
- * potentially by as much as one buffer length.
- *
- * @return the number of bytes read including the (longest) newline
- * found.
- *
- * @throws IOException if the underlying stream throws
- */
- public int readLine(Text str, int maxLineLength,
- int maxBytesToConsume) throws IOException {
- /* We're reading data from in, but the head of the stream may be
- * already buffered in buffer, so we have several cases:
- * 1. No newline characters are in the buffer, so we need to copy
- * everything and read another buffer from the stream.
- * 2. An unambiguously terminated line is in buffer, so we just
- * copy to str.
- * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
- * in CR. In this case we copy everything up to CR to str, but
- * we also need to see what follows CR: if it's LF, then we
- * need consume LF as well, so next call to readLine will read
- * from after that.
- * We use a flag prevCharCR to signal if previous character was CR
- * and, if it happens to be at the end of the buffer, delay
- * consuming it until we have a chance to look at the char that
- * follows.
- */
- str.clear();
- int txtLength = 0; //tracks str.getLength(), as an optimization
- int newlineLength = 0; //length of terminating newline
- boolean prevCharCR = false; //true of prev char was CR
- long bytesConsumed = 0;
- do {
- int startPosn = bufferPosn; //starting from where we left off the last time
- //如果buffer中的数据读完了,先加载一批数据到buffer里
- if (bufferPosn >= bufferLength) {
- startPosn = bufferPosn = 0;
- if (prevCharCR)
- ++bytesConsumed; //account for CR from previous read
- bufferLength = in.read(buffer);
- if (bufferLength <= 0)
- break; // EOF
- }
- //注意:这里的逻辑有点tricky,由于不同操作系统对“行结束符“的定义不同:
- //UNIX: '\n' (LF)
- //Mac: '\r' (CR)
- //Windows: '\r\n' (CR)(LF)
- //为了准确判断一行的结尾,程序的判定逻辑是:
- //1.如果当前符号是LF,可以确定一定是到了行尾,但是需要参考一下前一个
- //字符,因为如果前一个字符是CR,那就是windows文件,“行结束符的长度”
- //(即变量:newlineLength,这个变量名起的有点糟糕)应该是2,否则就是UNIX文件,“行结束符的长度”为1。
- //2.如果当前符号不是LF,看一下前一个符号是不是CR,如果是也可以确定一定上个字符就是行尾了,这是一个mac文件。
- //3.如果当前符号是CR的话,还需要根据下一个字符是不是LF判断“行结束符的长度”,所以只是标记一下prevCharCR=true,供读取下个字符时参考。
- for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
- if (buffer[bufferPosn] == LF) {
- newlineLength = (prevCharCR) ? 2 : 1;
- ++bufferPosn; // at next invocation proceed from following byte
- break;
- }
- if (prevCharCR) { //CR + notLF, we are at notLF
- newlineLength = 1;
- break;
- }
- prevCharCR = (buffer[bufferPosn] == CR);
- }
- int readLength = bufferPosn - startPosn;
- if (prevCharCR && newlineLength == 0)
- --readLength; //CR at the end of the buffer
- bytesConsumed += readLength;
- int appendLength = readLength - newlineLength;
- if (appendLength > maxLineLength - txtLength) {
- appendLength = maxLineLength - txtLength;
- }
- if (appendLength > 0) {
- str.append(buffer, startPosn, appendLength);
- txtLength += appendLength;
- }//newlineLength == 0 就意味着始终没有读到行尾,程序会继续通过文件输入流继续从文件里读取数据。
- //这里有一个非常重要的地方:in的实例创建自构造函数:org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit)
- //第86行:FSDataInputStream fileIn = fs.open(split.getPath()); 我们看以看到:
- //对于LineRecordReader:当它对取“一行”时,一定是读取到完整的行,不会受filesplit的任何影响,因为它读取是filesplit所在的文件,而不是限定在filesplit的界限范围内。
- //所以不会出现“断行”的问题!
- } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
- if (bytesConsumed > (long)Integer.MAX_VALUE)
- throw new IOException("Too many bytes before newline: " + bytesConsumed);
- return (int)bytesConsumed;
- }
2. 按照readLine的上述行为,在遇到跨split的行时,会到下一个split继续读取数据直至行尾,那么下一个split怎么判定开头的一行有没有被上一个split的LineRecordReader读取过从而避免漏读或重复读取开头一行呢?这方面LineRecordReader使用了一个简单而巧妙的方法:既然无法断定每一个split开始的一行是独立的一行还是被切断的一行的一部分,那就跳过每个split的开始一行(当然要除第一个split之外),从第二行开始读取,然后在到达split的结尾端时总是再多读一行,这样数据既能接续起来又避开了断行带来的麻烦.以下是相关的源码:
在LineRecordReader的构造函数org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit) 108到113行确定start位置时,明确注明::会特别地忽略掉第一行!
- // If this is not the first split, we always throw away first record
- // because we always (except the last split) read one extra line in
- // next() method.
- if (start != 0) {
- start += in.readLine(new Text(), 0, maxBytesToConsume(start));
- }
相应地,在LineRecordReader判断是否还有下一行的方法:org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text) 170到173行中,while使用的判定条件是:当前位置小于或等于split的结尾位置,也就说:当当前以处于split的结尾位置上时,while依然会执行一次,这一次读到显然已经是下一个split的开始行了!
- // We always read one extra line, which lies outside the upper
- // split limit i.e. (end - 1)
- while (getFilePosition() <= end) {
小结:
至此,跨split的行读取的逻辑就完备了.如果引申地来看,这是map-reduce前期数据切分的一个普遍性问题,即不管我们用什么方式切分和读取一份大数据中的小部分,包括我们在实现自己的InputFormat时,都会面临在切分处数据时的连续性解析问题. 对此我们应该深刻地认识到:split最直接的现实作用是取出大数据中的一小部分给mapper处理,但这只是一种"逻辑"上的,"宏观"上的切分,在"微观"上,在split的首尾切分处,为了确保数据连续性,跨越split接续并拼接数据也是完全正当和合理的.
相关推荐
综上所述,InputFormat是Hadoop MapReduce编程模型中的核心组件之一,它通过定义数据的分片和读取机制,允许开发者以灵活的方式处理各种格式的数据。理解InputFormat的设计和实现,对于有效地使用Hadoop进行大规模...
在大数据处理领域,Apache Flink 和 Apache Hadoop 是两个不可或缺的重要组件。Flink作为一个实时流处理框架,以其高效的事件驱动和状态管理能力著称;而Hadoop则以其分布式存储和计算能力为基础,为大数据处理提供...
《Hadoop源码分析——MapReduce深度解析》 Hadoop,作为云计算领域的核心组件,以其分布式存储和计算能力,为大数据处理提供了强大的支持。MapReduce是Hadoop的主要计算框架,其设计思想源于Google的论文,旨在解决...
- **Hadoop API**:学习如何使用Hadoop API开发MapReduce程序,理解和使用InputFormat、OutputFormat、Mapper、Reducer等关键类。 - **本地模式**:开发者可以在单机上运行Hadoop,进行快速测试和调试,无需真实...
在大数据处理领域,Hadoop MapReduce 是一个至关重要的组件,它为海量数据的分布式计算提供了框架。本资源包“大数据-hadoop-mapreduce代码”显然包含了与MapReduce编程相关的实例或示例代码,对于理解并应用Hadoop ...
2. **MapReduce**:源代码在`hadoop-mapreduce-project`目录下,包含JobTracker(在YARN中已替换为ResourceManager)和TaskTracker(现在是NodeManager)的实现,以及MapReduce作业的生命周期管理。重点可以关注...
Hadoop 代码使用方式 ...hadoop jar hadoop-mapreduce-custom-inputformat-1.0-SNAPSHOT.jar org.apache.hadoop.mapreduce.sample.SmallFileWordCount -Dmapreduce.input.fileinputformat.split.maxsize=10
2. MapReduce组件:MapReduce的源码主要位于`hadoop-mapreduce-project`目录。包括Mapper、Reducer、Partitioner、Combiner等类的实现,以及作业调度和任务执行的逻辑。通过阅读这部分源码,我们可以理解如何将...
在大数据处理领域,Hadoop MapReduce 是一个至关重要的组件,尤其在处理海量数据时,它提供了分布式计算的能力。Hadoop 2.8.0 是一个稳定版本,包含了多个改进和优化,增强了系统的性能和稳定性。在这个"Day04"的...
3. **Hadoop API**:熟悉Hadoop MapReduce的编程模型,包括Mapper、Reducer类,InputFormat和OutputFormat接口,以及JobConf配置。 4. **YARN API**:如果插件要与YARN交互,那么理解ApplicationMaster和Container...
总结,Hadoop-LZO是Hadoop生态中的一个重要组件,它利用LZO压缩算法提高了大数据处理的效率。理解并掌握Hadoop-LZO的原理和使用方法,对于优化Hadoop集群的性能和资源利用率具有重要意义。在实际项目中,根据具体...
MapReduce是Apache Hadoop框架的核心组件之一,主要用于处理和生成大规模数据集。本文将深入浅出地探讨MapReduce的工作原理,帮助读者理解其核心概念和流程。 一、MapReduce架构概述 MapReduce是一种编程模型,...
对于开发者来说,理解Hadoop的基本原理和编程模型非常重要,包括MapReduce的map和reduce阶段、InputFormat和OutputFormat接口、RecordReader和RecordWriter的概念,以及如何编写自定义分区器和Combiner。此外,学习...
在Hadoop MapReduce框架中,InputFormat是处理输入数据的核心组件。它负责将原始数据分割成逻辑上的键值对(key-value pairs),然后为每个分区分配一个或多个这些键值对给Mapper。默认情况下,Hadoop支持如...
标题 "hadoop-mapreduce-anagram" 指的是一个基于Hadoop MapReduce的项目,它展示了如何使用MapReduce解决一个问题:查找和分析字母重新排列(anagram)的问题。MapReduce是一种分布式计算框架,由Google提出,现在...
它主要由两个核心组件构成:Hadoop Distributed File System (HDFS)和MapReduce。HDFS提供了高容错性的分布式存储,而MapReduce则为大规模数据集的并行处理提供了编程模型。 2. **Hadoop 2.7.6版本亮点** Hadoop ...
MapReduce是Hadoop框架的核心组件之一,用于处理和生成大数据集。它遵循“分而治之”的策略,将大规模的数据处理任务分解为一系列可并行执行的子任务,分别是Map阶段和Reduce阶段。 **MapTask运行流程:** 1. **...
Hadoop MapReduce是一种分布式计算框架,是Apache Hadoop生态系统的核心组成部分,主要用于处理和存储大规模数据集。这个项目是一个学习Hadoop MapReduce的实践项目,利用Maven构建,无需单独安装Hadoop环境,只需在...
2. **理解MapReduce编程模型**:在使用插件前,熟悉MapReduce的基本概念,如Mapper、Reducer、InputFormat和OutputFormat等,有助于编写出高效的Hadoop程序。 3. **合理组织项目结构**:将相关的源代码、配置文件和...
在大数据处理领域,Hadoop是不可或缺的核心技术之一。作为一个开源框架,Hadoop为海量数据的存储、处理和分析提供了高效且可扩展的解决方案。本文将深入探讨“Hadoop高级编程——构建与实现大数据解决方案”这一主题...