`
zqhxuyuan
  • 浏览: 32425 次
  • 性别: Icon_minigender_1
  • 来自: 福建
社区版块
存档分类
最新评论

MapReduce源码注释-MapTask.MapOutputBuffer.Buffer

阅读更多
    public class Buffer extends OutputStream {
      private final byte[] scratch = new byte[1];
      public synchronized void write(int v) throws IOException {
        scratch[0] = (byte)v;
        write(scratch, 0, 1);
      }
      /**Attempt to write a sequence of bytes to the collection buffer. This method will block if the spill thread is running and it cannot write. */
      public synchronized void write(byte b[], int off, int len) throws IOException {
        boolean buffull = false;		// 缓冲区连续写是否写满标志
        boolean wrap = false;		// 缓冲区非连续情况下有足够写空间标志
        spillLock.lock();
        try {
          do {
            // sufficient buffer space? 是否有足够的缓冲区
            // 连续的空间, 在还没开始写入len个字节到kvbuffer时, 要判断如果写入len个字节后, 会不会使缓冲区满, 如果满需要spill
            // 1. bufstart=bufend<bufindex 正常写入缓冲区, 由于还没有开始spill, bufstart=bufend. 连续区间, 所以bufstart=bufend<bufindex
            // 2. bufstart < bufend < bufindex 正在溢写(bufstart<bufend=bufindex)的同时又有数据写入缓冲区(bufstart<bufend<bufindex)
            if (bufstart <= bufend && bufend <= bufindex) {  // bufindex不会在bufend前面. 即bufindex不会写完缓冲区后,再绕到bufend前面
              buffull = bufindex + len > bufvoid;	// 一有数据写入kvbuffer,bufindex就后移. 判断再写入len个字节后,是否到达缓冲区尾部
              wrap = (bufvoid - bufindex) + bufstart > len; // 缓冲区中非连续性区间包括两部分, buindex剩余的和bufstart之前的
            } else {	// 非连续的空间(中间部分为即将写入的数据): 基本条件为bufindex < bufstart. 
              // bufindex <= bufstart <= bufend 写入缓冲区尾部后不够, 需要写到缓冲区头部bufindex<bufstart=bufend
              // bufend <= bufindex <= bufstart	溢写bufend=bufindex<bufstart,同时又有数据写入bufend<bufindex<bufstart
              wrap = false;  // 非连续情况下总设置为false,一旦缓冲区即将满就必须溢写. 这样在复制到kvbuffer时,保证不会执行if(buffull)的逻辑
              buffull = bufindex + len > bufstart;	// bufindex现在已经重头开始了,再写入len,不能超过bufstart.如果超过则说明缓冲区写满了
            }
            if (kvstart == kvend) {		// spill thread not running 溢写线程还没运行. 只要kvstart!=kvend,才开始将缓冲区数据写入磁盘
              if (kvend != kvindex) {	// we have records we can spill 还没溢写时, 没有赋值kvend=kvindex. 数组kvoffsets中有记录
            	// bufindex>bufend, 连续的空间, 判断是否需要溢写. [bufend,bufindex)之间的为已经写入的数据,判断是否超过80%的限制
                final boolean bufsoftlimit = (bufindex > bufend) ? bufindex - bufend > softBufferLimit
                // 非连续空间分成两段,bufvoid-bufend后面一段, bufindex为前面一段. 两者之和即为已写入的数据bufvoid-bufend+bufindex>limit
                  : bufend - bufindex < bufvoid - softBufferLimit;
                if (bufsoftlimit || (buffull && !wrap)) {
                  LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
                  startSpill();			// 缓冲区中已使用的内存利用率超过80%, 或者在写入之前缓冲区已经满了, 则立即spill
                }
              } else if (buffull && !wrap) {  // kvoffsets数组中没有记录, 但是kvbuffer空间还是不够当前写入的数据
                // We have no buffered records, and this record is too large to write into kvbuffer. We must spill it directly from collect
                final int size = ((bufend <= bufindex) ? bufindex - bufend : (bufvoid - bufend) + bufindex) + len;  // 已使用+即将写入的len
                bufstart = bufend = bufindex = bufmark = 0;
                kvstart = kvend = kvindex = 0;
                bufvoid = kvbuffer.length;
                throw new MapBufferTooSmallException(size + " bytes");
              }
            }
            if (buffull && !wrap) {		// 如果空间不足同时spill在运行, 等待spillDone
              try {
                while (kvstart != kvend) {
                  reporter.progress();
                  spillDone.await();
                }
              } catch (InterruptedException e) {throw (IOException)new IOException("Buffer interrupted while waiting for the writer").initCause(e);}
            }
          } while (buffull && !wrap);
        } finally {
          spillLock.unlock();
        }
        // here, we know that we have sufficient space to write 缓冲区有足够的空间用来写入数据
        // 前面判断buffull有两种情况即连续性和非连续性. 非连续性一定不会执行下面的if语句. 
        // 因为非连续性设置wrap=false,如果缓冲区满的话, 一定会执行spill溢写. 
        // 否则如果非连续性执行下面的buffull逻辑. gaplen=bufvoid-bufindex就有问题.
        if (buffull) {
          final int gaplen = bufvoid - bufindex;   // 写到缓冲区尾部
          System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
          len -= gaplen;	// 剩余部分len=len-gaplen会写到缓冲区头部
          off += gaplen;	// 要写入数据的起始位置也要相应变化
          bufindex = 0;	// 从缓冲区头部开始写
        }
        System.arraycopy(b, off, kvbuffer, bufindex, len);
        bufindex += len;	// 写入len个字节, 缓冲区的bufindex就后移len个字节
      }
    }

 

BlockBuffer.reset

      protected synchronized void reset() throws IOException {
        // spillLock unnecessary; If spill wraps, then
        // bufindex < bufstart < bufend so contention is impossible
        // a stale value for bufstart does not affect correctness, since
        // we can only get false negatives that force the more conservative path
    	// 写入key时,发生跨界现象. 即写入某个key时,缓冲区尾部剩余空间不足以容纳整个key值,因此需要将key值分开存储,其中一部分存到缓冲区末尾,->key上半部分①
    	// 另一部分存到缓冲区头部->key下半部分②. 由于key是排序的关键字,需要保证连续性. 因此需要将跨界的key值重新存储到缓冲区的头部位置.
    	// 发生跨界的key在缓冲区末尾的长度=bufvoid-bufmark. 其中bufmark为最后(上一次)写入的一个完整的key/value的结束位置 
        int headbytelen = bufvoid - bufmark;  	
        // 将尾部key插入到头部之后, bufvoid要设置为bufmark. 那么bufvoid开始,长度为headbytelen的就是key的尾部部分了
        bufvoid = bufmark;
        // 缓冲区前半段有足够的空间容纳整个key值. 即将尾部的key插入到头部后, 不会使得这一个key超过bufstart
        if (bufindex + headbytelen < bufstart) {
          // bufindex为当前缓冲区的位置. 不管写入key或者value, bufindex都表示下一个可写的初始位置
          // [0, bufindex]在调整之前的头部① -> [headbytelen, headbytelen+bufindex] 即将原先头部后移headbytelen用来准备第二次拷贝
          System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
          // [bufvoid, bufvoid+headbytelen]调整之前的尾部② -> [0, headbytelen] 将尾部②插入到原先的头部①
          System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
          bufindex += headbytelen;  // bufindex也要跟随移动到原先头部的下一个位置
        } else {   // 缓冲区前半段没有足够的空间容纳整个key值. 在将key值移动到缓冲区开始位置时触发一次spill操作
          byte[] keytmp = new byte[bufindex];	// bufindex为在未调整之前的缓冲区头部
          System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);  // 将原先的头部②复制到临时缓冲区中
          bufindex = 0;  	// 重置缓冲区
          out.write(kvbuffer, bufmark, headbytelen);	// 首先将[bufmark, bufmark+headbytelen]即尾部①写入输出流
          out.write(keytmp);// 然后将缓冲区头部,即key的下半部分也写入输出流	
        }
      }

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics