`
Donald_Draper
  • 浏览: 985200 次
社区版块
存档分类
最新评论

ArrayBlockingQueue解析

    博客分类:
  • JUC
阅读更多
Queue接口定义:http://donald-draper.iteye.com/blog/2363491
AbstractQueue简介:http://donald-draper.iteye.com/blog/2363608
ConcurrentLinkedQueue解析:http://donald-draper.iteye.com/blog/2363874
BlockingQueue接口的定义:http://donald-draper.iteye.com/blog/2363942
LinkedBlockingQueue解析:http://donald-draper.iteye.com/blog/2364007
package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;

/**
 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
 * array.  This queue orders elements FIFO (first-in-first-out).  The
 * [i]head[/i] of the queue is that element that has been on the
 * queue the longest time.  The [i]tail[/i] of the queue is that
 * element that has been on the queue the shortest time. New elements
 * are inserted at the tail of the queue, and the queue retrieval
 * operations obtain elements at the head of the queue.
 *
 基于数组的有界阻塞FIFO队列。head是待在队列中,最久的元素;
 tail则是最短的元素。新元素插入时放在队尾,消费时,则从head获取。
 * <p>This is a classic "bounded buffer", in which a
 * fixed-sized array holds elements inserted by producers and
 * extracted by consumers.  Once created, the capacity cannot be
 * changed.  Attempts to {@code put} an element into a full queue
 * will result in the operation blocking; attempts to {@code take} an
 * element from an empty queue will similarly block.
 *
 这一个是个节点的有界缓存队列,生产者生产消息,消费者消费消息。
 队列创建后,容量不能被修改。尝试向一个已满队列生产元素,则会被阻塞,
 尝试从一个空队列消费元素,同样会被阻塞。
 * <p>This class supports an optional fairness policy for ordering
 * waiting producer and consumer threads.  By default, this ordering
 * is not guaranteed. However, a queue constructed with fairness set
 * to {@code true} grants threads access in FIFO order. Fairness
 * generally decreases throughput but reduces variability and avoids
 * starvation.
 *
ArrayBlockingQueue支持消费和生产线程等待条件时公平性与非公平性,默认为非公平锁。
这个我们可以通过构造公平性参数来设置。公平锁一般会降低吞吐量,但是可以减少不确定性,
避免锁饥饿情况的发生。
 * <p>This class and its iterator implement all of the
 * [i]optional[/i] methods of the {@link Collection} and {@link
 * Iterator} interfaces.
 *
 实现了所有Collection和Iterator接口 
 * <p>This class is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <E> the type of elements held in this collection
 */
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * Serialization ID. This class relies on default serialization
     * even for the items array, which is default-serialized, even if
     * it is empty. Otherwise it could not be declared final, which is
     * necessary here.
     */
    private static final long serialVersionUID = -817911632652898426L;
    //存放元素的数组
    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    //下一个take,poll,peek,remove元素的数组index
    int takeIndex;

    /** items index for next put, offer, or add */
    //下一个put, offer, or add元素的数组index
    int putIndex;

    /** Number of elements in the queue */
    //当前队列的元素数量
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */
    //可重入锁
    /** Main lock guarding all access */
    final ReentrantLock lock;
    /** Condition for waiting takes ,队列非空条件*/
    private final Condition notEmpty;
    /** Condition for waiting puts 队列非满条件*/
    private final Condition notFull;
     /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and default access policy.
     *
     待容量参数的构造
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     待容量和公平性参数的构造
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    /**
     * Circularly increment i.
     */
    final int inc(int i) {
        return (++i == items.length) ? 0 : i;
    }

    /**
     * Circularly decrement i.
     */
    final int dec(int i) {
        return ((i == 0) ? items.length : i) - 1;
    }

    @SuppressWarnings("unchecked")
    static <E> E cast(Object item) {
        return (E) item;
    }
}

从上面来看,ArrayBlockingQueue是一个有界的线程安全FIFO队列,队列元素放在
一个元素数组中,一个takeIndex,表示下一个take,poll,peek,remove元素的数组index。
一个putIndex,表示下一个put, offer, or add元素的数组index,一个int Count,表示当前
队列元素数量,一把锁ReentrantLock用于访问控制,一个队列非空条件notEmpty,一个队列非满条件notFull,这两条件都是由ReentrantLock创建。
来看put操作:
/**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        //检查元素是否为null
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
	//以可中断方式,获取锁
        lock.lockInterruptibly();
        try {
            while (count == items.length)
	        //如果队列已满,等待notFull条件
                notFull.await();
	    //否则插入元素
            insert(e);
        } finally {
            lock.unlock();
        }
    }

put操作首先检查元素是否为null,然后以可中断方式,获取锁,
如果队列已满,等待notFull条件,否则插入元素;
来看put操作的几个要点:
1.
//检查元素是否为null
checkNotNull(e);

/**
     * Throws NullPointerException if argument is null.
     *
     * @param v the element
     */
    private static void checkNotNull(Object v) {
       //为null,则抛出null指针异常
        if (v == null)
            throw new NullPointerException();
    }


2.
//否则插入元素
 insert(e);


/**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void insert(E x) {
        items[putIndex] = x;
	//put索引自增1
        putIndex = inc(putIndex);
	//容量自增
        ++count;
	//唤醒等待消费的线程
        notEmpty.signal();
    }

来看offer操作
public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

offer操作与put操作的区别,当队列满时,返回false;
再看超时offer:
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            insert(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

超时offer与put操作没有太大的区别,区别是当队列满时,
超时等待notFull条件,插入成功,则返回true。

再看add操作:
 public boolean add(E e) {
        return super.add(e);
    }

//AbstractQueue
 
 public boolean add(E e) {
        //委托给offer
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }


add操作实际上调用的offer操作。

再看take操作
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
	//以可中断方式获取锁
        lock.lockInterruptibly();
        try {
            while (count == 0)
	        //当队列为空,等待非空条件
                notEmpty.await();
            //返回队列中takeIndex,索引对应的元素
            return extract();
        } finally {
            lock.unlock();
        }
    }

   /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E extract() {
        final Object[] items = this.items;
	//获取队列中takeIndex,索引对应的元素
        E x = this.<E>cast(items[takeIndex]);
	//设置takeIndex索引对应的元素为null
        items[takeIndex] = null;
	//takeIndex索引自增
        takeIndex = inc(takeIndex);
	//容量count自减
        --count;
	//唤醒等待notFull条件的线程
        notFull.signal();
        return x;
    }

从上来看,take操作首先以可中断方式获取锁,当队列为空,等待非空条件,
否则返回队列中takeIndex,索引对应的元素,akeIndex索引自增,容量count自减,
唤醒等待notFull条件的线程。
再看超时poll
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return extract();
        } finally {
            lock.unlock();
        }
    }

超时poll的唯一区别是当当队列为空,超时等待非空条件。

再看poll
 
 public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : extract();
        } finally {
            lock.unlock();
        }
    }

poll操作与take操作的最大的区别为当队列为空,返回null。


再看peek操作:
 public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : itemAt(takeIndex);
        } finally {
            lock.unlock();
        }
    }

peek操作首先获取锁,如果队列为空,则返回null,否则,返回takeIndex所对应的元素。
来看peek操作关键点itemAt,
 
/**
     * Returns item at index i.
     */
    final E itemAt(int i) {
        return this.<E>cast(items[i]);
    }

再看remove操作:
public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
	//获取锁
        lock.lock();
        try {
	    //遍历队列,找到元素相等,移除
            for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

remove操作,首先获取锁,遍历队列,找到元素相等,移除。

contains操作:
 public boolean contains(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
                if (o.equals(items[i]))
                    return true;
            return false;
        } finally {
            lock.unlock();
        }
    }

clear操作:
 /**
     * Atomically removes all of the elements from this queue.
     * The queue will be empty after this call returns.
     */
    public void clear() {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
	    //遍历队列,将队列清空
            for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
                items[i] = null;
            count = 0;
            putIndex = 0;
            takeIndex = 0;
	    //通知所有等待put的线程
            notFull.signalAll();
        } finally {
            lock.unlock();
        }
    }

再来看drainTo
drainTo(Collection<? super E> c)和drainTo(Collection<? super E> c, int maxElements) 基本上相同,我们来看
drainTo(Collection<? super E> c, int maxElements)
/**
     * @throws UnsupportedOperationException {@inheritDoc}
     * @throws ClassCastException            {@inheritDoc}
     * @throws NullPointerException          {@inheritDoc}
     * @throws IllegalArgumentException      {@inheritDoc}
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        checkNotNull(c);
        if (c == this)
            throw new IllegalArgumentException();
        if (maxElements <= 0)
            return 0;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex;
            int n = 0;
            int max = (maxElements < count) ? maxElements : count;
	    //取出所有可用元素,添加集合
            while (n < max) {
                c.add(this.<E>cast(items[i]));
                items[i] = null;
                i = inc(i);
                ++n;
            }
            if (n > 0) {
                count -= n;
                takeIndex = i;
		//唤醒所有等待put的线程
                notFull.signalAll();
            }
	    //返回取出的元素数量
            return n;
        } finally {
            lock.unlock();
        }
    }

//获取队列中的元素数量
 public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

//获取队列剩余空间
public int remainingCapacity() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return items.length - count;
        } finally {
            lock.unlock();
        }
    }




总结:
    ArrayBlockingQueue是一个有界的线程安全FIFO队列,队列元素放在
一个元素数组中,一个takeIndex,表示下一个take,poll,peek,remove元素的数组index。
一个putIndex,表示下一个put, offer, or add元素的数组index,一个int Count,表示当前
队列元素数量,一把锁ReentrantLock用于访问控制,一个队列非空条件notEmpty,一个队列非满条件notFull,这两个条件都是由ReentrantLock创建。
    put操作首先检查元素是否为null,然后以可中断方式,获取锁,
如果队列已满,等待notFull条件,否则插入元素;putIndex索引自增,容量count自增,唤醒等待消费的线程。offer操作与put操作的区别,当队列满时,返回false;超时offer与put操作没有太大的区别,区别是当队列满时超时等待notFull条件,插入成功,则返回true。
具体选择何中操作,视具体的场景需求。
    take操作首先以可中断方式获取锁,当队列为空,等待非空条件,否则返回队列中takeIndex,索引对应的元素,akeIndex索引自增,容量count自减,唤醒等待notFull条件的线程。超时poll的唯一区别是当当队列为空,超时等待非空条件。poll操作与take操作的区别为当队列为空,返回null。具体选择何中操作,视具体的场景需求。
   peek操作首先获取锁,如果队列为空,则返回null,否则,返回takeIndex所对应的元素。
   remove操作,首先获取锁,遍历队列,找到元素相等,移除。
   ArrayBlockingQueue与LinkedBlockingQueue区别是LinkedBlockingQueue中元素以节点链表来存储,而ArrayBlockingQueue是放在数组中;LinkedBlockingQueue中有两把锁分别为put和take锁,读写分离,而ArrayBlockingQueue只有一把锁控制take和put。
0
0
分享到:
评论

相关推荐

    ArrayBlockingQueue源码解析-动力节点共

    本文将深入解析ArrayBlockingQueue的内部实现机制和关键特性。 **1. ArrayBlockingQueue的构造** ArrayBlockingQueue在初始化时需要指定其容量大小,这个容量一旦设定就无法改变。构造函数通常需要传入两个参数:...

    Java源码解析阻塞队列ArrayBlockingQueue常用方法

    本文将深入解析ArrayBlockingQueue的常用方法及其内部实现机制。 ArrayBlockingQueue的核心是一个固定大小的数组`items`,用于存储队列中的元素。此外,它还有3个关键的索引变量:`takeIndex`、`putIndex`和`count`...

    Java源码解析阻塞队列ArrayBlockingQueue介绍

    Java源码解析阻塞队列ArrayBlockingQueue介绍 Java源码解析阻塞队列ArrayBlockingQueue介绍是Java中的一种阻塞队列实现,使用ReentrantLock和Condition来实现同步和阻塞机制。本文将对ArrayBlockingQueue的源码进行...

    Java源码解析阻塞队列ArrayBlockingQueue功能简介

    Java源码解析阻塞队列ArrayBlockingQueue功能简介 ArrayBlockingQueue是Java中一个重要的阻塞队列实现,它基于数组实现了有界阻塞队列,提供FIFO(First-In-First-Out)功能。该队列的头元素是最长时间呆在队列中的...

    java concurrent 包 详细解析

    5. **阻塞队列**:如`ArrayBlockingQueue`、`LinkedBlockingQueue`,它们是线程安全的队列,用于存储待处理的任务,线程池会从队列中取出任务进行执行。 6. **并发集合**:Java并发包提供了线程安全的集合,如`...

    java阻塞队列实现原理及实例解析.docx

    在Java中,自Java 5.0起,`java.util.concurrent`包提供了多种阻塞队列的实现,例如`ArrayBlockingQueue`、`LinkedBlockingQueue`、`PriorityBlockingQueue`等。这些类都实现了`java.util.concurrent.BlockingQueue`...

    面试必备:Java线程池解析.pdf

    - ArrayBlockingQueue:基于数组的有界阻塞队列。 - LinkedBlockingQueue:基于链表的可选有界阻塞队列。 - SynchronousQueue:不存储元素的阻塞队列,提交任务必须等待另一个线程取走。 - PriorityBlockingQueue:...

    JAVA技能鉴定试题大全.pdf

    这可以使用`Thread`类或`Runnable`接口实现,队列可以使用`ArrayBlockingQueue`,限制其大小。线程二则负责按照特定规则选择话务员。 5. **数据排序**:题目要求按照成绩、姓名、年龄升序排列。Java集合框架中的`...

    完全解析Android多线程中线程池ThreadPool的原理和使用

    5. **工作队列** (workQueue):用于存放待执行任务的队列,如ArrayBlockingQueue、LinkedBlockingQueue等。 6. **线程工厂** (threadFactory):创建新线程的工厂,用于定制线程的创建行为。 线程池的工作逻辑如下:...

    15个顶级Java多线程面试题及回答

    Java提供了多种实现阻塞队列的方式,例如使用`ArrayBlockingQueue`、`LinkedBlockingQueue`等预定义类,也可以使用`wait`/`notify`机制自定义实现。 #### 5. 生产者-消费者问题 - **题目**: 用Java写代码来解决...

    java线程池实例

    常见的有`ArrayBlockingQueue`、`LinkedBlockingQueue`和`SynchronousQueue`等。 6. `threadFactory`: 用于创建新线程的工厂,可以自定义线程的命名、优先级等。 7. `handler`: 当线程池无法接受新任务时,例如达到...

    基于Java线程池技术的数据爬虫设计与实现.zip

    3. 工作队列:存放待执行任务的队列,可以是无界队列(如LinkedBlockingQueue)或有界队列(如ArrayBlockingQueue)。 4. 线程存活时间:当线程池中线程数超过核心线程数,且没有任务执行时,线程存活的最长时间。 ...

    java阻塞队列实现原理及实例解析

    Java阻塞队列实现原理及实例解析 Java阻塞队列是一种特殊的队列,它能够在队列为空或满时阻塞线程,使得线程之间能够更好地协作和通信。阻塞队列的实现原理是基于锁机制和条件变量机制的,通过wait和notify方法来...

    Android校招面试指南 2018最新版本

    - **ArrayBlockingQueue**:基于数组的有界阻塞队列。 - **LinkedBlockingQueue**:基于链表的阻塞队列。 - **ConcurrentHashMap**:线程安全的哈希表。 #### 三、Java虚拟机(JVM) - **对象的创建、内存布局和...

    Android面试复习资料大全(包含java源码)

    11. **Java并发集合**:ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentHashMap等线程安全集合的使用和原理。 12. **Java虚拟机(JVM)**:了解JVM的基本概念、对象的创建、内存布局、内存模型、类加载机制、...

    java爬虫源码

    7. **阻塞队列**:在Java中,如LinkedBlockingQueue、ArrayBlockingQueue等阻塞队列常用于线程间的通信和协调。它们允许生产者线程放入元素,消费者线程取出元素,而不会发生数据竞争。 8. **网络编程**:在爬虫...

    JAVA技能鉴定试题大全[归纳].pdf

    可以使用`ArrayBlockingQueue`,它是一个有限大小的阻塞队列,线程安全,适合生产者-消费者模型。线程间的同步可以通过`synchronized`关键字或`java.util.concurrent.locks`包中的锁来实现,确保操作的原子性和避免...

Global site tag (gtag.js) - Google Analytics