论坛首页 Java企业应用论坛

java.util.concurrent 之ConcurrentHashMap 源码分析

浏览 6918 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-03-28  

      最近有点想研究下java.util.concurrent 包下的一些类中的实现,在现实中也对这包里的类应用不少,但都没怎么去深入理解,只是听说里面的实现在高并发中有不错的性能。。接下将对里面的几个比较常用的类的源码进行分析。。

 

   ConcurrentHashMap类

   研究源码时,我一般喜欢从实际的应用中去一步步调试分析。。这样理解起来容易很多。

 

   实际应用:

 

   ConcurrentMap<String, String> map = new ConcurrentHashMap<String, String>();
		String oldValue = map.put("zhxing", "value");
		String oldValue1 = map.put("zhxing", "value1");
		String oldValue2 = map.putIfAbsent("zhxing", "value2");
		String value = map.get("zhxing");

		System.out.println("oldValue:" + oldValue);
		System.out.println("oldValue1:" + oldValue1);
		System.out.println("oldValue2:" + oldValue2);
		System.out.println("value:" + value);
 

输出结果:

oldValue:null
oldValue1:value
oldValue2:value1
value:value1
 

先从new 方法开始

	/**
	 * Creates a new, empty map with a default initial capacity (16), load
	 * factor (0.75) and concurrencyLevel(也就是锁的个数) (16).
	 * 
	 */
public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }
// 当都是默认的设置参数
	public ConcurrentHashMap(int initialCapacity, float loadFactor,
			int concurrencyLevel) {
		if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
			throw new IllegalArgumentException();
		// MAX_SEGMENTS = 1 << 16,锁的个数有限制
		if (concurrencyLevel > MAX_SEGMENTS)
			concurrencyLevel = MAX_SEGMENTS;

		// Find power-of-two sizes best matching arguments
		// 这里是根据设定的并发数查找最优的并发数
		int sshift = 0;
		int ssize = 1;
		while (ssize < concurrencyLevel) {
			++sshift;
			ssize <<= 1;// 不断右移
		}
		// 到这里,sshift=4,ssize=16.因为concurrencyLevel=16=1<<4
		segmentShift = 32 - sshift;// =16
		segmentMask = ssize - 1;// =3
		// 创建了16个分段(Segment),其实每个分段相当于一个带锁的map
		this.segments = Segment.newArray(ssize);

		if (initialCapacity > MAXIMUM_CAPACITY)
			initialCapacity = MAXIMUM_CAPACITY;
		// 这里是计算每个分段存储的容量
		int c = initialCapacity / ssize;// c=16/16=1
		if (c * ssize < initialCapacity)// 防止分段的相加的容量小于总容量
			++c;
		int cap = 1;
		// 如果初始容量比cap的容量小,则已双倍的容量增加
		while (cap < c)
			cap <<= 1;
		// 分别new分段
		for (int i = 0; i < this.segments.length; ++i)
			this.segments[i] = new Segment<K, V>(cap, loadFactor);
	}
 

这里提到了一个Segment 这个类,其实这个是总map 的分段,就是为了实现分段锁机制。

