- 浏览: 566345 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (267)
- 随笔 (4)
- Spring (13)
- Java (61)
- HTTP (3)
- Windows (1)
- CI(Continuous Integration) (3)
- Dozer (1)
- Apache (11)
- DB (7)
- Architecture (41)
- Design Patterns (11)
- Test (5)
- Agile (1)
- ORM (3)
- PMP (2)
- ESB (2)
- Maven (5)
- IDE (1)
- Camel (1)
- Webservice (3)
- MySQL (6)
- CentOS (14)
- Linux (19)
- BI (3)
- RPC (2)
- Cluster (9)
- NoSQL (7)
- Oracle (25)
- Loadbalance (7)
- Web (5)
- tomcat (1)
- freemarker (1)
- 制造 (0)
最新评论
-
panamera:
如果设置了连接需要密码,Dynamic Broker-Clus ...
ActiveMQ 集群配置 -
panamera:
请问你的最后一种模式Broker-C节点是不是应该也要修改持久 ...
ActiveMQ 集群配置 -
maosheng:
longshao_feng 写道楼主使用 文件共享 模式的ma ...
ActiveMQ 集群配置 -
longshao_feng:
楼主使用 文件共享 模式的master-slave,produ ...
ActiveMQ 集群配置 -
tanglanwen:
感触很深,必定谨记!
少走弯路的十条忠告
ConcurrentHashMap 的分析和使用:
为什么要使用 ConcurrentHashMap:
线程不安全的 HashMap。因为多线程环境下,使用 HashMap 进行 put 操作会引起死循环,导致 CPU 利用率接近 100%,所以在并发情况下不能使用 HashMap。
效率低下的 HashTable。HashTable 容器使用 synchronized 来保证线程安全,但在线程竞争激烈的情况下 HashTable 的效率非常低下。因为当一个线程访问 HashTable 的同步方法时,其他线程访问 HashTable 的同步方法时,可能会进入阻塞或轮询状态。如线程 1 使用 put 进行添加元素,线程 2 不但不能使用 put 方法添加元素,并且也不能使用 get 方法来获取元素,所以竞争越激烈效率越低。
ConcurrentHashMap 的锁分段技术:
HashTable 容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问 HashTable 的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是 ConcurrentHashMap 所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
ConcurrentHashMap 的结构:
ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。Segment 是一种可重入锁 ReentrantLock,在 ConcurrentHashMap 里扮演锁的角色,HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的结构和 HashMap 类似,是一种数组和链表结构, 一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment 守护者一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得它对应的 Segment 锁。
ConcurrentHashMap 的初始化:
ConcurrentHashMap 初始化方法是通过 initialCapacity,loadFactor, concurrencyLevel几个参数来初始化 segments 数组,段偏移量 segmentShift,段掩码 segmentMask 和每个segment 里的 HashEntry数组。
...........
/* ---------------- Constants -------------- */
/**
* The default initial capacity for this table,
* used when not otherwise specified in a constructor.
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* The default load factor for this table, used when not
* otherwise specified in a constructor.
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* The default concurrency level for this table, used when not
* otherwise specified in a constructor.
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* The maximum capacity, used if a higher value is implicitly
* specified by either of the constructors with arguments. MUST
* be a power of two <= 1<<30 to ensure that entries are indexable
* using ints.
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* The minimum capacity for per-segment tables. Must be a power
* of two, at least two to avoid immediate resizing on next use
* after lazy construction.
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
/**
* The maximum number of segments to allow; used to bound
* constructor arguments. Must be power of two less than 1 << 24.
*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
/**
* Number of unsynchronized retries in size and containsValue
* methods before resorting to locking. This is used to avoid
* unbounded retries if tables undergo continuous modification
* which would make it impossible to obtain an accurate result.
*/
static final int RETRIES_BEFORE_LOCK = 2;
..........
/**
* Mask value for indexing into segments. The upper bits of a
* key's hash code are used to choose the segment.
*/
final int segmentMask;
/**
* Shift value for indexing within segments.
*/
final int segmentShift;
............
/**
* Creates a new, empty map with the specified initial
* capacity, load factor and concurrency level.
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements.
* @param loadFactor the load factor threshold, used to control resizing.
* Resizing may be performed when the average number of elements per
* bin exceeds this threshold.
* @param concurrencyLevel the estimated number of concurrently
* updating threads. The implementation performs internal sizing
* to try to accommodate this many threads.
* @throws IllegalArgumentException if the initial capacity is
* negative or the load factor or concurrencyLevel are
* nonpositive.
*/
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
由上面的代码可知 segments 数组的长度 ssize 通过 concurrencyLevel 计算得出。为了能通过按位与的哈希算法来定位 segments 数组的索引,必须保证 segments 数组的长度是 2 的 N 次方(power-of-two size),所以必须计算出一个是大于或等于 concurrencyLevel 的最小的 2 的 N 次方值来作为 segments 数组的长度。假如 concurrencyLevel 等于 14,15 或 16,ssize 都会等于 16,即容器里锁的个数也是 16。
注意:concurrencyLevel 的最大大小是 65535,意味着 segments 数组的长度最大为65536,对应的二进制是 16 位。
初始化 segmentShift 和 和 segmentMask。这两个全局变量在定位 segment 时的哈希算法里需要使用,sshift 等于 ssize 从 1 向左移位的次数,在默认情况下 concurrencyLevel 等于 16,1 需要向左移位移动 4 次,所以 sshift 等于 4。segmentShift 用于定位参与 hash 运算的位数,segmentShift 等于32 减 sshift,所以等于 28,这里之所以用 32 是因为 ConcurrentHashMap 里的 hash()方法输出的最大数是 32 位的。segmentMask 是哈希运算的掩码,等于 ssize 减1,即 15,掩码的二进制各个位的值都是 1。因为 ssize 的最大长度是 65536,所以 segmentShift 最大值是 16,segmentMask 最大值是 65535,对应的二进制是 16 位,每个位都是 1。
上面代码中,initialCapacity 是 ConcurrentHashMap 的初始化容量,loadfactor 是每个 segment 的负载因子,变量 cap 就是 segment 里 HashEntry 数组的长度,它等于initialCapacity 除以 ssize的倍数 c,如果 c 大于 1,就会取大于等于 c 的 2 的 N 次方值,所以 cap 不是 1,就是 2 的 N 次方。segment 的容量 threshold=(int)cap*loadFactor,默认情况下 initialCapacity 等于 16,loadfactor 等于0.75,通过运算 cap 等于 1,threshold 等于零。
定位 Segment:
/**
* Applies a supplemental hash function to a given hashCode, which
* defends against poor quality hash functions. This is critical
* because ConcurrentHashMap uses power-of-two length hash tables,
* that otherwise encounter collisions for hashCodes that do not
* differ in lower or upper bits.
*/
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
既然 ConcurrentHashMap 使用分段锁 Segment 来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到 Segment。可以看到 ConcurrentHashMap 会首先使用Wang/Jenkinshash 的变种算法对元素的 hashCode 进行一次再哈希。
之所以进行再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的 Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个 Segment 中,不仅存取元素缓慢,分段锁也会失去意义。
ConcurrentHashMap 的 get 操作:
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
ConcurrentHashMap 通过以下哈希算法定位 segment, long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; Segment 的 get 操作实现非常简单和高效。先经过一次再哈希,然后使用这个哈希值通过哈希运算定位到 segment,再通过哈希算法定位到元素,get 操作的高效之处在于整个 get 过程不需要加锁,除非读到的值是空的才会加锁重读,我们知道 HashTable 容器的 get 方法是需要加锁的,那么 ConcurrentHashMap 的 get 操作是如何做到不加锁的呢?原因是它的 get 方法里将要使用的共享变量都定义成 volatile,如用于统计当前 Segement大小的 count 字段和用于存储值的 HashEntry 的 value。定义成volatile 的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在 get 操作里只需要读不需要写共享变量 count和 value,所以可以不用加锁。只所以不会读到过期的值,是根据 java 内存模型的 happen before 原则,对 volatile 字段的写入操作先于读操作,即使两个线程同时修改和获取 volatile 变量,get 操作也能拿到最新的值,这是用 volatile 替换锁的经典应用场景。
/**
* ConcurrentHashMap list entry. Note that this is never exported
* out as a user-visible Map.Entry.
*/
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* Sets next field with volatile write semantics. (See above
* about use of putOrderedObject.)
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
在定位元素的代码里我们可以发现定位 HashEntry 和定位 Segment 的哈希算法虽然一样,都与数组的长度减去1相与,但是相与的值不一样,定位 Segment 使用的是元素的 hashcode 通过再哈希后得到的值的高位,而定位 HashEntry 直接使用的是再哈希后的值。其目的是避免两次哈希后的值一样,导致元素虽然在 Segment 里散列开了,但是却没有在 HashEntry 里散列开。
ConcurrentHashMap 的 put 操作:
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p> The value can be retrieved by calling the <tt>get</tt> method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>
* @throws NullPointerException if the specified key or value is null
*/
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
........
/**
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
..........
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
transient volatile HashEntry<K,V>[] table;
/**
* The number of elements. Accessed only either within locks
* or among other volatile reads that maintain visibility.
*/
transient int count;
/**
* The total number of mutative operations in this segment.
* Even though this may overflows 32 bits, it provides
* sufficient accuracy for stability checks in CHM isEmpty()
* and size() methods. Accessed only either within locks or
* among other volatile reads that maintain visibility.
*/
transient int modCount;
/**
* The table is rehashed when its size exceeds this threshold.
* (The value of this field is always <tt>(int)(capacity *
* loadFactor)</tt>.)
*/
transient int threshold;
/**
* The load factor for the hash table. Even though this value
* is same for all segments, it is replicated to avoid needing
* links to outer object.
* @serial
*/
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
..........
/**
* Doubles size of table and repacks entries, also adding the
* given node to new table
*/
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
........
}
由于 put 方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须得加锁。put 方法首先定位到 Segment,然后在 Segment 里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对 Segment 里的 HashEntry 数组进行扩容,第二步定位添加元素的位置然后放在 HashEntry 数组里。是否需要扩容。在插入元素前会先判断 Segment里的 HashEntry 数组是否超过容量(threshold),如果超过阀值,数组进行扩容。值得一提的是,Segment 的扩容判断比 HashMap 更恰当,因为HashMap 是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap 就进行了一次无效的扩容。
如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash 后插入到新的数组里。为了高效 ConcurrentHashMap 不会对整个容器进行扩容,而只对某个segment 进行扩容。
ConcurrentHashMap 的 size 操作:
/**
* Returns the number of key-value mappings in this map. If the
* map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of key-value mappings in this map
*/
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
如果我们要统计整个 ConcurrentHashMap 里元素的大小,就必须统计所有 Segment 里元素的大小后求和。Segment 里的全局变量 count 是一个 volatile 变量,那么在多线程场景下,我们是不是直接把所有 Segment 的 count 相加就可以得到整个 ConcurrentHashMap 大小了呢?不是的,虽然相加时可以获取每个 Segment 的 count 的最新值,但是拿到之后可能累加前使用的 count 发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计 size 的时候把所有 Segment 的 put,remove和 clean 方法全部锁住,但是这种做法显然非常低效。
因为在累加 count 操作过程中,之前累加过的 count 发生变化的几率非常小,所以ConcurrentHashMap 的做法是先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,如果统计的过程中,容器的 count 发生了变化,则再采用加锁的方式来统计所有Segment 的大小。
那么 ConcurrentHashMap 是如何判断在统计的时候容器是否发生了变化呢?使用 modCount变量,在 put , remove 和 clean 方法里操作元素前都会将变量 modCount 进行加 1,那么在统计 size 前后比较 modCount 是否发生变化,从而得知容器的大小是否发生变化。
ReentrantLock:
在 ReentrantLock 中,调用 lock()方法获取锁;调用 unlock()方法释放锁。
ReentrantLock 的实现依赖于 java 同步器框架 AbstractQueuedSynchronizer,AbstractQueuedSynchronizer 使用一个整型的 volatile 变量(命名为 state)来维护同步状态,这个 volatile 变量是 ReentrantLock 内存语义实现的关键。
ReentrantLock 分为公平锁和非公平锁,我们首先分析公平锁。
使用公平锁时,加锁方法 lock()的方法调用轨迹如下:
1. ReentrantLock : lock()
2. FairSync : lock()
3. AbstractQueuedSynchronizer : acquire(int arg)
4. ReentrantLock : tryAcquire(int acquires)
在第 4 步真正开始加锁,下面是该方法的源代码:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
从上面源代码中我们可以看出,加锁方法首先读 volatile 变量 state。
在使用公平锁时,解锁方法 unlock()的方法调用轨迹如下:
1. ReentrantLock : unlock()
2. AbstractQueuedSynchronizer : release(int arg)
3. Sync : tryRelease(int releases)
在第 3 步真正开始释放锁,下面是该方法的源代码:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
从上面的源代码我们可以看出,在释放锁的最后写 volatile 变量 state。
公平锁在释放锁的最后写 volatile 变量 state;在获取锁时首先读这个 volatile 变量。根据 volatile 的 happens-before 规则,释放锁的线程在写 volatile 变量之前可见的共享变量,在获取锁的线程读取同一个 volatile 变量后将立即变的对获取锁的线程可见。
现在我们分析非公平锁的内存语义的实现。
非公平锁的释放和公平锁完全一样,所以这里仅仅分析非公平锁的获取。
使用公平锁时,加锁方法 lock()的方法调用轨迹如下:
1. ReentrantLock : lock()
2. NonfairSync : lock()
3. AbstractQueuedSynchronizer : compareAndSetState(int expect, int
update)
在第 3 步真正开始加锁,下面是该方法的源代码:
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
该方法以原子操作的方式更新 state 变量,本文把 java 的 compareAndSet() 方法调用简称为CAS。JDK 文档对该方法的说明如下: 如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。此操作具有 volatile 读和写的内存语义。
这里我们分别从编译器和处理器的角度来分析,CAS 如何同时具有 volatile 读和volatile 写的内存语义。
编译器不会对 volatile 读与 volatile 读后面的任意内存操作重排序;编译器不会对volatile 写与 volatile 写前面的任意内存操作重排序。组合这两个条件,意味着为了同时实现 volatile 读和 volatile 写的内存语义,编译器不能对CAS 与 CAS 前面和后面的任意内存操作重排序。
现在对公平锁和非公平锁的内存语义做个总结:
公平锁和非公平锁释放时,最后都要写一个 volatile 变量 state。
公平锁获取时,首先会去读这个 volatile 变量。
非公平锁获取时,首先会用 CAS 更新这个 volatile 变量,这个操作同时具有 volatile 读和volatile 写的内存语义。
从ReentrantLock 的分析可以看出,锁释放-获取的内存语义的实现至少有下面两种方式:
1. 利用 volatile 变量的写-读所具有的内存语义。
2. 利用 CAS 所附带的 volatile 读和 volatile 写的内存语义。
为什么要使用 ConcurrentHashMap:
线程不安全的 HashMap。因为多线程环境下,使用 HashMap 进行 put 操作会引起死循环,导致 CPU 利用率接近 100%,所以在并发情况下不能使用 HashMap。
效率低下的 HashTable。HashTable 容器使用 synchronized 来保证线程安全,但在线程竞争激烈的情况下 HashTable 的效率非常低下。因为当一个线程访问 HashTable 的同步方法时,其他线程访问 HashTable 的同步方法时,可能会进入阻塞或轮询状态。如线程 1 使用 put 进行添加元素,线程 2 不但不能使用 put 方法添加元素,并且也不能使用 get 方法来获取元素,所以竞争越激烈效率越低。
ConcurrentHashMap 的锁分段技术:
HashTable 容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问 HashTable 的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是 ConcurrentHashMap 所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
ConcurrentHashMap 的结构:
ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。Segment 是一种可重入锁 ReentrantLock,在 ConcurrentHashMap 里扮演锁的角色,HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的结构和 HashMap 类似,是一种数组和链表结构, 一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment 守护者一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得它对应的 Segment 锁。
ConcurrentHashMap 的初始化:
ConcurrentHashMap 初始化方法是通过 initialCapacity,loadFactor, concurrencyLevel几个参数来初始化 segments 数组,段偏移量 segmentShift,段掩码 segmentMask 和每个segment 里的 HashEntry数组。
...........
/* ---------------- Constants -------------- */
/**
* The default initial capacity for this table,
* used when not otherwise specified in a constructor.
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* The default load factor for this table, used when not
* otherwise specified in a constructor.
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* The default concurrency level for this table, used when not
* otherwise specified in a constructor.
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* The maximum capacity, used if a higher value is implicitly
* specified by either of the constructors with arguments. MUST
* be a power of two <= 1<<30 to ensure that entries are indexable
* using ints.
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* The minimum capacity for per-segment tables. Must be a power
* of two, at least two to avoid immediate resizing on next use
* after lazy construction.
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
/**
* The maximum number of segments to allow; used to bound
* constructor arguments. Must be power of two less than 1 << 24.
*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
/**
* Number of unsynchronized retries in size and containsValue
* methods before resorting to locking. This is used to avoid
* unbounded retries if tables undergo continuous modification
* which would make it impossible to obtain an accurate result.
*/
static final int RETRIES_BEFORE_LOCK = 2;
..........
/**
* Mask value for indexing into segments. The upper bits of a
* key's hash code are used to choose the segment.
*/
final int segmentMask;
/**
* Shift value for indexing within segments.
*/
final int segmentShift;
............
/**
* Creates a new, empty map with the specified initial
* capacity, load factor and concurrency level.
*
* @param initialCapacity the initial capacity. The implementation
* performs internal sizing to accommodate this many elements.
* @param loadFactor the load factor threshold, used to control resizing.
* Resizing may be performed when the average number of elements per
* bin exceeds this threshold.
* @param concurrencyLevel the estimated number of concurrently
* updating threads. The implementation performs internal sizing
* to try to accommodate this many threads.
* @throws IllegalArgumentException if the initial capacity is
* negative or the load factor or concurrencyLevel are
* nonpositive.
*/
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
由上面的代码可知 segments 数组的长度 ssize 通过 concurrencyLevel 计算得出。为了能通过按位与的哈希算法来定位 segments 数组的索引,必须保证 segments 数组的长度是 2 的 N 次方(power-of-two size),所以必须计算出一个是大于或等于 concurrencyLevel 的最小的 2 的 N 次方值来作为 segments 数组的长度。假如 concurrencyLevel 等于 14,15 或 16,ssize 都会等于 16,即容器里锁的个数也是 16。
注意:concurrencyLevel 的最大大小是 65535,意味着 segments 数组的长度最大为65536,对应的二进制是 16 位。
初始化 segmentShift 和 和 segmentMask。这两个全局变量在定位 segment 时的哈希算法里需要使用,sshift 等于 ssize 从 1 向左移位的次数,在默认情况下 concurrencyLevel 等于 16,1 需要向左移位移动 4 次,所以 sshift 等于 4。segmentShift 用于定位参与 hash 运算的位数,segmentShift 等于32 减 sshift,所以等于 28,这里之所以用 32 是因为 ConcurrentHashMap 里的 hash()方法输出的最大数是 32 位的。segmentMask 是哈希运算的掩码,等于 ssize 减1,即 15,掩码的二进制各个位的值都是 1。因为 ssize 的最大长度是 65536,所以 segmentShift 最大值是 16,segmentMask 最大值是 65535,对应的二进制是 16 位,每个位都是 1。
上面代码中,initialCapacity 是 ConcurrentHashMap 的初始化容量,loadfactor 是每个 segment 的负载因子,变量 cap 就是 segment 里 HashEntry 数组的长度,它等于initialCapacity 除以 ssize的倍数 c,如果 c 大于 1,就会取大于等于 c 的 2 的 N 次方值,所以 cap 不是 1,就是 2 的 N 次方。segment 的容量 threshold=(int)cap*loadFactor,默认情况下 initialCapacity 等于 16,loadfactor 等于0.75,通过运算 cap 等于 1,threshold 等于零。
定位 Segment:
/**
* Applies a supplemental hash function to a given hashCode, which
* defends against poor quality hash functions. This is critical
* because ConcurrentHashMap uses power-of-two length hash tables,
* that otherwise encounter collisions for hashCodes that do not
* differ in lower or upper bits.
*/
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
既然 ConcurrentHashMap 使用分段锁 Segment 来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到 Segment。可以看到 ConcurrentHashMap 会首先使用Wang/Jenkinshash 的变种算法对元素的 hashCode 进行一次再哈希。
之所以进行再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的 Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个 Segment 中,不仅存取元素缓慢,分段锁也会失去意义。
ConcurrentHashMap 的 get 操作:
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
ConcurrentHashMap 通过以下哈希算法定位 segment, long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; Segment 的 get 操作实现非常简单和高效。先经过一次再哈希,然后使用这个哈希值通过哈希运算定位到 segment,再通过哈希算法定位到元素,get 操作的高效之处在于整个 get 过程不需要加锁,除非读到的值是空的才会加锁重读,我们知道 HashTable 容器的 get 方法是需要加锁的,那么 ConcurrentHashMap 的 get 操作是如何做到不加锁的呢?原因是它的 get 方法里将要使用的共享变量都定义成 volatile,如用于统计当前 Segement大小的 count 字段和用于存储值的 HashEntry 的 value。定义成volatile 的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在 get 操作里只需要读不需要写共享变量 count和 value,所以可以不用加锁。只所以不会读到过期的值,是根据 java 内存模型的 happen before 原则,对 volatile 字段的写入操作先于读操作,即使两个线程同时修改和获取 volatile 变量,get 操作也能拿到最新的值,这是用 volatile 替换锁的经典应用场景。
/**
* ConcurrentHashMap list entry. Note that this is never exported
* out as a user-visible Map.Entry.
*/
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* Sets next field with volatile write semantics. (See above
* about use of putOrderedObject.)
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
在定位元素的代码里我们可以发现定位 HashEntry 和定位 Segment 的哈希算法虽然一样,都与数组的长度减去1相与,但是相与的值不一样,定位 Segment 使用的是元素的 hashcode 通过再哈希后得到的值的高位,而定位 HashEntry 直接使用的是再哈希后的值。其目的是避免两次哈希后的值一样,导致元素虽然在 Segment 里散列开了,但是却没有在 HashEntry 里散列开。
ConcurrentHashMap 的 put 操作:
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p> The value can be retrieved by calling the <tt>get</tt> method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>
* @throws NullPointerException if the specified key or value is null
*/
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
........
/**
* Segments are specialized versions of hash tables. This
* subclasses from ReentrantLock opportunistically, just to
* simplify some locking and avoid separate construction.
*/
static final class Segment<K,V> extends ReentrantLock implements Serializable {
..........
/**
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
transient volatile HashEntry<K,V>[] table;
/**
* The number of elements. Accessed only either within locks
* or among other volatile reads that maintain visibility.
*/
transient int count;
/**
* The total number of mutative operations in this segment.
* Even though this may overflows 32 bits, it provides
* sufficient accuracy for stability checks in CHM isEmpty()
* and size() methods. Accessed only either within locks or
* among other volatile reads that maintain visibility.
*/
transient int modCount;
/**
* The table is rehashed when its size exceeds this threshold.
* (The value of this field is always <tt>(int)(capacity *
* loadFactor)</tt>.)
*/
transient int threshold;
/**
* The load factor for the hash table. Even though this value
* is same for all segments, it is replicated to avoid needing
* links to outer object.
* @serial
*/
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
..........
/**
* Doubles size of table and repacks entries, also adding the
* given node to new table
*/
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
........
}
由于 put 方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须得加锁。put 方法首先定位到 Segment,然后在 Segment 里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对 Segment 里的 HashEntry 数组进行扩容,第二步定位添加元素的位置然后放在 HashEntry 数组里。是否需要扩容。在插入元素前会先判断 Segment里的 HashEntry 数组是否超过容量(threshold),如果超过阀值,数组进行扩容。值得一提的是,Segment 的扩容判断比 HashMap 更恰当,因为HashMap 是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap 就进行了一次无效的扩容。
如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash 后插入到新的数组里。为了高效 ConcurrentHashMap 不会对整个容器进行扩容,而只对某个segment 进行扩容。
ConcurrentHashMap 的 size 操作:
/**
* Returns the number of key-value mappings in this map. If the
* map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of key-value mappings in this map
*/
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
如果我们要统计整个 ConcurrentHashMap 里元素的大小,就必须统计所有 Segment 里元素的大小后求和。Segment 里的全局变量 count 是一个 volatile 变量,那么在多线程场景下,我们是不是直接把所有 Segment 的 count 相加就可以得到整个 ConcurrentHashMap 大小了呢?不是的,虽然相加时可以获取每个 Segment 的 count 的最新值,但是拿到之后可能累加前使用的 count 发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计 size 的时候把所有 Segment 的 put,remove和 clean 方法全部锁住,但是这种做法显然非常低效。
因为在累加 count 操作过程中,之前累加过的 count 发生变化的几率非常小,所以ConcurrentHashMap 的做法是先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,如果统计的过程中,容器的 count 发生了变化,则再采用加锁的方式来统计所有Segment 的大小。
那么 ConcurrentHashMap 是如何判断在统计的时候容器是否发生了变化呢?使用 modCount变量,在 put , remove 和 clean 方法里操作元素前都会将变量 modCount 进行加 1,那么在统计 size 前后比较 modCount 是否发生变化,从而得知容器的大小是否发生变化。
ReentrantLock:
在 ReentrantLock 中,调用 lock()方法获取锁;调用 unlock()方法释放锁。
ReentrantLock 的实现依赖于 java 同步器框架 AbstractQueuedSynchronizer,AbstractQueuedSynchronizer 使用一个整型的 volatile 变量(命名为 state)来维护同步状态,这个 volatile 变量是 ReentrantLock 内存语义实现的关键。
ReentrantLock 分为公平锁和非公平锁,我们首先分析公平锁。
使用公平锁时,加锁方法 lock()的方法调用轨迹如下:
1. ReentrantLock : lock()
2. FairSync : lock()
3. AbstractQueuedSynchronizer : acquire(int arg)
4. ReentrantLock : tryAcquire(int acquires)
在第 4 步真正开始加锁,下面是该方法的源代码:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
从上面源代码中我们可以看出,加锁方法首先读 volatile 变量 state。
在使用公平锁时,解锁方法 unlock()的方法调用轨迹如下:
1. ReentrantLock : unlock()
2. AbstractQueuedSynchronizer : release(int arg)
3. Sync : tryRelease(int releases)
在第 3 步真正开始释放锁,下面是该方法的源代码:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
从上面的源代码我们可以看出,在释放锁的最后写 volatile 变量 state。
公平锁在释放锁的最后写 volatile 变量 state;在获取锁时首先读这个 volatile 变量。根据 volatile 的 happens-before 规则,释放锁的线程在写 volatile 变量之前可见的共享变量,在获取锁的线程读取同一个 volatile 变量后将立即变的对获取锁的线程可见。
现在我们分析非公平锁的内存语义的实现。
非公平锁的释放和公平锁完全一样,所以这里仅仅分析非公平锁的获取。
使用公平锁时,加锁方法 lock()的方法调用轨迹如下:
1. ReentrantLock : lock()
2. NonfairSync : lock()
3. AbstractQueuedSynchronizer : compareAndSetState(int expect, int
update)
在第 3 步真正开始加锁,下面是该方法的源代码:
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
该方法以原子操作的方式更新 state 变量,本文把 java 的 compareAndSet() 方法调用简称为CAS。JDK 文档对该方法的说明如下: 如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。此操作具有 volatile 读和写的内存语义。
这里我们分别从编译器和处理器的角度来分析,CAS 如何同时具有 volatile 读和volatile 写的内存语义。
编译器不会对 volatile 读与 volatile 读后面的任意内存操作重排序;编译器不会对volatile 写与 volatile 写前面的任意内存操作重排序。组合这两个条件,意味着为了同时实现 volatile 读和 volatile 写的内存语义,编译器不能对CAS 与 CAS 前面和后面的任意内存操作重排序。
现在对公平锁和非公平锁的内存语义做个总结:
公平锁和非公平锁释放时,最后都要写一个 volatile 变量 state。
公平锁获取时,首先会去读这个 volatile 变量。
非公平锁获取时,首先会用 CAS 更新这个 volatile 变量,这个操作同时具有 volatile 读和volatile 写的内存语义。
从ReentrantLock 的分析可以看出,锁释放-获取的内存语义的实现至少有下面两种方式:
1. 利用 volatile 变量的写-读所具有的内存语义。
2. 利用 CAS 所附带的 volatile 读和 volatile 写的内存语义。
发表评论
-
java 类的加载 以及 ClassLoader
2020-04-16 09:43 454Class Loader 类加载器: 类加载器负责加载 ... -
Stack 的实现原理深入剖析
2020-04-06 13:26 493Stack 介绍: Stack是栈。 ... -
Vector 的实现原理深入剖析
2020-04-06 13:17 368Vector介绍: Vector 是矢量队列,它是JDK1. ... -
JDK 分析工具
2020-04-05 17:30 369常用分析工具: jps:显示指定系统中所有的HotSpot虚 ... -
二叉树的深度优先遍历和广度优先遍历
2020-03-10 09:33 599概述: 1、深度优先遍历(Depth-First-Sear ... -
Hashtable 的实现原理深入剖析
2020-02-18 20:59 538一、Hashtable的基本方法: 1、定义: HashT ... -
jdk 1.8 新特性
2020-02-17 13:43 3661、default关键字 ... -
Java IO 架构
2019-11-11 16:39 351主要两类: 磁盘I/O 网络I/O 基于字节 ... -
Java 数据结构与算法
2019-04-03 10:25 519程序=数据结构+算法 ... -
Java语言异常(Exception)
2018-10-09 11:40 550异常,是Java中非常常用 ... -
Java并发问题--乐观锁与悲观锁以及乐观锁的一种实现方式-CAS
2018-08-17 09:47 1474首先介绍一些乐观锁与 ... -
Java 高性能编程注意事项
2016-11-17 09:55 6481. 尽量在合适的场合使用单例 使用单例可以减轻加载的负担, ... -
Netty 解析
2017-03-07 13:47 1224Linux网络IO模型: Linux ... -
2016年Java 面试题总结
2016-01-18 13:34 54794多线程、并发及线程的基础问题: 1)Java 中能创建 vo ... -
java 内存模型
2015-12-29 13:44 817JAVA内存模型: Java内存 ... -
JVM 深入剖析
2015-12-29 12:51 1095JVM是JAVA虚拟机(JAVA Virtual Machin ... -
Java 并发编程_Synchronized
2015-12-16 12:42 872硬件的效率和一致性: 由于计算机的运算速度和它的存储和通讯子 ... -
Java 并发编程_Volatile
2015-12-15 13:42 620术语定义: 共享变量:在多个线程之间能够被共享的变量被称为共 ... -
Java 并发编程_ConcurrentLinkedQueue
2015-12-15 13:32 908ConcurrentLinkedQueue 的分析和使用: ... -
JVM 垃圾回收原理
2015-10-29 13:38 489基本回收算法: 1.引用 ...
相关推荐
Java并发编程是软件开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。本资源"Java并发编程_设计原则和模式(CHM)"聚焦于Java语言在并发环境下的编程技巧、...
### Java并发编程之ConcurrentHashMap #### 一、概述 `ConcurrentHashMap`是Java并发编程中的一个重要组件,它提供了一种线程安全的哈希表实现方式。与传统的`Hashtable`或`synchronized`关键字相比,`...
《Java并发编程实战》是Java并发编程领域的一本经典著作,它深入浅出地介绍了如何在Java平台上进行高效的多线程编程。这本书的源码提供了丰富的示例,可以帮助读者更好地理解书中的理论知识并将其应用到实际项目中。...
《Java并发编程实战》这本书是关于Java语言中并发编程技术的经典著作。它详细介绍了如何在Java环境中有效地实现多线程程序和并发控制机制。在Java平台上,由于其本身提供了强大的并发编程支持,因此,掌握并发编程...
《Java并发编程实战》是一本深入探讨Java平台并发编程的权威指南。这本书旨在帮助开发者理解和掌握在Java环境中创建高效、可扩展且可靠的多线程应用程序的关键技术和实践。它涵盖了从基本概念到高级主题的广泛内容,...
《Java并发编程的艺术》这本书是Java开发者深入理解并发编程的重要参考书籍。这本书全面地介绍了Java平台上的并发和多线程编程技术,旨在帮助开发者解决在实际工作中遇到的并发问题,提高程序的性能和可伸缩性。 ...
并发集合是Java并发编程中的重要组成部分,如`ConcurrentHashMap`, `CopyOnWriteArrayList`, `ConcurrentLinkedQueue`等,它们设计为线程安全,能够在并发环境中高效地工作。书中的章节可能会详细解释这些集合的设计...
Java并发编程是Java开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。以下是对标题和描述中所提及的几个知识点的详细解释: 1. **线程与并发** - **线程*...
《JAVA并发编程实践》这本书是Java开发者深入理解并发编程的重要参考资料。它涵盖了Java并发的核心概念、工具和最佳实践,旨在帮助读者在多线程环境下编写高效、安全的代码。 并发编程是现代软件开发中的关键技能,...
《Java并发编程实践》是一本深入探讨Java多线程编程的经典著作,由Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowles和David Holmes等专家共同编写。这本书全面介绍了Java平台上的并发编程技术,是Java开发...
Java并发编程还包括对并发容器的使用,如ArrayList、LinkedList、HashSet、HashMap等基础容器在并发环境下可能存在问题,Java提供了一些线程安全的容器,如Vector、HashTable以及java.util.concurrent包下的...
### Java并发编程实战知识点概述 #### 一、Java并发特性详解 在《Java并发编程实战》这本书中,作者深入浅出地介绍了Java 5.0和Java 6中新增的并发特性。这些特性旨在帮助开发者更高效、安全地编写多线程程序。书中...
《Java并发编程实战》这本书深入探讨了Java平台上的并发编程技术,涵盖了理论与实践的各个方面。在Java开发中,理解并掌握并发编程是至关重要的,因为它可以帮助开发者充分利用多核处理器的性能,提升程序的运行效率...
《Java并发编程从入门到精通》是一本专为Java开发者设计的深度学习并发编程的书籍。作者韩剑锋,凭借其12年的IT行业经验,曾担任多家IT公司的研发总监和技术总监,以其丰富的实战经验和深厚的理论知识,为读者提供了...
《JAVA并发编程艺术》是Java开发者深入理解和掌握并发编程的一本重要著作,它涵盖了Java并发领域的核心概念和技术。这本书详细阐述了如何在多线程环境下有效地编写高效、可靠的代码,对于提升Java程序员的技能水平...
Java并发编程实践是Java开发中不可或缺的一个领域,它涉及到如何高效、正确地处理多线程环境中的任务。这本书的读书笔记涵盖了多个关键知识点,旨在帮助读者深入理解Java并发编程的核心概念。 1. **线程和进程的...
Java并发编程是软件开发中的一个关键领域,尤其是在大型企业级应用和分布式系统中。通过学习相关的书籍,开发者可以深入理解如何有效地设计和实现高效的多线程应用程序,避免并发问题,如竞态条件、死锁、活锁等。...