//LinkedBlockQueue
//先看构造函数
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
//保证可见性这里不会有竞争互斥现象。
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
//offer插入一个元素
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果满了
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
//加入队列
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
//唤醒在非满条件上等待的线程。
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
//当前已经有一个元素,唤醒在非空条件上等待的线程。
/**为什么只有c=0的时候唤醒?只有c为0的时候说明当前队列从一个也没有变为有一个,
也就是说之前take锁由于一个都没有所以阻塞在获取node上。*/
signalNotEmpty();
return c >= 0;
}
//这个方法也很奇怪为什么不直接notEmpty.signal()而要锁住以后在释放?
/**说说我的理解:这里涉及到一点Reentrant的源码知识,notEmpty.signal()这个方法
只是将在condition上等待的线程加入到执行队列中,此时还没有释放锁。只有在unlock的时候才会释放锁。
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//唤醒在notEmpty上等待的线程。
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
//poll获取一个元素
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
//唤醒非空条件上的等待
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
//当c=容量说明之前整个队列已满有线程在notfull条件上等待了所以唤醒他。
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
//头节点
Node<E> h = head;
//头节点的下一个节点才是真正的节点初始化的时候会初始化一个空节点。
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
//在指定时间内插入元素超时返回false
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//当前队列已满
while (count.get() == capacity) {
//超时返回false
if (nanos <= 0)
return false;
//在notfull条件上等待
nanos = notFull.awaitNanos(nanos);
}
//入队
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
//还有空间那么唤醒在notfull条件上等待的线程。
notFull.signal();
} finally {
putLock.unlock();
}
//如果c=0说明之前队列是空的,唤醒在notEmpty条件上等待的线程。
if (c == 0)
signalNotEmpty();
return true;
}
//在指定时间内去获取元素超时返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//当前没有队列为空
while (count.get() == 0) {
//超时返回null
if (nanos <= 0)
return null;
//挂起线程
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
//说明队列中至少还有一个元素唤醒在非空条件上等待的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//说明之前队列是满的现在移出了一个堆了不满了所以唤醒在不满条件上等待的线程。
if (c == capacity)
signalNotFull();
return x;
}
//只取元素但是并不移除
public E peek() {
//没有元素
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
//插入元素如果队列已满则阻塞。
public void put(E e) throws InterruptedException {
//不能插入空元素
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//队列已满阻塞
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
//队列还未满唤醒在notfull条件上等待的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//说明队列之前是空的唤醒在notEmpty条件上等待的线程
if (c == 0)
signalNotEmpty();
}
//获取元素如果队列为空则阻塞
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列为空阻塞
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
//队列还有元素唤醒notEmpty条件上等待的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//说明之前队列已满所以唤醒在notFull条件上等待的线程
if (c == capacity)
signalNotFull();
return x;
}
//返回当前还可以容纳的线程节点数量
public int remainingCapacity() {
//这里为什么不用上锁?capacity是final的而count是AtomicInteger也是线程安全的。
return capacity - count.get();
}
//返回队列是否包含该元素
public boolean contains(Object o) {
if (o == null) return false;
//两把锁全部锁上
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
//两把锁全部释放
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
//size()返回队列中元素的个数
public int size() {
return count.get();
}
//从队列中移除该元素
public boolean remove(Object o) {
if (o == null) return false;
//全部锁上
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
//全部释放
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
//如果之前队列是满的,唤醒在notfull上等待的条件。
if (count.getAndDecrement() == capacity)
notFull.signal();
}
//clear清空所有元素整个队列变为初始化状态
public void clear() {
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
//将count设置为0,如果之前队列是满的则唤醒在非满条件上等待的线程。
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}
//将队列中的元素转换为数组类型为Object
public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}
//将队列中的元素封装到指定类型的数组
public <T> T[] toArray(T[] a) {
fullyLock();
try {
int size = count.get();
//如果数组长度<队列长度重新创建一个等长的数组
if (a.length < size)
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
if (a.length > k)
a[k] = null;
return a;
} finally {
fullyUnlock();
}
}
//将列队中的元素移到集合中
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
//如果是true说明之前队列是满的
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
//返回元素的迭代器
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
/*
* Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
Itr() {
fullyLock();
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
fullyUnlock();
}
}
public boolean hasNext() {
return current != null;
}
/**
* Returns the next live successor of p, or null if no such.
*
* Unlike other traversal methods, iterators need to handle both:
* - dequeued nodes (p.next == p)
* - (possibly multiple) interior removed nodes (p.item == null)
*/
private Node<E> nextNode(Node<E> p) {
for (;;) {
Node<E> s = p.next;
//说明该节点已经出列,在出列的时候有一句代码h.next = h;
if (s == p)
return head.next;
if (s == null || s.item != null)
return s;
p = s;
}
}
public E next() {
fullyLock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}
public void remove() {
if (lastRet == null)
throw new IllegalStateException();
fullyLock();
try {
Node<E> node = lastRet;
lastRet = null;
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (p == node) {
unlink(p, trail);
break;
}
}
} finally {
fullyUnlock();
}
}
}
/**
总结:LinkedBlockingQueue采用两把锁实现。一个锁住插入一个锁住移出。
为什么用两把锁实现可以保证并发?其主要有一个头,尾节点。而插入是从尾节点开始,
而移出是从头结点的下一个节点开始。这样可以保证无论怎么操作最后的结果都是一样的。
相当于他们操作于不同的node。一个操作的head一个操作的last。
*/
分享到:
相关推荐
《LinkedBlockingQueue深度解析与应用实践》 在Java并发编程领域,`LinkedBlockingQueue`是一个不可或缺的工具。作为`java.util.concurrent`包中的一个线程安全的队列,它基于链表结构实现,具备先进先出(FIFO)的...
在分析`SingleLinked.java`源码时,我们可以看到具体如何实现这些概念。文件中的类可能定义了一个简单的单向链表节点,包括节点的数据成员(如数据item和指向下一个节点的引用next)以及可能的辅助方法,例如插入新...
《LinkedBlockingQueue与ConcurrentLinkedQueue的比较与应用》 在Java并发编程中,队列是一种重要的数据结构,尤其在多线程环境下的任务调度和数据传递中扮演着关键角色。LinkedBlockingQueue和...
### 并发队列 ConcurrentLinkedQueue 和阻塞队列 LinkedBlockingQueue 用法详解 #### 一、并发队列 ConcurrentLinkedQueue 概述 `ConcurrentLinkedQueue` 是 Java 并发包 `java.util.concurrent` 提供的一个高性能...
《LinkedBlockingQueue in Java》 Java中的`LinkedBlockingQueue`是`java.util.concurrent`包下的一种线程安全的阻塞队列,它是基于链表结构实现的,具有很好的性能表现。这种队列在多线程环境下的并发操作中被广泛...
如ConcurrentHashMap、LinkedBlockingQueue等,这些工具类提供了线程安全的数据结构和算法,对于理解和实现高性能并发程序至关重要。 11. **字符串处理**: String类是Java中最常用的类之一,其内部实现涉及字符...
详细分析Java并发集合LinkedBlockingQueue的用法 Java并发集合LinkedBlockingQueue是Java并发集合中的一种阻塞队列实现,使用链表方式实现的阻塞队列。LinkedBlockingQueue的实现主要包括链表实现、插入和删除节点...
"并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解" ArrayBlockingQueue和LinkedBlockingQueue是Java并发容器中两个常用的阻塞队列实现,分别基于数组和链表存储元素。它们都继承自AbstractQueue类...
本篇文章将深入探讨`LinkedBlockingQueue`,这是一个基于链表结构的无界阻塞队列,常用于线程池的创建中作为任务缓冲队列。 首先,我们来看一下`LinkedBlockingQueue`的底层数据结构。与数组结构的`...
《元素唯一的LinkedBlockingQueue阻塞队列》 在Java并发编程中,`LinkedBlockingQueue`是一种基于链表结构的阻塞队列,它在多线程环境下的性能表现优秀,常用于实现生产者消费者模型。这个队列的一个关键特性是其...
常见的有无界队列(如ArrayBlockingQueue)和有界队列(如LinkedBlockingQueue),以及优先级队列(PriorityBlockingQueue)。队列的选择直接影响线程池的处理策略。 `ThreadPoolExecutor`的工作流程如下: 1. **...
TaskQueue通常使用阻塞队列(如LinkedBlockingQueue)实现,它具有很好的性能特性,如线程安全和无锁操作。当队列满时,提交任务的线程会被阻塞,直到队列中有空闲位置。反之,当队列为空时,等待工作的线程会被阻塞...
在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...
在这个"java队列源码"中,我们可以看到如何利用Java来实现多线程环境下的安全队列,特别适用于抢购等高并发场景。以下将详细讨论队列、多线程以及源码实现的关键知识点。 1. **Java 队列接口与实现** - Java 提供...
Java中的`LinkedBlockingQueue`和`ArrayBlockingQueue`都是`java.util.concurrent`包下的线程安全队列,它们都实现了`BlockingQueue`接口,提供了一种高效、线程安全的数据同步方式。这两种队列在很多方面都有相似之...
4. **并发容器**:如`ConcurrentHashMap`、`BlockingQueue`、`ArrayBlockingQueue`、`LinkedBlockingQueue`等,这些容器在并发环境下提供高效的数据共享。源码分析可以帮助理解它们如何保证线程安全和并发性能。 5....
囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue(ArrayBlockingQueue、...
Java的`java.util.Queue`接口和`java.util.concurrent.LinkedBlockingQueue`类提供了队列的实现。源码可能涵盖入队(enqueue)、出队(dequeue)操作。 5. **堆(Heap)**:堆是一种特殊的树形数据结构,通常用于...