/**
	 * Segments are specialized versions of hash tables. This subclasses from
	 * ReentrantLock opportunistically, just to simplify some locking and avoid
	 * separate construction. map 的分段实现,扩展了锁机制
	 */
	static final class Segment<K, V> extends ReentrantLock implements
			Serializable {
//。。。
Segment(int initialCapacity, float lf) {
			loadFactor = lf;
			// 这个是开始初始化map容器了
			setTable(HashEntry.<K, V> newArray(initialCapacity));
		}
		/**
		 * Sets table to new HashEntry array. Call only while holding lock or in
		 * constructor.
		 */
		void setTable(HashEntry<K, V>[] newTable) {
			threshold = (int) (newTable.length * loadFactor);
			table = newTable;
		}
}

	// 这个是实际保存到map的东西了,如果对HashMap源码有了解的话,是不是觉得很像Hash.Entry,但又没实现Map.Entry接口,它是用另外个类WriteThroughEntry
	// 来实现这个Map.Entry接口的。
	static final class HashEntry<K, V> {
		final K key;
		final int hash;
		volatile V value;
		final HashEntry<K, V> next;

		HashEntry(K key, int hash, HashEntry<K, V> next, V value) {
			this.key = key;
			this.hash = hash;
			this.next = next;
			this.value = value;
		}

		@SuppressWarnings("unchecked")
		// 新建数组,保存着map里的键值对
		static final <K, V> HashEntry<K, V>[] newArray(int i) {
			return new HashEntry[i];
		}
 

get方法实现

//ConcurrentHashMap类	
// 在这里发现,get操作几乎是不带锁的。。效率提高很多
	public V get(Object key) {
		// key不能为null 。。
		int hash = hash(key); // throws NullPointerException if key null
		return segmentFor(hash).get(key, hash);
	}

	// 这个hash方式不太懂,估计是为了能均匀分布吧
	static int hash(Object x) {
		int h = x.hashCode();
		h += ~(h << 9);
		h ^= (h >>> 14);
		h += (h << 4);
		h ^= (h >>> 10);
		return h;
	}

	/**
	 * Returns the segment that should be used for key with given hash 这个是寻找所在分段
	 * 
	 * @param hash
	 *            the hash code for the key
	 * @return the segment
	 */
	final Segment<K, V> segmentFor(int hash) {
		// hash>>>16&3
		return segments[(hash >>> segmentShift) & segmentMask];
	}

//Segment 类方法
		/* Specialized implementations of map methods */
		// 获得值了,和其他map的get的实现其实差不多
		V get(Object key, int hash) {
			// count 是每个分段的键值对个数,而且是volatile,保证在内存中只有一份
			if (count != 0) { // read-volatile
				// 获得分段中hash链表的第一个值
				HashEntry<K, V> e = getFirst(hash);
				while (e != null) {
					if (e.hash == hash && key.equals(e.key)) {
						V v = e.value;
						if (v != null)
							return v;
						// 这个做了一个挺有趣的检查,如果v==null,而key!=null,的时候会等待锁中value的值
						return readValueUnderLock(e); // recheck
					}
					e = e.next;
				}
			}
			return null;
		}

		/**
		 * Reads value field of an entry under lock. Called if value field ever
		 * appears to be null. This is possible only if a compiler happens to
		 * reorder a HashEntry initialization with its table assignment, which
		 * is legal under memory model but is not known to ever occur.
		 */
		V readValueUnderLock(HashEntry<K, V> e) {
			lock();
			try {
				return e.value;
			} finally {
				unlock();
			}
		}

 

put 方法

//ConcurrentHashMap类
	// 注意的是key 和value 都不能为空
	public V put(K key, V value) {
		if (value == null)
			throw new NullPointerException();
		// 和get方式一样的hash 方式
		int hash = hash(key);
		return segmentFor(hash).put(key, hash, value, false);
	}

//Segment 类

	V put(K key, int hash, V value, boolean onlyIfAbsent) {
			// 这里加锁了
			lock();
			try {
				int c = count;
				// 如果超过限制,就重新分配
				if (c++ > threshold) // ensure capacity
					rehash();
				HashEntry<K, V>[] tab = table;
				int index = hash & (tab.length - 1);
				HashEntry<K, V> first = tab[index];
				HashEntry<K, V> e = first;
				// e的值总是在链表的最后一个
				while (e != null && (e.hash != hash || !key.equals(e.key)))
					e = e.next;

				V oldValue;
				if (e != null) {
					oldValue = e.value;
					// 这里就是实现putIfAbsent 的方式
					if (!onlyIfAbsent)
						e.value = value;
				} else {
					oldValue = null;
					++modCount;
					tab[index] = new HashEntry<K, V>(key, hash, first, value);
					count = c; // write-volatile
				}
				return oldValue;
			} finally {
				unlock();
			}
		}

		// 这中扩容方式应该和其他map 的扩容一样
		void rehash() {
			HashEntry<K, V>[] oldTable = table;
			int oldCapacity = oldTable.length;
			// 如果到了最大容量则不能再扩容了,max=1<<30,这将可能导致的一个后果是map的操作越来越慢
			if (oldCapacity >= MAXIMUM_CAPACITY)
				return;

			/*
			 * Reclassify nodes in each list to new Map. Because we are using
			 * power-of-two expansion, the elements from each bin must either
			 * stay at same index, or move with a power of two offset. We
			 * eliminate unnecessary node creation by catching cases where old
			 * nodes can be reused because their next fields won't change.
			 * Statistically, at the default threshold, only about one-sixth of
			 * them need cloning when a table doubles. The nodes they replace
			 * will be garbage collectable as soon as they are no longer
			 * referenced by any reader thread that may be in the midst of
			 * traversing table right now.
			 */
			// 以两倍的方式增长
			HashEntry<K, V>[] newTable = HashEntry.newArray(oldCapacity << 1);
			threshold = (int) (newTable.length * loadFactor);
			int sizeMask = newTable.length - 1;
			// 下面的数据拷贝就没多少好讲的了
			for (int i = 0; i < oldCapacity; i++) {
				// We need to guarantee that any existing reads of old Map can
				// proceed. So we cannot yet null out each bin.
				HashEntry<K, V> e = oldTable[i];

				if (e != null) {
					HashEntry<K, V> next = e.next;
					int idx = e.hash & sizeMask;

					// Single node on list
					if (next == null)
						newTable[idx] = e;

					else {
						// Reuse trailing consecutive sequence at same slot
						HashEntry<K, V> lastRun = e;
						int lastIdx = idx;
						for (HashEntry<K, V> last = next; last != null; last = last.next) {
							int k = last.hash & sizeMask;
							if (k != lastIdx) {
								lastIdx = k;
								lastRun = last;
							}
						}
						newTable[lastIdx] = lastRun;

						// Clone all remaining nodes
						for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
							int k = p.hash & sizeMask;
							HashEntry<K, V> n = newTable[k];
							newTable[k] = new HashEntry<K, V>(p.key, p.hash, n,
									p.value);
						}
					}
				}
			}
			table = newTable;
		}

 

