`

Java多线程(四)之ConcurrentSkipListMap深入分析

 
阅读更多

一、前言 

concurrentHashMap与ConcurrentSkipListMap性能测试

在4线程1.6万数据的条件下,ConcurrentHashMap 存取速度是ConcurrentSkipListMap 的4倍左右。

但ConcurrentSkipListMap有几个ConcurrentHashMap 不能比拟的优点

1、ConcurrentSkipListMap 的key是有序的。

2、ConcurrentSkipListMap 支持更高的并发。ConcurrentSkipListMap 的存取时间是log(N),和线程数几乎无关。也就是说在数据量一定的情况下,并发的线程越多,ConcurrentSkipListMap越能体现出他的优势。 

 

二、使用建议

在非多线程的情况下,应当尽量使用TreeMap。此外对于并发性相对较低的并行程序可以使用Collections.synchronizedSortedMap将TreeMap进行包装,也可以提供较好的效率。对于高并发程序,应当使用ConcurrentSkipListMap,能够提供更高的并发度。


所以在多线程程序中,如果需要对Map的键值进行排序时,请尽量使用ConcurrentSkipListMap,可能得到更好的并发度。
注意,调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个操作是个O(log(n))的操作。

 

二、什么是SkipList

 

Skip list(跳表)是一种可以代替平衡树的数据结构,默认是按照Key值升序的。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过“空间来换取时间”的一个算法,在每个节点中增加了向前的指针,在插入、删除、查找时可以忽略一些不可能涉及到的结点,从而提高了效率。


从概率上保持数据结构的平衡比显示的保持数据结构平衡要简单的多。对于大多数应用,用Skip list要比用树算法相对简单。由于Skip list比较简单,实现起来会比较容易,虽然和平衡树有着相同的时间复杂度(O(logn)),但是skip list的常数项会相对小很多。Skip list在空间上也比较节省。一个节点平均只需要1.333个指针(甚至更少)。
                
图1-1 Skip list结构图(以7,14,21,32,37,71,85序列为例)

 

Skip list的性质

(1) 由很多层结构组成,level是通过一定的概率随机产生的。
(2) 每一层都是一个有序的链表,默认是升序,也可以根据创建映射时所提供的Comparator进行排序,具体取决于使用的构造方法。
(3) 最底层(Level 1)的链表包含所有元素。
(4) 如果一个元素出现在Level i 的链表中,则它在Level i 之下的链表也都会出现。
(5) 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。

 

三、什么是ConcurrentSkipListMap

 

ConcurrentSkipListMap提供了一种线程安全的并发访问的排序映射表。内部是SkipList(跳表)结构实现,在理论上能够在O(log(n))时间内完成查找、插入、删除操作。
      注意,调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个操作是个O(log(n))的操作。

 

 ConcurrentSkipListMap存储结构

 


ConcurrentSkipListMap存储结构图

 

跳跃表(SkipList):(如上图所示)
1.多条链构成,是关键字升序排列的数据结构;
2.包含多个级别,一个head引用指向最高的级别,最低(底部)的级别,包含所有的key;
3.每一个级别都是其更低级别的子集,并且是有序的;
4.如果关键字 key在 级别level=i中出现,则,level<=i的链表中都会包含该关键字key;

 

------------------------

ConcurrentSkipListMap主要用到了Node和Index两种节点的存储方式,通过volatile关键字实现了并发的操作

  

[java] view plaincopy
 
  1. static final class Node<K,V> {  
  2.         final K key;  
  3.         volatile Object value;//value值  
  4.         volatile Node<K,V> next;//next引用  
  5.         ……  
  6. }  
  7. static class Index<K,V> {  
  8.         final Node<K,V> node;  
  9.         final Index<K,V> down;//downy引用  
  10.        volatile Index<K,V> right;//右边引用  
  11.        ……  
  12. }  

 

 

------------------------

ConcurrentSkipListMap的查找

 

通过SkipList的方式进行查找操作:(下图以“查找91”进行说明:)

 


红色虚线,表示查找的路径,蓝色向右箭头表示right引用;黑色向下箭头表示down引用;

 

/get方法,通过doGet操作实现

 

[java] view plaincopy
 
  1. public V get(Object key) {  
  2.       return doGet(key);  
  3.  }  
  4.  //doGet的实现  
  5. private V doGet(Object okey) {  
  6.         Comparable<? super K> key = comparable(okey);  
  7.         Node<K,V> bound = null;  
  8.         Index<K,V> q = head;//把头结点作为当前节点的前驱节点  
  9.         Index<K,V> r = q.right;//前驱节点的右节点作为当前节点  
  10.         Node<K,V> n;  
  11.         K k;  
  12.         int c;  
  13.         for (;;) {//遍历  
  14.             Index<K,V> d;  
  15.             // 依次遍历right节点  
  16.             if (r != null && (n = r.node) != bound && (k = n.key) != null) {  
  17.                 if ((c = key.compareTo(k)) > 0) {//由于key都是升序排列的,所有当前关键字大于所要查找的key时继续向右遍历  
  18.                     q = r;  
  19.                     r = r.right;  
  20.                     continue;  
  21.                 } else if (c == 0) {  
  22.                     //如果找到了相等的key节点,则返回该Node的value如果value为空可能是其他并发delete导致的,于是通过另一种  
  23.                     //遍历findNode的方式再查找  
  24.                     Object v = n.value;  
  25.                     return (v != null)? (V)v : getUsingFindNode(key);  
  26.                 } else  
  27.                     bound = n;  
  28.             }  
  29.             //如果一个链表中right没能找到key对应的value,则调整到其down的引用处继续查找  
  30.             if ((d = q.down) != null) {  
  31.                 q = d;  
  32.                 r = d.right;  
  33.             } else  
  34.                 break;  
  35.         }  
  36.         // 如果通过上面的遍历方式,还没能找到key对应的value,再通过Node.next的方式进行查找  
  37.         for (n = q.node.next;  n != null; n = n.next) {  
  38.             if ((k = n.key) != null) {  
  39.                 if ((c = key.compareTo(k)) == 0) {  
  40.                     Object v = n.value;  
  41.                     return (v != null)? (V)v : getUsingFindNode(key);  
  42.                 } else if (c < 0)  
  43.                     break;  
  44.             }  
  45.         }  
  46.         return null;  
  47.     }  

 

 

 

------------------------------------------------

ConcurrentSkipListMap的删除

 

通过SkipList的方式进行删除操作:(下图以“删除23”进行说明:)

 


红色虚线,表示查找的路径,蓝色向右箭头表示right引用;黑色向下箭头表示down引用;

 

 

[java] view plaincopy
 
  1. //remove操作,通过doRemove实现,把所有level中出现关键字key的地方都delete掉  
  2. public V remove(Object key) {  
  3.         return doRemove(key, null);  
  4.  }  
  5.  final V doRemove(Object okey, Object value) {  
  6.         Comparable<? super K> key = comparable(okey);  
  7.         for (;;) {  
  8.             Node<K,V> b = findPredecessor(key);//得到key的前驱(就是比key小的最大节点)  
  9.             Node<K,V> n = b.next;//前驱节点的next引用  
  10.             for (;;) {//遍历  
  11.                 if (n == null)//如果next引用为空,直接返回  
  12.                     return null;  
  13.                 Node<K,V> f = n.next;  
  14.                 if (n != b.next)                    // 如果两次获得的b.next不是相同的Node,就跳转到第一层循环重新获得b和n  
  15.                     break;  
  16.                 Object v = n.value;  
  17.                 if (v == null) {                    // 当n被其他线程delete的时候,其value==null,此时做辅助处理,并重新获取b和n  
  18.                     n.helpDelete(b, f);  
  19.                     break;  
  20.                 }  
  21.                 if (v == n || b.value == null)      // 当其前驱被delet的时候直接跳出,重新获取b和n  
  22.                     break;  
  23.                 int c = key.compareTo(n.key);  
  24.                 if (c < 0)  
  25.                     return null;  
  26.                 if (c > 0) {//当key较大时就继续遍历  
  27.                     b = n;  
  28.                     n = f;  
  29.                     continue;  
  30.                 }  
  31.                 if (value != null && !value.equals(v))  
  32.                     return null;  
  33.                 if (!n.casValue(v, null))  
  34.                     break;  
  35.                 if (!n.appendMarker(f) || !b.casNext(n, f))//casNext方法就是通过比较和设置b(前驱)的next节点的方式来实现删除操作  
  36.                     findNode(key);                  // 通过尝试findNode的方式继续find  
  37.                 else {  
  38.                     findPredecessor(key);           // Clean index  
  39.                     if (head.right == null)   //如果head的right引用为空,则表示不存在该level  
  40.                         tryReduceLevel();  
  41.                 }  
  42.                 return (V)v;  
  43.             }  
  44.         }  
  45.     }  



 

-------------------------------------

 

ConcurrentSkipListMap的插入

 

 

通过SkipList的方式进行插入操作:(下图以“添加55”的两种情况,进行说明:)


在level=2(该level存在)的情况下添加55的图示:只需在level<=2的合适位置插入55即可

--------


在level=4(该level不存在,图示level4是新建的)的情况下添加55的情况:首先新建level4,然后在level<=4的合适位置插入55

-----------

 

[java] view plaincopy
 
  1. //put操作,通过doPut实现  
  2.  public V put(K key, V value) {  
  3.         if (value == null)  
  4.             throw new NullPointerException();  
  5.         return doPut(key, value, false);  
  6.  }  
  7. private V doPut(K kkey, V value, boolean onlyIfAbsent) {  
  8.         Comparable<? super K> key = comparable(kkey);  
  9.         for (;;) {  
  10.             Node<K,V> b = findPredecessor(key);//前驱  
  11.             Node<K,V> n = b.next;  
  12.            //定位的过程就是和get操作相似  
  13.             for (;;) {  
  14.                 if (n != null) {  
  15.                     Node<K,V> f = n.next;  
  16.                     if (n != b.next)               // 前后值不一致的情况下,跳转到第一层循环重新获得b和n  
  17.                         break;;  
  18.                     Object v = n.value;  
  19.                     if (v == null) {               // n被delete的情况下  
  20.                         n.helpDelete(b, f);  
  21.                         break;  
  22.                     }  
  23.                     if (v == n || b.value == null// b 被delete的情况,重新获取b和n  
  24.                         break;  
  25.                     int c = key.compareTo(n.key);  
  26.                     if (c > 0) {  
  27.                         b = n;  
  28.                         n = f;  
  29.                         continue;  
  30.                     }  
  31.                     if (c == 0) {  
  32.                         if (onlyIfAbsent || n.casValue(v, value))  
  33.                             return (V)v;  
  34.                         else  
  35.                             break// restart if lost race to replace value  
  36.                     }  
  37.                     // else c < 0; fall through  
  38.                 }  
  39.                 Node<K,V> z = new Node<K,V>(kkey, value, n);  
  40.                 if (!b.casNext(n, z))  
  41.                     break;         // restart if lost race to append to b  
  42.                 int level = randomLevel();//得到一个随机的level作为该key-value插入的最高level  
  43.                 if (level > 0)  
  44.                     insertIndex(z, level);//进行插入操作  
  45.                 return null;  
  46.             }  
  47.         }  
  48.     }  
  49.   
  50.  /** 
  51.      * 获得一个随机的level值 
  52.      */  
  53.     private int randomLevel() {  
  54.         int x = randomSeed;  
  55.         x ^= x << 13;  
  56.         x ^= x >>> 17;  
  57.         randomSeed = x ^= x << 5;  
  58.         if ((x & 0x8001) != 0// test highest and lowest bits  
  59.             return 0;  
  60.         int level = 1;  
  61.         while (((x >>>= 1) & 1) != 0) ++level;  
  62.         return level;  
  63.     }  
  64. //执行插入操作:如上图所示,有两种可能的情况:  
  65. //1.当level存在时,对level<=n都执行insert操作  
  66. //2.当level不存在(大于目前的最大level)时,首先添加新的level,然后在执行操作1   
  67. private void insertIndex(Node<K,V> z, int level) {  
  68.         HeadIndex<K,V> h = head;  
  69.         int max = h.level;  
  70.         if (level <= max) {//情况1  
  71.             Index<K,V> idx = null;  
  72.             for (int i = 1; i <= level; ++i)//首先得到一个包含1~level个级别的down关系的链表,最后的inx为最高level  
  73.                 idx = new Index<K,V>(z, idx, null);  
  74.             addIndex(idx, h, level);//把最高level的idx传给addIndex方法  
  75.         } else { // 情况2 增加一个新的级别  
  76.             level = max + 1;  
  77.             Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];  
  78.             Index<K,V> idx = null;  
  79.             for (int i = 1; i <= level; ++i)//该步骤和情况1类似  
  80.                 idxs[i] = idx = new Index<K,V>(z, idx, null);  
  81.             HeadIndex<K,V> oldh;  
  82.             int k;  
  83.             for (;;) {  
  84.                 oldh = head;  
  85.                 int oldLevel = oldh.level;  
  86.                 if (level <= oldLevel) { // lost race to add level  
  87.                     k = level;  
  88.                     break;  
  89.                 }  
  90.                 HeadIndex<K,V> newh = oldh;  
  91.                 Node<K,V> oldbase = oldh.node;  
  92.                 for (int j = oldLevel+1; j <= level; ++j)  
  93.                     newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);//创建新的  
  94.                 if (casHead(oldh, newh)) {  
  95.                     k = oldLevel;  
  96.                     break;  
  97.                 }  
  98.             }  
  99.             addIndex(idxs[k], oldh, k);  
  100.         }  
  101.     }  
  102. /** 
  103.      *在1~indexlevel层中插入数据  
  104.      */  
  105.     private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) {  
  106.         //  insertionLevel 代表要插入的level,该值会在indexLevel~1间遍历一遍  
  107.         int insertionLevel = indexLevel;  
  108.         Comparable<? super K> key = comparable(idx.node.key);  
  109.         if (key == nullthrow new NullPointerException();  
  110.         // 和get操作类似,不同的就是查找的同时在各个level上加入了对应的key  
  111.         for (;;) {  
  112.             int j = h.level;  
  113.             Index<K,V> q = h;  
  114.             Index<K,V> r = q.right;  
  115.             Index<K,V> t = idx;  
  116.             for (;;) {  
  117.                 if (r != null) {  
  118.                     Node<K,V> n = r.node;  
  119.                     // compare before deletion check avoids needing recheck  
  120.                     int c = key.compareTo(n.key);  
  121.                     if (n.value == null) {  
  122.                         if (!q.unlink(r))  
  123.                             break;  
  124.                         r = q.right;  
  125.                         continue;  
  126.                     }  
  127.                     if (c > 0) {  
  128.                         q = r;  
  129.                         r = r.right;  
  130.                         continue;  
  131.                     }  
  132.                 }  
  133.                 if (j == insertionLevel) {//在该层level中执行插入操作  
  134.                     // Don't insert index if node already deleted  
  135.                     if (t.indexesDeletedNode()) {  
  136.                         findNode(key); // cleans up  
  137.                         return;  
  138.                     }  
  139.                     if (!q.link(r, t))//执行link操作,其实就是inset的实现部分  
  140.                         break// restart  
  141.                     if (--insertionLevel == 0) {  
  142.                         // need final deletion check before return  
  143.                         if (t.indexesDeletedNode())  
  144.                             findNode(key);  
  145.                         return;  
  146.                     }  
  147.                 }  
  148.                 if (--j >= insertionLevel && j < indexLevel)//key移动到下一层level  
  149.                     t = t.down;  
  150.                 q = q.down;  
  151.                 r = q.right;  
  152.             }  
  153.         }  
  154.     }  

 

分享到:
评论

相关推荐

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、介绍三种高效优雅的Singleton实现方式.wmv │ 高并发编程第二阶段04讲、多线程的休息室WaitSet详细...

    多线程排序---希尔排序、快速排序、堆排序

    在计算机科学中,排序是数据处理的一个重要环节,尤其是在大数据处理和数据分析中。多线程技术则能够有效地利用现代...对于希望深入理解和应用多线程编程以及优化排序算法的Java开发者来说,这是一个非常有价值的资源。

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、介绍三种高效优雅的Singleton实现方式.wmv │ 高并发编程第二阶段04讲、多线程的休息室WaitSet详细...

    Java后端体系高级面试题

    3. **多线程**: - 线程的创建方式:实现Runnable接口、继承Thread类 - 同步机制:synchronized关键字,volatile变量 - 线程池:ExecutorService,ThreadPoolExecutor,ScheduledThreadPoolExecutor - 死锁、...

    Java基础学习25.pdf

    7. **并发集合类**:在多线程环境中,为了保证线程安全,Java提供了专门的线程安全集合类: - CopyOnWriteArrayList:线程安全的ArrayList实现,在写操作时复制底层数组。 - ConcurrentHashMap:线程安全的HashMap...

    关于 Java Collections API 您不知道的 5 件事,第 2 部分

    这些类在多线程环境中能保证并发访问的安全,但要注意它们的设计目标和性能特性,因为有些是牺牲了写入性能来提高读取性能的。 5. **集合工厂方法** 自Java 8起,Collections API引入了工厂方法,如`List.of()`, `...

    [电子书][java类]java并发编程实践

    《Java并发编程实践》这本书是Java开发者深入理解并发编程的重要参考资料。并发编程是现代多核处理器环境下不可或缺的技能,它涉及到如何在多个线程或进程之间有效地分配计算资源,以提高程序性能。Java语言提供了...

    java面试题(非常好)

    5. **多线程**:线程的创建方式(Thread类和Runnable接口),同步机制(synchronized关键字,Lock接口及其实现类),线程池(ExecutorService,ThreadPoolExecutor,ScheduledExecutorService)的使用和配置。...

    java_collection_source_code_analyze:Java集合部分源码分析-Source code collection

    本项目"java_collection_source_code_analyze"专注于对Java集合框架的源代码进行深入分析,帮助开发者理解其内部机制,从而更好地利用这些工具。下面我们将详细探讨Java集合框架中的主要类、接口以及它们的实现和...

    Java-Trees:与使用Java Trees的数据结构相关的程序

    - 并发性:虽然它们不是线程安全的,但可以使用`Collections.synchronizedXXX()`方法或者`ConcurrentSkipListSet`和`ConcurrentSkipListMap`来实现并发访问。 3. **自定义比较器**:如果你的元素不支持自然顺序...

    A Tree Map_map_tree_

    在多线程环境下,需要手动同步或者使用`ConcurrentSkipListMap`。 - `TreeMap`允许空键但不允许空值。 - `TreeMap`可以创建子视图,如头尾视图(headMap())、尾部视图(tailMap())和从指定键到尾部的视图...

    最小松弛优先

    "最小松弛优先"是一种任务调度算法,...通过深入分析这些文件,我们可以更好地理解和学习如何在Java中实现和应用“最小松弛优先”算法。这将是一个有价值的资源,特别是对于那些对任务调度和优化感兴趣的Java开发者。

Global site tag (gtag.js) - Google Analytics