//LinkedBlockQueue
//先看构造函数
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
//保证可见性这里不会有竞争互斥现象。
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
//offer插入一个元素
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果满了
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
//加入队列
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
//唤醒在非满条件上等待的线程。
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
//当前已经有一个元素,唤醒在非空条件上等待的线程。
/**为什么只有c=0的时候唤醒?只有c为0的时候说明当前队列从一个也没有变为有一个,
也就是说之前take锁由于一个都没有所以阻塞在获取node上。*/
signalNotEmpty();
return c >= 0;
}
//这个方法也很奇怪为什么不直接notEmpty.signal()而要锁住以后在释放?
/**说说我的理解:这里涉及到一点Reentrant的源码知识,notEmpty.signal()这个方法
只是将在condition上等待的线程加入到执行队列中,此时还没有释放锁。只有在unlock的时候才会释放锁。
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//唤醒在notEmpty上等待的线程。
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
//poll获取一个元素
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
//唤醒非空条件上的等待
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//当c=容量说明之前整个队列已满有线程在notfull条件上等待了所以唤醒他。
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
//头节点
Node<E> h = head;
//头节点的下一个节点才是真正的节点初始化的时候会初始化一个空节点。
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
//在指定时间内插入元素超时返回false
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//当前队列已满
while (count.get() == capacity) {
//超时返回false
if (nanos <= 0)
return false;
//在notfull条件上等待
nanos = notFull.awaitNanos(nanos);
}
//入队
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
//还有空间那么唤醒在notfull条件上等待的线程。
notFull.signal();
} finally {
putLock.unlock();
}
//如果c=0说明之前队列是空的,唤醒在notEmpty条件上等待的线程。
if (c == 0)
signalNotEmpty();
return true;
}
//在指定时间内去获取元素超时返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//当前没有队列为空
while (count.get() == 0) {
//超时返回null
if (nanos <= 0)
return null;
//挂起线程
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
//说明队列中至少还有一个元素唤醒在非空条件上等待的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//说明之前队列是满的现在移出了一个堆了不满了所以唤醒在不满条件上等待的线程。
if (c == capacity)
signalNotFull();
return x;
}
//只取元素但是并不移除
public E peek() {
//没有元素
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
//插入元素如果队列已满则阻塞。
public void put(E e) throws InterruptedException {
//不能插入空元素
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//队列已满阻塞
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
//队列还未满唤醒在notfull条件上等待的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//说明队列之前是空的唤醒在notEmpty条件上等待的线程
if (c == 0)
signalNotEmpty();
}
//获取元素如果队列为空则阻塞
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列为空阻塞
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
//队列还有元素唤醒notEmpty条件上等待的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//说明之前队列已满所以唤醒在notFull条件上等待的线程
if (c == capacity)
signalNotFull();
return x;
}
//返回当前还可以容纳的线程节点数量
public int remainingCapacity() {
//这里为什么不用上锁?capacity是final的而count是AtomicInteger也是线程安全的。
return capacity - count.get();
}
//返回队列是否包含该元素
public boolean contains(Object o) {
if (o == null) return false;
//两把锁全部锁上
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
//两把锁全部释放
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
//size()返回队列中元素的个数
public int size() {
return count.get();
}
//从队列中移除该元素
public boolean remove(Object o) {
if (o == null) return false;
//全部锁上
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
//全部释放
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
//如果之前队列是满的,唤醒在notfull上等待的条件。
if (count.getAndDecrement() == capacity)
notFull.signal();
}
//clear清空所有元素整个队列变为初始化状态
public void clear() {
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
//将count设置为0,如果之前队列是满的则唤醒在非满条件上等待的线程。
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}
//将队列中的元素转换为数组类型为Object
public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}
//将队列中的元素封装到指定类型的数组
public <T> T[] toArray(T[] a) {
fullyLock();
try {
int size = count.get();
//如果数组长度<队列长度重新创建一个等长的数组
if (a.length < size)
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
if (a.length > k)
a[k] = null;
return a;
} finally {
fullyUnlock();
}
}
//将列队中的元素移到集合中
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
//如果是true说明之前队列是满的
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
//返回元素的迭代器
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
/*
* Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
Itr() {
fullyLock();
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
fullyUnlock();
}
}
public boolean hasNext() {
return current != null;
}
/**
* Returns the next live successor of p, or null if no such.
*
* Unlike other traversal methods, iterators need to handle both:
* - dequeued nodes (p.next == p)
* - (possibly multiple) interior removed nodes (p.item == null)
*/
private Node<E> nextNode(Node<E> p) {
for (;;) {
Node<E> s = p.next;
//说明该节点已经出列,在出列的时候有一句代码h.next = h;
if (s == p)
return head.next;
if (s == null || s.item != null)
return s;
p = s;
}
}
public E next() {
fullyLock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}
public void remove() {
if (lastRet == null)
throw new IllegalStateException();
fullyLock();
try {
Node<E> node = lastRet;
lastRet = null;
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (p == node) {
unlink(p, trail);
break;
}
}
} finally {
fullyUnlock();
}
}
}
/**
总结:LinkedBlockingQueue采用两把锁实现。一个锁住插入一个锁住移出。
为什么用两把锁实现可以保证并发?其主要有一个头,尾节点。而插入是从尾节点开始,
而移出是从头结点的下一个节点开始。这样可以保证无论怎么操作最后的结果都是一样的。
相当于他们操作于不同的node。一个操作的head一个操作的last。
*/
分享到:
相关推荐
11. **Java并发集合**:ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentHashMap等线程安全集合的使用和原理。 12. **Java虚拟机(JVM)**:了解JVM的基本概念、对象的创建、内存布局、内存模型、类加载机制、...
以下将根据给出的面试题目,详细解读相关知识点: 1. **项目优化**:这是面试中常见的问题,旨在了解候选人在实际项目中的处理能力和问题解决思路。优化可能涉及到性能调优(如减少响应时间、提高并发能力)、代码...
本文将基于提供的"JDKSourseNote",针对JDK1.8的源码进行详细的解读,旨在帮助开发者更好地掌握其内部机制,从而在实际开发中运用自如。 首先,我们要明确,JDK1.8是Java发展的一个重要里程碑,引入了许多新特性,...
T型三电平+SVPWM的下垂控制与双闭环中点电位平衡控制.pdf
STM32真实企业级项目:锅炉控制器源码、原理图与PCB图.pdf
STM32F103 Modbus主站源码:正常使役,支持多从机功能码通信及从机寄存器写入.pdf
Simulink永磁同步直驱风机PMSG一次调频离散模型:含虚拟惯性与下垂控制,可扩展至光伏储能研究.pdf
VSG仿真、并网与离网运行仿真、预同期并网控制及虚拟同步机逆变器仿真.pdf
VIC水文模型全程视频教学指导.pdf
vrep_coppeliasim+matlab机器人轨迹控制仿真:利用matlab读取轨迹并控制机械臂在墙上绘图的详细学习示例.pdf
2000-2022年上市公司行业异质性数据(技术密集型、劳动密集型、资本密集型)(含原始数据和处理代码) 1、时间:2000-2022年 2、指标:股票代码、年份、股票简称、统计日期、行业名称、行业代码、成立日期、上市日期、所在省份、所在城市、上市状态、保留两位行业代码、保留一位行业代码、高科技为1,非高科技为0、重污染为1,非重污染为0、制造业为1,非制造业为0、劳动密集型为1,资本密集型为2,技术密集型为3 3、来源:csmar 4、根据2012年中国证监会行业划分是否高科技、是否重污染、是否制造业、是否劳动密集型、资本密集型、技术密集型。 5、内容:包括原始数据、处理代码和计算结果
TMS320F28335电机控制程序:BLDC、PMSM无感有感及异步VF程序源代码与开发资料大全.pdf
tc275、s12x、s32k144基于CANoe的UDS诊断数据库CDD文件及CAPL Boot上位机、下位机程序移植说明文档.pdf
STM32系列通信透传技术:以太网、串口、CAN透传及OBD协议解析.pdf
STM32开发:IIR带阻滤波器设计与实现.pdf
UG后处理:CNC西门子828D后处理与西门子后处理工厂实战自用.pdf
MYSQL深入学习总结.pdf
Stewart六自由度平台反解算法 C#.pdf
1、文件说明: Centos8操作系统vim-ale-3.3.0-1.el8.rpm以及相关依赖,全打包为一个tar.gz压缩包 2、安装指令: #Step1、解压 tar -zxvf vim-ale-3.3.0-1.el8.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm
tc275、s12x和s32k144的Boot程序及UDS故障诊断与Bootloader移植的Python自制上位机源码.pdf