size 方法

	/**
	 * Returns the number of key-value mappings in this map. If the map contains
	 * more than <tt>Integer.MAX_VALUE</tt> elements, returns
	 * <tt>Integer.MAX_VALUE</tt>. javadoc 上也写明了,返回的数值不能超过Int的最大值,超过也返回最大值
	 * 在下面的分析也可以看出,为了减少锁竞争做了一些性能优化,这种的优化方式在很多方法都有使用
	 * 
	 * @return the number of key-value mappings in this map
	 */
	public int size() {
		final Segment<K, V>[] segments = this.segments;
		long sum = 0;
		long check = 0;
		int[] mc = new int[segments.length];
		// Try a few times to get accurate count. On failure due to
		// continuous async changes in table, resort to locking.
		// 这里最多试RETRIES_BEFORE_LOCK=2 次的检查对比
		for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
			check = 0;
			sum = 0;// size 总数
			int mcsum = 0;// 修改的总次数
			// 这里保存了一份对比值,供下次对比时使用
			for (int i = 0; i < segments.length; ++i) {
				sum += segments[i].count;
				mcsum += mc[i] = segments[i].modCount;
			}
			// 只有当map初始化的时候才等于0
			if (mcsum != 0) {
				// 在此对比上面保存的修改值
				for (int i = 0; i < segments.length; ++i) {
					check += segments[i].count;
					if (mc[i] != segments[i].modCount) {
						check = -1; // force retry
						break;
					}
				}
			}
			// 检查和第一次保存值一样则结束循环
			if (check == sum)
				break;
		}
		// 当不相等的时候,这里就只有用锁来保证正确性了
		if (check != sum) { // Resort to locking all segments
			sum = 0;
			for (int i = 0; i < segments.length; ++i)
				segments[i].lock();
			for (int i = 0; i < segments.length; ++i)
				sum += segments[i].count;
			for (int i = 0; i < segments.length; ++i)
				segments[i].unlock();
		}
		// 这里也可以看出,如果超过int 的最大值值返回int 最大值
		if (sum > Integer.MAX_VALUE)
			return Integer.MAX_VALUE;
		else
			return (int) sum;
	}

 

keys 方法

	public Enumeration<K> keys() {
		//这里新建了一个内部Iteraotr 类
		return new KeyIterator();
	}
