ArrayBlockingQueue
public interface BlockingQueue<E> extends Queue<E> { /** * 添加元素 */ boolean add(E e); //将指定的元素添加到此队列中(如果立即可行),在成功时返回 true,其他情况则抛出 IllegalStateException。 void put(E e) throws InterruptedException; //将指定元素添加到此队列中,如果没有可用空间,将一直等待(如果有必要) boolean offer(E e); //将指定元素插入此队列中 /** * 删除元素 */ E take() throws InterruptedException; //检索并移除此队列的头部,如果此队列不存在任何元素,则一直等待 E poll(long timeout, TimeUnit unit) throws InterruptedException; //检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要) boolean remove(Object o); //从队列中删除指定元素 E peek(); }
- public class ArrayBlockingQueue<E> extends AbstractQueue<E>
- implements BlockingQueue<E>, java.io.Serializable {
- private final E[] items;
- /** items index for next take, poll or remove */
- private int takeIndex;
- /** items index for next put, offer, or add. */
- private int putIndex;
- private int count;
- /*ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,
- * 由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue
- */
- private final ReentrantLock lock;
- private final Condition notEmpty;
- private final Condition notFull;
- ...
- }
/** * 构造函数 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
- final int inc(int i) {
- return (++i == items.length)? 0 : i;
- }
- //塞数据、取数据内部主要引用这两个方法
- //塞数据:
- private void insert(E x) {
- items[putIndex] = x;
- putIndex = inc(putIndex);
- ++count;
- notEmpty.signal();
- }
- //取数据:
- private E extract() {
- final E[] items = this.items;
- E x = items[takeIndex];
- items[takeIndex] = null;
- takeIndex = inc(takeIndex);
- --count;
- notFull.signal();
- return x;
- }
put()/take() 可打断,并且会等待
put()/take() 可打断,并且会等待 /** * 放数据 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } /** * 取数据 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : items[takeIndex]; } finally { lock.unlock(); } } public boolean remove(Object o) { if (o == null) return false; final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int k = 0; for (;;) { if (k++ >= count) return false; if (o.equals(items[i])) { removeAt(i); return true; } i = inc(i); } } finally { lock.unlock(); } }
从 extract()方法里可见,tabkeIndex维护一个可以提取/移除元素的索引位置,因为takeIndex是从0递增的,所以这个类是FIFO队列。
putIndex维护一个可以插入的元素的位置索引。
count显然是维护队列中已经存在的元素总数。
ArrayBlockingQueue是JAVA5中的一个阻塞队列,能够自定义队列大小,当插入时,如果队列已经没有空闲位置,那么新的插入线程将阻塞到该队列,一旦该队列有空闲位置,那么阻塞的线程将执行插入。从队列中取数据为:take,放数据为:put
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
相关推荐
`ArrayBlockingQueue` 是 Java 中实现并发编程时常用的一个线程安全的数据结构,它是一个有界的阻塞队列。在 `java.util.concurrent` 包下,`ArrayBlockingQueue` 继承自 `java.util.concurrent.BlockingQueue` 接口...
ArrayBlockingQueue是Java并发编程中一个重要的数据结构,它是Java并发包`java.util.concurrent`中的一个线程安全的阻塞队列。这个队列基于数组实现,提供了在生产者和消费者之间进行数据交换的能力,同时确保了线程...
"并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解" ArrayBlockingQueue和LinkedBlockingQueue是Java并发容器中两个常用的阻塞队列实现,分别基于数组和链表存储元素。它们都继承自AbstractQueue类...
—BlockingQueue和ArrayBlockingQueue.pdf” 【描述】:此文档是关于Java并发编程的学习资料,以漫画形式讲解,聚焦于Java并发编程中的核心概念——BlockingQueue接口及其具体实现ArrayBlockingQueue。 【标签】:...
Java并发集合ArrayBlockingQueue的用法详解 Java并发集合ArrayBlockingQueue是Java并发集合框架下的一个重要组件,它提供了阻塞队列的实现,用于多线程环境下的并发操作。下面是对ArrayBlockingQueue的用法详解: ...
Java并发之ArrayBlockingQueue详细介绍 ArrayBlockingQueue是Java并发编程中常用的线程安全队列,经常被用作任务队列在线程池中。它是基于数组实现的循环队列,具有线程安全的实现。 ArrayBlockingQueue的实现 ...
Java中的ArrayBlockingQueue是一个高效的并发数据结构,它是Java并发包`java.util.concurrent`下的一个阻塞队列。本文将深入解析ArrayBlockingQueue的常用方法及其内部实现机制。 ArrayBlockingQueue的核心是一个...
ArrayBlockingQueue是Java并发编程中一个重要的集合类,它属于 BlockingQueue 接口的一个实现,主要特点是线程安全、有界以及基于数组的阻塞队列。线程安全得益于其内部使用了Java并发包中的ReentrantLock(可重入锁...
Java中的`LinkedBlockingQueue`和`ArrayBlockingQueue`都是`java.util.concurrent`包下的线程安全队列,它们都实现了`BlockingQueue`接口,提供了一种高效、线程安全的数据同步方式。这两种队列在很多方面都有相似之...
Java源码解析阻塞队列ArrayBlockingQueue介绍 Java源码解析阻塞队列ArrayBlockingQueue介绍是Java中的一种阻塞队列实现,使用ReentrantLock和Condition来实现同步和阻塞机制。本文将对ArrayBlockingQueue的源码进行...
Java源码解析阻塞队列ArrayBlockingQueue功能简介 ArrayBlockingQueue是Java中一个重要的阻塞队列实现,它基于数组实现了有界阻塞队列,提供FIFO(First-In-First-Out)功能。该队列的头元素是最长时间呆在队列中的...
在前面的的文章,写了一个带有缓冲区的队列,是用JAVA的Lock下的Condition实现的,但是JAVA类中提供了这项功能,是ArrayBlockingQueue, ArrayBlockingQueue是由数组支持的有界阻塞队列,次队列按照FIFO(先进先...
通常,我们会选择实现BlockingQueue的类,如ArrayBlockingQueue、LinkedBlockingQueue或PriorityBlockingQueue,根据实际需求选择合适的实现。 例如,我们可以创建一个配置类,如下所示: ```java @Configuration ...
ArrayBlockingQueue支持公平和非公平两种策略,公平策略意味着等待最久的线程优先获得资源,而非公平策略则不保证这一点。在高并发场景下,非公平策略通常能提供更好的性能。 2. LinkedBlockingQueue ...
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue(10); Thread producer = new Thread(new Producer(queue)); Thread consumer = new Thread(new Consumer(queue)); producer.start(); consumer....
本节我们将深入探讨Disruptor框架以及与ArrayBlockingQueue的对比。 ArrayBlockingQueue是Java并发库中的一个阻塞队列实现,基于循环数组,提供了线程安全的入队和出队操作。在上述代码中,我们创建了一个固定大小...