Jdk1.6 JUC源码解析(24)-ConcurrentLinkedQueue
作者:大飞
功能简介:
- ConcurrentLinkedQueue是一种基于单向链表实现的无界的线程安全队列。队列中的元素遵循先入先出(FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。
- ConcurrentLinkedQueue内部采用一种wait-free(无等待)算法来实现。
源码分析:
- 首先看下内部结构:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable { private static final long serialVersionUID = 196745693267521676L; private static class Node<E> { private volatile E item; private volatile Node<E> next; Node(E item) { // Piggyback on imminent casNext() lazySetItem(item); } E getItem() { return item; } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void setItem(E val) { item = val; } void lazySetItem(E val) { UNSAFE.putOrderedObject(this, itemOffset, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } Node<E> getNext() { return next; } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); private static final long nextOffset = objectFieldOffset(UNSAFE, "next", Node.class); private static final long itemOffset = objectFieldOffset(UNSAFE, "item", Node.class); } /** * 表示第一个存活的节点(头节点),访问此节点时间复杂度为O(1)。 * Invariants: * - 所有存活的节点都能通过头结点到达。 * Non-invariants: * - 头节点的元素可以为null。 * - 可以允许尾节点比头节点滞后,也就是说,可能从头节点无法到达尾节点。 */ private transient volatile Node<E> head = new Node<E>(null); /** * 队列的尾节点(唯一的next为null的节点),访问此节点时间复杂度为O(1)。 * Invariants: * - 最后的节点可以通过对tail调用succ()方法访问到。 * Non-invariants: * - 尾节点的元素可以为null。 * - 可以允许尾节点比头节点滞后,也就是说,可能从头节点无法到达尾节点。 * - 尾节点的next可能指向自身。 */ private transient volatile Node<E> tail = head; public ConcurrentLinkedQueue() {}
可见,内部并没有锁,只有一个头节点和一个尾节点。初始时头节点和尾节点都指向同一节点。
- 接下来看一下添加元素到队列的方法:
public boolean add(E e) { return offer(e); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node<E> n = new Node<E>(e); retry: for (;;) { Node<E> t = tail; Node<E> p = t; for (int hops = 0; ; hops++) { Node<E> next = succ(p); // 1.获取p的后继节点。(如果p的next指向自身,返回head节点) if (next != null) { // 2.如果next不为null if (hops > HOPS && t != tail) continue retry; // 3.如果自旋次数大于HOPS,且t不是尾节点,跳出2层循环重试。 p = next; // 4.如果自旋字数小于HOPS或者t是尾节点,将p指向next。 } else if (p.casNext(null, n)) { // 5.如果next为null,尝试将p的next节点设置为n,然后自旋。 if (hops >= HOPS) casTail(t, n); // 6.如果设置成功且自旋次数大于HOPS,尝试将n设置为尾节点,失败也没关系。 return true; // 7.添加成功。 } else { p = succ(p); // 8。如果第5步尝试将p的next节点设置为n失败,那么将p指向p的后继节点,然后自旋。 } } } } final Node<E> succ(Node<E> p) { Node<E> next = p.getNext(); //如果p节点的next节点指向自身,那么返回head节点;否则返回p的next节点。 return (p == next) ? head : next; } /** * 允许头尾节点的指针滞后,所以当头尾节点离"实际位置"的距离 * (按节点)小于HOPS时,不会去更新头尾指针。这里是假设volatile写代价比volatile读高。 */ private static final int HOPS = 1;
简单分析一下offer方法里的过程:
1.假设没有竞争的情况下,初始状态时head和tail都指向一个节点,这时来了一个新节点n1,代码走向上面注释第1行,得到的next为null,然后走向第5行,然后将p(也就是tail节点)的next节点设置为n1,成功返回。注意这里并没有调整tail指针,head和tail还是指向之前的节点。然后再来一个新节点n2,还是走向第1行,得到的next是n1,然后走向第8行,将p指向n1,自旋一次,又来到第1行,得到的next为null,然后走向第5行,将p(也就是n1)的next节点设置为n2。由于已经自旋一次,所以这时还会走向第6行,将tail指向n2。
2.有竞争的情况也很好分析,假设在注释第5行竞争失败,那么会走向第8行,将p向队尾推进一步,然后重试;如果重试了多次后(超过一次),在注释第一行获取的next仍然不为空,且同时尾节点也被其他线程推进了,那说明当前节点离尾节点太远了,跳出循环,重新定位尾节点然后再试,这样可以减少自旋次数。
- 再看一下从队列中获取元素的方法:
public E poll() { Node<E> h = head; Node<E> p = h; for (int hops = 0; ; hops++) { E item = p.getItem(); // 1.获取p节点上的元素item。 if (item != null && p.casItem(item, null)) { // 2.如果item不为null,尝试将p的item设置为null。 if (hops >= HOPS) { Node<E> q = p.getNext(); updateHead(h, (q != null) ? q : p); // 3.如果自旋次数大于HOPS,尝试更新头节点。 } return item; // 4.获取元素成功。 } Node<E> next = succ(p); // 5.获取p的后继节点。(如果p的next指向自身,那么返回head节点) if (next == null) { updateHead(h, p); // 6.如果p的后继节点为null,尝试将p设置为头节点,然后跳出循环。 break; } p = next; } return null; // 7.从第6步过来,没有成功获取元素,返回null。 } final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); //将h的next指向自身,帮助GC。 }
和offer方法一样的思路。
再看下peek方法,只获取元素,不移除节点:
public E peek() { Node<E> h = head; Node<E> p = h; E item; for (;;) { item = p.getItem(); if (item != null) break; Node<E> next = succ(p); if (next == null) { break; } p = next; } updateHead(h, p); return item; }
- 继续看一下其他方法:
看其他方法之前,先看一个内部支持方法first:
Node<E> first() { Node<E> h = head; Node<E> p = h; Node<E> result; for (;;) { E item = p.getItem(); if (item != null) { result = p; break; } Node<E> next = succ(p); if (next == null) { result = null; break; } p = next; } updateHead(h, p); // 这里会帮助推进一下头节点。 return result; }可见,和peek里面的逻辑基本一致。但之所以没有利用first来实现peek,而是单独写peek,是因为可以减少一次volatile读。
public boolean isEmpty() { return first() == null; } /** * 注意这个方法内部使用遍历实现,时间复杂度为O(n)。 */ public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) { if (p.getItem() != null) { // Collections.size() spec says to max out if (++count == Integer.MAX_VALUE) break; } } return count; } public boolean contains(Object o) { if (o == null) return false; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.getItem(); if (item != null && o.equals(item)) return true; } return false; } public boolean remove(Object o) { if (o == null) return false; Node<E> pred = null; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.getItem(); if (item != null && o.equals(item) && p.casItem(item, null)){ Node<E> next = succ(p); if(pred != null && next != null) pred.casNext(p, next); return true; } pred = p; } return false; } public Object[] toArray() { // Use ArrayList to deal with resizing. ArrayList<E> al = new ArrayList<E>(); for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.getItem(); if (item != null) al.add(item); } return al.toArray(); } @SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { // try to use sent-in array int k = 0; Node<E> p; for (p = first(); p != null && k < a.length; p = succ(p)) { E item = p.getItem(); if (item != null) a[k++] = (T)item; } if (p == null) { if (k < a.length) a[k] = null; return a; } // If won't fit, use ArrayList version ArrayList<E> al = new ArrayList<E>(); for (Node<E> q = first(); q != null; q = succ(q)) { E item = q.getItem(); if (item != null) al.add(item); } return al.toArray(a); }这些方法都是基于first方法实现,代码很容易看懂,这里就不啰嗦了。
- 最后,ConcurrentLinkedQueue的迭代器也是弱一致的。
小总结一下:
ConcurrentLinkedQueue内部在插入或者移除元素时,不会即时设置头尾节点,而是有一个缓冲期(一个节点长的距离),这样能减少一些CAS操作;其次在设置头尾节点的时候,也不会CAS-Loop直到成功,只尝试一次,失败也没关系,下一次操作或者其他线程在操作时会帮忙推进头尾节点(将头尾指针指向正确位置)。
ConcurrentLinkedQueue的代码解析完毕!
相关推荐
aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...
2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02
下载的压缩包文件"jdk-6u45-windows-x64(1.6 64).exe"是Windows 64位系统的安装程序。安装过程中,用户需要选择安装路径,并设置环境变量,包括`JAVA_HOME`指向JDK的安装目录,`PATH`添加JDK的bin目录,确保系统可以...
1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar
1. 解压缩"java-jdk1.6-jdk-6u45-windows-x64.zip"文件,这将释放出"jdk-6u45-windows-x64.exe"可执行文件。 2. 双击运行"jdk-6u45-windows-x64.exe",安装向导会引导你完成安装过程。通常,你需要选择安装路径,...
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
标题中的"jdk-jdk1.6.0.24-windows-i586.exe"是一个Java Development Kit(JDK)的安装程序,适用于Windows操作系统且为32位版本。JDK是Oracle公司提供的一个用于开发和运行Java应用程序的软件包。这个特定的版本,...
标题中的“jdk1.6集成jjwt的问题”指的是在Java Development Kit (JDK) 版本1.6的环境下,尝试整合JSON Web Token (JWT) 库jjwt时遇到的挑战。JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息作为 ...
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
logback-cfca-jdk1.6-3.1.0.0.jar
三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
### JDK1.6安装及与JDK-1.5版本共存 #### 一、前言 随着软件开发环境的变化和技术的进步,不同的项目可能需要不同的Java版本来支持其运行。例如,在某些特定环境下,可能既需要使用JDK1.5(Java Development Kit ...
- 这可能是ZXing库的完整源码包,专门针对JDK1.6编译,包含了所有必要的源文件和资源,供开发者进行更深度的定制和集成。 总之,ZXing库是一个强大的条形码和二维码工具,这个特别适配JDK1.6的版本为那些仍在使用...
这个压缩包文件"jdk-6u45-linux-x64.zip"包含的是JDK 1.6.0_45(也被称为6u45或1.6u45)的64位Linux版本。JDK 1.6是Java平台标准版的一个重要版本,它提供了许多功能和性能改进,是许多企业级应用的基础。 JDK 1.6u...
JDK1.6是Oracle公司发布的一个较早版本,适用于Windows操作系统。在这个解压版中,用户无需进行安装过程,可以直接在Windows环境下使用JDK的各个工具。 JDK1.6包含的主要组件有: 1. **Java编译器**(javac):...
jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3
压缩包中的文件`jdk-6u45-windows-i586.exe`是JDK 1.6更新45的Windows 32位安装程序。安装步骤通常包括: 1. 下载并运行安装程序。 2. 遵循安装向导的提示,选择安装路径和组件。 3. 设置环境变量,包括`JAVA_HOME`...
java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64
《OkHttp3.8.0-JDK1.6:低版本环境下的高效网络通信库》 OkHttp3.8.0-jdk1.6.zip是一个专门为Java Web项目设计的网络通信库,它针对JDK1.6进行了优化和重新编译,确保在较低版本的Java环境中也能稳定运行。OkHttp,...