`

hbase blockcache BucketCache源码分析

阅读更多

    

hbase get/scan的时候

  StoreFileScanner next方法

      使用HfileReaderV2的内部静态类ScannerV2(HFileScanner) next方法 

          使用HFileReaderV2 readBlock方法:   

  1.封装为blockcachekey

  2.从blockcache中获取block

  3.从hfile中获取block

  4.将block放入blockcache中

  

 HfileReaderV2 readBlock

 /**
   * Read in a file block.
   * @param dataBlockOffset offset to read.
   * @param onDiskBlockSize size of the block
   * @param cacheBlock
   * @param pread Use positional read instead of seek+read (positional is
   *          better doing random reads whereas seek+read is better scanning).
   * @param isCompaction is this block being read as part of a compaction
   * @param expectedBlockType the block type we are expecting to read with this
   *          read operation, or null to read whatever block type is available
   *          and avoid checking (that might reduce caching efficiency of
   *          encoded data blocks)
   * @return Block wrapped in a ByteBuffer.
   * @throws IOException
   */
  @Override
  public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
      final boolean cacheBlock, boolean pread, final boolean isCompaction,
      BlockType expectedBlockType)
      throws IOException {
    if (dataBlockIndexReader == null) {
      throw new IOException("Block index not loaded");
    }
    if (dataBlockOffset < 0
        || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) {
      throw new IOException("Requested block is out of range: "
          + dataBlockOffset + ", lastDataBlockOffset: "
          + trailer.getLastDataBlockOffset());
    }
    // For any given block from any given file, synchronize reads for said
    // block.
    // Without a cache, this synchronizing is needless overhead, but really
    // the other choice is to duplicate work (which the cache would prevent you
    // from doing).
    
    //@1@@@@@@@@@@@@@@@@@@@@
    BlockCacheKey cacheKey =
        new BlockCacheKey(name, dataBlockOffset,
            dataBlockEncoder.getDataBlockEncoding(),
            expectedBlockType);//封装为blockcachekey,内存和索引都需要

    boolean useLock = false;
    IdLock.Entry lockEntry = null;
    TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock");
    try {
      while (true) {
        if (useLock) {
          lockEntry = offsetLock.getLockEntry(dataBlockOffset);//获得block锁
        }

        //@2@@@@@@@@@@@@@@@@@@@@
        // Check cache for block. If found return.
        if (cacheConf.isBlockCacheEnabled()) {//表是否使用block cache
          // Try and get the block from the block cache. If the useLock variable is true then this
          // is the second time through the loop and it should not be counted as a block cache miss.
          HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
              cacheBlock, useLock);//从blockcache类(bucketCache,lrublockcache)获取block
          if (cachedBlock != null) {
            validateBlockType(cachedBlock, expectedBlockType);
            if (cachedBlock.getBlockType().isData()) {
              HFile.dataBlockReadCnt.incrementAndGet();

              // Validate encoding type for data blocks. We include encoding
              // type in the cache key, and we expect it to match on a cache hit.
              if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
                throw new IOException("Cached block under key " + cacheKey + " "
                  + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
                  + dataBlockEncoder.getDataBlockEncoding() + ")");
              }
            }
            return cachedBlock;
          }
          // Carry on, please load.
        }
        if (!useLock) {
          // check cache again with lock
          useLock = true;
          continue;
        }
        if (Trace.isTracing()) {
          traceScope.getSpan().addTimelineAnnotation("blockCacheMiss");
        }
        //@3@@@@@@@@@@@@@@@@@@@@
        // Load block from filesystem.没有在cache中获得,或是掉过cache步骤,用hdfs上的block数据,读取hfile文件必须获取block锁
        long startTimeNs = System.nanoTime();
        HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1,
            pread);
        validateBlockType(hfileBlock, expectedBlockType);

        final long delta = System.nanoTime() - startTimeNs;
        HFile.offerReadLatency(delta, pread);

        // Cache the block if necessary
        //@4@@@@@@@@@@@@@@@@@@@@
        if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) {
        	//表为blockcache,且client端blockcache也为true,则将当前block放入block cache
          cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
        }

        if (hfileBlock.getBlockType().isData()) {
          HFile.dataBlockReadCnt.incrementAndGet();
        }

        return hfileBlock;
      }
    } finally {
      traceScope.close();
      if (lockEntry != null) {
        offsetLock.releaseLockEntry(lockEntry);
      }
    }
  }

 

  步骤2详细步骤,以BucketCache类为例,

  LRUBucketCache也是装饰的词类(好像是阿里一小孩打的patch,厉害厉害)

  

  BucketCache类获取block过程

  1.获取在blockcache中的block锁(todo:需要改为读写锁)

  2.从bytebufferArray中获得数据

  3.当前bucketEntry(block)命中数加1

 /**
   * Get the buffer of the block with the specified key.
   * @param key block's cache key
   * @param caching true if the caller caches blocks on cache misses
   * @param repeat Whether this is a repeat lookup for the same block
   * @return buffer of specified cache key, or null if not in cache
   */
  @Override
  public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) {
    if (!cacheEnabled)
      return null;
    RAMQueueEntry re = ramCache.get(key);
    if (re != null) {//从ramCache中命中block,这个cache还没有写到blockcache中
      cacheStats.hit(caching);
      re.access(accessCount.incrementAndGet());
      return re.getData();
    }
    //从blockcache中取数据
    BucketEntry bucketEntry = backingMap.get(key);
    if(bucketEntry!=null) {
      long start = System.nanoTime();
      IdLock.Entry lockEntry = null;
      try {
        lockEntry = offsetLock.getLockEntry(bucketEntry.offset());//获取在blockcache中的block锁
        if (bucketEntry.equals(backingMap.get(key))) {
          int len = bucketEntry.getLength();//数据长度
          ByteBuffer bb = ByteBuffer.allocate(len);
          int lenRead = ioEngine.read(bb, bucketEntry.offset());//从bytebufferArray中获得数据
          if (lenRead != len) {
            throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");
          }
          Cacheable cachedBlock = bucketEntry.deserializerReference(
              deserialiserMap).deserialize(bb, true);
          long timeTaken = System.nanoTime() - start;
          cacheStats.hit(caching);
          cacheStats.ioHit(timeTaken);
          bucketEntry.access(accessCount.incrementAndGet());//当前bucketEntry(block)命中数加1
          if (this.ioErrorStartTime > 0) {
            ioErrorStartTime = -1;
          }
          return cachedBlock;
        }
      } catch (IOException ioex) {
        LOG.error("Failed reading block " + key + " from bucket cache", ioex);
        checkIOErrorIsTolerated();
      } finally {
        if (lockEntry != null) {
          offsetLock.releaseLockEntry(lockEntry);
        }
      }
    }
    if(!repeat)cacheStats.miss(caching);
    return null;
  }

  步骤4中的cacheBlock,也说说BucketCache

  这个方法很简单,

  内存数据先写到ramcache中,并放入ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues随机一个队列中

  之后又线程会处理这些queue

 /**
   * Cache the block to ramCache
   * @param cacheKey block's cache key
   * @param cachedItem block buffer
   * @param inMemory if block is in-memory
   * @param wait if true, blocking wait when queue is full
   */
  public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem,
      boolean inMemory, boolean wait) {
    if (!cacheEnabled)
      return;

    if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
      return;

    /*
     * Stuff the entry into the RAM cache so it can get drained to the
     * persistent store
     */
    RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem,
        accessCount.incrementAndGet(), inMemory);//封装ramentry
    ramCache.put(cacheKey, re);
    int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();//随机取write queue
    BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
    boolean successfulAddition = bq.offer(re);
    if (!successfulAddition && wait) {
      synchronized (cacheWaitSignals[queueNum]) {
        try {
          cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME);
        } catch (InterruptedException ie) {
          Thread.currentThread().interrupt();
        }
      }
      successfulAddition = bq.offer(re);
    }
    if (!successfulAddition) {
        ramCache.remove(cacheKey);
        failedBlockAdditions.incrementAndGet();
    } else {
      this.blockNumber.incrementAndGet();
      this.heapSize.addAndGet(cachedItem.heapSize());
      blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
    }
  }

  多个WriterThread线程,将数据从队列中取出ramcache写入到bytebufferarray(阿里据说的高速磁盘上,我眼瞎没看到)

                        并且将bba数据的索引封装为bucketentry放入 backingMap中,这个才是blockcache;

 

 /**
     * 1.循环获取ramcache中的数据,生成数据索引,将数据放入bba中
     * 2.数据索引放入bucket中
     * 3.从ramchache中删除
     * Flush the entries in ramCache to IOEngine and add bucket entry to
     * backingMap
     * @param entries
     * @throws InterruptedException
     */
    private void doDrain(List<RAMQueueEntry> entries)
        throws InterruptedException {
      //要写入bucket的entry 数据索引
      BucketEntry[] bucketEntries = new BucketEntry[entries.size()];
      //ramcache中内存中的数据
      RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()];
      int done = 0;
      while (entries.size() > 0 && cacheEnabled) {
        // Keep going in case we throw...
        RAMQueueEntry ramEntry = null;
        try {
          ramEntry = entries.remove(entries.size() - 1);//获得entry
          if (ramEntry == null) {
            LOG.warn("Couldn't get the entry from RAM queue, who steals it?");
            continue;
          }
          BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine,
              bucketAllocator, deserialiserMap, realCacheSize);//@@@@返回数据索引,将数据存储到bytebufferArray里面
          ramEntries[done] = ramEntry;
          bucketEntries[done++] = bucketEntry;//内存bba的数据索引
          if (ioErrorStartTime > 0) {
            ioErrorStartTime = -1;
          }
        } catch (BucketAllocatorException fle) {
          LOG.warn("Failed allocating for block "
              + (ramEntry == null ? "" : ramEntry.getKey()), fle);
        } catch (CacheFullException cfe) {
          if (!freeInProgress) {
            freeSpace();
          } else {
            Thread.sleep(50);
          }
        } catch (IOException ioex) {
          LOG.error("Failed writing to bucket cache", ioex);
          checkIOErrorIsTolerated();
        }
      }

      // Make sure that the data pages we have written are on the media before
      // we update the map.
      try {
        ioEngine.sync();
      } catch (IOException ioex) {
        LOG.error("Faild syncing IO engine", ioex);
        checkIOErrorIsTolerated();
        // Since we failed sync, free the blocks in bucket allocator
        for (int i = 0; i < done; ++i) {
          if (bucketEntries[i] != null) {
            bucketAllocator.freeBlock(bucketEntries[i].offset());
          }
        }
        done = 0;
      }

      for (int i = 0; i < done; ++i) {
        if (bucketEntries[i] != null) {//将数据索引放入bba中
          backingMap.put(ramEntries[i].getKey(), bucketEntries[i]);
        }
        RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey());
        if (ramCacheEntry != null) {
          heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize());
        }
      }

      if (bucketAllocator.getUsedSize() > acceptableSize()) {
        freeSpace();
      }
    }
  }
  

  下面是writeToCache方法

/**
     * 返回数据索引,将数据存储到bytebufferArray里面
     * 没看到高速磁盘什么事情
     * @param ioEngine
     * @param bucketAllocator
     * @param deserialiserMap
     * @param realCacheSize
     * @return
     * @throws CacheFullException
     * @throws IOException
     * @throws BucketAllocatorException
     */
    public BucketEntry writeToCache(final IOEngine ioEngine,
        final BucketAllocator bucketAllocator,
        final UniqueIndexMap<Integer> deserialiserMap,
        final AtomicLong realCacheSize) throws CacheFullException, IOException,
        BucketAllocatorException {
      int len = data.getSerializedLength();
      // This cacheable thing can't be serialized...
      if (len == 0) return null;
      long offset = bucketAllocator.allocateBlock(len);
      BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime,
          inMemory);
      bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);//创建数据的引用,到be中
      try {//将数据写到bytebufferArray里面
        if (data instanceof HFileBlock) {
          ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader();//获取data bb
          sliceBuf.rewind();
          assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
          ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE);
          ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer);
          ioEngine.write(sliceBuf, offset);//写数据到bytebufferArray里面
          ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE);
        } else {
          ByteBuffer bb = ByteBuffer.allocate(len);
          data.serialize(bb);
          ioEngine.write(bb, offset);
        }
      } catch (IOException ioe) {
        // free it in bucket allocator
        bucketAllocator.freeBlock(offset);
        throw ioe;
      }
      
      realCacheSize.addAndGet(len);
      return bucketEntry;
    }
  }

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    hbase bucket cache

    - `hbase.bucketcache.persistent.path` 设置为 `"file:/disk1/hbase/cache.meta"`,表示 Bucket Cache 元数据的存储路径,用于重启时恢复缓存状态。 #### 工作流程 1. **首次使用流程**: - 当第一次请求一个块...

    基于Java语言的HBase分布式数据库设计源码分析

    本项目为基于Java语言的HBase分布式数据库设计源码,包含5415个文件,涵盖了4575个Java源文件、223个Ruby脚本、118个XML...该源码是GitHub上HBase项目的核心代码,适用于需要深入理解和分析HBase架构与实现细节的场景。

    基于Java和多种语言的HBase分布式数据库设计源码分析

    该项目为HBase分布式数据库的设计源码,采用Java作为主要开发语言,并融合了Ruby、Shell、Python、HTML、JavaScript、CSS、C++、C、PHP等语言,总文件量达到5854个。其中,Java文件占主导地位,达4975个,其他文件...

    基于Java实现的Apache HBase分布式数据库设计源码分析

    该优化项目为Apache HBase分布式数据库设计,采用Java作为主要开发语言,辅以Ruby、Shell、Python等工具。项目源码包含5301个文件,其中...项目旨在深入分析HBase的内部实现机制,为分布式数据库的研究和应用提供参考。

    HBase实战源码

    9. **BlockCache**:HBase的缓存机制,用于缓存热点Row Key对应的行数据,提高读性能。 10. **Get和Scan操作**:Get操作用于按Row Key获取单行数据,Scan则支持多行连续扫描。 11. **Put和Delete操作**:Put用于...

    hbase权威指南源码

    - **BlockCache**和**MemStore**:缓存机制用于提高读写性能。 8. **扩展性**: HBase支持水平扩展,可以通过增加RegionServer来处理更多数据。 9. **监控与运维**: HBase提供丰富的监控指标,如JMX、Web UI等...

    hbase权威指南 配套源码

    《HBase权威指南》是一本深入探讨分布式列式数据库HBase的专业书籍,其配套源码提供了书中所提及的示例代码和实践案例,方便读者更好地理解和应用HBase。以下将详细解析HBase的相关知识点。 HBase是建立在Apache ...

    HBase堆外内存测试

    HBase的BlockCache是一种用于提高读取性能的重要机制,它主要用于缓存HFile的Block数据,以减少磁盘I/O操作,从而加快读取速度。BlockCache分为两种类型:LruCache和BucketCache。 - **LruCache**:基于最近最少...

    HBase源码分析

    HBase源码分析揭示了HBase在RPC通信机制方面的一些关键技术点,这包括了角色分配、通信信道建立、通信接口协议定义、对象序列化、传输控制和会话管理,以及在传输过程中可能出现的错误处理和重试机制。 HBase中的...

    最近很火的大数据Hadoop之Hbase0.99.2最新版源码

    《深入解析Hadoop之HBase 0.99.2源码分析》 在当今的信息化社会,大数据处理已经成为企业核心竞争力的关键要素。Hadoop作为开源大数据处理框架的领头羊,其生态中的HBase更是备受关注。HBase是基于Google Bigtable...

    HBase源码分析与开发实战

    HBase源码分析与开发实战视频技术讲解高阶视频教程以及课件,内部讲解资料 内容非常详细 值得想要提高薪水的人去学习了解

    hbase_coprocessor_hbase_coprocessorjava_源码

    这个压缩包“hbase_coprocessor_hbase_coprocessorjava_源码”显然包含了用Java API实现的HBase Coprocessor的相关代码和工具类,这将帮助我们深入理解如何在HBase中使用Coprocessors。 首先,Coprocessor是HBase中...

    hbase-0.94.13 jar和源码

    通过分析源码,开发者可以深入理解分布式数据库的设计思想,而jar包则使开发者能够快速构建基于HBase的应用。不过需要注意的是,0.94.13已经是较旧的版本,最新的稳定版本可能会包含更多的功能和改进,因此在生产...

    基于Java语言的HBase数据库设计源码分析

    该系统是一款基于Java语言的HBase数据库设计源码,共计26个文件,其中包含18个Java源文件、5个XML配置文件、2个Git忽略文件和1个属性文件。

    hbase-0.98.1源码包

    源码包“hbase-0.98.1-src.tar.gz”提供了HBase 0.98.1版本的完整源代码,对于理解其内部工作原理、进行二次开发或调试是非常有价值的。 HBase的核心概念包括: 1. 表:HBase中的表由行和列族组成,表名全局唯一。...

    基于Java语言的Apache HBase数据库设计源码分析

    该项目为Apache HBase数据库设计源码,采用Java语言编写,包含5428个文件,其中4589个为Java源文件,222个为Ruby脚本,118个为XML配置文件,其余文件包括Shell脚本、Python脚本、HTML、CSS、JavaScript、C++、PHP、C...

    本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统,大数据处理技术

    本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...

    HBase的Java客户端源码

    HBASE的java版本的客户端,运行代码需要设定环境变量且打包成jar文件运行

Global site tag (gtag.js) - Google Analytics