`
yinwufeng
  • 浏览: 287018 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

jdk1.6 ConcurrentHashMap

阅读更多

jdk1.6 ConcurrentHashMap

分享

    Concurrent包,鼎鼎大名的Doug Lea开发的。让我来好好向大师学习学习。

    HashMap在单线程时效率非常高,但多线程环境下会出现许多问题。

    HashTable支持多线程,但每个操作都会锁住整个数组,效率一般。

    为了在效率和多线程安全之间找一个平衡点,Doug Lea引入了ConcurrentHashMap。

    /*
     * The basic strategy is to subdivide the table among Segments,
     * each of which itself is a concurrently readable hash table.
     */

    1.存储数据结构:

    final int segmentMask;  //Segment长度减一
    final int segmentShift; //偏移量

    final Segment<K,V>[] segments;

    (static final class Segment<K,V> extends ReentrantLock implements Serializable {...})

    2.构造函数:注意参数concurrencyLevel:the estimated number of concurrently
     updating threads即同时修改的线程数。表明并发量,直接影响Segment数组的长度。都是2的N次方。

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();

        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;        
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        segmentShift = 32 - sshift;
        segmentMask = ssize - 1;   
        this.segments = Segment.newArray(ssize);

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)
            cap <<= 1;

        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }

    3.put(),putIfAbsent()方法:

    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, false);
    }

    public V putIfAbsent(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, true);
    }

    首先对key.hashCode()进行了二次hash:

    private static int hash(int h) {
        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

    然后通过hash值定位对应的segment,使用segmentFor(hash):

    final Segment<K,V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }

    可以看到,hash>>>segmentShift是将低位去除,使用高若干位来和segmentMask相与运算,得到应放入哪个segment。

    然后调用segment.put()方法:

 V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash();
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
                    count = c; // write-volatile
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

  3.1 这里有一点要非常注意,即:

    static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;

        HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
            this.key = key;
            this.hash = hash;
            this.next = next;
            this.value = value;
        }

    我们可以看到,在添加一个key-value时,next指针和key,hash都是final类型,也就是说不可变更,那么当我们获取一个HashEntry之后,不用担心它的next链表结构会被其他线程改变。


  

  4.上述put(),putIfAbsent()方法内部同步了,使用的方式是通过Segment<K,V> extends ReentrantLock,使用ReentrantLock的lock()和unlock()方法实现。那么也就是说,锁的粒度是一个Segment,而其他的Segment并不会锁住。这相比于原来的HashTable锁,性能吞吐上明显提高,HashTable由于内部使用一个数组来实现的数据存储,因此上锁时,就会锁住整个数组,所有对数组的操作(put,get等)都会被阻塞。而ConcurrentHashMap使用多个数组来作为内部存储实现,上锁只会锁住其中一个数组,那么其他数组还是可以对外提供操作的。把锁的粒度减小了。

  5.我们再来看看get()操作:

    public V get(Object key) {
        int hash = hash(key.hashCode());
        return segmentFor(hash).get(key, hash);
    }

     V get(Object key, int hash) {
            if (count != 0) { // read-volatile
                HashEntry<K,V> e = getFirst(hash);
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;

//这个为什么会产生呢?这是以前JDK会出现的问题,目前已解决了,原因是在put()方//法调用时,分四步插入:
//  1          this.key = key;
//  2          this.hash = hash;
//  3          this.next = next;
//  4         this.value = value;
//       有可能执行了1,但还没执行4,所以value为默认null,因此需要readValueUnderLock(e)来读同步。

//说明:产生这个问题的根本原因在于之前jvm初始化对象操作是在堆上完成,而现在的jvm是在工作线程完成对//象初始化,然后同步到堆中去,不会出现初始化一半的情况。其他线程要么看不到,要么看到完整的对象。
                        if (v != null)
                            return v;
                        return readValueUnderLock(e); // recheck
                    }
                    e = e.next;
                }
            }
            return null;
        }

 

     V readValueUnderLock(HashEntry<K,V> e) {
            lock();
            try {
                return e.value;
            } finally {
                unlock();
            }
        }

   6.从put(),get(),remove()我们可以清晰地了解ConcurrentHashMap的内部实现思想及原理。put,remove是同步的,get()是非同步的。大多数时候,读起来非常快,接近HashMap.get()的速度。

   7.remove()方法的实现:

   整个remove实现并不复杂,但是需要注意如下几点。第一,当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是 volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。

   从remove()方法我们可以看出,在删除一个节点时,需要对链表该节点之前的所有节点重新创建,之后节点不用处理。

 V remove(Object key, int hash, Object value) {
            lock();
            try {
                int c = count - 1;
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue = null;
                if (e != null) {
                    V v = e.value;
                    if (value == null || value.equals(v)) {
                        oldValue = v;
                        // All entries following removed node can stay
                        // in list, but all preceding ones need to be
                        // cloned.
                        ++modCount;
                        HashEntry<K,V> newFirst = e.next;

                         //重新创建之前的节点。

                         for (HashEntry<K,V> p = first; p != e; p = p.next)
                            newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                          newFirst, p.value);
                        tab[index] = newFirst;
                        count = c; // write-volatile
                    }
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

     8.我们来看一下几个跨段操作的方法,size(),isEmpty().

   public int size() {
        final Segment<K,V>[] segments = this.segments;
        long sum = 0;
        long check = 0;
        int[] mc = new int[segments.length];
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
            check = 0;
            sum = 0;
            int mcsum = 0;
            for (int i = 0; i < segments.length; ++i) {
                sum += segments[i].count;
                mcsum += mc[i] = segments[i].modCount;
            }
            if (mcsum != 0) {
                for (int i = 0; i < segments.length; ++i) {
                    check += segments[i].count;
                    if (mc[i] != segments[i].modCount) {
                        check = -1; // force retry
                        break;
                    }
                }
            }
            if (check == sum)
                break;
        }

        //检测发现两次求和操作的值不一致,则证明有线程先修改,加锁求和。
        if (check != sum) { // Resort to locking all segments
            sum = 0;
            for (int i = 0; i < segments.length; ++i)
                segments[i].lock();
            for (int i = 0; i < segments.length; ++i)
                sum += segments[i].count;
            for (int i = 0; i < segments.length; ++i)
                segments[i].unlock();
        }
        if (sum > Integer.MAX_VALUE)
            return Integer.MAX_VALUE;
        else
            return (int)sum;
    }

  size()实现思路就是先不加锁,求两次和,检测modCount是否有改变(表明是否有其他线程在修改ConcurrentHashMap的某个段),重复尝试2次后,若还不相等(元素个数发生改变),则对所有段加锁求和。

 public boolean isEmpty() {
        final Segment<K,V>[] segments = this.segments;
        /*
         * We keep track of per-segment modCounts to avoid ABA
         * problems in which an element in one segment was added and
         * in another removed during traversal, in which case the
         * table was never actually empty at any point. Note the
         * similar use of modCounts in the size() and containsValue()
         * methods, which are the only other methods also susceptible
         * to ABA problems.
         */
        int[] mc = new int[segments.length];
        int mcsum = 0;
        for (int i = 0; i < segments.length; ++i) {
            if (segments[i].count != 0)
                return false;
            else
                mcsum += mc[i] = segments[i].modCount;
        }
        // If mcsum happens to be zero, then we know we got a snapshot
        // before any modifications at all were made.  This is
        // probably common enough to bother tracking.
        if (mcsum != 0) {
            for (int i = 0; i < segments.length; ++i) {
                if (segments[i].count != 0 ||
                    mc[i] != segments[i].modCount)
                    return false;
            }
        }
        return true;
    }

   isEmpty()方法,也是比较两次,若不等则返回false,否则返回true。不需要加锁操作。

分享到:
评论

相关推荐

    jdk1.6 解压版-windows

    JDK1.6是Oracle公司发布的一个较早版本,适用于Windows操作系统。在这个解压版中,用户无需进行安装过程,可以直接在Windows环境下使用JDK的各个工具。 JDK1.6包含的主要组件有: 1. **Java编译器**(javac):...

    jdk1.6_green_32bit

    本资源"jdk1.6_green_32bit"是一个针对32位操作系统的绿色版JDK,无需安装即可使用,这在某些场景下非常方便,比如在没有管理员权限或者希望快速部署的环境中。 JDK 1.6,也被称为Java SE(标准版)6,是Oracle公司...

    jdk1.6老版本下载

    7. **并发编程**:Java 1.6的`java.util.concurrent`包得到了扩展,新增了一些并发工具类,如`ConcurrentHashMap`,提高了多线程环境下的性能。 8. **JMX增强**:Java Management Extensions(JMX)在1.6版本中得到...

    JDK 1.6 JDK 1.6 winxp专用 winxp专用

    **Java Development Kit (JDK) 1.6 for Windows XP** JDK 1.6,也被称为Java SE 6,是Oracle公司发布的Java平台标准版的一个重要版本,主要用于开发和运行Java应用程序。这个版本在2006年推出,带来了许多新特性、...

    32位jdk1.6

    **32位JDK1.6详解** Java Development Kit(JDK)是Oracle公司发布的用于开发和运行Java应用程序的重要工具集。JDK1.6,也被称为Java SE 6(Java Standard Edition 6),是Java平台的一个重要版本,它包含了Java...

    jdk1.6压缩包,老版本的jdk

    在解压提供的"jdk1.6.zip"文件后,你会得到两个主要部分:`jdk1.6`和`jre6`。`jdk1.6`包含了开发工具,如javac编译器、jar打包工具等,而`jre6`则是Java运行环境,包含JVM(Java Virtual Machine)和其他运行Java...

    jdk 1.6 64位安装包+安装过程

    **Java Development Kit (JDK) 1.6 64位安装详解** JDK(Java Development Kit)是Oracle公司(原Sun Microsystems)为Java开发者提供的核心工具集,它包含了Java运行环境(Java Runtime Environment,JRE)以及一...

    jdk1.6-bin.zip

    这个压缩包文件"jdk1.6-bin.zip"包含了JDK 1.6版本的二进制文件,适用于Linux操作系统,特别是64位系统。JDK 1.6,也被称为Java SE(标准版)6,发布于2006年,它在Java平台上具有里程碑式的意义,引入了许多新特性...

    jdk1.6windows-x64

    标题"jdk1.6windows-x64"指的是这个压缩包包含的是适用于Windows 64位系统的JDK 1.6版本。"windows-x64"标识了这是专为64位Windows操作系统设计的,确保在64位环境下可以正确运行Java应用。 描述中的"直接解压及用...

    JDK1.6 64位下载

    JDK1.6,也称为Java SE 6,是Java历史上的一个重要版本,它在2006年发布,提供了许多新特性、改进和优化。对于那些需要在旧系统上运行或开发针对该版本兼容应用的用户来说,JDK1.6 64位版本尤其有价值。 首先,64位...

    jdk1.6(linux和windows).rar

    JDK1.6是Oracle公司发布的一个早期版本,发布时间为2006年,它是Java SE(标准版)平台的重要组成部分。这个版本包含了Java虚拟机(JVM)、Java类库以及开发工具,如Java编译器(javac)、Java文档生成器(javadoc)...

    JDK 1.6 中文版.zip

    **JDK 1.6 中文版** JDK(Java Development Kit)是Oracle公司发布的用于开发和运行Java应用程序的工具集。JDK 1.6,也被称为Java SE 6,是Java平台的一个重要版本,它包含了Java编译器、JVM(Java虚拟机)、调试...

    Java开发工具jdk1.6

    在并发编程方面,JDK 1.6提供了更多的并发工具类,如`ConcurrentHashMap`、`CountDownLatch`和`CyclicBarrier`,这些工具简化了多线程编程,帮助开发者更好地管理和同步线程。此外,`java.util.concurrent`包的引入...

    jdk1.6gfb_55452.zip

    "jdk1.6gfb_55452.zip" 是一个包含了JDK 1.6特定版本的压缩包,它提供了Java编译器、Java虚拟机(JVM)、调试工具和其他必要的库,以便开发者能在其计算机上构建、测试和部署Java应用。 JDK 1.6,也被称为Java SE...

    jdk1.6.tar.gz

    `jdk1.6.tar.gz` 文件就是一个包含了JDK 1.6所有组件的压缩包,开发者可以通过解压这个文件来安装和使用Java 1.6环境。 JDK是Java程序员的基础,它包括了Java运行时环境(JRE)、编译器(javac)、Java调试器(jdb...

    jdk 1.6 API 中文版帮助文档

    **JDK 1.6 API 中文版帮助文档详解** Java Development Kit (JDK) 是Java编程语言的核心组件,它包含了一系列工具和类库,使得开发者能够编写、编译、调试以及运行Java应用程序。JDK 1.6是Java的一个重要版本,发布...

    jdk版本1.6

    10. **并发工具类**:Java并发包(java.util.concurrent)在1.6版本中进一步增强,包括新的并发数据结构(如ConcurrentHashMap)和线程池API(ExecutorService),帮助开发者编写高效的多线程程序。 11. **安全增强...

    java 官方原版jdk 1.6

    Java JDK 1.6,全称为Java Development Kit版本1.6,是Oracle公司发布的一款用于开发和运行Java应用程序的重要工具集。它包含了编译器、Java虚拟机(JVM)、类库以及各种开发者工具,是Java程序员必备的基础环境。在...

    jdk1.6安装文件-64位

    JDK 1.6,也称为Java SE 6,是Oracle公司发布的一个早期版本,主要面向Windows操作系统。在这个版本中,Java引入了许多新特性和改进,旨在提高开发者的生产力和应用程序的性能。 首先,让我们详细了解一下JDK 1.6的...

    jdk1.6安装包

    JDK1.6,也称为Java SE 6,是Java历史上的一个重要版本,它包含了Java编译器、Java运行环境、Java API库以及各种开发工具。这个安装包是针对Java开发者的,用于在他们的计算机上安装并配置Java开发环境。 首先,JDK...

Global site tag (gtag.js) - Google Analytics