本文的主要内容是对jdk并发包中ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue类的代码解析,主要对比他们的不同。
前面两个类都是生产者消费者模型的实现,性能都不错,这毕竟都是大师的杰作。先来认识下ArrayBlockingQueue,这个类的内部数据结构是一个Object数组,通过ReentrantLock控制同步操作,由这个锁派生出两个条件对象,如下代码
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
这是非常经典的条件操作的应用。下面看下put方法的实现
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
它的思路是:如果队列满,等待,否则入队操作,并发出队列不空的信号。
以上的思路似乎很合理,但是仔细想想还可以改进:只要在队列为空的时候,入队成功才需发信号说队列不空了;还有一点就是,入队成功后,如果队列依然不满,可以唤醒正在等待的入队操作。这两点都是可以考虑的。
以上提到的在LinkedBlockingQueue中得到了实现,它的内部数据结构是一个单向链表,通过两个ReentrantLock来控制同步操作,下面是put方法的实现
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset
// local var holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from
* capacity. Similarly for all other uses of count in
* other wait guards.
*/
try {
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity) // 标记1
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0) // 标记2
signalNotEmpty();
}
private void insert(E x) {
last = last.next = new Node<E>(x);
}
上述代码中的两个标记处就实现了前面提到的两个不足。
take()方法的对比类似。
下面再来说说ConcurrentLinkedQueue,这个类和上面两个有本质的不同,它的内部采用的是非阻塞算法实现,性能比前两个都要好(书上说的,用机会测试下),这个类中采用的原子类是域更新器,看下代码更清楚
private static final
AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
tailUpdater =
AtomicReferenceFieldUpdater.newUpdater
(ConcurrentLinkedQueue.class, Node.class, "tail");
private static final
AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node>
headUpdater =
AtomicReferenceFieldUpdater.newUpdater
(ConcurrentLinkedQueue.class, Node.class, "head");
Node节点中还有两个
private static class Node<E> {
private volatile E item;
private volatile Node<E> next;
private static final
AtomicReferenceFieldUpdater<Node, Node>
nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
(Node.class, Node.class, "next");
private static final
AtomicReferenceFieldUpdater<Node, Object>
itemUpdater =
AtomicReferenceFieldUpdater.newUpdater
(Node.class, Object.class, "item");
Node(E x) { item = x; }
Node(E x, Node<E> n) { item = x; next = n; }
分别是用来更新队列的头结点、尾节点以及每个节点的item和next。
下面看下它的代码
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e, null);
for (;;) {
Node<E> t = tail;
Node<E> s = t.getNext();
if (t == tail) {
if (s == null) {
if (t.casNext(s, n)) {
casTail(t, n);
return true;
}
} else {
casTail(t, s);// 标记1
}
}
}
}
public E poll() {
for (;;) {
Node<E> h = head;
Node<E> t = tail;
Node<E> first = h.getNext();
if (h == head) {
if (h == t) {
if (first == null)
return null;
else
casTail(t, first);// 标记2
} else if (casHead(h, first)) {
E item = first.getItem();
if (item != null) {
first.setItem(null);
return item;
}
// else skip over deleted item, continue loop,
}
}
}
}
代码中的关键问题就是操作的原子性:单个原子操作当然是原子性的,但两个原子操作合起来就不是原子的了。比如在入队操作中要加入节点n,在更新tail节点的next成功后,tail节点指向n不一定会成功;这就需要在其他的操作中弥补这一“过错”,这在offer和poll中都有考虑的,如上述代码中的标记1和标记2。
分享到:
相关推荐
container Collection 标记: 顶级接口 List 标记: interface ArrayList 标记: class ...ArrayBlockingQueue ...LinkedBlockingQueue ...LinkedBlockingQueue 链表结构实现,无界队列(默认上限Integer.MAX_VALUE)
本文将深入探讨`ConcurrentLinkedQueue`、`ArrayBlockingQueue`以及`LinkedBlockingQueue`这三种实现,并分析它们的设计原理与应用场景。 首先,我们来看`ConcurrentLinkedQueue`。它是基于非阻塞算法(CAS,...
20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解 21.线程池ThreadPoolExecutor实现原理 22.线程池之ScheduledThreadPoolExecutor 23.FutureTask基本操作总结 24.Java中atomic包中的原子操作类...
它们分别基于链表、数组和优先级堆实现,其中 ArrayBlockingQueue 和 LinkedBlockingQueue 使用 ReentrantLock 和 Condition 实现线程安全,PriorityBlockingQueue 则是一个按优先级排序的队列。 在选择并发容器时...
1. 并发集合改进:如ArrayBlockingQueue、LinkedBlockingQueue等,提供了并发访问和线程安全的性能提升。 2. ForkJoinPool:Java 7引入的新特性,用于实现分治策略,提高并行计算能力。 七、实战应用 1. 高并发场景...
Java 5引入了`java.util.concurrent`包,其中`BlockingQueue`接口和其实现如`ArrayBlockingQueue`、`LinkedBlockingQueue`等提供了更高级的线程间通信方式,它们可以用于生产者-消费者模式或者工作窃取模式。...
例如,如果需要高效的并发操作,可以选择`ConcurrentLinkedQueue`或`LinkedBlockingQueue`;如果对元素的顺序有特定要求,可以使用`PriorityQueue`;对于一般用途且对性能要求较高,`ArrayDeque`是不错的选择。 ...
- **BlockingQueue**:一种线程安全的数据结构,用于线程间的数据交换,如ArrayBlockingQueue、LinkedBlockingQueue等。 4. **并发工具类**: - **CountDownLatch**:一次性计数器,用于线程同步,等待其他线程...
Java的`java.util.concurrent`包提供了多种阻塞队列实现,如ArrayBlockingQueue、LinkedBlockingQueue和PriorityBlockingQueue等。这些示例代码可能包含如何创建、插入、删除和操作阻塞队列的实例。 2. **卖票的...
阻塞队列在Java并发包java.util.concurrent中提供了多种实现,如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue和SynchronousQueue等,每种阻塞队列都根据其特性适用于不同的场景。...
9. **LinkedBlockingQueue**: 类似于ArrayBlockingQueue,但它基于链表结构,无固定容量限制,但是添加和获取元素的操作仍然有阻塞效果。 10. **LinkedBlockingDeque**: 双端阻塞队列,可以像LinkedBlockingQueue...
java bitset源码Java源码分析 ...ArrayBlockingQueue(done) LinkedBlockingDeque (done) LinkedBlockingQueue (done) PriorityBlockingQueue (done) ConcurrentHashMap (done) ConcurrentLinkedQueue (done) C
- `BlockingQueue`:阻塞队列,如`ArrayBlockingQueue`和`LinkedBlockingQueue`,在队列满或空时,线程会被阻塞,实现生产者-消费者模式。 - `CopyOnWriteArrayList` 和 `CopyOnWriteArraySet`:在读多写少的场景...
双向队列:ArrayBlockingQueue(有界),LinkedBlockingQueue(无界),DelayQueue,PriorityBlockingQueue,采用锁机制;使用ReentrantLock锁。 集合 链表,纹理 字典,关联数组 栈 Stack是线程安全的。 内部...
常见的`BlockingQueue`实现包括`ArrayBlockingQueue`、`LinkedBlockingQueue`和`PriorityBlockingQueue`等。 ### 5. 并发集合 Java并发库(`java.util.concurrent`包)提供了一系列优化的线程安全集合,如`...
3. **并发容器**:Java的`java.util.concurrent`包提供了多种并发队列,如`ArrayBlockingQueue`、`LinkedBlockingQueue`和`ConcurrentLinkedQueue`等,它们为多线程环境提供了高效的队列操作。 4. **工作窃取算法**...
阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...
阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...
阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...
阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...