- 浏览: 137820 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
SINCE1978:
还没细看,似乎取材自一本书《scala与clojure设计模式 ...
Scala设计模式 -
HowKeyond:
补充,我代码中监听了session失效事件,并做了重连,但重连 ...
ZK Timeout再讨论 -
HowKeyond:
请问这一般是什么原因引起的呢?怎么解决?我被这个问题困扰几个星 ...
ZK Timeout再讨论 -
chenchao051:
耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 ...
回答一位网友对Scala的提问 -
dogstar:
唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.
回答一位网友对Scala的提问
先来看下LruBlockCache的构造,关键是看清每个参数的作用:
接下来我们还需要了解几个相关的类:
这个类代表了LruBlockCache中的一个条目,它里面有个非常关键的枚举:
通过以下代码可以更好的解释:
另一方面,因为是LRU算法的实现,该类也实现了一个比较器:
因为它实现了HeapSize这个接口,所以它能返回这个条目所占用的heap大小。
另一个关键的类是LruBlcokCache的内部类:
这个类的作用是把所有的block分到不同的priority bucket中,每个BlockPriority都会有自己的一个bucket
我们可以开始看将一个新的block加入缓存:
有必要来看一下这个清理线程,在初始化LruBlockCache的时候就已经将其启动:
看具体的evict方法:
/** * Configurable constructor. Use this constructor if not using defaults. * @param maxSize maximum size of this cache, in bytes * @param blockSize expected average size of blocks, in bytes * @param evictionThread whether to run evictions in a bg thread or not * @param mapInitialSize initial size of backing ConcurrentHashMap * @param mapLoadFactor initial load factor of backing ConcurrentHashMap * @param mapConcurrencyLevel initial concurrency factor for backing CHM * @param minFactor percentage of total size that eviction will evict until * @param acceptableFactor percentage of total size that triggers eviction * @param singleFactor percentage of total size for single -access blocks * @param multiFactor percentage of total size for multiple -access blocks * @param memoryFactor percentage of total size for in -memory blocks */ public LruBlockCache( long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor , float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor) { if(singleFactor + multiFactor + memoryFactor != 1) { throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0"); } if(minFactor >= acceptableFactor) { throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); } if(minFactor >= 1.0f || acceptableFactor >= 1.0f) { throw new IllegalArgumentException("all factors must be < 1" ); } this. maxSize = maxSize; this. blockSize = blockSize; map = new ConcurrentHashMap<String,CachedBlock>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); this. minFactor = minFactor; this. acceptableFactor = acceptableFactor; this. singleFactor = singleFactor; this. multiFactor = multiFactor; this. memoryFactor = memoryFactor; this. stats = new CacheStats(); this. count = new AtomicLong(0); this. elements = new AtomicLong(0); this. overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); this. size = new AtomicLong(this.overhead); if(evictionThread) { this. evictionThread = new EvictionThread(this); this. evictionThread.start(); // FindBugs SC_START_IN_CTOR } else { this. evictionThread = null ; } this. scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod , TimeUnit.SECONDS); }
接下来我们还需要了解几个相关的类:
public class CachedBlock implements HeapSize, Comparable<CachedBlock >
这个类代表了LruBlockCache中的一个条目,它里面有个非常关键的枚举:
static enum BlockPriority { /** * Accessed a single time (used for scan -resistance) */ SINGLE, /** * Accessed multiple times */ MULTI, /** * Block from in -memory store */ MEMORY };
通过以下代码可以更好的解释:
public CachedBlock(String blockName, ByteBuffer buf, long accessTime, boolean inMemory ) { this. blockName = blockName; this.buf = buf; this. accessTime = accessTime; this. size = ClassSize. align(blockName.length()) + ClassSize.align(buf.capacity()) + PER_BLOCK_OVERHEAD; //第一次缓存一个block时,假设inMemory为false(默认),那么会把这个CachedBlock的BlockPriority 设置为SINGLE, 否则为MEMORY。 if(inMemory ) { this. priority = BlockPriority. MEMORY; } else { this. priority = BlockPriority. SINGLE; } }
/** * Block has been accessed. Update its local access time. */ public void access(long accessTime) { this. accessTime = accessTime; // 当再次访问到时,假如此时CacheedBlock的BlockPriority的值是SINGLE,则把它变为MULTI if(this. priority == BlockPriority. SINGLE) { this. priority = BlockPriority. MULTI; } }
另一方面,因为是LRU算法的实现,该类也实现了一个比较器:
public int compareTo(CachedBlock that) { if(this. accessTime == that.accessTime ) return 0; return this.accessTime < that.accessTime ? 1 : -1; }
因为它实现了HeapSize这个接口,所以它能返回这个条目所占用的heap大小。
另一个关键的类是LruBlcokCache的内部类:
private class BlockBucket implements Comparable<BlockBucket >
这个类的作用是把所有的block分到不同的priority bucket中,每个BlockPriority都会有自己的一个bucket
我们可以开始看将一个新的block加入缓存:
public void cacheBlock(String blockName, ByteBuffer buf, boolean inMemory ) { //private final ConcurrentHashMap<String,CachedBlock> map, 维护了缓存映射 CachedBlock cb = map.get(blockName); //如果这个block已经被缓存了,那么就抛出一个运行时异常 if(cb != null) { throw new RuntimeException("Cached an already cached block" ); } //初始化一个新的CachedBlock cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory); //得到最新的heapsize long newSize = size.addAndGet(cb.heapSize()); //将新增的block放到map中 map.put(blockName, cb); //elements记录了目前缓存的数目 elements.incrementAndGet(); //假如最新的heapsize大于了acceptableSize(见下面的方法),那么就需要进行evict动作 if(newSize > acceptableSize() && ! evictionInProgress) { runEviction(); } } //----------------------------- //假如没有特定的清理线程,那么就使用目前的线程来进行evict,这显然不是一个好主意,会造成阻塞,假如有清理线程,那么调用其evict方法 private void runEviction() { if(evictionThread == null) { evict(); } else { evictionThread.evict(); //事实上是触发了清理线程的notify } }
有必要来看一下这个清理线程,在初始化LruBlockCache的时候就已经将其启动:
private static class EvictionThread extends Thread { private WeakReference<LruBlockCache> cache; public EvictionThread(LruBlockCache cache) { super( "LruBlockCache.EvictionThread" ); setDaemon( true); this. cache = new WeakReference<LruBlockCache>(cache); } @Override //这里使用了wait和notify机制,线程将一直等待,知道有notify消息过来说需要进行清理了 public void run() { while( true) { synchronized(this ) { try { this.wait(); } catch(InterruptedException e) {} } //这里cache使用了弱引用 LruBlockCache cache = this.cache .get(); if(cache == null) break; cache.evict(); } } public void evict() { synchronized( this) { this.notify(); // FindBugs NN_NAKED_NOTIFY } } }
看具体的evict方法:
void evict () { // Ensure only one eviction at a time if(!evictionLock.tryLock()) return; try { evictionInProgress = true; long currentSize = this.size .get(); //需要释放掉的heap大小 long bytesToFree = currentSize - minSize(); if (LOG.isDebugEnabled()) { LOG.debug("Block cache LRU eviction started; Attempting to free " + StringUtils. byteDesc(bytesToFree) + " of total=" + StringUtils. byteDesc(currentSize)); } if(bytesToFree <= 0) return; // Instantiate priority buckets //初始化三个桶,来存放single,multi,和memory,比例分别为25%,50%,25% BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize , singleSize()); BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize , multiSize()); BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize , memorySize()); // Scan entire map putting into appropriate buckets for(CachedBlock cachedBlock : map.values()) { switch(cachedBlock.getPriority()) { case SINGLE : { bucketSingle.add(cachedBlock); break; } case MULTI : { bucketMulti.add(cachedBlock); break; } case MEMORY : { bucketMemory.add(cachedBlock); break; } } } //接下来将三个桶放入PriorityQueue PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3); //会调用到CachedBlockQueue的add方法,下面分析 bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); bucketQueue.add(bucketMemory); int remainingBuckets = 3; long bytesFreed = 0; BlockBucket bucket; //溢出的多的那个桶,会越先被清理, 参看BlockBucket的compareTo方法 //这里也说明,三个桶本身没有优先级 while((bucket = bucketQueue.poll()) != null) { long overflow = bucket.overflow(); if(overflow > 0) { // 本次要释放掉的内存 long bucketBytesToFree = Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); //free方法在下面解释 bytesFreed += bucket.free(bucketBytesToFree); } remainingBuckets--; } if (LOG.isDebugEnabled()) { long single = bucketSingle.totalSize(); long multi = bucketMulti.totalSize(); long memory = bucketMemory.totalSize(); LOG.debug("Block cache LRU eviction completed; " + "freed=" + StringUtils.byteDesc(bytesFreed) + ", " + "total=" + StringUtils.byteDesc( this.size .get()) + ", " + "single=" + StringUtils.byteDesc(single) + ", " + "multi=" + StringUtils.byteDesc(multi) + ", " + "memory=" + StringUtils.byteDesc(memory)); } } finally { stats.evict(); evictionInProgress = false; evictionLock.unlock(); } }
public void add(CachedBlock cb) { //如果当前的heapsize小于maxsize,直接加到queue中,这边的queue也是一个PriorityQueue if(heapSize < maxSize) { queue.add(cb); heapSize += cb.heapSize(); } else { // 否则先取出列表头 CachedBlock head = queue.peek(); //判断假如的cb是不是比head大,实际的意义就是看新加入的cb是不是比head新,参看CachedBlock的compareTo方法,假如新,则继续 if(cb.compareTo(head) > 0) { heapSize += cb.heapSize(); heapSize -= head.heapSize(); if(heapSize > maxSize ) { //取出head queue.poll(); } else { heapSize += head.heapSize(); } queue.add(cb); } } }
public long free(long toFree) { //这边的queue是CacheBlockQueue类型,这个get方法很重要,它对PriorityQueue做了反序,这样的话就把时间最早的放在队列头 LinkedList<CachedBlock> blocks = queue.get(); long freedBytes = 0; for(CachedBlock cb: blocks) { freedBytes += evictBlock(cb); if(freedBytes >= toFree) { return freedBytes; } } return freedBytes; }
//最后调用这个方法将block从map中移除: protected long evictBlock(CachedBlock block) { map.remove(block.getName()); size.addAndGet(-1 * block.heapSize()); elements.decrementAndGet(); stats.evicted(); return block.heapSize(); }
发表评论
-
简单HBase笔记
2012-10-26 16:35 1981一、Client-side write buffe ... -
诡异的超长时间GC问题定位
2012-10-19 16:45 4358HBase的GC策略采用PawNew+CMS, 这是大众化的配 ... -
ZK Timeout再讨论
2012-10-18 15:29 29886http://crazyjvm.iteye.com/blog/ ... -
HBase集群中的某几台rs挂掉后导致整个集群挂掉的案例
2012-10-10 09:35 0集群规模(小型):13dn 6rs 现象:2台rs在很短 ... -
HBase异常记录
2012-10-09 11:19 7645一、YouAreDeadException FA ... -
HBase日志中的异常记录1
2012-10-09 10:49 2晕菜了,这狗屁编辑器把我的格式全弄没了...mlgbd! 异 ... -
zookeeper超时--minSessionTimeout与maxSessionTimeout
2012-10-08 16:55 11125很多同学可能有这样的疑问,我明明把连接zk客户端的超时 ... -
HBase备份与还原
2012-09-18 13:53 2793转载两篇相关文章: 第一篇、http://blog.nosq ... -
Thrift安装中出现的问题(For HBase)
2012-09-06 10:55 1888安装巨简单: 进入thrif ... -
hadoop 0.20.203 数据迁移至 cdh3u3
2012-08-29 08:40 1483假如用hadoop0.20.203版本,hbase会提 ... -
HBase Backup Options
2012-08-23 15:24 1319If you are thinking about using ... -
HBase RegionServer挂掉后的源码分析
2012-08-13 11:20 4121首先肯定是HMaster通过ZK发现某RS挂掉了,HMaste ... -
HBase架构简介
2012-08-06 10:47 1159HBase的架构简介,有兴趣的可以看看。
相关推荐
### HBase源码解析与开发实战 #### 一、HBase简介 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了...
hadoop2.x、Hive、HBase源码解析+企业级应用视频,本人花钱买的视频,全部的还没有看,如果感觉老师讲的不好,请不要碰我,谢谢
HBase源码分析揭示了HBase在RPC通信机制方面的一些关键技术点,这包括了角色分配、通信信道建立、通信接口协议定义、对象序列化、传输控制和会话管理,以及在传输过程中可能出现的错误处理和重试机制。 HBase中的...
《HBase实战源码》是针对Apache HBase这一分布式、高性能、基于列族的NoSQL数据库的深度解析书籍。源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供...
本项目为HBase数据库设计源码解析,采用Java语言编写,共包含1248个文件,其中1128个为Java源文件,辅以71个文本文件、39个图片文件、7个图片文件以及2个XML文件和1个Git忽略文件。
HBase 1.2.0是该数据库的一个稳定版本,包含了众多优化和改进,对于想要深入理解HBase工作原理或者进行大数据分析的学习者来说,研究其源码是非常有价值的。 一、HBase架构与核心概念 1. 表与Region:HBase中的...
源码包“hbase-0.98.1-src.tar.gz”提供了HBase 0.98.1版本的完整源代码,对于理解其内部工作原理、进行二次开发或调试是非常有价值的。 HBase的核心概念包括: 1. 表:HBase中的表由行和列族组成,表名全局唯一。...
《深入理解HBase:源码剖析与测试实践》 HBase,全称为Hadoop Database,是一种基于Apache Hadoop的开源分布式非关系型数据库。它的设计灵感来源于Google的Bigtable,旨在处理大规模数据集,通常在PB级别以上。...
HBase源码(hbase-2.4.9-src.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File ...
三、HBase源码解析 1. **Region分配与负载均衡**:在`org.apache.hadoop.hbase.master`包下,RegionServerTracker类负责监控Region服务器状态,LoadBalancer类实现Region的负载均衡策略。 2. **元数据管理**:元...
《HBase权威指南》是一本深入探讨分布式大数据存储系统HBase的专业书籍,其源码提供了对书中各个章节涉及技术的直观展示和实践操作。源码分析可以帮助读者更好地理解和应用书中的理论知识,以下是对这份源码包中可能...
《深入理解HBase:从HBaseTest源码解析开始》 HBase,作为Apache软件基金会的一个开源项目,是构建在Hadoop之上的分布式列式数据库,特别适合处理大规模数据。其设计灵感来源于Google的Bigtable,提供高吞吐量的...
在HBase 1.3.1的源码中,我们可以深入理解这个系统的内部工作原理,包括以下几个关键知识点: 1. **Maven构建系统**:HBase使用Maven作为构建工具,源码组织结构遵循Maven的标准目录结构,如src/main/java存放Java...
"基于SpringBoot集成HBase过程解析" SpringBoot集成HBase是当前大数据处理和存储解决方案中的一种常见组合。HBase是基于Hadoop的分布式、可扩展的NoSQL数据库,能够存储大量的结构化和非结构化数据。SpringBoot则...
Hbase权威指南 随书源代码 源码包 绝对完整版 maven工程,带pom文件,可以直接作为一个完整工程导入eclipse等ide。