一:功能介绍
基于单项链表,FIFO的有界阻塞队列,内部采用可重入锁ReentrantLock实现,一个take锁,一个put锁,相应的等待条件也为二个。
二:源码分析
package java.util.concurrent; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; //静态内部类Node static class Node<E> { //节点元素 E item; //指向某个节点的下一个节点的引用 Node<E> next; Node(E x) { item = x; } } //链表的容量 private final int capacity; //原子类型,统计队列里面链表节点的个数 private final AtomicInteger count = new AtomicInteger(0); //链表的头节点 private transient Node<E> head; //链表的尾节点 private transient Node<E> last; //定义可重入获取节点锁 private final ReentrantLock takeLock = new ReentrantLock(); //获取链表头部节点的等待条件 private final Condition notEmpty = takeLock.newCondition(); //定义可重入插入节点锁 private final ReentrantLock putLock = new ReentrantLock(); //获取链表尾部节点的等待条件 private final Condition notFull = putLock.newCondition(); //队列有数据,唤醒所有等待获取队列数据的线程 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } //队列没有满,唤醒所有等待插入数据的线程 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } //插入节点到尾部 private void enqueue(Node<E> node) { //1:设置未插入节点前尾部节点的下一个节点引用为当前需要插入的节点 //2:将当前需要插入的节点定义为尾部节点 last = last.next = node; } //移除数据 private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC //将头节点的下一个节点设为新的头部节点 head = first; //返回节点数据 E x = first.item; first.item = null; return x; } //获取插入锁,获取锁 void fullyLock() { putLock.lock(); takeLock.lock(); } //释放插入锁,获取锁 void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } //若不指定初始化容量,默认为int的最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //构造函数 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //初始化时,头部节点,尾部节点都为null last = head = new Node<E>(null); } //初始化时刻指定存储默认的数据 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); //队列满了,抛异常 if (n == capacity) throw new IllegalStateException("Queue full"); //入队操作 enqueue(new Node<E>(e)); ++n; } //原子设置节点大小 count.set(n); } finally { putLock.unlock(); } } //返回链表节点的数量 public int size() { return count.get(); } //返回还可以有多少的容量可供插入 public int remainingCapacity() { return capacity - count.get(); } //入队操作 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; //new一个Node Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //插入数据的可中断锁,只要interrupted,立马中断 putLock.lockInterruptibly(); try { //如果队列满了 while (count.get() == capacity) { notFull.await(); } //入队 enqueue(node); c = count.getAndIncrement(); //如果插入数据后队列容量未满 if (c + 1 < capacity) //唤醒等待插入的线程 notFull.signal(); } finally { putLock.unlock(); } //如果未插入节点前,队列为空,那么在插入节点后,就需要唤醒等待take的线程 if (c == 0) signalNotEmpty(); } //也是入队操作,成功返回true,多了个超时,其他同put,这里就不解释了 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; } //入队操作,入队OK返回true public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //如果队列容量满了,直接返回false if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { //如果队列容量未满 if (count.get() < capacity) { //入队,插入到队尾 enqueue(node); //数量+1 c = count.getAndIncrement(); //如果入队尾后,容量还有,唤醒后面阻塞的等待入队的线程 if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } //如果未插入节点前,队列为空,那么在插入节点后,就需要唤醒等待take的线程 if (c == 0) signalNotEmpty(); return c >= 0; } //出队操作,获取头部节点 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //如果没有节点,take线程阻塞 while (count.get() == 0) { notEmpty.await(); } //获取头部节点,出队 x = dequeue(); c = count.getAndDecrement(); //如果还有节点,唤醒后面等待take的线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //如果未take之前,队列是满的,那么在take一次之后,队列未满,唤醒阻塞的put操作的线程 if (c == capacity) signalNotFull(); return x; } //同poll(),只是多了超时限制 public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //获取并移除队列的头部节点 public E poll() { final AtomicInteger count = this.count; //没有数据,直接返回null if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { //获取头部节点 x = dequeue(); c = count.getAndDecrement(); //唤醒阻塞的take线程 if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } //如果未移除前,队列是满的,移除后,队列未满,唤醒阻塞的put线程 if (c == capacity) signalNotFull(); return x; } //获取头部数据,但不移除队列头 public E peek() { //队列没有数据,return null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } } //是否包含,遍历链表equals判断,如果链表较长,请求太靠后,比较耗性能 public boolean contains(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> p = head.next; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { fullyUnlock(); } } //返回数组 public Object[] toArray() { fullyLock(); try { int size = count.get(); Object[] a = new Object[size]; int k = 0; for (Node<E> p = head.next; p != null; p = p.next) //数组的每一项存在队列链表节点的内容 a[k++] = p.item; return a; } finally { fullyUnlock(); } } //遍历 public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { //当前节点 private Node<E> current; //上一个返回的节点 private Node<E> lastRet; //当前节点对应的节点数据 private E currentElement; Itr() { fullyLock(); try { current = head.next; if (current != null) currentElement = current.item; } finally { fullyUnlock(); } } //只要当前节点的下一个节点引用还存在 public boolean hasNext() { return current != null; } /** * Returns the next live successor of p, or null if no such. * * Unlike other traversal methods, iterators need to handle both: * - dequeued nodes (p.next == p) * - (possibly multiple) interior removed nodes (p.item == null) */ private Node<E> nextNode(Node<E> p) { for (;;) { Node<E> s = p.next; if (s == p) return head.next; if (s == null || s.item != null) return s; p = s; } } //获取下一个节点 public E next() { fullyLock(); try { if (current == null) throw new NoSuchElementException(); E x = currentElement; //将当前节点当做上一次返回的节点 lastRet = current; //获取当前节点的下一个节点 current = nextNode(current); currentElement = (current == null) ? null : current.item; return x; } finally { fullyUnlock(); } } public void remove() { if (lastRet == null) throw new IllegalStateException(); fullyLock(); try { Node<E> node = lastRet; lastRet = null; for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (p == node) { unlink(p, trail); break; } } } finally { fullyUnlock(); } } } }
相关推荐
总的来说,LinkedBlockingQueue通过单向链表的数据结构,结合阻塞队列的特性,为Java并发编程提供了一种高效的解决方案。了解其内部工作原理有助于我们更好地利用这一工具,优化并发程序的性能和可维护性。在实际...
`ConcurrentLinkedQueue` 是 Java 并发包 `java.util.concurrent` 提供的一个高性能的线程安全队列实现,基于链表结构,它适用于对吞吐量有较高要求的场景。`ConcurrentLinkedQueue` 不提供容量限制,并且在队列为空...
LinkedBlockingQueue是一个基于链表结构的阻塞队列,它实现了BlockingQueue接口,具备线程安全的特性。队列内部通过双向链表维护元素顺序,这使得插入和删除操作具有O(1)的时间复杂度。同时,作为阻塞队列,当生产者...
课程大作业基于easyx的使用单向链表的数据结构C++源码(带详细注释+exe可执行文件).zip课程大作业基于easyx的使用单向链表的数据结构C++源码(带详细注释+exe可执行文件).zip课程大作业基于easyx的使用单向链表的数据...
单向链表是一种基本的数据结构,它在计算机科学中被广泛应用,特别是在算法和数据结构的实现中。在Java编程中,单向链表通常通过定义一个节点类来实现,每个节点包含数据和指向下一个节点的引用。下面我们将深入探讨...
在本压缩包"C语言基于单向链表的图书管理系统源码.zip"中,包含了一个用C语言实现的图书管理系统。这个系统使用了数据结构中的单向链表来存储和管理图书信息,使得我们可以进行诸如添加、删除、查找和显示图书等操作...
单向链表是一种基本的数据结构,它在计算机科学和编程中有着广泛的应用。与数组不同,链表中的元素不是在内存中连续存储的,而是通过指针或引用连接在一起,形成一个逻辑上的线性序列。单向链表的每个节点包含两部分...
单向链表是一种常见的数据结构,它由一系列节点组成,每个节点包含数据和指向下一个节点的指针。在C++编程中,为了实现通用性,我们通常会使用模板类来创建单向链表,以便它可以处理不同类型的元素。标题"单向链表类...
04.单向链表以及单向链表的应用.ppt
本文将详细讲解如何在C#中实现单向链表,结合源码解析来帮助你深入理解其内部机制。 首先,我们要知道什么是单向链表。单向链表是由一系列节点组成,每个节点包含两个部分:数据域和指针域。数据域存储实际的数据,...
在Intel的实现中,单向链表和双向链表被广泛应用于内存管理、数据结构优化和其他系统级任务。这两种链表的主要区别在于节点之间的链接方式,这直接影响了它们的操作效率和用途。 **单向链表** 是一种线性数据结构,...
在本篇数据结构实验报告中,我们关注的核心是单向链表这一数据结构。单向链表是一种线性数据结构,每个节点包含一个数据元素和一个指向下一个节点的指针。实验使用VC++ 6.0作为编程工具,旨在通过实践来深入理解和...
1.随机产生或键盘输入一组元素,建立一个带头结点的单向链表(无序)。 2.遍历单向链表。 3.把单向链表中元素逆置(不允许申请新的结点空间)。 4.在单向链表中删除所有的偶数元素结点。 5.编写在非递减...
这是一个单向链表,它具有插入与删除节点的功能。Entry类实现了链表的各节点。
假定一个单向循环链表来表示队列
Java SE程序 类实现单向链表Java SE程序 类实现单向链表Java SE程序 类实现单向链表Java SE程序 类实现单向链表Java SE程序 类实现单向链表Java SE程序 类实现单向链表Java SE程序 类实现单向链表Java SE程序 类实现...
1.项目代码功能经验证ok,确保稳定可靠运行。欢迎下载使用!在使用过程中,如有问题或建议,请及时私信沟通,帮助解答。 ...基于C语言实现的链表、栈、队列、排序算法以及二叉树源码(课程作业).zip
数据结构 基于链表实现的优先队列 Cpp文件
双向链表与单向链表类似,但每个节点除了有指向前一个节点的指针外,还有一个指向下个节点的指针。这使得双向链表支持双向遍历,可以从前往后,也可以从后往前。双向链表的操作比单向链表更灵活,但也因此在内存...
将一个单向链表反向连接