`
zhangbaoming815
  • 浏览: 150435 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hadoop中LineReader的readLine方法解析

阅读更多

Hadoop默认的读取一条数据,使用的就是LineReader的readLine方法,这个方法具体怎么工作,可以直接看源码,因为比较复杂,所以加上一些注释:

    /**
     * Read one line from the InputStream into the given Text. A line can be
     * terminated by one of the following: '\n' (LF) , '\r' (CR), or '\r\n'
     * (CR+LF). EOF also terminates an otherwise unterminated line.
     * 
     * @param str the object to store the given line (without newline)
     * @param maxLineLength the maximum number of bytes to store into str; the
     *        rest of the line is silently discarded.
     * @param maxBytesToConsume the maximum number of bytes to consume in this
     *        call. This is only a hint, because if the line cross this
     *        threshold, we allow it to happen. It can overshoot potentially by
     *        as much as one buffer length.
     * 
     * @return the number of bytes read including the (longest) newline found.
     * 
     * @throws IOException if the underlying stream throws
     */
    public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
            throws IOException
    {
        /*
         * We're reading data from in, but the head of the stream may be already
         * buffered in buffer, so we have several cases:
         * 
         * 缓存的大小为默认的64k,在请求获取下一条数据的时候,有可能请求的数据已经在缓存中存在,
         * 一条数据的大小有可能没有64k,这个情况是存在的,所以就分为下面几种情况:
         * 
         * 1. 缓存中的数据没有新的一行的标记,如\n 或者 \r,那么就将这个缓存中的所有数据拷贝出来,
         * 并且从另外一个缓存中读取一条记录的后半部分
         * 
         * 2. 如果缓存中有不规则的结束行,则将这行赋给str,这一条什么意思 ?
         * 
         * 3. 如果缓存中存在规则的结束行,如\r,那么将缓存中\r之前的数据赋值给str,但同时,也需要
         * 看与\r紧挨着的字符是什么。如果是\n,那么在处理这条记录的同时,也需要将缓存中的这个\n消除,
         * 这样下次读取一行的时候会从\n后面开始读。 使用标志prevCharCR 来标志前面一个字符是否为\r
         * 如果缓冲区中的最后一个字符正好是\r,需要寻找紧挨这个字符的字符是什么,根据紧挨着的字符来判断
         * 继续的动作
         * 
         * 1. No newline characters are in the buffer, 
         * so we need to copy everything and read
         * another buffer from the stream. 
         * 
         * 2. An unambiguously terminated line
         * is in buffer, so we just copy to str. 
         * 
         * 3. Ambiguously terminated line
         * is in buffer, i.e. buffer ends in CR. In this case we copy everything
         * up to CR to str, but we also need to see what follows CR: if it's LF,
         * then we need consume LF as well, so next call to readLine will read
         * from after that. We use a flag prevCharCR to signal if previous
         * character was CR and, if it happens to be at the end of the buffer,
         * delay consuming it until we have a chance to look at the char that
         * follows.
         */
        
        // 清理str已有的数据
        str.clear();
        int txtLength = 0; // tracks str.getLength(), as an optimization
        int newlineLength = 0; // length of terminating newline
        boolean prevCharCR = false; // true of prev char was CR
        long bytesConsumed = 0;
        
        do
        {
            // 
            // bufferPosn存储缓冲区当前的位置,表示上一次读取数据到达的位置
            // 如果上一次读取到400位置的字符,那么现在冲401开始读取
            // 
            int startPosn = bufferPosn; // starting from where we left off the
                                        // last time
            
            // 如果当前的位置超过了缓冲区的长度,这种情况的出现是因为
            // 在当前的缓冲区中为读取一行结束的标识,所以的继续读入新的
            // 数据填充缓冲区,以便继续寻找
            if (bufferPosn >= bufferLength)
            {
                // 充值bufferPosn参数,从 0 开始
                startPosn = bufferPosn = 0;
                
                // 判断上一个字符是否为 \r
                if (prevCharCR)
                    ++bytesConsumed; // account for CR from previous read
                
                // 冲输入流in中读取数据存储到buffer中,读取的数据长度为bufferLength
                bufferLength = in.read(buffer);
                
                // 如果没有读取到数据,则跳出循环
                if (bufferLength <= 0)
                    break; // EOF
            }
            
            // 从bufferPosn位置开始往后读取
            for (; bufferPosn < bufferLength; ++bufferPosn)
            { 
                // search for newline
                // 寻找新的一行的标志 \n
                if (buffer[bufferPosn] == LF)
                {
                    
                    /***********************************
                     *     ‹p›          ‹p›
                     *      |            |
                     *      0080abcdk\r\ngabld008924\r\n
                     * 
                     ***********************************/
                    // 找到\n后,看前一个字符是否为\r,如果为\r
                    // 返回newlineLength = 2,否则为 newlineLength = 1
                    newlineLength = (prevCharCR) ? 2 : 1;
                    ++bufferPosn; // at next invocation proceed from following
                                  // byte
                    break;
                }
                
                /***********************************
                 *     ‹p›        ‹p›
                 *      |          |
                 *      0080abcdk\rgabld008924\r\n
                 * 
                 ***********************************/
                // 到字符 g 的时候,发现前面一个字符是\r,newlineLength = 1
                if (prevCharCR)
                { 
                    // CR + notLF, we are at notLF
                    newlineLength = 1;
                    break;
                }
                
                // 读取到\r的时候,prevCharCR被设置为true,下一轮就进入了前面的判断,
                // 设置newlineLength = 1,跳出循环
                prevCharCR = (buffer[bufferPosn] == CR);
            }
            
            // 获取此次读取的数据长度
            int readLength = bufferPosn - startPosn;
            
            // 处理第三种情况,正好读取缓冲区的最末尾,而且正好是\r
            /***********************************
             *     ‹p›       ‹p›
             *      |       |←|
             *      0080abcdk\r
             * 
             ***********************************/
            if (prevCharCR && newlineLength == 0)
                --readLength; // CR at the end of the buffer
            
            bytesConsumed += readLength;
            int appendLength = readLength - newlineLength;
            
            // 如果当前这条记录的长度超过Text允许存储的数据长度maxLineLength
            // 将appendLength赋值为 maxLineLength - txtLength
            if (appendLength > maxLineLength - txtLength)
            {
                appendLength = maxLineLength - txtLength;
            }
            
            if (appendLength > 0)
            {
                // 将缓冲区buffer从startPson位置开始的长度为appendLength的数据赋值为str
                str.append(buffer, startPosn, appendLength);
                
                // 处理记录长度超过maxLineLength的情况,分多次赋值
                txtLength += appendLength;
            }
        }
        // newlineLength = 0 处理记录长度超过maxLineLength的情况,分多次赋值
        while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

        // 一条记录的长度太长了,跑出异常
        if (bytesConsumed > (long) Integer.MAX_VALUE)
            throw new IOException("Too many bytes before newline: "
                    + bytesConsumed);
        return (int) bytesConsumed;
    }

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics