//先看构造函数
//初始化一个给定容量的ArrayBlockingQueue
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//通过给定的容量初始化内部的数组和锁以及条件。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化一个给定容量的数组
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//通过给定的集合初始化数组超出大小会抛出异常这里为什么要上锁?源码注释写的是这里不会有互斥,只是为了保证可见性。防止指令重排?
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
//将数量设置为i
count = i;
//设置putIndex的位置如果数组已满的话设置为0
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
//add方法 调用offer方法新增元素如果成功返回true失败抛出异常
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
//不能加入null元素
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果数组已经满了
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
//设置putIndex位置
putIndex = inc(putIndex);
//总数+1
++count;
//因为数组已经非空所以唤醒notEmpty条件上的等待队列
notEmpty.signal();
}
final int inc(int i) {
//队列已满put设置为0否则设置为i+1
return (++i == items.length) ? 0 : i;
}
//poll方法没有元素获取null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果当前没有元素了返回null。
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
//获取E的实际类型
E x = this.<E>cast(items[takeIndex]);
//将当前位置的值设置为null防止内存溢出。
items[takeIndex] = null;
//将takeIndex加1如果数组已满设置为0
takeIndex = inc(takeIndex);
//数组元素数量减少一个
--count;
//当前数组已经是非满状态所以唤醒在notFull条件上等待的队列。
notFull.signal();
return x;
}
@SuppressWarnings("unchecked")
static <E> E cast(Object item) {
return (E) item;
}
//poll(long timeout, TimeUnit unit)在该时间内一直尝试去获取元素超时返回null。
/**这个方法实现的很精妙,直接在notEmpty条件上挂起当前线程,如果该条件被唤醒,
他就去获取一次值。
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
//在notEmpty条件上等待nanos时间
nanos = notEmpty.awaitNanos(nanos);
}
return extract();
} finally {
lock.unlock();
}
}
//offer(E e, long timeout, TimeUnit unit)在指定时间内插入元素超时返回false
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
//如果当前队列是满的那么在notFull条件上等待唤醒
nanos = notFull.awaitNanos(nanos);
}
insert(e);
return true;
} finally {
lock.unlock();
}
}
//remove方法成功返回true失败返回false
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//从takeIndex开始遍历查找o
for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
//如果删除的是头元素直接删除takeIndex+1,否则滑动整个数组,重新构建整个数组。
void removeAt(int i) {
final Object[] items = this.items;
//如果移除正好位于takeindex上。
if (i == takeIndex) {
items[takeIndex] = null;
takeIndex = inc(takeIndex);
} else {
// slide over all others up through putIndex.
for (;;) {
int nexti = inc(i);
//移动整个元素列表
if (nexti != putIndex) {
items[i] = items[nexti];
i = nexti;
} else {
items[i] = null;
//将putindex往前移
putIndex = i;
break;
}
}
}
--count;
//当前队列已经非满唤醒在非满条件上等待的线程
notFull.signal();
}
//peek:获取头元素如果没有获取到则返回null。仅仅是获取不移除。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : itemAt(takeIndex);
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return this.<E>cast(items[i]);
}
//put()如果队列已满。则阻塞。直到notFull条件被唤醒。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
//take()如果队列为空则阻塞.直到notEmpty条件被唤醒。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
//remainingCapacity()返回当前还能够容下的元素个数
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
//contains返回是否包含当前元素
public boolean contains(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
if (o.equals(items[i]))
return true;
return false;
} finally {
lock.unlock();
}
}
//返回一个包含当前元素的新数组Object类型
public Object[] toArray() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
Object[] a = new Object[count];
for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
a[k] = items[i];
return a;
} finally {
lock.unlock();
}
}
//返回指定类型的新数组
public <T> T[] toArray(T[] a) {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
final int len = a.length;
//如果传入的数组长度<当前的元素个数就重新创建一个count长度的数组
if (len < count)
a = (T[])java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), count);
for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
a[k] = (T) items[i];
//如果长度>count
if (len > count)
a[count] = null;
return a;
} finally {
lock.unlock();
}
}
//移除此队列中的所有元素
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
items[i] = null;
//将这几个变量设置为初始值。
count = 0;
putIndex = 0;
takeIndex = 0;
notFull.signalAll();
} finally {
lock.unlock();
}
}
//返回迭代器一个内部类实现这里就不写了
public Iterator<E> iterator() {
return new Itr();
}
//移除队列中的所有元素并封装到collection中
public int drainTo(Collection<? super E> c) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int n = 0;
int max = count;
while (n < max) {
c.add(this.<E>cast(items[i]));
//帮助垃圾回收器回收,防止内存泄漏
items[i] = null;
i = inc(i);
++n;
}
if (n > 0) {
//将当前集合置为初始化状态
count = 0;
putIndex = 0;
takeIndex = 0;
//唤醒非满条件上的等待。
notFull.signalAll();
}
return n;
} finally {
lock.unlock();
}
}
//从此队列中最多移出maxElements个元素。
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex;
int n = 0;
//如果maxElements<count就移出maxElements个否则全部移出。
int max = (maxElements < count) ? maxElements : count;
while (n < max) {
c.add(this.<E>cast(items[i]));
items[i] = null;
i = inc(i);
++n;
}
if (n > 0) {
count -= n;
takeIndex = i;
notFull.signalAll();
}
return n;
} finally {
lock.unlock();
}
}
/**
总结:ArrayBlockingQueue低层是用数组+锁实现。是一种可以用于生产者消费者模式的可阻塞的队列。但是插入和删除这些操作都是用的一把锁。这可能会导致效率不高?
*/
分享到:
相关推荐
下面我们将深入分析其主要的实现机制、方法以及源码。 1. **数据结构与容量** `ArrayBlockingQueue` 内部使用一个数组 `items` 来存储元素,因此它的容量在创建时就需要指定,且不可改变。这个容量限制确保了队列...
在深入理解ArrayBlockingQueue的源码之前,我们需要先了解其基本概念和特性。 ArrayBlockingQueue的核心特点在于其固定大小的容量,这使得它在处理高并发场景时能有效控制资源消耗。队列的元素按照FIFO(先进先出)...
Java源码解析阻塞队列ArrayBlockingQueue介绍 Java源码解析阻塞队列ArrayBlockingQueue介绍是Java中的一种阻塞队列实现,使用ReentrantLock和Condition来实现同步和阻塞机制。本文将对ArrayBlockingQueue的源码进行...
Java中的ArrayBlockingQueue是一个高效的并发数据结构,它是Java并发包`java.util.concurrent`下的一个阻塞队列。本文将深入解析ArrayBlockingQueue的常用方法及其内部实现机制。 ArrayBlockingQueue的核心是一个...
八、ArrayBlockingQueue源码分析 ArrayBlockingQueue是一种基于数组实现的阻塞队列,提供了线程安全的生产者消费者模型。ArrayBlockingQueue的继承体系中,它继承了AbstractQueue,实现了BlockingQueue接口。 ...
Java源码解析阻塞队列ArrayBlockingQueue功能简介 ArrayBlockingQueue是Java中一个重要的阻塞队列实现,它基于数组实现了有界阻塞队列,提供FIFO(First-In-First-Out)功能。该队列的头元素是最长时间呆在队列中的...
Java并发集合ArrayBlockingQueue的用法详解 Java并发集合ArrayBlockingQueue是Java并发集合框架下的一个重要组件,它提供了阻塞队列的实现,用于多线程环境下的并发操作。下面是对ArrayBlockingQueue的用法详解: ...
java ...[ArrayBlockingQueue] [ConcurrentLinkedQueue] [PriorityBlockingQueue] [DelayQueue] 并发安全集合 [HashMap, ConcurrentHashMap源码] [ArrayList, LinkedList, CopyOnWriteArrayList源码]
"并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解" ArrayBlockingQueue和LinkedBlockingQueue是Java并发容器中两个常用的阻塞队列实现,分别基于数组和链表存储元素。它们都继承自AbstractQueue类...
常见的有无界队列(如ArrayBlockingQueue)和有界队列(如LinkedBlockingQueue),以及优先级队列(PriorityBlockingQueue)。队列的选择直接影响线程池的处理策略。 `ThreadPoolExecutor`的工作流程如下: 1. **...
Java并发之ArrayBlockingQueue详细介绍 ArrayBlockingQueue是Java并发编程中常用的线程安全队列,经常被用作任务队列在线程池中。它是基于数组实现的循环队列,具有线程安全的实现。 ArrayBlockingQueue的实现 ...
ArrayList核心源码+扩容机制分析LinkedList核心源码分析HashMap核心源码+底层数据结构分析ConcurrentHashMap核心源码+底层数据结构分析LinkedHashMap核心源码分析CopyOnWriteArrayList核心源码分析...
ArrayBlockingQueue是Java并发编程中一个重要的集合类,它属于 BlockingQueue 接口的一个实现,主要特点是线程安全、有界以及基于数组的阻塞队列。线程安全得益于其内部使用了Java并发包中的ReentrantLock(可重入锁...
—BlockingQueue和ArrayBlockingQueue.pdf” 【描述】:此文档是关于Java并发编程的学习资料,以漫画形式讲解,聚焦于Java并发编程中的核心概念——BlockingQueue接口及其具体实现ArrayBlockingQueue。 【标签】:...
Java中的`LinkedBlockingQueue`和`ArrayBlockingQueue`都是`java.util.concurrent`包下的线程安全队列,它们都实现了`BlockingQueue`接口,提供了一种高效、线程安全的数据同步方式。这两种队列在很多方面都有相似之...
4. **并发容器**:如`ConcurrentHashMap`、`BlockingQueue`、`ArrayBlockingQueue`、`LinkedBlockingQueue`等,这些容器在并发环境下提供高效的数据共享。源码分析可以帮助理解它们如何保证线程安全和并发性能。 5....
囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue(ArrayBlockingQueue、...
Java并发库中实现阻塞队列的主要类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue和DelayQueue等。它们在并发编程中起到缓冲和协调生产者与消费者线程的作用,实现高效的数据交换。 3. **线程...