//这里主要是继承了HashIterator 方法,基本的实现都在HashIterator 中
	final class KeyIterator extends HashIterator implements Iterator<K>,
			Enumeration<K> {
		public K next() {
			return super.nextEntry().key;
		}

		public K nextElement() {
			return super.nextEntry().key;
		}
	}

	/* ---------------- Iterator Support -------------- */
	// 分析代码发现,这个遍历过程没有涉及到锁,查看Javadoc 后可知该视图的 iterator 是一个“弱一致”的迭代器。。
	abstract class HashIterator {
		int nextSegmentIndex;// 下一个分段的index
		int nextTableIndex;// 下一个分段的容器的index
		HashEntry<K, V>[] currentTable;// 当前容器
		HashEntry<K, V> nextEntry;// 下个键值对
		HashEntry<K, V> lastReturned;// 上次返回的键值对

		HashIterator() {
			nextSegmentIndex = segments.length - 1;
			nextTableIndex = -1;
			advance();
		}

		public boolean hasMoreElements() {
			return hasNext();
		}

		// 先变量键值对的链表,再对table 数组的index 遍历,最后遍历分段数组的index。。这样就可以完整的变量完所有的entry了
		final void advance() {
			// 先变量键值对的链表
			if (nextEntry != null && (nextEntry = nextEntry.next) != null)
				return;
			// 对table 数组的index 遍历
			while (nextTableIndex >= 0) {
				if ((nextEntry = currentTable[nextTableIndex--]) != null)
					return;
			}
			// 遍历分段数组的index
			while (nextSegmentIndex >= 0) {
				Segment<K, V> seg = segments[nextSegmentIndex--];
				if (seg.count != 0) {
					currentTable = seg.table;
					for (int j = currentTable.length - 1; j >= 0; --j) {
						if ((nextEntry = currentTable[j]) != null) {
							nextTableIndex = j - 1;
							return;
						}
					}
				}
			}
		}

		public boolean hasNext() {
			return nextEntry != null;
		}

		HashEntry<K, V> nextEntry() {
			if (nextEntry == null)
				throw new NoSuchElementException();
			// 把上次的entry换成当前的entry
			lastReturned = nextEntry;
			// 这里做一些预操作
			advance();
			return lastReturned;
		}

		public void remove() {
			if (lastReturned == null)
				throw new IllegalStateException();
			ConcurrentHashMap.this.remove(lastReturned.key);
			lastReturned = null;
		}
	}

 

keySet/Values/elements 这几个方法都和keys 方法非常相似 。。就不解释了。。而entrySet 方法有点特别。。我也有点不是很明白。。

//这里没什么好说的,看下就明白,主要在下面
public Set<Map.Entry<K, V>> entrySet() {
		Set<Map.Entry<K, V>> es = entrySet;
		return (es != null) ? es : (entrySet = new EntrySet());
	}

	final class EntrySet extends AbstractSet<Map.Entry<K, V>> {
		public Iterator<Map.Entry<K, V>> iterator() {
			return new EntryIterator();
		}
}
//主要在这里,新建了一个WriteThroughEntry 这个类
	final class EntryIterator extends HashIterator implements
			Iterator<Entry<K, V>> {
		public Map.Entry<K, V> next() {
			HashEntry<K, V> e = super.nextEntry();
			return new WriteThroughEntry(e.key, e.value);
		}
	}

	/**
	 * Custom Entry class used by EntryIterator.next(), that relays setValue
	 * changes to the underlying map.
	 * 这个主要是返回一个Entry,但有点不明白的是为什么不在HashEntry中实现Map
	 * .Entry就可以了(HashMap就是这样的),为了减少锁竞争??
	 */
	final class WriteThroughEntry extends AbstractMap.SimpleEntry<K, V> {
		WriteThroughEntry(K k, V v) {
			super(k, v);
		}

		/**
		 * Set our entry's value and write through to the map. The value to
		 * return is somewhat arbitrary here. Since a WriteThroughEntry does not
		 * necessarily track asynchronous changes, the most recent "previous"
		 * value could be different from what we return (or could even have been
		 * removed in which case the put will re-establish). We do not and
		 * cannot guarantee more.
		 */
		public V setValue(V value) {
			if (value == null)
				throw new NullPointerException();
			V v = super.setValue(value);
			ConcurrentHashMap.this.put(getKey(), value);
			return v;
		}
	}
 

   从上面可以看出,ConcurrentHash 也没什么特别的,大概的思路就是采用分段锁机制来实现的,把之前用一个容易EntryTable来装的转换成多个Table来装键值对。而方法里面的也采用了不少为了减少锁竞争而做的一些优化。。从ConcurrentHash类里面可以看出,它里面实现了一大堆的内部类。。比如Segment/KeyIterator/ValueIterator/EntryIterator等等。。个人觉得有些代码好像比较难理解。。比如Segment 类继承ReentrantLock,为什么不用组合呢。。还会有上面提到的,HashEntry 为什么不像HashMap 的Entry一样实现Map.Entry接口。。建立这么多内部类,搞得人头晕晕的。。。。

 

   发表时间:2011-03-31  
http://www.iteye.com/topic/344876
刚发现有人写的比我还好,我的理解还是不大够透彻的。。哈哈。。看我的看不懂的童鞋可以参考上面滴。。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics