Jdk1.6 JUC源码解析(22)-LinkedBlockingDeque
作者:大飞
功能简介:
- LinkedBlockingDeque是一种基于双向链表实现的有界的(可选的,不指定默认int最大值)阻塞双端队列。
双端队列一般适用于工作密取模式,即每个消费者都拥有自己的双端队列,如果某个消费者完成了自己队列的全部任务,可以到其他消费者双端队列尾部秘密获取任务来处理。
源码分析:
- LinkedBlockingDeque实现了BlockingDeque接口,简单看下这个接口:
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> { /** * 将元素插入队头,如果队列满了,抛出IllegalStateException。 */ void addFirst(E e); /** * 将元素插入队尾,如果队列满了,抛出IllegalStateException。 */ void addLast(E e); /** * 将元素插入队头,如果成功,返回true;如果队列满了,返回false。 */ boolean offerFirst(E e); /** * 将元素插入队尾,如果成功,返回true;如果队列满了,返回false。 */ boolean offerLast(E e); /** * 将元素插入队头。如果队列满了,阻塞等待,直到队列有可用空间。 */ void putFirst(E e) throws InterruptedException; /** * 将元素插入队尾。如果队列满了,阻塞等待,直到队列有可用空间。 */ void putLast(E e) throws InterruptedException; /** * 将元素插入队头。如果队列满了,阻塞等待。 * 如果插入成功,返回true;如果等待超时,返回false。 */ boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * 将元素插入队尾。如果队列满了,阻塞等待。 * 如果插入成功,返回true;如果等待超时,返回false。 */ boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * 获取并移除队头元素,如果队头没有元素,阻塞等待,直到有元素可以获取。 */ E takeFirst() throws InterruptedException; /** * 获取并移除队尾元素,如果队头没有元素,阻塞等待,直到有元素可以获取。 */ E takeLast() throws InterruptedException; /** * 获取并移除队头元素,如果队头没有元素,阻塞等待。 * 如果过了给定的时间还不能获取元素,返回null。 */ E pollFirst(long timeout, TimeUnit unit) throws InterruptedException; /** * 获取并移除队尾元素,如果队头没有元素,阻塞等待。 * 如果过了给定的时间还不能获取元素,返回null。 */ E pollLast(long timeout, TimeUnit unit) throws InterruptedException; /** * 从队列中删除第一个出现的指定元素。 * 如果有这个元素并删除,返回true;如果没有指定元素,返回false。 */ boolean removeFirstOccurrence(Object o); /** * 从队列中删除最后一个出现的指定元素。 * 如果有这个元素并删除,返回true;如果没有指定元素,返回false。 */ boolean removeLastOccurrence(Object o); // *** BlockingQueue methods *** /** * 添加一个元素到队尾,如果成功,返回true。如果队列满了,抛出IllegalStateException。 */ boolean add(E e); /** * 添加一个元素到队尾。添加成功,返回treue;如果队列满了,返回false。 */ boolean offer(E e); /** * 将元素插入队尾。如果队列满了,阻塞等待。 */ void put(E e) throws InterruptedException; /** * 将元素插入队尾。如果队列满了,阻塞等待。 * 如果插入成功,返回true;如果等待超时,返回false。 */ boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * 获取并移除队头元素,如果队头没有元素,抛出NoSuchElementException异常。 */ E remove(); /** * 获取并移除队头元素,如果队头没有元素,返回null。 */ E poll(); /** * 获取并移除队头元素,如果队头没有元素,阻塞等待,直到有元素可以获取。 */ E take() throws InterruptedException; /** * 获取并移除队头元素,如果队头没有元素,阻塞等待。 * 如果过了给定的时间还不能获取元素,返回null。 */ E poll(long timeout, TimeUnit unit) throws InterruptedException; /** * 查看队头元素,如果队头没有元素,抛出NoSuchElementException。 */ E element(); /** * 查看队头元素,如果队头没有元素,返回null。 */ E peek(); /** * 从队列中删除第一个出现的指定元素。 * 如果有这个元素并删除,返回true;如果没有指定元素,返回false。 */ boolean remove(Object o); /** * 查看队列是否包含给定的元素,包含返回true;不包含返回false。 */ public boolean contains(Object o); /** * 返回队列中元素数量。 */ public int size(); /** * 返回一个从头到尾排序的迭代器。 */ Iterator<E> iterator(); // *** Stack methods *** /** * 相当于addFirst(Object) */ void push(E e); }
- 接下来看下LinkedBlockingDeque内部数据结构:
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { private static final long serialVersionUID = -387911632671998426L; /** Doubly-linked list node class */ static final class Node<E> { /** * 保存元素的域,如果为nul说明当前节点已被删除。 */ E item; /** * - 指向其前驱节点。 * - 如果指向自身,说明前面是队尾节点。 * - 如果为null,说明没有前驱节点。 */ Node<E> prev; /** * - 指向其后继节点。 * - 如果指向自身,说明后面是队头节点。 * - 如果为null,说明没有后继节点。 */ Node<E> next; Node(E x, Node<E> p, Node<E> n) { item = x; prev = p; next = n; } } /** 指向队头节点 */ transient Node<E> first; /** 指向队尾节点 */ transient Node<E> last; /** 队列中元素数量 */ private transient int count; /** 队列最大容量 */ private final int capacity; /** 队列中保护访问使用的锁 */ final ReentrantLock lock = new ReentrantLock(); /** 获取元素的等待条件(队列非空) */ private final Condition notEmpty = lock.newCondition(); /** 插入元素的等待条件(队列非满) */ private final Condition notFull = lock.newCondition(); /** * 不指定容量,默认为Integer.MAX_VALUE(可认为无限)。 */ public LinkedBlockingDeque() { this(Integer.MAX_VALUE); } public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; }
内部结构:一个双向链表、一把锁和两个锁条件。
- 再看下内部的一些基础操作,这些操作必须在持有锁的前提下调用:
/** * Links e as first element, or returns false if full. */ private boolean linkFirst(E e) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; //如果队列已满,返回false; Node<E> f = first; //新建节点x,用来存放数据e,将e插入到队头节点前面。 Node<E> x = new Node<E>(e, null, f); //然后将e设置为队头节点。 first = x; if (last == null) last = x; //如果没有队尾节点,那么将x设置为队尾节点。 else f.prev = x; //如果有队尾节点,那么将f的prev指向x,完成节点拼接。 ++count; // 累加当前元素计数 notEmpty.signal(); //有元素入队了,唤醒在notEmpty上等待的获取元素的线程。 return true; }
linkFirst就是将一个元素插入到队头,并成为新的队头元素。
/** * Links e as last element, or returns false if full. */ private boolean linkLast(E e) { // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; //如果队列已满,返回false; Node<E> l = last; //新建节点x,用来存放数据e,将e插入到队尾节点后面。 Node<E> x = new Node<E>(e, l, null); last = x; if (first == null) first = x; //如果没有队头节点,那么将x设置为队头节点。 else l.next = x; //如果有队头节点,那么将l的next指向x,完成节点拼接。 ++count; // 累加当前元素计数 notEmpty.signal(); //有元素入队了,唤醒在notEmpty上等待的获取元素的线程。 return true; }
linkLast就是将一个元素插入到队尾,并成为新的队尾元素。
/** * Removes and returns first element, or null if empty. */ private E unlinkFirst() { // assert lock.isHeldByCurrentThread(); Node<E> f = first; //获取队头节点f。 if (f == null) return null; //如果没有队头节点,返回null。 Node<E> n = f.next; //获取f的后继节点。 E item = f.item; //获取f的元素item。 f.item = null; //将f的item域置空。 f.next = f; // 将f的next域指向自身,帮助GC。 first = n; //将f的后继节点n设置为新的队头节点。 if (n == null) last = null; //如果n为空,说明队列为空了,把队尾节点也置空一下。 else n.prev = null; //如果n不为空,现在n是队头节点,需要将其prev域置空。 --count; //递减当前元素计数 notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。 return item; //返回元素item。 }
unlinkFirst就是将现有的队头节点移除,并将其后继节点设置为新的队头节点,并返回移除的队头节点中保存的元素。
/** * Removes and returns last element, or null if empty. */ private E unlinkLast() { // assert lock.isHeldByCurrentThread(); Node<E> l = last; //获取队尾节点l。 if (l == null) return null; //如果没有队尾节点,返回null。 Node<E> p = l.prev; //获取l的前驱节点。 E item = l.item; //获取l的元素item。 l.item = null; //将l的item域置空。 l.prev = l; // 将f的prev域指向自身,帮助GC。 last = p; //将l的前驱节点p设置为新的队尾节点。 if (p == null) first = null; //如果p为空,说明队列为空了,把队头节点也置空一下。 else p.next = null; //如果p不为空,现在p是队尾节点,需要将其next域置空。 --count; //递减当前元素计数 notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。 return item; //返回元素item。 }
unlinkLast就是将现有的队尾节点移除,并将其前驱节点设置为新的队尾节点,并返回移除的队尾节点中保存的元素。
/** * Unlinks x */ void unlink(Node<E> x) { // assert lock.isHeldByCurrentThread(); Node<E> p = x.prev; Node<E> n = x.next; if (p == null) { unlinkFirst(); //如果x没有前驱节点,那么x就是队头节点,所以调用一下unlinkFirst就可以了。 } else if (n == null) { unlinkLast(); //如果x没有后继节点,那么x就是队尾节点,所以调用一下unlinkLast就可以了。 } else { p.next = n; //将x的前驱节点p的next指向x的后继节点n。 n.prev = p; //将x的后继节点n的prev指向x的前驱节点n。 x.item = null; //置空x的item域。 // 注意上面并没有清理x本身的prev和next域,因为它们可能正在被某个迭代器使用中。 --count; //递减当前元素计数 notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。 } }
- LinkedBlockingDeque中利用上述基础方法来实现BlockingDeque接口定义的方法,随便看几个:
public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(e)) notFull.await(); //如果插入元素到队头失败,在notFull条件上等待。 } finally { lock.unlock(); } } public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(e)) notFull.await(); //如果插入元素到队尾失败,在notFull条件上等待。 } finally { lock.unlock(); } } public E takeFirst() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkFirst()) == null) notEmpty.await(); //如果从队头获取并删除元素失败,在notEmpty条件上等待。 return x; } finally { lock.unlock(); } } public E takeLast() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { E x; while ( (x = unlinkLast()) == null) notEmpty.await(); //如果从队尾获取并删除元素失败,在notEmpty条件上等待。 return x; } finally { lock.unlock(); } }
- 其他的方法也很容易看懂了,这里就不啰嗦了。最后注意LinkedBlockingDeque的迭代器是弱一致的,而且支持双向迭代器。
LinkedBlockingDeque的代码解析完毕!
相关推荐
aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...
2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02
1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar
下载的压缩包文件"jdk-6u45-windows-x64(1.6 64).exe"是Windows 64位系统的安装程序。安装过程中,用户需要选择安装路径,并设置环境变量,包括`JAVA_HOME`指向JDK的安装目录,`PATH`添加JDK的bin目录,确保系统可以...
标题中的“jdk1.6集成jjwt的问题”指的是在Java Development Kit (JDK) 版本1.6的环境下,尝试整合JSON Web Token (JWT) 库jjwt时遇到的挑战。JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息作为 ...
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
1. 解压缩"java-jdk1.6-jdk-6u45-windows-x64.zip"文件,这将释放出"jdk-6u45-windows-x64.exe"可执行文件。 2. 双击运行"jdk-6u45-windows-x64.exe",安装向导会引导你完成安装过程。通常,你需要选择安装路径,...
标题中的"jdk-jdk1.6.0.24-windows-i586.exe"是一个Java Development Kit(JDK)的安装程序,适用于Windows操作系统且为32位版本。JDK是Oracle公司提供的一个用于开发和运行Java应用程序的软件包。这个特定的版本,...
logback-cfca-jdk1.6-3.1.0.0.jar
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
### JDK1.6安装及与JDK-1.5版本共存 #### 一、前言 随着软件开发环境的变化和技术的进步,不同的项目可能需要不同的Java版本来支持其运行。例如,在某些特定环境下,可能既需要使用JDK1.5(Java Development Kit ...
mac for jdk1.6 jdk6 安装版 里面有两个jdk1.6的安装包,都可以用 如果电脑上安装有1.7,1.8等高版本jdk就不要再下安装包了,安装包安装会报错 命令是这个:brew install java6或 brew install homebrew/cask-...
- 这可能是ZXing库的完整源码包,专门针对JDK1.6编译,包含了所有必要的源文件和资源,供开发者进行更深度的定制和集成。 总之,ZXing库是一个强大的条形码和二维码工具,这个特别适配JDK1.6的版本为那些仍在使用...
这个压缩包文件"jdk-6u45-linux-x64.zip"包含的是JDK 1.6.0_45(也被称为6u45或1.6u45)的64位Linux版本。JDK 1.6是Java平台标准版的一个重要版本,它提供了许多功能和性能改进,是许多企业级应用的基础。 JDK 1.6u...
jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3
压缩包中的文件`jdk-6u45-windows-i586.exe`是JDK 1.6更新45的Windows 32位安装程序。安装步骤通常包括: 1. 下载并运行安装程序。 2. 遵循安装向导的提示,选择安装路径和组件。 3. 设置环境变量,包括`JAVA_HOME`...
java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64
Java编程开发工具包,最新版本,很好用,经典