一:功能介绍
基于数组的有界阻塞队列,基于FIFO的存储模式,支持公平非公平锁。
二:源码分析
//数组 final Object[] items; //出队索引 int takeIndex; //入队索引 int putIndex; //队列大小 int count; //可重入锁 final ReentrantLock lock; //等待通知条件 private final Condition notEmpty; //等待通知条件 private final Condition notFull;
构造函数
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); //初始化数组容量 this.items = new Object[capacity]; //内容采用可重入锁ReentrantLock实现,支持公平非公平选择 lock = new ReentrantLock(fair); //阻塞队列,等待条件 notEmpty = lock.newCondition(); //阻塞队列,等待条件 notFull = lock.newCondition(); }
入队操作
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; //可中断获取锁,如果出现了interrupted,不用一直阻塞 lock.lockInterruptibly(); try { //如果队列已满 while (count == items.length) //入队线程阻塞 notFull.await(); //插入数据 insert(e); } finally { lock.unlock(); } } private void insert(E x) { //将新的数据赋值在数组的某一个索引处 items[putIndex] = x; //重新赋值putIndex,设置下一个被取出元素的索引 putIndex = inc(putIndex); //队列大小+1 ++count; //唤醒take线程 notEmpty.signal(); } final int inc(int i) { //如果队列满了,重新初始化为0 return (++i == items.length) ? 0 : i; }
出队操作
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //同上,获取中断锁 lock.lockInterruptibly(); try { //队列没有值,阻塞 while (count == 0) notEmpty.await(); //返回被取走的数据 return extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; //获取takeIndex处的元素 E x = this.<E>cast(items[takeIndex]); //置空takeIndex处的元素,引用不存在,便于GC,释放内存 items[takeIndex] = null; //重新赋值takeIndex,设置下一个被取出的元素 takeIndex = inc(takeIndex); //队列大小-1 --count; //唤醒put线程 notFull.signal(); return x; }
移除数据
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { //从takeIndex处开始计算,每次i加1,最大为队列最大容量count for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { //如果移除元素在数组某个下标找到 if (o.equals(items[i])) { removeAt(i); return true; } } return false; } finally { lock.unlock(); } } void removeAt(int i) { final Object[] items = this.items; //如果准备移除的索引和下一个被取出的元素索引一样,直接移除 if (i == takeIndex) { //赋值null,便于GC items[takeIndex] = null; //重新设置下一个被取出元素的索引 takeIndex = inc(takeIndex); //如果需要删除的元素索引不是当前被取出的索引 } else { //一直循环,直到删除为止 for (;;) { //假设队列容量是4,目前存了3个元素,即takeIndex=0,putIndex=3,目前我打算删除数组下标为1的元素 // nexti第一次为2 int nexti = inc(i); if (nexti != putIndex) { //相当于将队列往前移 items[i] = items[nexti]; //相当于i+1 i = nexti; //待删除的索引与待put的索引相等,比如putIndex=2,i=1,inc(i) = 2 } else { //索引i处置null,偏于GC items[i] = null; //重新赋值下一个即将放入元素的索引 putIndex = i; break; } } } //队列大小-1 --count; //唤醒put线程,公平的话按FIFO顺序,非公平的话可以抢占 notFull.signal(); }
遍历队列
public Iterator<E> iterator() { return new Itr(); } private class Itr implements Iterator<E> { //队列里面还剩的元素个数 private int remaining; //下一次调用next()返回的索引 private int nextIndex; //下一次调用next()返回的元素 private E nextItem; //上一次调用next()返回的元素 private E lastItem; //上一次调用next()返回的索引 private int lastRet; Itr() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { lastRet = -1; //只有队列里面还有元素 if ((remaining = count) > 0) //获取takeIndex处的元素 nextItem = itemAt(nextIndex = takeIndex); } finally { lock.unlock(); } } public boolean hasNext() { return remaining > 0; } public E next() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { //如果队列没有值 if (remaining <= 0) throw new NoSuchElementException(); lastRet = nextIndex; //获取下一次获取索引处的元素 E x = itemAt(nextIndex); // check for fresher value if (x == null) { x = nextItem; // we are forced to report old value lastItem = null; // but ensure remove fails } else //将刚获取的元素当做上一次获取的元素 lastItem = x; //当下一次获取的元素不存在的时候 while (--remaining > 0 && // skip over nulls (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) ; return x; } finally { lock.unlock(); } } public void remove() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { int i = lastRet; if (i == -1) throw new IllegalStateException(); lastRet = -1; E x = lastItem; lastItem = null; // only remove if item still at index if (x != null && x == items[i]) { boolean removingHead = (i == takeIndex); removeAt(i); if (!removingHead) nextIndex = dec(nextIndex); } } finally { lock.unlock(); } } }
相关推荐
`ArrayBlockingQueue` 是 Java 中实现并发编程时常用的一个线程安全的数据结构,它是一个有界的阻塞队列。在 `java.util.concurrent` 包下,`ArrayBlockingQueue` 继承自 `java.util.concurrent.BlockingQueue` 接口...
Java源码解析阻塞队列ArrayBlockingQueue功能简介 ArrayBlockingQueue是Java中一个重要的阻塞队列实现,它基于数组实现了有界阻塞队列,提供FIFO(First-In-First-Out)功能。该队列的头元素是最长时间呆在队列中的...
首先,ArrayBlockingQueue是一个有界的阻塞队列,这意味着它有一个固定的容量限制。在创建时必须提供该容量大小,一旦队列满,生产者尝试插入元素时会被阻塞,直到其他消费者消费掉一部分元素;反之,当队列为空时,...
八、ArrayBlockingQueue源码分析 ArrayBlockingQueue是一种基于数组实现的阻塞队列,提供了线程安全的生产者消费者模型。ArrayBlockingQueue的继承体系中,它继承了AbstractQueue,实现了BlockingQueue接口。 ...
其中,`ArrayBlockingQueue`是基于数组的有界队列,内部使用了`ReentrantLock`实现同步;`LinkedBlockingQueue`则是基于链表结构的无界队列,它使用了`Node`节点来存储元素,并通过`Condition`来控制生产者和消费者...
1. **ArrayBlockingQueue**:基于数组的有界阻塞队列,容量在创建时确定且不可变。它是线程安全的,内部通过锁(`ReentrantLock`)和条件变量(`Condition`)来实现同步。 2. **LinkedBlockingQueue**:基于链表...
例如,`ArrayBlockingQueue`是一个基于数组的有界阻塞队列,提供了公平或非公平的锁策略。 backport-util-concurrent还包含了其他一些并发工具,如`Semaphore`(信号量)、`CountDownLatch`(倒计数器)和`...
其次,`ArrayBlockingQueue`是一个有界的阻塞队列,内部使用数组实现。它通过公平或非公平锁来保证线程安全,提供了固定的容量,从而避免了资源无限消耗的问题。其吞吐量通常低于`ConcurrentLinkedQueue`,但在高...
8. **阻塞队列**:如`LinkedBlockingQueue`和`ArrayBlockingQueue`,前者基于链表结构,后者基于数组。它们都使用内部锁机制来保证线程安全,但实现方式不同,例如ArrayBlockingQueue使用的是公平锁。 9. **HashMap...
- **ArrayBlockingQueue**:基于数组的有界阻塞队列。 - **LinkedBlockingQueue**:基于链表的阻塞队列,可以设置容量也可以不限制。 以上总结了多线程面试中的常见问题及解决方案,包括但不限于`synchronized`...
- **ArrayBlockingQueue**:基于数组的阻塞队列。 - **LinkedBlockingQueue**:基于链表的阻塞队列。 - **PriorityBlockingQueue**:支持优先级排序的阻塞队列。 #### 25. Tomcat 如何管理servlet? Tomcat使用...
常见的实现有ArrayBlockingQueue、LinkedBlockingQueue和PriorityBlockingQueue等。阻塞队列常用于生产者消费者模型,可以有效地管理并发任务。 4. **Atomic* 类**:如AtomicInteger、AtomicLong等,提供了原子操作...
asynThread框架可能就是基于这样的理念,利用`ArrayBlockingQueue`作为线程池的工作队列,这是一种有界的阻塞队列,可以限制线程池中并发任务的数量,防止过度消耗系统资源。 `ArrayBlockingQueue`是线程安全的数据...
2. 并发容器及典型源码分析并发容器是Java并发编程中不可或缺的一部分,它们提供了线程安全的数据结构,可以高效地处理多线程环境下的数据共享。以下是一些重要的并发容器: 2.1 ConcurrentHashMap并发哈希映射表,...