public interface BlockingQueue<E> extends Queue<E> { /** * 添加元素 */ boolean add(E e); //将指定的元素添加到此队列中(如果立即可行),在成功时返回 true,其他情况则抛出 IllegalStateException。 void put(E e) throws InterruptedException; //将指定元素添加到此队列中,如果没有可用空间,将一直等待(如果有必要) boolean offer(E e); //将指定元素插入此队列中 /** * 删除元素 */ E take() throws InterruptedException; //检索并移除此队列的头部,如果此队列不存在任何元素,则一直等待 E poll(long timeout, TimeUnit unit) throws InterruptedException; //检索并移除此队列的头部,如果此队列中没有任何元素,则等待指定等待的时间(如果有必要) boolean remove(Object o); //从队列中删除指定元素 E peek(); }
- public class ArrayBlockingQueue<E> extends AbstractQueue<E>
- implements BlockingQueue<E>, {
- private final E[] items;
- /** items index for next take, poll or remove */
- private int takeIndex;
- /** items index for next put, offer, or add. */
- private int putIndex;
- private int count;
- /*ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,
- * 由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue
- */
- private final ReentrantLock lock;
- private final Condition notEmpty;
- private final Condition notFull;
- ...
- }
/** * 构造函数 */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
- final int inc(int i) {
- return (++i == items.length)? 0 : i;
- }
- //塞数据、取数据内部主要引用这两个方法
- //塞数据:
- private void insert(E x) {
- items[putIndex] = x;
- putIndex = inc(putIndex);
- ++count;
- notEmpty.signal();
- }
- //取数据:
- private E extract() {
- final E[] items = this.items;
- E x = items[takeIndex];
- items[takeIndex] = null;
- takeIndex = inc(takeIndex);
- --count;
- notFull.signal();
- return x;
- }
put()/take() 可打断,并且会等待
put()/take() 可打断,并且会等待 /** * 放数据 */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } /** * 取数据 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : items[takeIndex]; } finally { lock.unlock(); } } public boolean remove(Object o) { if (o == null) return false; final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int k = 0; for (;;) { if (k++ >= count) return false; if (o.equals(items[i])) { removeAt(i); return true; } i = inc(i); } } finally { lock.unlock(); } }
从 extract()方法里可见,tabkeIndex维护一个可以提取/移除元素的索引位置,因为takeIndex是从0递增的,所以这个类是FIFO队列。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
