hbase get/scan的时候
StoreFileScanner next方法
使用HfileReaderV2的内部静态类ScannerV2(HFileScanner) next方法
使用HFileReaderV2 readBlock方法:
1.封装为blockcachekey
2.从blockcache中获取block
3.从hfile中获取block
4.将block放入blockcache中
HfileReaderV2 readBlock
/** * Read in a file block. * @param dataBlockOffset offset to read. * @param onDiskBlockSize size of the block * @param cacheBlock * @param pread Use positional read instead of seek+read (positional is * better doing random reads whereas seek+read is better scanning). * @param isCompaction is this block being read as part of a compaction * @param expectedBlockType the block type we are expecting to read with this * read operation, or null to read whatever block type is available * and avoid checking (that might reduce caching efficiency of * encoded data blocks) * @return Block wrapped in a ByteBuffer. * @throws IOException */ @Override public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock, boolean pread, final boolean isCompaction, BlockType expectedBlockType) throws IOException { if (dataBlockIndexReader == null) { throw new IOException("Block index not loaded"); } if (dataBlockOffset < 0 || dataBlockOffset >= trailer.getLoadOnOpenDataOffset()) { throw new IOException("Requested block is out of range: " + dataBlockOffset + ", lastDataBlockOffset: " + trailer.getLastDataBlockOffset()); } // For any given block from any given file, synchronize reads for said // block. // Without a cache, this synchronizing is needless overhead, but really // the other choice is to duplicate work (which the cache would prevent you // from doing). //@1@@@@@@@@@@@@@@@@@@@@ BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset, dataBlockEncoder.getDataBlockEncoding(), expectedBlockType);//封装为blockcachekey,内存和索引都需要 boolean useLock = false; IdLock.Entry lockEntry = null; TraceScope traceScope = Trace.startSpan("HFileReaderV2.readBlock"); try { while (true) { if (useLock) { lockEntry = offsetLock.getLockEntry(dataBlockOffset);//获得block锁 } //@2@@@@@@@@@@@@@@@@@@@@ // Check cache for block. If found return. if (cacheConf.isBlockCacheEnabled()) {//表是否使用block cache // Try and get the block from the block cache. If the useLock variable is true then this // is the second time through the loop and it should not be counted as a block cache miss. HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock, useLock);//从blockcache类(bucketCache,lrublockcache)获取block if (cachedBlock != null) { validateBlockType(cachedBlock, expectedBlockType); if (cachedBlock.getBlockType().isData()) { HFile.dataBlockReadCnt.incrementAndGet(); // Validate encoding type for data blocks. We include encoding // type in the cache key, and we expect it to match on a cache hit. if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) { throw new IOException("Cached block under key " + cacheKey + " " + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: " + dataBlockEncoder.getDataBlockEncoding() + ")"); } } return cachedBlock; } // Carry on, please load. } if (!useLock) { // check cache again with lock useLock = true; continue; } if (Trace.isTracing()) { traceScope.getSpan().addTimelineAnnotation("blockCacheMiss"); } //@3@@@@@@@@@@@@@@@@@@@@ // Load block from filesystem.没有在cache中获得,或是掉过cache步骤,用hdfs上的block数据,读取hfile文件必须获取block锁 long startTimeNs = System.nanoTime(); HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, pread); validateBlockType(hfileBlock, expectedBlockType); final long delta = System.nanoTime() - startTimeNs; HFile.offerReadLatency(delta, pread); // Cache the block if necessary //@4@@@@@@@@@@@@@@@@@@@@ if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) { //表为blockcache,且client端blockcache也为true,则将当前block放入block cache cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory()); } if (hfileBlock.getBlockType().isData()) { HFile.dataBlockReadCnt.incrementAndGet(); } return hfileBlock; } } finally { traceScope.close(); if (lockEntry != null) { offsetLock.releaseLockEntry(lockEntry); } } }
步骤2详细步骤,以BucketCache类为例,
LRUBucketCache也是装饰的词类(好像是阿里一小孩打的patch,厉害厉害)
BucketCache类获取block过程
1.获取在blockcache中的block锁(todo:需要改为读写锁)
2.从bytebufferArray中获得数据
3.当前bucketEntry(block)命中数加1
/** * Get the buffer of the block with the specified key. * @param key block's cache key * @param caching true if the caller caches blocks on cache misses * @param repeat Whether this is a repeat lookup for the same block * @return buffer of specified cache key, or null if not in cache */ @Override public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat) { if (!cacheEnabled) return null; RAMQueueEntry re = ramCache.get(key); if (re != null) {//从ramCache中命中block,这个cache还没有写到blockcache中 cacheStats.hit(caching); re.access(accessCount.incrementAndGet()); return re.getData(); } //从blockcache中取数据 BucketEntry bucketEntry = backingMap.get(key); if(bucketEntry!=null) { long start = System.nanoTime(); IdLock.Entry lockEntry = null; try { lockEntry = offsetLock.getLockEntry(bucketEntry.offset());//获取在blockcache中的block锁 if (bucketEntry.equals(backingMap.get(key))) { int len = bucketEntry.getLength();//数据长度 ByteBuffer bb = ByteBuffer.allocate(len); int lenRead = ioEngine.read(bb, bucketEntry.offset());//从bytebufferArray中获得数据 if (lenRead != len) { throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected"); } Cacheable cachedBlock = bucketEntry.deserializerReference( deserialiserMap).deserialize(bb, true); long timeTaken = System.nanoTime() - start; cacheStats.hit(caching); cacheStats.ioHit(timeTaken); bucketEntry.access(accessCount.incrementAndGet());//当前bucketEntry(block)命中数加1 if (this.ioErrorStartTime > 0) { ioErrorStartTime = -1; } return cachedBlock; } } catch (IOException ioex) { LOG.error("Failed reading block " + key + " from bucket cache", ioex); checkIOErrorIsTolerated(); } finally { if (lockEntry != null) { offsetLock.releaseLockEntry(lockEntry); } } } if(!repeat)cacheStats.miss(caching); return null; }
步骤4中的cacheBlock,也说说BucketCache
这个方法很简单,
内存数据先写到ramcache中,并放入ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues随机一个队列中
之后又线程会处理这些queue
/** * Cache the block to ramCache * @param cacheKey block's cache key * @param cachedItem block buffer * @param inMemory if block is in-memory * @param wait if true, blocking wait when queue is full */ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory, boolean wait) { if (!cacheEnabled) return; if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) return; /* * Stuff the entry into the RAM cache so it can get drained to the * persistent store */ RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);//封装ramentry ramCache.put(cacheKey, re); int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();//随机取write queue BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum); boolean successfulAddition = bq.offer(re); if (!successfulAddition && wait) { synchronized (cacheWaitSignals[queueNum]) { try { cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } successfulAddition = bq.offer(re); } if (!successfulAddition) { ramCache.remove(cacheKey); failedBlockAdditions.incrementAndGet(); } else { this.blockNumber.incrementAndGet(); this.heapSize.addAndGet(cachedItem.heapSize()); blocksByHFile.put(cacheKey.getHfileName(), cacheKey); } }
多个WriterThread线程,将数据从队列中取出ramcache写入到bytebufferarray(阿里据说的高速磁盘上,我眼瞎没看到)
并且将bba数据的索引封装为bucketentry放入 backingMap中,这个才是blockcache;
/** * 1.循环获取ramcache中的数据,生成数据索引,将数据放入bba中 * 2.数据索引放入bucket中 * 3.从ramchache中删除 * Flush the entries in ramCache to IOEngine and add bucket entry to * backingMap * @param entries * @throws InterruptedException */ private void doDrain(List<RAMQueueEntry> entries) throws InterruptedException { //要写入bucket的entry 数据索引 BucketEntry[] bucketEntries = new BucketEntry[entries.size()]; //ramcache中内存中的数据 RAMQueueEntry[] ramEntries = new RAMQueueEntry[entries.size()]; int done = 0; while (entries.size() > 0 && cacheEnabled) { // Keep going in case we throw... RAMQueueEntry ramEntry = null; try { ramEntry = entries.remove(entries.size() - 1);//获得entry if (ramEntry == null) { LOG.warn("Couldn't get the entry from RAM queue, who steals it?"); continue; } BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);//@@@@返回数据索引,将数据存储到bytebufferArray里面 ramEntries[done] = ramEntry; bucketEntries[done++] = bucketEntry;//内存bba的数据索引 if (ioErrorStartTime > 0) { ioErrorStartTime = -1; } } catch (BucketAllocatorException fle) { LOG.warn("Failed allocating for block " + (ramEntry == null ? "" : ramEntry.getKey()), fle); } catch (CacheFullException cfe) { if (!freeInProgress) { freeSpace(); } else { Thread.sleep(50); } } catch (IOException ioex) { LOG.error("Failed writing to bucket cache", ioex); checkIOErrorIsTolerated(); } } // Make sure that the data pages we have written are on the media before // we update the map. try { ioEngine.sync(); } catch (IOException ioex) { LOG.error("Faild syncing IO engine", ioex); checkIOErrorIsTolerated(); // Since we failed sync, free the blocks in bucket allocator for (int i = 0; i < done; ++i) { if (bucketEntries[i] != null) { bucketAllocator.freeBlock(bucketEntries[i].offset()); } } done = 0; } for (int i = 0; i < done; ++i) { if (bucketEntries[i] != null) {//将数据索引放入bba中 backingMap.put(ramEntries[i].getKey(), bucketEntries[i]); } RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey()); if (ramCacheEntry != null) { heapSize.addAndGet(-1 * ramEntries[i].getData().heapSize()); } } if (bucketAllocator.getUsedSize() > acceptableSize()) { freeSpace(); } } }
下面是writeToCache方法
/** * 返回数据索引,将数据存储到bytebufferArray里面 * 没看到高速磁盘什么事情 * @param ioEngine * @param bucketAllocator * @param deserialiserMap * @param realCacheSize * @return * @throws CacheFullException * @throws IOException * @throws BucketAllocatorException */ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator, final UniqueIndexMap<Integer> deserialiserMap, final AtomicLong realCacheSize) throws CacheFullException, IOException, BucketAllocatorException { int len = data.getSerializedLength(); // This cacheable thing can't be serialized... if (len == 0) return null; long offset = bucketAllocator.allocateBlock(len); BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);//创建数据的引用,到be中 try {//将数据写到bytebufferArray里面 if (data instanceof HFileBlock) { ByteBuffer sliceBuf = ((HFileBlock) data).getBufferReadOnlyWithHeader();//获取data bb sliceBuf.rewind(); assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; ByteBuffer extraInfoBuffer = ByteBuffer.allocate(HFileBlock.EXTRA_SERIALIZATION_SPACE); ((HFileBlock) data).serializeExtraInfo(extraInfoBuffer); ioEngine.write(sliceBuf, offset);//写数据到bytebufferArray里面 ioEngine.write(extraInfoBuffer, offset + len - HFileBlock.EXTRA_SERIALIZATION_SPACE); } else { ByteBuffer bb = ByteBuffer.allocate(len); data.serialize(bb); ioEngine.write(bb, offset); } } catch (IOException ioe) { // free it in bucket allocator bucketAllocator.freeBlock(offset); throw ioe; } realCacheSize.addAndGet(len); return bucketEntry; } }
相关推荐
- `hbase.bucketcache.persistent.path` 设置为 `"file:/disk1/hbase/cache.meta"`,表示 Bucket Cache 元数据的存储路径,用于重启时恢复缓存状态。 #### 工作流程 1. **首次使用流程**: - 当第一次请求一个块...
本项目为基于Java语言的HBase分布式数据库设计源码,包含5415个文件,涵盖了4575个Java源文件、223个Ruby脚本、118个XML...该源码是GitHub上HBase项目的核心代码,适用于需要深入理解和分析HBase架构与实现细节的场景。
该项目为HBase分布式数据库的设计源码,采用Java作为主要开发语言,并融合了Ruby、Shell、Python、HTML、JavaScript、CSS、C++、C、PHP等语言,总文件量达到5854个。其中,Java文件占主导地位,达4975个,其他文件...
该优化项目为Apache HBase分布式数据库设计,采用Java作为主要开发语言,辅以Ruby、Shell、Python等工具。项目源码包含5301个文件,其中...项目旨在深入分析HBase的内部实现机制,为分布式数据库的研究和应用提供参考。
9. **BlockCache**:HBase的缓存机制,用于缓存热点Row Key对应的行数据,提高读性能。 10. **Get和Scan操作**:Get操作用于按Row Key获取单行数据,Scan则支持多行连续扫描。 11. **Put和Delete操作**:Put用于...
- **BlockCache**和**MemStore**:缓存机制用于提高读写性能。 8. **扩展性**: HBase支持水平扩展,可以通过增加RegionServer来处理更多数据。 9. **监控与运维**: HBase提供丰富的监控指标,如JMX、Web UI等...
《HBase权威指南》是一本深入探讨分布式列式数据库HBase的专业书籍,其配套源码提供了书中所提及的示例代码和实践案例,方便读者更好地理解和应用HBase。以下将详细解析HBase的相关知识点。 HBase是建立在Apache ...
HBase的BlockCache是一种用于提高读取性能的重要机制,它主要用于缓存HFile的Block数据,以减少磁盘I/O操作,从而加快读取速度。BlockCache分为两种类型:LruCache和BucketCache。 - **LruCache**:基于最近最少...
HBase源码分析揭示了HBase在RPC通信机制方面的一些关键技术点,这包括了角色分配、通信信道建立、通信接口协议定义、对象序列化、传输控制和会话管理,以及在传输过程中可能出现的错误处理和重试机制。 HBase中的...
《深入解析Hadoop之HBase 0.99.2源码分析》 在当今的信息化社会,大数据处理已经成为企业核心竞争力的关键要素。Hadoop作为开源大数据处理框架的领头羊,其生态中的HBase更是备受关注。HBase是基于Google Bigtable...
HBase源码分析与开发实战视频技术讲解高阶视频教程以及课件,内部讲解资料 内容非常详细 值得想要提高薪水的人去学习了解
这个压缩包“hbase_coprocessor_hbase_coprocessorjava_源码”显然包含了用Java API实现的HBase Coprocessor的相关代码和工具类,这将帮助我们深入理解如何在HBase中使用Coprocessors。 首先,Coprocessor是HBase中...
通过分析源码,开发者可以深入理解分布式数据库的设计思想,而jar包则使开发者能够快速构建基于HBase的应用。不过需要注意的是,0.94.13已经是较旧的版本,最新的稳定版本可能会包含更多的功能和改进,因此在生产...
该系统是一款基于Java语言的HBase数据库设计源码,共计26个文件,其中包含18个Java源文件、5个XML配置文件、2个Git忽略文件和1个属性文件。
源码包“hbase-0.98.1-src.tar.gz”提供了HBase 0.98.1版本的完整源代码,对于理解其内部工作原理、进行二次开发或调试是非常有价值的。 HBase的核心概念包括: 1. 表:HBase中的表由行和列族组成,表名全局唯一。...
该项目为Apache HBase数据库设计源码,采用Java语言编写,包含5428个文件,其中4589个为Java源文件,222个为Ruby脚本,118个为XML配置文件,其余文件包括Shell脚本、Python脚本、HTML、CSS、JavaScript、C++、PHP、C...
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
HBASE的java版本的客户端,运行代码需要设定环境变量且打包成jar文件运行