Jdk1.6 JUC源码解析(25)-ConcurrentHashMap
作者:大飞
- ConcurrentHashMap是一种线程安全的HashMap。相对于HashTable和Collections.synchronizedMap(),ConcurrentHashMap具有更好的性能和伸缩性,是由于其使用了分段锁的策略,将内部数据分为多个段,每个段单独加锁,而不是整个HashMap加锁,这样能减少很多不必要的锁争用。
- ConcurrentHashMap实现了ConcurrentMap接口,先简单了解下这个接口:
public interface ConcurrentMap<K, V> extends Map<K, V> { /** * 如果map中已经存在给定的key,返回map中key对应的value; * 如果不存在给定的key,插入给定的key和value。 * 这个是一个原子操作,逻辑相当于: * if (!map.containsKey(key)) * return map.put(key, value); * else * return map.get(key); */ V putIfAbsent(K key, V value); /** * 如果map中存在给定的key,并且map中对应的value也等于给定的value, * 那么删除这个key和value。 * 这是一个原子操作,逻辑相当于: * if (map.containsKey(key) && map.get(key).equals(value)) { * map.remove(key); * return true; * } else return false; */ boolean remove(Object key, Object value); /** * 如果map中存在给定的key,并且map中对应的value也等于给定的oldValue, * 那么将这个key对应的value替换成newValue。 * 这是一个原子操作,逻辑相当于: * if (map.containsKey(key) && map.get(key).equals(oldValue)) { * map.put(key, newValue); * return true; * } else return false; */ boolean replace(K key, V oldValue, V newValue); /** * 如果map中已经存在给定的key, * 那么将这个key对应的value替换成给定的value。 * 这是一个原子操作,逻辑相当于: * if (map.containsKey(key)) { * return map.put(key, value); * } else return null; */ V replace(K key, V value); }ConcurrentMap扩展了Map接口,定义了上面4个原子操作方法。
- 接下来看下ConcurrentHashMap的内部结构:
/** * segment数组, 每一个segment都是一个hash table。 */ final Segment<K,V>[] segments;
重点看下segment的实现吧,首先看数据结构:
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; /** * 记录segment(哈希表)中的元素数量。 * 另一个重要角色就是其他操作会利用count的volatile读写来保证可见性,避免使用锁。 */ transient volatile int count; /** * 统计跟踪修改,用来保证一些批量操作的一致性。 * 比如统计所有segment元素个数时,如果统计过程发现modCount变化 * 那么需要重试。 */ transient int modCount; /** * 当哈希表的容量超过了这个阀值,表会扩容,里面的元素会重新散列。 * 这个值一般是:capacity * loadFactor */ transient int threshold; /** * 存放数组的哈希表。 */ transient volatile HashEntry<K,V>[] table; /** * 哈希表的加载因子。 * @serial */ final float loadFactor;
再看下segment的构造方法:
Segment(int initialCapacity, float lf) { loadFactor = lf; setTable(HashEntry.<K,V>newArray(initialCapacity)); } void setTable(HashEntry<K,V>[] newTable) { threshold = (int)(newTable.length * loadFactor); table = newTable; } static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V>[] newArray(int i) { return new HashEntry[i]; } }
/* ---------------- Constants -------------- */ //默认segment中hashTable长度。 static final int DEFAULT_INITIAL_CAPACITY = 16; //默认加载因子。 static final float DEFAULT_LOAD_FACTOR = 0.75f; //默认table的并发级别,其实就是segment数组长度。 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //table的最大容量。 static final int MAXIMUM_CAPACITY = 1 << 30; //允许的最大的segment数组长度。 static final int MAX_SEGMENTS = 1 << 16; /** * 在size和containsValue方法中,加锁之前的尝试操作次数。 */ static final int RETRIES_BEFORE_LOCK = 2; /* ---------------- Fields -------------- */ /** * 计算segment下标的掩码。一个key的hash code高位(由segmentShift确定)用来确定segment下标。 */ final int segmentMask; /** * segment下标的位移值。 */ final int segmentShift; public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; //concurrencyLevel不能超过最大值 // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; //ssize最后是比concurrencyLevel大的最小的2的幂。 } /* * 假设传入的concurrencyLevel是50, * 那么ssize就是64,sshift就是6,segmentMask就是 00000000 00000000 00000000 00111111*/ segmentShift = 32 - sshift; segmentMask = ssize - 1; this.segments = Segment.newArray(ssize); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; //cap其实就是比总体容量平均分到每个segment的数量大的最小的2的幂...有点绕, for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment<K,V>(cap, loadFactor); //把segment都初始化一下。 } public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }可以看到构造方法中计算了几个重要的数:segment掩码、segment位移值、segment数组长度和segment内部哈希表容量,注意后两个都是2的幂,想到了什么? a & (b - 1)吧哈哈。
- 现在从插入和获取操作切入,来理解源码。先看下插入操作:
public V put(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); return segmentFor(hash).put(key, hash, value, false); }
注意到,put过程中,首先要根据key的hashCode,再次算一个hash值出来;其次是要根据这个hash值来确定一个segment,然后把key-value存到这个segment里面。
/** * Applies a supplemental hash function to a given hashCode, which * defends against poor quality hash functions. This is critical * because ConcurrentHashMap uses power-of-two length hash tables, * that otherwise encounter collisions for hashCodes that do not * differ in lower or upper bits. */ private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } /** * Returns the segment that should be used for key with given hash * @param hash the hash code for the key * @return the segment */ final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
首先看下这个hash算法,它相当于在key本身的hashCode上做了加强,再次hash一次,使得hash值更加散列。这样做的原因是因为ConcurrentHashMap中哈希表的长度都是2的幂,会增加一些冲突几率,比如两个hashCode高位不同但低位相同,对哈希表长度取模时正好忽略了这些高位,造成冲突。这里是采用了Wang/Jenkins哈希算法的一个变种,更多相关信息可以google之。
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock();// 加锁 try { int c = count; if (c++ > threshold) // ensure capacity rehash(); // 如果添加一个元素后,超过扩容阀值,那么进行rehash。 HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); // 对hash取模算出key对应的哈希表的桶的下标。 HashEntry<K,V> first = tab[index]; // 找出桶的第一个节点 HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; // 遍历一下桶里的单链表,看看有没有相同的。 V oldValue; if (e != null) { // 如果找到了相同的,记录旧值 oldValue = e.value; if (!onlyIfAbsent) // 并且有覆盖标识 e.value = value; // 那么覆盖这个值 } else { // 如果没找到相同的。 oldValue = null; ++modCount; // 因为会改变哈希表元素个数,所以modCount累加。 // 将元素设置为桶内新的第一个节点。 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // 注意这里做了一个volatile写。 } return oldValue; // 返回旧值。 } finally { unlock(); // 解锁。 } }代码也比较容易理解,注意有rehash的情况,看下rehash方法:
void rehash() { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity >= MAXIMUM_CAPACITY) return; //不能超过最大容量。 /* * 将哈希表中所有桶里的节点重新分配到新哈希表中。 * 由于使用的容量是2的幂,所有一部分节点会分配到新哈希表中相同 * 下标的桶里,这样我们就可以重用这些节点,而无需重新创建。 * 按照统计数据,在默认的加载因子下,大约只有六分之一的节点在 * 哈希表扩容的时候需要拷贝(重新创建对象)。 */ HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1); threshold = (int)(newTable.length * loadFactor); int sizeMask = newTable.length - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null) newTable[idx] = e; // 链表上唯一的节点,直接复制到新table。 else { // 重用从尾部往前能定位到新table中同一个桶的,最长的连续节点。 HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 其他的节点就copy过去了。 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { int k = p.hash & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(p.key, p.hash, n, p.value); } } } } table = newTable; }
put的实现看完了,继续看下从ConcurrentHashMap中get的实现:
public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); }
流程还是先算hash值,然后确定到segment,然后调用segment的get方法:
V get(Object key, int hash) { if (count != 0) { // 这里做一个volatile读 HashEntry<K,V> e = getFirst(hash); //获取相应桶的第一个节点。 while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; //如果是相同的key,返回value。 return readValueUnderLock(e); // recheck } e = e.next; } } return null; } HashEntry<K,V> getFirst(int hash) { HashEntry<K,V>[] tab = table; return tab[hash & (tab.length - 1)]; } /** * 加锁读value。如果value为nul的情况下调用这个方法。 * 只有在编译器将HashEntry的初始化和其赋值给table的指令重排才会 * 出现这种情况,这在内存模型下是合法的,但从没发生过。 */ V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } }
- 理解了put和get过程,其他方法也很好理解了:
boolean containsKey(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) return true; e = e.next; } } return false; }
V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }
我们会发现所有的写操作最后都会写一下count,而且所有的读操作最前面都会读一下count,由于count是volatile修饰的,所以这样相当于加了内存屏障(volatile写和后面的volatile读不能重排),保证了读操作能够看到最新的写的变化。
以上理解有偏差,感谢@不待人亲指正。
仔细看源码注释发现:
All (synchronized) write operations should write to the "count" field after structurally changing any bin.
也就是说只有bin(HashEntry链)的结构变化之后才会写count(覆盖的情况不会写count)。
所以这里纠正一下:所有改变bin结构的写操作都会写一下count,可以保证HashEntry的可见性(因为无论是添加还是删除,bin起始的HashEntry都会发生变化,由于HashEntry的next域是不变的,所以删除时需要将目标HashEntry之前的Entry都拷贝一下)。
而覆盖旧值的情况下不会写count,因为HashEntry的value本身也是volatile的,可以保证自身的可见性。
- 我们上面还看到了有一个RETRIES_BEFORE_LOCK值,看看这个值起什么作用:
public int size() { final Segment<K,V>[] segments = this.segments; long sum = 0; long check = 0; int[] mc = new int[segments.length]; // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { check = 0; sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { sum += segments[i].count; mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { check += segments[i].count; if (mc[i] != segments[i].modCount) { check = -1; // force retry break; } } } if (check == sum) break; } if (check != sum) { // Resort to locking all segments sum = 0; for (int i = 0; i < segments.length; ++i) segments[i].lock(); for (int i = 0; i < segments.length; ++i) sum += segments[i].count; for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) return Integer.MAX_VALUE; else return (int)sum; }
上面size的过程就是,累加所有segment中的count,如果过程中segment中元素数量发生了变化,那么重试。如果重试了RETRIES_BEFORE_LOCK次(默认是2)都不行,那么将所有segment加锁,然后累加count,然后再解锁。在containsValue里面也是这么玩儿的,代码就不贴了。
- 其他代码也很容易看懂了,就分析到这里。最后注意下,ConcurrentHashMap也提供了Key和Value的集合视图,它们和ConcurrentHashMap共享一份数据,它们的迭代器是弱一致的。
相关推荐
`ConcurrentHashMap`在JDK 1.7和1.8中有着显著的区别。 在JDK 1.7中,`ConcurrentHashMap`采用了分段锁(Segment)的设计思想,每个Segment是一个独立的可锁容器,类似于线程安全的`HashMap`。Segment的数量由`...
该项目还深入解析了从JDK 5到JDK 8各版本的重要特性,为开发者提供了丰富的代码示例和源码分析。 1. **Java基础用法**: - **变量与数据类型**:Java支持基本数据类型如int、float、char等,以及引用类型如类、...
在深入探讨JDK1.8源码解析之前,先理解一下JDK的含义:Java Development Kit,即Java开发工具包,是Java编程语言的核心组成部分,提供了编写、编译和运行Java应用程序所需的所有工具。JDK1.8是Oracle公司发布的Java ...
JUC集合框架的目录整理如下:1. 【JUC】JUC集合框架综述【JUC】JDK1.8源码分析之ConcurrentHashMap(一)【JUC】JDK1.8源
本篇将深入解析JDK中的JUC包,探讨其核心组件和机制,帮助开发者更好地理解和利用这些高级并发工具。 一、线程池ExecutorService ExecutorService是JUC包中的核心接口,它扩展了Executor接口,提供了管理和控制线程...
总之,这份"javajdk8源码-concurrencyJava"的学习资料会带你深入理解JDK 8中并发编程的各种机制和工具,通过源码解析,你可以更好地掌握Java并发编程的精髓,从而编写出高效、安全的多线程应用。对于系统开源领域的...
JUC(java.util.concurrent)是Java提供的一个并发编程工具包...要掌握JUC并发编程及其底层原理,除了通过阅读官方文档和源码学习外,还需要大量实践和经验积累,才能够真正理解和应用JUC中的并发工具来解决实际问题。
Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...
起因:由于之前经常会翻看JDK的源码,尤其是JUC包下面的一些类库,发现一些类中开头的大段英语注释写的非常好,很多时候都说明了设计思路以及这样做的原因,所以想把这些注释翻译成中文,便于自己温习的同时也方便...
【JUC并发编程】 Java并发编程主要涉及java.util.concurrent包,包括并发工具类、原子类和锁。其中: - **java.util.concurrent** 包含并发工具类,如ExecutorService、Semaphore、CountDownLatch等。 - **java.util...
- **ConcurrentHashMap**:在并发环境下提供线程安全,采用分段锁策略,JDK1.8后使用CAS和Synchronized优化。 **4. JUC(Java并发工具包)** - **Volatile**:保证可见性,但不保证原子性,适合简单变量共享。 - **...
- **ConcurrentHashMap**:在JDK 8中采用了分段锁和哈希表优化。 - **原子类**:如`AtomicInteger`,提供原子更新整数值的方法。 - **CAS问题**:ABA问题,通过版本号或原子标志位解决。 #### Spring相关知识点 - ...
- Map接口及其实现类(HashMap、TreeMap、ConcurrentHashMap)的特点。 - 集合操作的优化,如迭代器的使用,避免在循环中修改集合。 10. **反射** - 反射的概念,如何通过反射获取类的信息,创建和调用对象。 -...
在Java世界中,JVM(Java Virtual Machine)和JUC(Java Concurrency Utilities)是两个极其重要的概念。JVM是Java程序的运行平台,而JUC则是Java并发编程的重要工具集。这个名为“JVM_JUC_Demo”的实践案例旨在帮助...
以下是基于这个主题的详细知识点解析: 1. **Java基础** - **语法**:包括变量、数据类型、运算符、控制结构(如if语句、switch、for、while循环)、方法、数组等。 - **面向对象**:类、对象、继承、封装、多态...
- JDK安装与配置:介绍如何下载安装JDK(Java Development Kit),包括环境变量的设置等。 - HelloWorld程序:通过编写第一个Java程序来熟悉Java开发环境。 - 数据类型与运算符:学习Java中的基本数据类型如整型...
│ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)过程详细介绍.mp4 │ 高并发编程第二阶段47讲、ClassLoader初始化阶段详细介绍clinit.mp4 │ 高并发编程第二阶段48讲、JVM内置三大类加载器...
│ 高并发编程第二阶段46讲、ClassLoader链接阶段(验证,准备,解析)过程详细介绍.mp4 │ 高并发编程第二阶段47讲、ClassLoader初始化阶段详细介绍clinit.mp4 │ 高并发编程第二阶段48讲、JVM内置三大类加载器...
- 动态代理的理解,包括JDK动态代理和CGLIB动态代理。 7. **设计模式**: - 了解常见的设计模式,如单例、工厂、观察者、装饰者、适配器、代理等,并能结合实际场景应用。 8. **JVM**: - 垃圾回收机制(GC)的...
这个"jdkLearning"项目专注于深入理解JDK的源代码,特别是集合、并发编程(JUC)和Executor服务这三个核心领域。通过阅读和分析这些源码,开发者可以更深入地了解Java平台的工作原理,提升编程技能,并优化应用程序...