一:功能介绍
基于单项链表,FIFO的有界阻塞队列,内部采用可重入锁ReentrantLock实现,一个take锁,一个put锁,相应的等待条件也为二个。
二:源码分析
package java.util.concurrent; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -6903933977591709194L; //静态内部类Node static class Node<E> { //节点元素 E item; //指向某个节点的下一个节点的引用 Node<E> next; Node(E x) { item = x; } } //链表的容量 private final int capacity; //原子类型,统计队列里面链表节点的个数 private final AtomicInteger count = new AtomicInteger(0); //链表的头节点 private transient Node<E> head; //链表的尾节点 private transient Node<E> last; //定义可重入获取节点锁 private final ReentrantLock takeLock = new ReentrantLock(); //获取链表头部节点的等待条件 private final Condition notEmpty = takeLock.newCondition(); //定义可重入插入节点锁 private final ReentrantLock putLock = new ReentrantLock(); //获取链表尾部节点的等待条件 private final Condition notFull = putLock.newCondition(); //队列有数据,唤醒所有等待获取队列数据的线程 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } //队列没有满,唤醒所有等待插入数据的线程 private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } //插入节点到尾部 private void enqueue(Node<E> node) { //1:设置未插入节点前尾部节点的下一个节点引用为当前需要插入的节点 //2:将当前需要插入的节点定义为尾部节点 last = last.next = node; } //移除数据 private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC //将头节点的下一个节点设为新的头部节点 head = first; //返回节点数据 E x = first.item; first.item = null; return x; } //获取插入锁,获取锁 void fullyLock() { putLock.lock(); takeLock.lock(); } //释放插入锁,获取锁 void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } //若不指定初始化容量,默认为int的最大值 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //构造函数 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //初始化时,头部节点,尾部节点都为null last = head = new Node<E>(null); } //初始化时刻指定存储默认的数据 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); //队列满了,抛异常 if (n == capacity) throw new IllegalStateException("Queue full"); //入队操作 enqueue(new Node<E>(e)); ++n; } //原子设置节点大小 count.set(n); } finally { putLock.unlock(); } } //返回链表节点的数量 public int size() { return count.get(); } //返回还可以有多少的容量可供插入 public int remainingCapacity() { return capacity - count.get(); } //入队操作 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; //new一个Node Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //插入数据的可中断锁,只要interrupted,立马中断 putLock.lockInterruptibly(); try { //如果队列满了 while (count.get() == capacity) { notFull.await(); } //入队 enqueue(node); c = count.getAndIncrement(); //如果插入数据后队列容量未满 if (c + 1 < capacity) //唤醒等待插入的线程 notFull.signal(); } finally { putLock.unlock(); } //如果未插入节点前,队列为空,那么在插入节点后,就需要唤醒等待take的线程 if (c == 0) signalNotEmpty(); } //也是入队操作,成功返回true,多了个超时,其他同put,这里就不解释了 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; } //入队操作,入队OK返回true public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; //如果队列容量满了,直接返回false if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { //如果队列容量未满 if (count.get() < capacity) { //入队,插入到队尾 enqueue(node); //数量+1 c = count.getAndIncrement(); //如果入队尾后,容量还有,唤醒后面阻塞的等待入队的线程 if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } //如果未插入节点前,队列为空,那么在插入节点后,就需要唤醒等待take的线程 if (c == 0) signalNotEmpty(); return c >= 0; } //出队操作,获取头部节点 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //如果没有节点,take线程阻塞 while (count.get() == 0) { notEmpty.await(); } //获取头部节点,出队 x = dequeue(); c = count.getAndDecrement(); //如果还有节点,唤醒后面等待take的线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //如果未take之前,队列是满的,那么在take一次之后,队列未满,唤醒阻塞的put操作的线程 if (c == capacity) signalNotFull(); return x; } //同poll(),只是多了超时限制 public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } //获取并移除队列的头部节点 public E poll() { final AtomicInteger count = this.count; //没有数据,直接返回null if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { //获取头部节点 x = dequeue(); c = count.getAndDecrement(); //唤醒阻塞的take线程 if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } //如果未移除前,队列是满的,移除后,队列未满,唤醒阻塞的put线程 if (c == capacity) signalNotFull(); return x; } //获取头部数据,但不移除队列头 public E peek() { //队列没有数据,return null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } } //是否包含,遍历链表equals判断,如果链表较长,请求太靠后,比较耗性能 public boolean contains(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> p = head.next; p != null; p = p.next) if (o.equals(p.item)) return true; return false; } finally { fullyUnlock(); } } //返回数组 public Object[] toArray() { fullyLock(); try { int size = count.get(); Object[] a = new Object[size]; int k = 0; for (Node<E> p = head.next; p != null; p = p.next) //数组的每一项存在队列链表节点的内容 a[k++] = p.item; return a; } finally { fullyUnlock(); } } //遍历 public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { //当前节点 private Node<E> current; //上一个返回的节点 private Node<E> lastRet; //当前节点对应的节点数据 private E currentElement; Itr() { fullyLock(); try { current = head.next; if (current != null) currentElement = current.item; } finally { fullyUnlock(); } } //只要当前节点的下一个节点引用还存在 public boolean hasNext() { return current != null; } /** * Returns the next live successor of p, or null if no such. * * Unlike other traversal methods, iterators need to handle both: * - dequeued nodes (p.next == p) * - (possibly multiple) interior removed nodes (p.item == null) */ private Node<E> nextNode(Node<E> p) { for (;;) { Node<E> s = p.next; if (s == p) return head.next; if (s == null || s.item != null) return s; p = s; } } //获取下一个节点 public E next() { fullyLock(); try { if (current == null) throw new NoSuchElementException(); E x = currentElement; //将当前节点当做上一次返回的节点 lastRet = current; //获取当前节点的下一个节点 current = nextNode(current); currentElement = (current == null) ? null : current.item; return x; } finally { fullyUnlock(); } } public void remove() { if (lastRet == null) throw new IllegalStateException(); fullyLock(); try { Node<E> node = lastRet; lastRet = null; for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (p == node) { unlink(p, trail); break; } } } finally { fullyUnlock(); } } } }
相关推荐
总的来说,LinkedBlockingQueue通过单向链表的数据结构,结合阻塞队列的特性,为Java并发编程提供了一种高效的解决方案。了解其内部工作原理有助于我们更好地利用这一工具,优化并发程序的性能和可维护性。在实际...
本篇文章将深入探讨`LinkedBlockingQueue`,这是一个基于链表结构的无界阻塞队列,常用于线程池的创建中作为任务缓冲队列。 首先,我们来看一下`LinkedBlockingQueue`的底层数据结构。与数组结构的`...
chatbox 本地部署大模型 私有化部署
Delphi 12.3控件之pdfium-win-x86.rar
内容概要:本文详细探讨了图神经网络(GNN)与注意力机制的结合,特别是在图结构数据处理中的应用。文章首先简要介绍了图神经网络和注意力机制的概念,接着重点介绍了图注意力网络(GAT),以及其他几种基于注意力机制的图神经网络模型,如GATE、GaAN、RGAT等。文中还讨论了这些模型在节点分类、图分类、链接预测等任务中的具体应用,并指出了现有模型存在的问题及改进措施。最后,文章展望了未来的研究方向,强调了提升模型表达能力、增强可解释性和构建多尺度结构的重要性。 适合人群:从事图神经网络研究的科研人员、研究生及相关领域的从业人员。 使用场景及目标:① 提升图神经网络在处理复杂图结构数据时的性能;② 改善图神经网络的可解释性和可视化能力;③ 设计更高效的图注意力机制以应对大规模图数据。 其他说明:本文不仅回顾了图注意力网络的经典模型,还介绍了最新的研究成果,为未来的研究提供了有价值的参考。
文案: “CSDN博客之星”是技术圈的年度盛事,助力博主闪耀成长!通过参与评选,你不仅能提升个人品牌、链接行业大牛,还能在创作与交流中精进技术。活动汇聚优质内容与活跃作者,为技术人提供展示舞台。无论你是资深博主还是新人,这里都有机会被看见、被认可。原创干货+粉丝互动,让你的技术分享更有影响力!快来加入,与同行共赴星光之约! (注:严格控制在100字左右,突出活动价值与参与收益,简洁有力。)
内容概要:本文详细介绍了使用Qt编写的串口调试助手的源代码及其功能特性。该工具不仅支持基本的串口通信,还集成了自定义协议解析、帧判断、通信数据保存等功能。文章重点展示了通信模块的核心代码,如帧同步处理、协议配置界面的设计、数据持久化、帧同步配置、文件保存功能以及定时发送功能等。此外,还提到了一些实用的小技巧和注意事项,如协议解析窗口的隐藏调试控制台、文件名生成规则、跨线程数据传递等。 适合人群:具备一定Qt编程基础,从事嵌入式开发或串口通信相关工作的工程师和技术爱好者。 使用场景及目标:适用于需要频繁进行串口调试的开发者,帮助他们提高调试效率,快速定位问题。具体应用场景包括但不限于智能设备调试、工业控制系统开发、物联网设备测试等。 其他说明:文中提供了大量代码示例,便于读者理解和实践。同时,作者分享了许多实际开发中的经验和技巧,有助于读者避开常见的陷阱并优化代码性能。
内容概要:本文详细介绍了如何利用粒子群优化(PSO)算法对PID控制器进行参数整定。首先解释了PSO的基本概念和工作原理,即通过模拟自然界中鸟群或鱼群的行为,在三维参数空间中寻找最佳的Kp、Ki、Kd值。文中提供了完整的Matlab代码示例,涵盖了从初始化设置、适应度函数定义到粒子位置更新的具体步骤。同时,作者分享了一些实用的经验技巧,如选择合适的粒子数量、惯性权重以及学习因子等参数,并讨论了不同适应度函数的选择对优化结果的影响。此外,还展示了PSO-PID的实际应用案例,包括与传统方法的性能对比,以及在非线性系统中的优越表现。 适合人群:自动化控制领域的研究人员和技术人员,尤其是那些希望提高PID控制器性能并减少手动调参工作量的人。 使用场景及目标:适用于各种工业控制系统中PID控制器的参数优化任务,旨在通过智能化手段获得更好的动态响应特性,如更快的调节时间和更低的超调量。对于复杂的非线性系统尤为有用。 其他说明:附带详细的代码注释和可视化工具,帮助读者更好地理解和掌握PSO-PID的工作流程。建议读者在实践中灵活调整相关参数,以适应不同的应用场景。
运行程序,弹出选择本地图片窗口,选择一张带有人员的图片,检测出图片中的人员个数并用方框进行标注
Delphi 12.3控件之Sublime Text 4 Build 4189 x64.7z
Java项目基于ssm框架的课程设计,包含LW+ppt
内容概要:本文详细介绍了三菱FX3U PLC与台达变频器之间通过Modbus协议进行通信的方法。首先概述了Modbus通信协议及其master-slave模式的工作原理,接着深入分析了通信程序的具体结构,包括初始化通信、读取通信参数、执行通信任务以及错误处理等环节。文中提供了详细的代码示例,解释了如何通过RS指令配置通信参数、构建Modbus帧、处理CRC校验及通信触发逻辑。此外,还分享了一些实用的调试技巧和常见问题解决方案,如通信超时处理、硬件接线注意事项等。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是那些需要掌握PLC与变频器通信技能的人群。 使用场景及目标:适用于需要实现三菱FX3U PLC与台达变频器之间高效通信的实际工程项目。通过学习本文,读者能够掌握Modbus通信协议的应用,编写可靠的通信程序,确保工业控制系统稳定运行。 其他说明:本文不仅提供了理论知识,还包括大量实践经验,帮助读者更好地理解和应对实际工程中的挑战。
Java项目基于ssm框架的课程设计,包含LW+ppt
精选30套企业通用HR认证的极简求职模板,覆盖应届生/转行/社招全场景,同步整合高频率面试问答话术库+避坑指南(含薪酬谈判、离职原因黑话翻译)。面试场景分类与应对包括个人背景类、行为类问题、技术类问题、职业规划类、情景模拟类、公司文化类、压力测试类、薪资谈判类、团队合作类以及行业认知类等面试技巧类包括面试的自我介绍的时长、自我介绍的内容、自我介绍的表达、自我介绍的要点等,结合配套的企业求职模板,全程无废话纯干货版式,手机电脑即拿即用,帮你省下80%海投时间,把简历变成精准收割offer的流量入口。
win11专业版24H2设置共享服务和访问共享
Delphi 12.3控件之RADStudio-12-1-29-0-51961-7529-KeyPatch.7z
RunApi 是一款集调试、测试、文档输出以及项目协作的接口工具(功能上类似postman)。目前支持客户端版和在线精简版 ,包含接口测试/项目协作等功能。
文案: “博客之星2024技术博客大赛”火热开启!参赛需提交全年20篇原创博客(均分60+),评选依据创作影响力、文章质量及个人影响力综合评分。优胜者可赢取硬件设备、荣誉证书及专属虚拟福利。活动旨在发掘优秀技术博主,推动知识共享。IT创作者快来参与,展现你的专业价值! (100字)
谷歌浏览器,安卓离线版APK
内容概要:本文详细介绍了基于STM32F1平台的BLDC电机控制源码,涵盖有传感器(霍尔)和无传感(反电动势过零检测)两种驱动方式。文中展示了关键代码片段,如霍尔信号处理、反电动势过零检测、双闭环PID控制器的设计与实现。霍尔方案通过中断捕获霍尔信号并更新换相表,确保电机稳定运行;无传感方案则依靠ADC采样相电压,通过过零检测实现换相。双闭环PID用于精确控制电机的速度和电流,避免振荡。此外,文章提供了详细的调参建议和避坑指南,帮助开发者快速掌握BLDC电机控制的核心技术。 适合人群:具有一定嵌入式开发经验,尤其是对电机控制感兴趣的工程师和技术爱好者。 使用场景及目标:适用于希望深入了解BLDC电机控制原理及其具体实现的技术人员。通过学习本文,可以掌握有传感器和无传感两种控制方法的具体实现细节,以及如何优化PID参数以提高控制系统性能。 其他说明:文章不仅提供理论讲解,还包括大量实用的代码片段和调试建议,有助于读者快速上手并在实践中不断改进。