`

ConcurrentHashMap分析和研究

阅读更多

 

ConcurrentHashMap JDK 1.6的中文API

public class ConcurrentHashMap<K,V>

extends AbstractMap<K,V>

implements ConcurrentMap<K,V>, Serializable

 

    支持获取的完全并发和更新的所期望可调整并发的哈希表。此类遵守与 Hashtable 相同的功能规范,并且包括对应于 Hashtable 的每个方法的方法版本。     不过,尽管所有操作都是线程安全的,但 获取操作不 必锁定,并且不 支持以某种防止所有访问的方式锁定整个表。 此类可以通过程序完全与 Hashtable 进行互操作,这取决于其线程安全,而与其同步细节无关。 

 

    获取操作(包括 get)通常不会受阻塞,因此,可能与更新操作交迭(包括 put 和 remove)。获取会影响最近完成的 更新操作的结果。对于一些聚合操作,比如 putAll 和 clear,并发获取可能只影响某些条目的插入和移除。类似地,在创建迭代器/枚举时或自此之后, Iterators 和 Enumerations 返回在某一时间点上 影响哈希表状态的元素。它们不会 抛出 ConcurrentModificationException。不过,迭代器被设计成每次仅由一个线程使用。 

 

   这允许通过可选的 concurrencyLevel 构造方法参数(默认值为 16)来引导更新操作之间的并发,该参数用作内部调整大小的一个提示。      表是在内部进行分区的,试图允许指示无争用并发更新的数量。       因为哈希表中的位置基本上是随意的,所以实际的并发将各不相同。理想情况下,应该选择一个尽可能多地容纳并发修改该表的线程的值。使用一个比所需要的值高很多的值可能会浪费空间和时间,而使用一个显然低很多的值可能导致线程争用。     对数量级估计过高或估计过低通常都会带来非常显著的影响。当仅有一个线程将执行修改操作,而其他所有线程都只是执行读取操作时,才认为某个值是合适的。 此外,重新调整此类或其他任何种类哈希表的大小都是一个相对较慢的操作,因此,在可能的时候,提供构造方法中期望表大小的估计值是一个好主意。 

 

此类及其视图和迭代器实现了 Map 和 Iterator 接口的所有可 方法。 

此类与 Hashtable 相似, 但与 HashMap 不同,它不 允许将 null 用作键或值。 

此类是 Java Collections Framework 的成员。 

从以下版本开始: 1.5  另请参见: 序列化表格

 

本文参考的文章

http://www.cnblogs.com/dolphin0520/p/3932905.html

http://ifeve.com/ConcurrentHashMap/

http://www.iteye.com/topic/344876

另外建议研究 java.util.concurrent包的源码时,先深入阅读《JAVA并发编程实战 第二版》这本书

并发容器的简单介绍

  ConcurrentHashMap代替同步的Map(Collections.synchronized(new HashMap())),众所周知,HashMap是根据散列值分段存储的,同步Map在同步的时候锁住了所有的段,而ConcurrentHashMap加锁的时候根据散列值锁住了散列值锁对应的那段,因此提高了并发性能。ConcurrentHashMap也增加了对常用复合操作的支持,比如"若没有则添加":putIfAbsent(),替换:replace()。这2个操作都是原子操作。

  CopyOnWriteArrayList和CopyOnWriteArraySet分别代替List和Set,主要是在遍历操作为主的情况下来代替同步的List和同步的Set,这也就是上面所述的思路:迭代过程要保证不出错,除了加锁,另外一种方法就是"克隆"容器对象。

  ConcurrentLinkedQuerue是一个先进先出的队列。它是非阻塞队列。

    ConcurrentSkipListMap可以在高效并发中替代SoredMap(例如用Collections.synchronzedMap包装的TreeMap)。

  ConcurrentSkipListSet可以在高效并发中替代SoredSet(例如用Collections.synchronzedSet包装的TreeMap)。

 

ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁。

 

ConcurrentHashMap

ConcurrentHashMap的内部结构

  ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类Hash Table的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下ConcurrentHashMap的内部结构:



 

 

   从上面的结构我们可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment, 第二次Hash定位到元素所在的链表的头部(桶头), 因此,这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上),所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。

 

segment

segment是ConcurrentHashMap存储元素的基本段,它本身是一个hashtable的实现,read操作时无锁的,write需要同步.

 

 

/**
     * Segments are specialized versions of hash tables.  This
     * subclasses from ReentrantLock opportunistically, just to
     * simplify some locking and avoid separate construction.
     */
static final class Segment<K,V> extends ReentrantLock implements Serializable {
        /*
         * Segments maintain a table of entry lists that are always
         * kept in a consistent state, so can be read (via volatile
         * reads of segments and tables) without locking.  This
         * requires replicating nodes when necessary during table
         * resizing, so the old lists can be traversed by readers
         * still using old version of table.
         *
         * This class defines only mutative methods requiring locking.
         * Except as noted, the methods of this class perform the
         * per-segment versions of ConcurrentHashMap methods.  (Other
         * methods are integrated directly into ConcurrentHashMap
         * methods.) These mutative methods use a form of controlled
         * spinning on contention via methods scanAndLock and
         * scanAndLockForPut. These intersperse tryLocks with
         * traversals to locate nodes.  The main benefit is to absorb
         * cache misses (which are very common for hash tables) while
         * obtaining locks so that traversal is faster once
         * acquired. We do not actually use the found nodes since they
         * must be re-acquired under lock anyway to ensure sequential
         * consistency of updates (and in any case may be undetectably
         * stale), but they will normally be much faster to re-locate.
         * Also, scanAndLockForPut speculatively creates a fresh node
         * to use in put if no node is found.
         */

        private static final long serialVersionUID = 2249069246763182397L;

        /**
         * The maximum number of times to tryLock in a prescan before
         * possibly blocking on acquire in preparation for a locked
         * segment operation. On multiprocessors, using a bounded
         * number of retries maintains cache acquired while locating
         * nodes.
         */
        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

       /**
         * The per-segment table. Elements are accessed via
         * entryAt/setEntryAt providing volatile semantics.
         */
        transient volatile HashEntry<K,V>[] table;

        /**
         * The number of elements. Accessed only either within locks
         * or among other volatile reads that maintain visibility.
         */
        transient int count;
        transient int modCount;

        transient int threshold;

        final float loadFactor;

        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
      }

      final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //锁是从ReentrantLock那里继承来的,是一个单一锁,所以就单Segement来说是独占排斥的
            HashEntry<K,V> node = tryLock() ? null :  //要更新段之前,必须获得段锁
                scanAndLockForPut(key, hash, value);

            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;//k==key完全相同  或者  e.hash == hash && key.equals(k)值等且hash值相等,替换
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null) //scanAndLockForPut(key, hash, value)新建的node
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node); //node是这个桶的首元素
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();//!!!释放段锁
            }
            return oldValue;
        }


       /**
         * Scans for a node containing given key while trying to
         * acquire lock, creating and returning one if not found. Upon
         * return, guarantees that lock is held. UNlike in most
         * methods, calls to method equals are not screened: Since
         * traversal speed doesn't matter, we might as well help warm
         * up the associated code and accesses as well.
         *
         * @return a new node if key not found, else null 没找到新建一个node,找到返回null!
         */
        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) { //If the current thread already holds this lock then the hold count is incremented by one and the method returns true
                                                                                        //If the lock is held by another thread then this method will return immediately with the value false
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    if (e == null) {//表示key没找到,创建一个新node
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key)) //找到key
                        retries = 0;
                    else //key没找到,继续找下个点e.next求证,此时retries = -1保持不变
                        e = e.next;
//创建新node 或者 找到后,进入else if (++retries > MAX_SCAN_RETRIES) ,尝试MAX_SCAN_RETRIES次后,锁住 for put!
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

   
      //在rehash被调用的地方,之前总是获得了Segment的ReentranLock的!
      @SuppressWarnings("unchecked")
        private void rehash(HashEntry<K,V> node) {
            /*
             * Reclassify nodes in each list to new table.  Because we
             * are using power-of-two expansion, the elements from
             * each bin must either stay at same index, or move with a
             * power of two offset. We eliminate unnecessary node
             * creation by catching cases where old nodes can be
             * reused because their next fields won't change.
             * Statistically, at the default threshold, only about
             * one-sixth of them need cloning when a table
             * doubles. The nodes they replace will be garbage
             * collectable as soon as they are no longer referenced by
             * any reader thread that may be in the midst of
             * concurrently traversing table. Entry accesses use plain
             * array indexing because they are followed by volatile
             * table write.
             */
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;//遍历原桶,找到“最后一个节点(其实叫做最后一个部分桶,部分桶的元素根据newTable计算出的hash值是一样的)”,以及其所在新桶 将在外层newTable上的位置!
                            }
                        }
                        newTable[lastIdx] = lastRun;

                        // Clone remaining nodes //原桶除去部分桶之外的元素
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;//新节点总是加载桶的最前端!
            table = newTable;
        }

   /**
         * Scans for a node containing the given key while trying to
         * acquire lock for a remove or replace operation. Upon
         * return, guarantees that lock is held.  Note that we must
         * lock even if the key is not found, to ensure sequential
         * consistency of updates.
         */
        private void scanAndLock(Object key, int hash) {
            // similar to but simpler than scanAndLockForPut
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            int retries = -1;
            while (!tryLock()) {
                HashEntry<K,V> f;
                if (retries < 0) {
                    if (e == null || key.equals(e.key))//key所在的桶为空  或者  在key所在的桶找到与key相等的元素表示找到了
                                         //进入设置retries = 0,进而进入了“尝试MAX_SCAN_RETRIES次”再被锁住
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f;
                    retries = -1;
                }
            }
        }
}

 

  我们再来具体了解一下Segment的数据结构:

static final class Segment<K,V> extends ReentrantLock implements Serializable {

    transient volatile int count;

    transient int modCount;

    transient int threshold;

    transient volatile HashEntry<K,V>[] table;

    final float loadFactor;

}

  

详细解释一下Segment里面的成员变量的意义

count:Segment中元素的数量

modCount:对table的大小造成影响的操作的数量(比如put或者remove操作)

threshold:阈值,Segment里面元素的数量超过这个值依旧就会对Segment进行扩容

table:链表数组,数组中的每一个元素代表了一个链表的头部

loadFactor:负载因子,用于确定threshold

 

HashEntry

Segment中的元素是以HashEntry的形式存放在链表数组中的,看一下HashEntry的结构:

 

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        /**
         * Sets next field with volatile write semantics.  (See above
         * about use of putOrderedObject.)
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

        // Unsafe mechanics
        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }
}

 

 

可以看到HashEntry的一个特点,除了value以外,其他的几个变量都是final的,这样做是为了防止链表结构被破坏,出现ConcurrentModification的情况。

 

ConcurrentHashMap初始化方法

 下面我们来结合源代码来具体分析一下ConcurrentHashMap的实现,先看下初始化方法:

 

@SuppressWarnings("unchecked")
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        //根据并发度concurrencyLevel,计算得出segment的数目
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;//段移位数 = 32 - log2(最大段数目的)
        this.segmentMask = ssize - 1;//段数目掩码  (最大段数目-1)
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;

        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        //根据初始容量 和 段的最小容量,计算得到段的容量

        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);

        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

 

 

CurrentHashMap的初始化一共有三个参数,一个initialCapacity,表示初始的容量,一个loadFactor,表示负载参数,最后一个是concurrentLevel,代表ConcurrentHashMap内部的Segment的数量, ConcurrentLevel一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。

 

 整个ConcurrentHashMap的初始化方法还是非常简单的,先是根据concurrentLevel来new出Segment,这里Segment的数量是不大于concurrentLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程。接下来就是根据intialCapacity确定Segment的容量的大小,每一个Segment的容量大小也是2的指数,同样使为了加快hash的过程。

 

  这边需要特别注意一下两个变量,分别是segmentShift和segmentMask,这两个变量在后面将会起到很大的作用,假设构造函数确定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。

 

static {
        int ss, ts;
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class tc = HashEntry[].class;
            Class sc = Segment[].class;
            TBASE = UNSAFE.arrayBaseOffset(tc);
            SBASE = UNSAFE.arrayBaseOffset(sc);
            ts = UNSAFE.arrayIndexScale(tc);
            ss = UNSAFE.arrayIndexScale(sc); //此数组对应的index位置范围
            HASHSEED_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("hashSeed"));
            SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentShift"));
            SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentMask"));
            SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segments"));
        } catch (Exception e) {
            throw new Error(e);
        }
        if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
            throw new Error("data type scale not a power of two");
      
        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss); 
        TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
    }

 

 

ConcurrentHashMap的get操作

前面提到过ConcurrentHashMap的get操作是不用加锁的,我们这里看一下其实现:

 

 

public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
//this.segmentShift = 32 - sshift;//段移位数 = 32 - log2(最大段数目的)
//  SSHIFT = 31 - Integer.numberOfLeadingZeros(ss); //除去前导零后 的位数
        long u = (((h >>> segmentShift) & segmentMask)  //让高位参与hash
                                                    << SSHIFT) //让低位参与hash
                                                     + SBASE;   //SBASE = UNSAFE.arrayBaseOffset(sc);
//根据低 log2(最大段数目的)位 就能确定位置

        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

 

 

 

参考文章的讲解,附在这儿

public V get(Object key) {

    int hash = hash(key.hashCode());

    return segmentFor(hash).get(key, hash);

}

 

  看第三行,segmentFor这个函数用于确定操作应该在哪一个segment中进行,几乎对ConcurrentHashMap的所有操作都需要用到这个函数,我们看下这个函数的实现:

final Segment<K,V> segmentFor(int hash) {

    return segments[(hash >>> segmentShift) & segmentMask];

}

   这个函数用了位操作来确定Segment,根据传入的hash值向右无符号右移segmentShift位,然后和segmentMask进行与操作,结合我们之前说的segmentShift和segmentMask的值,就可以得出以下结论:假设Segment的数量是2的n次方,根据元素的hash值的高n位就可以确定元素到底在哪一个Segment中。

 

  在确定了需要在哪一个segment中进行操作以后,接下来的事情就是调用对应的Segment的get方法:

V get(Object key, int hash) {

    if (count != 0) { // read-volatile

        HashEntry<K,V> e = getFirst(hash);

        while (e != null) {

            if (e.hash == hash && key.equals(e.key)) {

                V v = e.value;

                if (v != null)

                    return v;

                return readValueUnderLock(e); // recheck

            }

            e = e.next;

        }

    }

    return null;

}

   先看第二行代码,这里对count进行了一次判断,其中count表示Segment中元素的数量,我们可以来看一下count的定义:

 

transient volatile int count;

   可以看到count是volatile的,实际上这里里面利用了volatile的语义:

    对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。

 

    因为实际上put、remove等操作也会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。

 

  然后,在第三行,调用了getFirst()来取得链表的头部:

HashEntry<K,V> getFirst(int hash) {

    HashEntry<K,V>[] tab = table;

    return tab[hash & (tab.length - 1)];

}

   同样,这里也是用位操作来确定链表的头部,hash值和HashTable的长度减一做与操作,最后的结果就是hash值的低n位,其中n是HashTable的长度以2为底的结果。

 

  在确定了链表的头部以后,就可以对整个链表进行遍历,看第4行,取出key对应的value的值,如果拿出的value的值是null,则可能这个key,value对正在put的过程中,如果出现这种情况,那么就加锁来保证取出的value是完整的,如果不是null,则直接返回value。

 

 

ConcurrentHashMap的put操作

 

@SuppressWarnings("unchecked")
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        //先找到桶,再向桶中放
        return s.put(key, hash, value, false);
    }

 

 

ConcurrentHashMap的remove操作

 

public V remove(Object key) {
        int hash = hash(key);
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.remove(key, hash, null);
    }

    /**
     * {@inheritDoc}
     *
     * @throws NullPointerException if the specified key is null
     */
    public boolean remove(Object key, Object value) {
        int hash = hash(key);
        Segment<K,V> s;
        return value != null && (s = segmentForHash(hash)) != null &&
            s.remove(key, hash, value) != null;
    }

 /**
     * Get the segment for the given hash
     */
    @SuppressWarnings("unchecked")
    private Segment<K,V> segmentForHash(int h) {
        long u = (((h >>> segmentShift) & segmentMask)
                    << SSHIFT) 
                    + SBASE;
        return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
    }

 

 

定位Segment

既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。

 

private static int hash(int h) {

h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10);

h += (h << 3); h ^= (h >>> 6);

h += (h << 2) + (h << 14); return h ^ (h >>> 16);

}

再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。我做了一个测试,不通过再哈希而直接执行哈希计算。

 

System.out.println(Integer.parseInt("0001111", 2) & 15);

System.out.println(Integer.parseInt("0011111", 2) & 15);

System.out.println(Integer.parseInt("0111111", 2) & 15);

System.out.println(Integer.parseInt("1111111", 2) & 15);

计算后输出的哈希值全是15,通过这个例子可以发现如果不进行再哈希,哈希冲突会非常严重,因为只要低位一样,无论高位是什么数,其哈希值总是一样。我们再把上面的二进制数据进行再哈希后结果如下,为了方便阅读,不足32位的高位补了0,每隔四位用竖线分割下。

0100|0111|0110|0111|1101|1010|0100|1110

1111|0111|0100|0011|0000|0001|1011|1000

0111|0111|0110|1001|0100|0110|0011|1110

1000|0011|0000|0000|1100|1000|0001|1010

可以发现每一位的数据都散列开了,通过这种再哈希能让数字的每一位都能参加到哈希运算当中,从而减少哈希冲突。ConcurrentHashMap通过以下哈希算法定位segment。

 

默认情况下segmentShift为28,segmentMask为15,再哈希后的数最大是32位二进制数据,向右无符号移动28位,意思是让高4位参与到hash运算中, (hash >>> segmentShift) & segmentMask的运算结果分别是4,15,7和8,可以看到hash值没有发生冲突。

 

final Segment<K,V> segmentFor(int hash) {

        return segments[(hash >>> segmentShift) & segmentMask];

}

 

跨段操作 

些操作需要涉及到多个段,比如说size(), containsValaue()。先来看下size()方法:

 

前面我们提到了一个Segment中的有一个modCount变量,代表的是对Segment中元素的数量造成影响的操作的次数,这个值只增不减,size操作就是遍历了两次Segment,每次记录Segment的modCount值,然后将两次的modCount进行比较,如果相同, 则表示期间没有发生过写入操作,就将原先遍历的结果返回,如果不相同,则把这个过程再重复做一次,如果再不相同,则就需要将所有的Segment都锁住,然后一个一个遍历了,具体的实现大家可以看ConcurrentHashMap的源码,这里就不贴了。

 

public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {//经过RETRIES_BEFORE_LOCK次前后比较后,还不同,则就需要将所有的Segment都锁住,然后一个一个遍历
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;//在没锁的情况下主要是利用Segment中的modCount进行检测,在遍历过程中保存每个Segment的modCount,遍历完成之后再检测每个Segment的modCount有没有改变,如果有改变表示有其它线程正在对Segment进行结构性并发更新,需要重新计算。
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {//!!! 注意注意最后一定要释放锁
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

public boolean containsValue(Object value) {
        // Same idea as size()
        if (value == null)
            throw new NullPointerException();
        final Segment<K,V>[] segments = this.segments;
        boolean found = false;
        long last = 0;
        int retries = -1;
        try {
            outer: for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                long hashSum = 0L;
                int sum = 0;
                for (int j = 0; j < segments.length; ++j) {
                    HashEntry<K,V>[] tab;
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null && (tab = seg.table) != null) {
                        for (int i = 0 ; i < tab.length; i++) {
                            HashEntry<K,V> e;
                            for (e = entryAt(tab, i); e != null; e = e.next) {
                                V v = e.value;
                                if (v != null && value.equals(v)) {
                                    found = true;
                                    break outer;
                                }
                            }
                        }
                        sum += seg.modCount;
                    }
                }
                if (retries > 0 && sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return found;
    }

 

   跨段方法中还一个isEmpty()方法,其实现比size()方法还要简单,也不介绍了。最后简单地介绍下迭代方法,如keySet(), values(), entrySet()方法,这些方法都返回相应的迭代器,所有迭代器都继承于Hash_Iterator类(提交时居然提醒我,只得加了下划线),里实现了主要的方法。其结构是:

 

abstract class HashIterator {
        int nextSegmentIndex;
        int nextTableIndex;
        HashEntry<K,V>[] currentTable;
        HashEntry<K, V> nextEntry;
        HashEntry<K, V> lastReturned;

        HashIterator() {
            nextSegmentIndex = segments.length - 1;
            nextTableIndex = -1;
            advance();
        }

        /**
         * Set nextEntry to first node of next non-empty table
         * (in backwards order, to simplify checks).
         */
        final void advance() {
            for (;;) {
                if (nextTableIndex >= 0) {
                    if ((nextEntry = entryAt(currentTable,
                                             nextTableIndex--)) != null)
                        break;
                }
                else if (nextSegmentIndex >= 0) {
                    Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);//从最后一个段的哈希表 
                    if (seg != null && (currentTable = seg.table) != null)
                        nextTableIndex = currentTable.length - 1; //的最后一个桶 开始迭代遍历!
                }
                else
                    break;
            }
        }

        final HashEntry<K,V> nextEntry() {
            HashEntry<K,V> e = nextEntry;
            if (e == null)
                throw new NoSuchElementException();
            lastReturned = e; // cannot assign until after null check
            if ((nextEntry = e.next) == null)
                advance();
            return e;
        }

        public final boolean hasNext() { return nextEntry != null; }
        public final boolean hasMoreElements() { return nextEntry != null; }

        public final void remove() {
            if (lastReturned == null)
                throw new IllegalStateException();
            ConcurrentHashMap.this.remove(lastReturned.key);
            lastReturned = null;
        }
    }
 

 

   nextSegmentIndex是段的索引,nextTableIndex是nextSegmentIndex对应段中中hash链的索引,currentTable是nextSegmentIndex对应段的table。 

 

思考总结:

  其实并发编程中,为了提升效率, 常常通过 前后(重复)比较/校验 ,如果不变就表示没变化;  如果变化了,再校验,再校验有一定次数的,如果不变就表示没变化;  没变化,就认为得到预期值了,可以返回了

如果还是变化的,此时才来加锁,然后操作。  还有就是细化,散列化。

 

 

 

 

 

  • 大小: 36.3 KB
分享到:
评论

相关推荐

    并行环境下Java哈希机制的对比及重构.pdf

    本研究针对这一问题进行了深入探讨,并着重分析了在并行程序中,这两种数据结构的性能对比以及如何重构相关代码,以提高并行程序的运行效率和稳定性。 首先,要了解Hashtable与ConcurrentHashMap的基本区别。...

    Java源码分析及个人总结

    6. **并发编程**:Java提供了丰富的并发工具类,如线程池、并发容器(如ConcurrentHashMap)、锁机制(如synchronized、ReentrantLock)等,理解这些并发工具的原理和使用方式对于编写高并发程序至关重要。...

    Java 性能分析

    在Java性能分析领域,开发者需要深入理解程序运行的效率,以优化系统性能并解决潜在问题。本文将基于标题“Java性能分析...在实际工作中,结合标签中提到的“工具”和对“源码”的深入研究,是提升Java应用性能的关键。

    数据结构与算法分析(Java版)

    例如,Java集合框架(如ArrayList、LinkedList、HashMap等)的内部机制,以及并发编程时如何使用ConcurrentHashMap和CopyOnWriteArrayList等线程安全的数据结构。 四、实践应用 除了理论知识,书中还可能包含大量...

    数据结构与算法分析(java英文)

    算法分析则是研究算法的时间复杂度和空间复杂度,以评估算法的效率。时间复杂度衡量了算法执行所需的基本操作数量,通常用大O记法表示,如O(1)、O(n)、O(log n)、O(n log n)、O(n²)等。空间复杂度则关注算法运行时...

    基于JAVA技术的搜索引擎的研究报告及实现收藏.doc

    索引构建过程中,JAVA的集合框架(如ArrayList和HashMap)和并发库(如ConcurrentHashMap)为构建高性能索引提供了基础。同时,JAVA的反射机制和动态代理功能使得索引结构的扩展和优化变得灵活。 **2.2.3 Web服务器...

    别人家的面经21

    【Elasticsearch】:Elasticsearch是一个分布式、RESTful风格的搜索和数据分析引擎,用于全文检索、分析和存储。在面试中,面试者提到用Elasticsearch替代Solr是因为在Spring Boot项目中集成Solr遇到困难,而Elastic...

    java并发源码分析之实战编程

    "java并发源码分析之实战编程"这个主题深入探讨了Java平台上的并发处理机制,旨在帮助开发者理解并有效地利用这些机制来提高程序性能和可扩展性。在这个专题中,我们将围绕Java并发库、线程管理、锁机制、并发容器...

    基于数据结构与简化内存模型的Java集合教学方法研究.zip

    本研究主要探讨如何结合数据结构与简化内存模型来提升Java集合的教学效果,帮助学生更好地理解和运用这些概念。 在Java中,集合框架包括接口(如List、Set、Queue等)和实现类(如ArrayList、HashSet、LinkedList等...

    Java企业面试题

    - 研究并发容器如ConcurrentHashMap和CopyOnWriteArrayList的特点和使用场景。 - 讨论集合框架中的泛型、迭代器和枚举。 4. **异常处理** - 理解Checked异常和Unchecked异常的区别。 - 掌握try-catch-finally...

    搜索引擎的研究与实现(Java)(含源码).zip

    总的来说,"搜索引擎的研究与实现(Java)(含源码)"这份资料为学习者提供了一个全面了解和实践搜索引擎技术的平台,无论你是对搜索引擎原理感兴趣,还是希望开发自己的搜索引擎系统,都能从中受益匪浅。通过深入学习和...

    最新java公司面试真实题目2023

    - 研究集合的并发问题,如ConcurrentHashMap和CopyOnWriteArrayList的应用。 4. **多线程** - 了解线程的创建方式,如继承Thread类和实现Runnable接口。 - 掌握synchronized关键字,理解锁的原理。 - 讲解wait...

    JDK library usage analyze.

    标题“JDK库使用分析”指的是对Java Development Kit(JDK)中的库进行深入研究,了解各个类库的使用方式、功能以及优化技巧。在这个分析过程中,开发者通常会关注JDK提供的各种API如何被有效利用,以提升代码的效率...

    Java并发编程最全面试题 123道

    11. ConcurrentHashMap:深入理解并发容器ConcurrentHashMap的内部实现,包括分段锁和CAS操作。 12. CopyOnWriteArrayList和CopyOnWriteArraySet:了解这些线程安全的集合类是如何实现无锁并发的。 13. ...

    高并发Java服务器设计研究.zip

    本资料“高并发Java服务器设计研究”深入探讨了如何有效地处理大量并发请求,以确保系统的稳定性和效率。以下将对这个主题展开详细的解析。 1. **并发基础** - 并发性是指系统在同一时间处理多个任务的能力。在...

    java学习教程

    8. **集合框架的高级用法**:深入研究List、Set和Map接口的实现类,了解并发容器(如ConcurrentHashMap)和高级数据结构(如TreeSet和TreeMap)。 9. **Java虚拟机(JVM)**:理解JVM的工作原理,包括类加载机制、...

    java源码分析工具-multiThread:一个java多线程的学习库,包含:源码分析,工具实现,使用样例等

    - 可能还包括线程安全的数据结构,如`ConcurrentHashMap`,以及用于同步的工具类,如`CountDownLatch`, `CyclicBarrier`, `Semaphore`等。 4. **使用样例** - 项目中的样例代码能够演示如何在实际场景中应用多...

    jdk学习笔记

    例如,通过分析`HashMap`和`ConcurrentHashMap`的实现,可以了解不同场景下选择哪种数据结构更为合适;研究`StringBuilder`和`StringBuffer`的源码,可以明白多线程环境下字符串操作的差异。 总的来说,林信良的JDK...

    Java并发编程的艺术1

    源代码可以在出版社网站和并发编程网站上获取,以便读者进一步研究和实践。 最后,作者对在写作过程中给予支持的个人和机构表示了感谢,包括编辑、导师、同事和家人,他们的鼓励和帮助使本书得以完成。

    java-java面试题库整理-基础-JVM-线程并发-框架等.zip

    - 内存区域:分析堆、栈、方法区、程序计数器和本地方法栈的功能。 - 虚拟机调优:学习JVM参数设置,如何优化内存分配和垃圾回收性能。 3. **线程并发** - 线程概念:理解线程的生命周期,创建线程的方式...

Global site tag (gtag.js) - Google Analytics