- 浏览: 980134 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
HashMap详解:http://donald-draper.iteye.com/blog/2361702
ConcurrentMap介绍:http://donald-draper.iteye.com/blog/2361719
HashMap是线程非安全的,Hashtable是线程安全的,并发访问支持较差,但已经过时,今天我们来看,并发包中的
线程安全且可并发访问的ConcurrentHashMap
先看一下HashEntry
再来看一下Segment定义
从上面来看:
Segment拥有与ConCurrentHashMap相同的负载因子,临界条件,拥有一个hash table。
来看Segment的put操作
Segment的put操,我们有一下几点要分析
1.
2.
3.
4.
我们一点一点的看,
1.
scanAndLockForPut函数的作用主要是:
在尝试获取锁失败时,遍历HashEntry,确认key存不存在,如果不存在,
则创建一个新Hash Entry,返回,确保一直持有锁。
2.
4.
从分析上面4步,可以看出,Segment的put操作,首先尝试获取锁,如果获取锁失败,
则Key在片段hash table中的索引,遍历索引对应的Hash Entry链,如找不到key对应
HashEntry,则创建一个HashEntry,这都是在尝试次数小于最大尝试次数MAX_SCAN_RETRIES情况下,MAX_SCAN_RETRIES默认为2。这样做的目的是为确保,进行put操作时,仍持有锁。
然后定位key在片段table中的索引,并放在链头,如果实际size达到临界条件,则重新hash,
创建2倍原始容量的hash table,重新建立hash table。
再看Segment的remove操作
remove方法中有一点要看,
来看scanAndLock
从上面来看
Segment的remove操作,首先尝试获取锁失败,则继续尝试获取锁,在获取锁的过程中,
定位key在片段table的HashEntry链表索引,遍历链表,如果找到对应的HashEntry,则移除,
如果尝试次数超过最大尝试次数,则lock,则遍历链表,找到对应的HashEntry,则移除。
再来看Segment的replace操作
从上面的replace(K key, int hash, V value)和 replace(K key, int hash, V oldValue, V newValue) 来看,与remove的基本思路相同,这里就不在说,唯一的区别是,当替换值时,修改计数要自增1。
再看Segment清除
从上面可以看出:Clear锁住整个table。
再看ConcurrentHashMap的构造
}
ConcurrentHashMap的构造主要是计算片段偏移量,片段掩码,临界条件,创建0片段和片段数组;片段数组中,片段的容量,负载因子,临界条件,并发访问量为默认值。
总结:
ConcurrentHashMap是线程安全的,可并发访问,不允许key或value的值为null,默认的容量为16,负载因子为0.75,并发访问量为16。ConcurrentHashMap中有一个Segment数组,默认数组大小为16,Segment中有一个HashEntry数组类似于HashMap中的table,Segment继承了可重入锁ReentrantLock,Segment的修改其hash table的操作都要使用lock。Segment的put操作,首先尝试获取锁,如果获取锁失败,则Key在片段hash table中的索引,遍历索引对应的Hash Entry链,如找不到key对应HashEntry,则创建一个HashEntry,这都是在尝试次数小于最大尝试次数MAX_SCAN_RETRIES情况下,MAX_SCAN_RETRIES默认为2。这样做的目的是为确保,进行put操作时,仍持有锁。然后定位key在片段table中的索引,并放在链头,如果实际size达到临界条件,则重新hash,创建2倍原始容量的hash table,重新建立hash table。Segment的remove操作,首先尝试获取锁失败,则继续尝试获取锁,在获取锁的过程中,定位key在片段table的HashEntry链表索引,遍历链表,如果找到对应HashEntry,则移除,如果尝试次数超过最大尝试次数,则lock,则遍历链表,找到对应的HashEntry,则移除。replace与remove的基本思路相同,唯一的区别是,当替换值时,修改计数要自增1。
put,remove和replace操作是锁住片段table中,key对应的索引HashEntry链表,而Clear为锁住整个table。ConcurrentHashMap通过将所有HashEntry分散在不同的Segment,及锁机制实现了并发访问。
ConcurrentHashMap的剩下部分,我们下一篇再讲。
ConcurrentHashMap解析后续:http://donald-draper.iteye.com/blog/2363201
ConcurrentMap介绍:http://donald-draper.iteye.com/blog/2361719
HashMap是线程非安全的,Hashtable是线程安全的,并发访问支持较差,但已经过时,今天我们来看,并发包中的
线程安全且可并发访问的ConcurrentHashMap
package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; import java.io.Serializable; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; /** * A hash table supporting full concurrency of retrievals and * adjustable expected concurrency for updates. This class obeys the * same functional specification as {@link java.util.Hashtable}, and * includes versions of methods corresponding to each method of * <tt>Hashtable</tt>. However, even though all operations are * thread-safe, retrieval operations do [i]not[/i] entail locking, * and there is [i]not[/i] any support for locking the entire table * in a way that prevents all access. This class is fully * interoperable with <tt>Hashtable</tt> in programs that rely on its * thread safety but not on its synchronization details. * ConcurrentHashMap提供完全线程安全的并发访问。ConcurrentHashMap与HashTable的 功能基本相同。即使所有操作是线程安全,检索操作不许要lock,也不支持lock the entry table,那样会阻止所有访问。在编程的时候ConcurrentHashMap与HashTable是 协作的,使用哪一个依赖于是否线程安全,而不是,是否需要同步。 * <p> Retrieval operations (including <tt>get</tt>) generally do not * block, so may overlap with update operations (including * <tt>put</tt> and <tt>remove</tt>). Retrievals reflect the results * of the most recently [i]completed[/i] update operations holding * upon their onset. For aggregate operations such as <tt>putAll</tt> * and <tt>clear</tt>, concurrent retrievals may reflect insertion or * removal of only some entries. Similarly, Iterators and * Enumerations return elements reflecting the state of the hash table * at some point at or since the creation of the iterator/enumeration. * They do [i]not[/i] throw {@link ConcurrentModificationException}. * However, iterators are designed to be used by only one thread at a time. * 检索操作(get等)一般不会阻塞,也许会与更新操作(put,remove等)出现overlap(重叠)。 检索操作反应的是最近更新操作完成的结果。进一步说,putAll和clear操作,并发检索也许 会反应插入或移除Entry的结果。相似的,Iterators和Enumerations返回的Entry,反应者 hash table在某一点,比如创建iterator/enumeration。它们并不会抛出异常,iterators 设计为只能有一个线程访问,在同一时间点。 * <p> The allowed concurrency among update operations is guided by * the optional <tt>concurrencyLevel</tt> constructor argument * (default <tt>16</tt>), which is used as a hint for internal sizing. The * table is internally partitioned to try to permit the indicated * number of concurrent updates without contention. Because placement * in hash tables is essentially random, the actual concurrency will * vary. Ideally, you should choose a value to accommodate as many * threads as will ever concurrently modify the table. Using a * significantly higher value than you need can waste space and time, * and a significantly lower value can lead to thread contention. But * overestimates and underestimates within an order of magnitude do * not usually have much noticeable impact. A value of one is * appropriate when it is known that only one thread will modify and * all others will only read. Also, resizing this or any other kind of * hash table is a relatively slow operation, so, when possible, it is * a good idea to provide estimates of expected table sizes in * constructors. * ConcurrentHashMap可以同时并发访问,并发数量与构造函数的concurrencyLevel有关, 默认为16,concurrencyLevel表示内部的hashtable的容量。内部的hash table是分块的,块 的数量表示无竞争的情况下,可以并发更新的数量。由于元素放到hash table中,是随机的, 实际的并发数,以实际为准。理想情况下,我们应该选择一个合适值,使更多的线程可以同时 修改hash table。用于个较高的值,可能会浪费不必要的空间和时间,太低的话,将导致 线程的竞争。当时在一个数量级的情况下过多或过少,没有太有的不同。一个近似的值为 当一个线程修改,可以有多少个线程可进行读操作。重新扩容或其他种类hash table是一个 较慢的操作,如果可能的话,在构造hash table的时候,最后给一个预估的size。 * <p>This class and its views and iterators implement all of the * [i]optional[/i] methods of the {@link Map} and {@link Iterator} * interfaces. * ConcurrentHashMap实现所有Map接口中的视图和iterators。这有点像hash table,而不是 HashMap,ConcurrentHashMap不允许key和value为null。 * <p> Like {@link Hashtable} but unlike {@link HashMap}, this class * does [i]not[/i] allow <tt>null</tt> to be used as a key or value. * * <p>This class is a member of the * <a href="{@docRoot}/../technotes/guides/collections/index.html"> * Java Collections Framework</a>. * * @since 1.5 * @author Doug Lea * @param <K> the type of keys maintained by this map * @param <V> the type of mapped values */ public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { private static final long serialVersionUID = 7249069246763182397L; /* * The basic strategy is to subdivide the table among Segments, * each of which itself is a concurrently readable hash table. To * reduce footprint, all but one segments are constructed only * when first needed (see ensureSegment). To maintain visibility * in the presence of lazy construction, accesses to segments as * well as elements of segment's table must use volatile access, * which is done via Unsafe within methods segmentAt etc * below. These provide the functionality of AtomicReferenceArrays * but reduce the levels of indirection. Additionally, * volatile-writes of table elements and entry "next" fields * within locked operations use the cheaper "lazySet" forms of * writes (via putOrderedObject) because these writes are always * followed by lock releases that maintain sequential consistency * of table updates. * ConcurrentHashMap最基本的策略是将table分布在不同的Segments,每个 Segments都是一个并发的可读hash table。 To reduce footprint, all but one segments are constructed only when first needed (see ensureSegment). (为了减少footprint,当需要的时候,我们才构造segments)。 为了保证懒构造的可见性,访问segments和访问segments中table中elements一样, 必须用volatile访问,实现的方式是通过Unsafe和segmentAt等。 这个方式提供了AtomicReferenceArrays的功能,除了减少间接访问的次数。 另外,volatile-writes table元素和修改Entry的next指针,通过putOrderedObject, 使用比较简单的lazySet形式,因为这些写操作总是跟着lock的释放,以维持 表更新的一致性。 * Historical note: The previous version of this class relied * heavily on "final" fields, which avoided some volatile reads at * the expense of a large initial footprint. Some remnants of * that design (including forced construction of segment 0) exist * to ensure serialization compatibility. */ 经验建议:ConcurrentHashMap的先前版本,过多的依赖于final,这避免了 在大量初始化封装实体的情况,可见性的读操作。ConcurrentHashMap有一些 保留性的设计,不如在构造是强制构造segment,以保证序列化的兼容性。 /* ---------------- Constants -------------- */ /** * The default initial capacity for this table, * used when not otherwise specified in a constructor. */ table的默认容量 static final int DEFAULT_INITIAL_CAPACITY = 16; /** * The default load factor for this table, used when not * otherwise specified in a constructor. */ 默认的负载因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; /** * The default concurrency level for this table, used when not * otherwise specified in a constructor. */ table的并发访问级别,在构造中非必须 static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * The maximum capacity, used if a higher value is implicitly * specified by either of the constructors with arguments. MUST * be a power of two <= 1<<30 to ensure that entries are indexable * using ints. */ 最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; /** * The minimum capacity for per-segment tables. Must be a power * of two, at least two to avoid immediate resizing on next use * after lazy construction. */ 每个片段 table的最小容量,默认至少为2,避免当next指针使用时,需要立即扩容 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; /** * The maximum number of segments to allow; used to bound * constructor arguments. Must be power of two less than 1 << 24. */ 最大片段数 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative /** * Number of unsynchronized retries in size and containsValue * methods before resorting to locking. This is used to avoid * unbounded retries if tables undergo continuous modification * which would make it impossible to obtain an accurate result. */ 在重排序lock之前,非同步化尝试调用size和containsValue方法的次数。 这个为了避免无限制的尝试,当table需要持续性的修改,这样做是为了 尽可能的保证获取准确的结果 static final int RETRIES_BEFORE_LOCK = 2; /* ---------------- Fields -------------- */ /** * holds values which can't be initialized until after VM is booted. */ private static class Holder { /** * Enable alternative hashing of String keys? * * <p>Unlike the other hash map implementations we do not implement a * threshold for regulating whether alternative hashing is used for * String keys. Alternative hashing is either enabled for all instances * or disabled for all instances. */ static final boolean ALTERNATIVE_HASHING; static { // Use the "threshold" system property even though our threshold // behaviour is "ON" or "OFF". String altThreshold = java.security.AccessController.doPrivileged( new sun.security.action.GetPropertyAction( "jdk.map.althashing.threshold")); int threshold; try { threshold = (null != altThreshold) ? Integer.parseInt(altThreshold) : Integer.MAX_VALUE; // disable alternative hashing if -1 if (threshold == -1) { threshold = Integer.MAX_VALUE; } if (threshold < 0) { throw new IllegalArgumentException("value must be positive integer."); } } catch(IllegalArgumentException failed) { throw new Error("Illegal value for 'jdk.map.althashing.threshold'", failed); } ALTERNATIVE_HASHING = threshold <= MAXIMUM_CAPACITY; } } /** * A randomizing value associated with this instance that is applied to * hash code of keys to make hash collisions harder to find. */ hash种子 private transient final int hashSeed = randomHashSeed(this); //获取ConcurrentHashMap实例的随机种子 private static int randomHashSeed(ConcurrentHashMap instance) { if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) { return sun.misc.Hashing.randomHashSeed(instance); } return 0; } /** * Mask value for indexing into segments. The upper bits of a * key's hash code are used to choose the segment. */ //片段索引的掩码,可以hash值的高位,用于选择片段索引 final int segmentMask; /** * Shift value for indexing within segments. */ 索引在片段中的偏移量 final int segmentShift; /** * The segments, each of which is a specialized hash table. */ 片段是一个特殊的Hash table; final Segment<K,V>[] segments; transient Set<K> keySet;//Key集合 transient Set<Map.Entry<K,V>> entrySet;//Entry集合 transient Collection<V> values;//vaule集合 }
先看一下HashEntry
/** * ConcurrentHashMap list entry. Note that this is never exported * out as a user-visible Map.Entry. */ //HashEntry内部使用,对用户不可见 static final class HashEntry<K,V> { final int hash;//hash值 final K key;//key //key和hash值不可修改 volatile V value;//value值和next内存可见 volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * Sets next field with volatile write semantics. (See above * about use of putOrderedObject.) */ //使用UNSAFE的putOrderedObject设置next指针 final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
再来看一下Segment定义
/** * Segments are specialized versions of hash tables. This * subclasses from ReentrantLock opportunistically, just to * simplify some locking and avoid separate construction. */ //片段是一个特殊版本的hash table。为可重入锁ReentrantLock的子类, 仅仅为了简化加锁和避免分离构造 static final class Segment<K,V> extends ReentrantLock implements Serializable { /* * Segments maintain a table of entry lists that are always * kept in a consistent state, so can be read (via volatile * reads of segments and tables) without locking. This * requires replicating nodes when necessary during table * resizing, so the old lists can be traversed by readers * still using old version of table. * Segments维护者一个Entry列表table,总是保持一致性状态,因此可以 不通过锁,我们就可以读取segments and tables的最新值。在重新扩容的时候, 旧的entry列表被迁移到新的segments and tables上,读线程,能可以用旧版本的table。 * This class defines only mutative methods requiring locking. * Except as noted, the methods of this class perform the * per-segment versions of ConcurrentHashMap methods. (Other * methods are integrated directly into ConcurrentHashMap * methods.) These mutative methods use a form of controlled * spinning on contention via methods scanAndLock and * scanAndLockForPut. These intersperse tryLocks with * traversals to locate nodes. The main benefit is to absorb * cache misses (which are very common for hash tables) while * obtaining locks so that traversal is faster once * acquired. We do not actually use the found nodes since they * must be re-acquired under lock anyway to ensure sequential * consistency of updates (and in any case may be undetectably * stale), but they will normally be much faster to re-locate. * Also, scanAndLockForPut speculatively creates a fresh node * to use in put if no node is found. */ Segments只会在修改hash table的方法中,是用lock,之所以加锁,是为了 保证ConcurrentHashMap中每segment的一致性。而其他一些方法,则直接放在 ConcurrentHashMap中。这些更新table的方式是通过scanAndLock和scanAndLockForPut 方法,控制自旋竞争。tryLocks方法,只会在遍历table时,锁住NODE。 这样做的好处是,在遍历table的情况,尽快获取locks,以保证缓存的可用性与可靠性。 为了保证更新的一致性,在锁住的情况下,由于要重新获取锁,我们一般不会访问锁住的节点。 但实际上的速度,要比重新定位要快。如果node不存在,则scanAndLockForPut会创建一个新的节点 放到table中。 private static final long serialVersionUID = 2249069246763182397L; /** * The maximum number of times to tryLock in a prescan before * possibly blocking on acquire in preparation for a locked * segment operation. On multiprocessors, using a bounded * number of retries maintains cache acquired while locating * nodes. */ 最大尝试获取锁次数,tryLock可能会阻塞,准备锁住segment操作获取锁。 在多处理器中,用一个有界的尝试次数,保证在定位node的时候,可以从缓存直接获取。 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; /** * The per-segment table. Elements are accessed via * entryAt/setEntryAt providing volatile semantics. */ segment内部的Hash table,访问HashEntry,通过具有volatile的entryAt/setEntryAt方法 transient volatile HashEntry<K,V>[] table; /** * The number of elements. Accessed only either within locks * or among other volatile reads that maintain visibility. */ segment的table中HashEntry的数量,只有在lock或其他保证可见性的volatile reads 中,才可以访问count transient int count; /** * The total number of mutative operations in this segment. * Even though this may overflows 32 bits, it provides * sufficient accuracy for stability checks in CHM isEmpty() * and size() methods. Accessed only either within locks or * among other volatile reads that maintain visibility. */ 在segment上所有的修改操作数。尽管可能会溢出,但它为isEmpty和size方法, 提供了有效准确稳定的检查或校验。只有在lock或其他保证可见性的volatile reads 中,才可以访问 transient int modCount; /** * The table is rehashed when its size exceeds this threshold. * (The value of this field is always <tt>(int)(capacity * * loadFactor)</tt>.) */ table重新hash的临界条件,为(capacity * loadFactor) transient int threshold; /** * The load factor for the hash table. Even though this value * is same for all segments, it is replicated to avoid needing * links to outer object. * @serial */ hash table的负载因子,尽管这个值是通过复制的,所有的segments相等, 为了避免需要连接到外部object final float loadFactor; //构造Segment,负载因子,临界条件,table Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } }
从上面来看:
Segment拥有与ConCurrentHashMap相同的负载因子,临界条件,拥有一个hash table。
来看Segment的put操作
//Segment /*如果key存在且onlyIfAbsent为false,则更新旧值,否则创建新hash Entry,添加到table中 final V put(K key, int hash, V value, boolean onlyIfAbsent) { //尝试获取锁,获取失败返回node为null,否则scanAndLockForPut HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; //获取table索引 int index = (tab.length - 1) & hash; //获取table索引为index的第一个HashEntry HashEntry<K,V> first = entryAt(tab, index); //遍历table的index索引上的HashEntry链 for (HashEntry<K,V> e = first;;) { if (e != null) { //如果HashEntry不为null K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { //如果存在key且hash相等,onlyIfAbsent为false,则更新旧值为value e.value = value; //修改数+1 ++modCount; } break; } e = e.next; } else { if (node != null) //如果创建的节点不为null,则将node放在table的index索引对应的HashEntry链的头部 node.setNext(first); else //否创建新的HashEntry,放在链头,next指向链的原始头部。 node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) //如果table的Hash Entry数量size大于临界条件,且小于最大容量,则重新hash rehash(node); else //添加node到table的索引对应的链表 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
Segment的put操,我们有一下几点要分析
final V put(K key, int hash, V value, boolean onlyIfAbsent)
1.
//尝试获取锁,获取失败返回node为null,否则scanAndLockForPut HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
2.
HashEntry<K,V>[] tab = table; //获取table索引 int index = (tab.length - 1) & hash; //获取table索引为index的第一个HashEntry HashEntry<K,V> first = entryAt(tab, index);
3.
if (c > threshold && tab.length < MAXIMUM_CAPACITY) //如果table的Hash Entry数量size大于临界条件,且小于最大容量,则重新hash rehash(node);
4.
else //添加node到table的索引对应的链表 setEntryAt(tab, index, node);
我们一点一点的看,
1.
//尝试获取锁,获取失败返回node为null,否则scanAndLockForPut HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
//Segment /** * Scans for a node containing given key while trying to * acquire lock, creating and returning one if not found. Upon * return, guarantees that lock is held. UNlike in most * methods, calls to method equals are not screened(筛选): Since * traversal speed doesn't matter, we might as well help warm * up the associated code and accesses as well. * 在尝试获取锁失败时,遍历HashEntry,确认key存不存在,如果不存在,则创建一个新Hash Entry,返回, 保证持有锁。 * @return a new node if key not found, else null */ private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //根据Segment片段和hash值,返回对应的Hash Entry HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node,运行尝试的次数 //当尝试获取锁失败 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { //当尝试获取锁失败,且尝试次数小于0,首次尝试,如果Entry为null,则创建新节点 if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { //如果尝试次数大于最大尝试次数,则锁住,跳出循环 lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; } /* Gets the table entry for the given segment and hash */ //根据Segment片段和hash值,返回对应的Hash Entry @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) { HashEntry<K,V>[] tab; return (seg == null || (tab = seg.table) == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); }
scanAndLockForPut函数的作用主要是:
在尝试获取锁失败时,遍历HashEntry,确认key存不存在,如果不存在,
则创建一个新Hash Entry,返回,确保一直持有锁。
2.
HashEntry<K,V>[] tab = table; //获取table索引 int index = (tab.length - 1) & hash; //获取table索引为index的第一个HashEntry HashEntry<K,V> first = entryAt(tab, index);
/** * Gets the ith element of given table (if nonnull) with volatile * read semantics. Note: This is manually integrated into a few * performance-sensitive methods to reduce call overhead. */ @SuppressWarnings("unchecked") //返回片段table中,索引i对应的HashEntry 链的第一个HashEntry static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { return (tab == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); } 3. if (c > threshold && tab.length < MAXIMUM_CAPACITY) //如果table的Hash Entry数量size大于临界条件,且小于最大容量,则重新hash rehash(node); /** * Doubles size of table and repacks entries, also adding the * given node to new table */ @SuppressWarnings("unchecked") private void rehash(HashEntry<K,V> node) { /* * Reclassify nodes in each list to new table. Because we * are using power-of-two expansion, the elements from * each bin must either stay at same index, or move with a * power of two offset. We eliminate unnecessary node * creation by catching cases where old nodes can be * reused because their next fields won't change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; //扩容容量为原来的2倍,重新计算临界条件 int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; //遍历table,将HashEntry,放在新table的对应的索引HashEntry链上。 for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; //重新获取原始HashEntry在新table中的索引 int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
4.
else //添加node到table的索引对应的链表 setEntryAt(tab, index, node);
/** * Sets the ith element of given table, with volatile write * semantics. (See above about use of putOrderedObject.) */ //将HashEntry添加到片段table中的索引i对应的HashEntry链中。 static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) { UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e); }
从分析上面4步,可以看出,Segment的put操作,首先尝试获取锁,如果获取锁失败,
则Key在片段hash table中的索引,遍历索引对应的Hash Entry链,如找不到key对应
HashEntry,则创建一个HashEntry,这都是在尝试次数小于最大尝试次数MAX_SCAN_RETRIES情况下,MAX_SCAN_RETRIES默认为2。这样做的目的是为确保,进行put操作时,仍持有锁。
然后定位key在片段table中的索引,并放在链头,如果实际size达到临界条件,则重新hash,
创建2倍原始容量的hash table,重新建立hash table。
再看Segment的remove操作
//Segment /** * Remove; match on key only if value null, else match both. */ 从片段的table中移除key,value值相等的hashEntry final V remove(Object key, int hash, Object value) { //尝试获取锁失败时,遍历table中key所在索引的HashEntry链表,主要为确保持有锁 if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; //定位key在table中的hashEntry链表索引 int index = (tab.length - 1) & hash; HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; //如果索引位置上HashEntry链表存在,则遍历链表,找到对应的HashEntry,则移除 while (e != null) { K k; HashEntry<K,V> next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; }
remove方法中有一点要看,
//尝试获取锁失败时,遍历table中key所在索引的HashEntry链表,主要为确保持有锁 if (!tryLock()) scanAndLock(key, hash);
来看scanAndLock
//Segment /** * Scans for a node containing the given key while trying to * acquire lock for a remove or replace operation. Upon * return, guarantees that lock is held. Note that we must * lock even if the key is not found, to ensure sequential * consistency of updates. */ //如果尝试获取锁失败,则遍历key在table索引链表,查看对应的HashEntry,是否存在,存在则移除 private void scanAndLock(Object key, int hash) { // similar to but simpler than scanAndLockForPut //获取key在片段table中的HashEntry链表 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; int retries = -1; //尝试获取锁失败,则遍历HashEntry链表,如果尝试次数超过最大尝试次数,则lock while (!tryLock()) { HashEntry<K,V> f; if (retries < 0) { if (e == null || key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; retries = -1; } } }
从上面来看
Segment的remove操作,首先尝试获取锁失败,则继续尝试获取锁,在获取锁的过程中,
定位key在片段table的HashEntry链表索引,遍历链表,如果找到对应的HashEntry,则移除,
如果尝试次数超过最大尝试次数,则lock,则遍历链表,找到对应的HashEntry,则移除。
再来看Segment的replace操作
//Segment final boolean replace(K key, int hash, V oldValue, V newValue) { if (!tryLock()) scanAndLock(key, hash); boolean replaced = false; try { HashEntry<K,V> e; for (e = entryForHash(this, hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { if (oldValue.equals(e.value)) { e.value = newValue; ++modCount; replaced = true; } break; } } } finally { unlock(); } return replaced; } //Segment final V replace(K key, int hash, V value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V> e; for (e = entryForHash(this, hash); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; e.value = value; ++modCount; break; } } } finally { unlock(); } return oldValue; }
从上面的replace(K key, int hash, V value)和 replace(K key, int hash, V oldValue, V newValue) 来看,与remove的基本思路相同,这里就不在说,唯一的区别是,当替换值时,修改计数要自增1。
再看Segment清除
//Segment final void clear() { lock(); try { HashEntry<K,V>[] tab = table; for (int i = 0; i < tab.length ; i++) //设置片段table的索引i的HashEntry链表为null setEntryAt(tab, i, null); ++modCount; count = 0; } finally { unlock(); } }
从上面可以看出:Clear锁住整个table。
再看ConcurrentHashMap的构造
/** * Creates a new, empty map with a default initial capacity (16), * load factor (0.75) and concurrencyLevel (16). */ public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new, empty map with the specified initial capacity, * and with default load factor (0.75) and concurrencyLevel (16). * * @param initialCapacity the initial capacity. The implementation * performs internal sizing to accommodate this many elements. * @throws IllegalArgumentException if the initial capacity of * elements is negative. */ public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new, empty map with the specified initial capacity * and load factor and with the default concurrencyLevel (16). * * @param initialCapacity The implementation performs internal * sizing to accommodate this many elements. * @param loadFactor the load factor threshold, used to control resizing. * Resizing may be performed when the average number of elements per * bin exceeds this threshold. * @throws IllegalArgumentException if the initial capacity of * elements is negative or the load factor is nonpositive * * @since 1.6 */ public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new, empty map with the specified initial * capacity, load factor and concurrency level. * * @param initialCapacity the initial capacity. The implementation * performs internal sizing to accommodate this many elements. * @param loadFactor the load factor threshold, used to control resizing. * Resizing may be performed when the average number of elements per * bin exceeds this threshold. * @param concurrencyLevel the estimated number of concurrently * updating threads. The implementation performs internal sizing * to try to accommodate this many threads. concurrencyLevel为并发更数 * @throws IllegalArgumentException if the initial capacity is * negative or the load factor or concurrencyLevel are * nonpositive. */ @SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //参数值,异常,则抛出IllegalArgumentException if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; //ConcurrentHashMap的片段数量 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift;//片段偏移量 this.segmentMask = ssize - 1;//片段掩码 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; //计算片段中Hash table的容量 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] //创建0片段 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); //创建片段数组 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss;
}
ConcurrentHashMap的构造主要是计算片段偏移量,片段掩码,临界条件,创建0片段和片段数组;片段数组中,片段的容量,负载因子,临界条件,并发访问量为默认值。
总结:
ConcurrentHashMap是线程安全的,可并发访问,不允许key或value的值为null,默认的容量为16,负载因子为0.75,并发访问量为16。ConcurrentHashMap中有一个Segment数组,默认数组大小为16,Segment中有一个HashEntry数组类似于HashMap中的table,Segment继承了可重入锁ReentrantLock,Segment的修改其hash table的操作都要使用lock。Segment的put操作,首先尝试获取锁,如果获取锁失败,则Key在片段hash table中的索引,遍历索引对应的Hash Entry链,如找不到key对应HashEntry,则创建一个HashEntry,这都是在尝试次数小于最大尝试次数MAX_SCAN_RETRIES情况下,MAX_SCAN_RETRIES默认为2。这样做的目的是为确保,进行put操作时,仍持有锁。然后定位key在片段table中的索引,并放在链头,如果实际size达到临界条件,则重新hash,创建2倍原始容量的hash table,重新建立hash table。Segment的remove操作,首先尝试获取锁失败,则继续尝试获取锁,在获取锁的过程中,定位key在片段table的HashEntry链表索引,遍历链表,如果找到对应HashEntry,则移除,如果尝试次数超过最大尝试次数,则lock,则遍历链表,找到对应的HashEntry,则移除。replace与remove的基本思路相同,唯一的区别是,当替换值时,修改计数要自增1。
put,remove和replace操作是锁住片段table中,key对应的索引HashEntry链表,而Clear为锁住整个table。ConcurrentHashMap通过将所有HashEntry分散在不同的Segment,及锁机制实现了并发访问。
ConcurrentHashMap的剩下部分,我们下一篇再讲。
ConcurrentHashMap解析后续:http://donald-draper.iteye.com/blog/2363201
发表评论
-
Executors解析
2017-04-07 14:38 1244ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4450ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2116ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4986Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9096Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6078Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3036Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20513Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1501Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1071Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1586Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1061Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1324package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1190/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1158Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1671package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2023线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 920Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1732Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2133Queue接口定义:http://donald-draper. ...
相关推荐
初始化ConcurrentHashMap时,会根据初始化参数计算出Segment数组的长度,并根据该长度初始化数组,然后对数组中的每个Segment进行初始化。初始化过程中,会计算段偏移量segmentShift和段掩码segmentMask,这些参数...
而ConcurrentHashMap是线程安全的HashMap实现,它在Java 7中采用了分段锁(Segment)的设计,每个Segment实际上是一个小型的HashMap,通过锁来确保并发安全。put过程包括: 1. 确保Segment初始化,如果需要则创建新...
在Java 7中,`ConcurrentHashMap`采用了分段锁的设计,将整个哈希表分成若干个段(Segment),每个段有自己的锁,不同段之间可以并发操作,提高了并发性能。每个段内部再由数组和链表组成,类似于Java 7的`HashMap`...
为了加速对Segment和Hash槽的定位,`ConcurrentHashMap`中的每个Segment的大小都被设置为2的幂次方。这意味着可以通过简单的位运算来计算出给定哈希值对应的Segment和Hash槽的位置。 **3. 特殊的扩容机制** 当...
《并发HashMap 1.7的源码解析》 在Java并发编程中,`ConcurrentHashMap`是一个非常重要的数据结构,它提供了线程安全的哈希映射功能,且在性能上优于传统的`synchronized HashMap`。本文主要分析`ConcurrentHashMap...
本节我们将深入解析`ConcurrentHashMap`的`put`和`get`方法,以及其初始化过程。 首先,`ConcurrentHashMap`的初始化过程在第一次`put`操作时触发,其核心在于`initTable`方法。这个方法确保在多线程环境下安全地...
与HashMap不同,ConcurrentHashMap采用了分段锁(Segment)的设计,将整个散列表分为多个独立的段,每个段有自己的锁。这样,在多线程环境下,不同的线程可以同时对不同段进行操作,提高了并发性能。每个段内部仍然...
基于Java并发容器ConcurrentHashMap#put方法解析 Java并发容器ConcurrentHashMap是Java中最常用的数据结构之一,它的出现是为了解决HashMap在多线程并发环境下的线程不安全问题。在ConcurrentHashMap中,put方法是...
《并发编程中的 ConcurrentHashMap 深度解析》 在Java编程领域,ConcurrentHashMap是一个至关重要的数据结构,尤其在多线程环境下,它提供了高效、安全的并发访问性能。本资料"ConcurrentHashMap共18页.pdf.zip...
【标题】: "Java Map深度解析:从面试题看HashMap与ConcurrentHashMap" 【描述】: 本资源针对Java后端开发人员,由有7年大厂经验的专家精心整理,通过一系列面试题目来深入剖析Java中的Map,特别是HashMap和...
- `ConcurrentHashMap`使用了分段锁技术(Segment Locking),即将整个哈希表分为多个段(Segment),每个段使用独立的锁,这样可以支持高并发下的读写操作。 - `ConcurrentHashMap`在读取时不加锁,只在写入时才...
### Java集合面试题全集解析 #### 一、List、Set、Map的区别 - **List**:有序集合,允许重复元素。典型实现包括`ArrayList`(基于数组)、`LinkedList`(基于双向链表)。适用于频繁的插入和删除操作时选择`...
对于并发环境,可以使用`ConcurrentHashMap`,它在JDK1.7中使用锁分段策略(Segment),而在JDK1.8中则使用CAS操作和同步块,提供了更好的并发性能。 12. `ConcurrentHashMap`的底层: - JDK1.7:`Segment`数组,...
- 每个Segment相当于一个小的HashMap。 - 实现了线程安全,并且性能优于传统的同步机制。 #### 七、设计模式 1. **单例模式**:确保某个类只有一个实例,并提供一个全局访问点。例如,在用户数据管理中使用单例...
以下是对Java集合相关知识点的详细解析: 1. **List、Set、Map的区别**: - **List**:有序、允许重复元素,存储元素的位置有顺序关系。常见的实现类包括ArrayList(基于数组实现,提供快速随机访问)和LinkedList...
### BAT互联网Java面试题汇总知识点解析 #### 一、HashMap的底层原理及冲突解决机制 - **HashMap** 是一种常用的数据结构,在Java中用于存储Key-Value键值对。其核心是一个数组`table`,数组中的每个元素是一个...
根据提供的文档信息,本文将详细解析并发容器(Map、List、Set)的实战应用及其原理。并发容器在Java多线程环境下发挥着至关重要的作用,它们的设计旨在解决非线程安全容器在高并发场景下的性能瓶颈问题。接下来,...
- 每个段(segment)包含一个子哈希表,段之间互不干扰,每个段有自己的锁,这使得多个线程可以同时访问不同的段而不会发生冲突。 #### 五、乐观锁概念 乐观锁是一种并发控制机制,它假设数据在大部分情况下不会发生...