`
BrokenDreams
  • 浏览: 253998 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:100166
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(12)-ArrayBlockingQueue

阅读更多

Jdk1.6 JUC源码解析(12)-ArrayBlockingQueue

作者:大飞

 

功能简介:
  • ArrayBlockingQueue是一种基于数组实现的有界的阻塞队列。队列中的元素遵循先入先出(FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。
  • 和普通队列有所不同,该队列支持阻塞操作。比如从空队列中取元素,会导致当前线程阻塞,直到其他线程将元素放入队列;将元素插入已经满的队列,同样会导致当前线程阻塞,直到其他线程从队列中取出元素。
  • ArrayBlockingQueue也支持公平和非公平策略(针对队列中元素的存取线程,也可认为是元素的生产者和消费者)。
源码分析:
  • ArrayBlockingQueue继承了AbstractQueue并实现了BlockingQueue,AbstractQueue是Queue的公共骨架实现,这个不看了,简单看下BlockingQueue接口:
public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 将一个元素放入队列。
     * 成功返回true;失败抛IllegalStateException异常。
     */
    boolean add(E e);
    /**
     * 将一个元素放入队列。
     * 成功返回true;失败返回false。
     */
    boolean offer(E e);
    /**
     * 将一个元素放入队列。
     * 如果元素无法放入队列,当前操作线程会等待,直到元素可以放入队列。
     */
    void put(E e) throws InterruptedException;
    /**
     * 将一个元素放入队列。
     * 如果元素无法放入队列,当前操作线程会等待,直到元素可以放入队列或者
     * 给定的时间超时。
     * 成功返回true;超时返回false;
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 从队列头部获取并删除一个元素。
     * 如果无法获取元素,当前操作线程等待,直到有元素可以被获取。
     */
    E take() throws InterruptedException;
    /**
     * 从队列头部获取并删除一个元素。
     * 如果无法获取元素,当前操作线程等待,直到有元素可以被获取或者给定时间超时。
     * 如果超时,返回null。
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 获取队列剩余容量。
     */
    int remainingCapacity();
    /**
     * 移除队列中和给定元素相同的元素。
     */
    boolean remove(Object o);
    /**
     * 判断队列中是否包含给定元素。
     */
    public boolean contains(Object o);
    /**
     * 移除队列中所有的可用元素,并把它们添加到给定集合。
     */
    int drainTo(Collection<? super E> c);
    /**
     * 移除队列中不超过给定数量的可用元素,并把它们添加到给定集合。
     */
    int drainTo(Collection<? super E> c, int maxElements);
}
       可以重点关注下put和take方法的行为。
 
  • 接下来看下ArrayBlockingQueue内部的数据结构:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private static final long serialVersionUID = -817911632652898426L;
    /** 保存内部元素的数组  */
    private final E[] items;
    /** 取元素使用的下标 */
    private int takeIndex;
    /** 存元素使用的下标 */
    private int putIndex;
    /** 队列中元素数量 */
    private int count;
    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */
    /** 保护存取的锁 */
    private final ReentrantLock lock;
    /** 取的等待条件 */
    private final Condition notEmpty;
    /** 存的等待条件 */
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        if (capacity < c.size())
            throw new IllegalArgumentException();
        for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
            add(it.next());
    }
       ArrayBlockingQueue内部结构非常简单,就是一个数组,一把锁,两个条件;也可以看到,上面提到的公平和非公平策略是由内部的重入锁来支持的。
 
  • 继续看下ArrayBlockingQueue的重要方法,重点看下put和take,先看下put方法:
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        //由于需要支持方法可中断行为,这里使用可中断的锁操作。
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();//队列满时,在notFull条件上等待。
            } catch (InterruptedException ie) {
                notFull.signal(); // 被中断后,唤醒其他等待notFull条件的线程。
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
    }

    /**
     * Circularly increment i.
     */
    final int inc(int i) {
        return (++i == items.length)? 0 : i;
    }
    /**
     * 在内部数组的putIndex位置插入元素,调整putIndex和count,然后唤醒notEmpty条件上等待的线程。
     * 本方法只有在持有锁的情况下才会被调用。
     */
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

 

       再看下take方法: 

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();//队列空时,在notEmpty条件上等待。
            } catch (InterruptedException ie) {
                notEmpty.signal(); // 被中断后,唤醒其他等待notEmpty条件的线程。
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 从takeInde的位置取出元素,增加takeIndex,减少count,唤醒在notFull上等待的线程。
     * 本方法只有在持有锁的情况下才会被调用。
     */
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }

 

 

  • 其他方法的实现也都比较简单,不进行一一解析。最后注意一下,ArrayBlockingQueue的Iterator是弱一致的。
 
       ArrayBlockingQueue的代码解析完毕!
 
 
 

 

分享到:
评论

相关推荐

    aspose-words-15.8.0-jdk1.6

    aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...

    jdk-1.6-windows-64-01

    2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02

    okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.zip

    1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar

    JDK 1.6 64位(jdk-6u45-windows-x64(1.6 64))

    下载的压缩包文件"jdk-6u45-windows-x64(1.6 64).exe"是Windows 64位系统的安装程序。安装过程中,用户需要选择安装路径,并设置环境变量,包括`JAVA_HOME`指向JDK的安装目录,`PATH`添加JDK的bin目录,确保系统可以...

    jdk1.6集成jjwt的问题

    标题中的“jdk1.6集成jjwt的问题”指的是在Java Development Kit (JDK) 版本1.6的环境下,尝试整合JSON Web Token (JWT) 库jjwt时遇到的挑战。JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息作为 ...

    jdk-1.6-linux-64-3

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    java-jdk1.6-jdk-6u45-windows-x64.zip

    1. 解压缩"java-jdk1.6-jdk-6u45-windows-x64.zip"文件,这将释放出"jdk-6u45-windows-x64.exe"可执行文件。 2. 双击运行"jdk-6u45-windows-x64.exe",安装向导会引导你完成安装过程。通常,你需要选择安装路径,...

    jdk-jdk1.6.0.24-windows-i586.exe

    标题中的"jdk-jdk1.6.0.24-windows-i586.exe"是一个Java Development Kit(JDK)的安装程序,适用于Windows操作系统且为32位版本。JDK是Oracle公司提供的一个用于开发和运行Java应用程序的软件包。这个特定的版本,...

    logback-cfca-jdk1.6-3.1.0.0.jar

    logback-cfca-jdk1.6-3.1.0.0.jar

    jdk-1.6-linux-64-2

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    jdk-1.6-windows-32-3

    三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3

    jdk-1.6-linux-64-1

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    JDK1.6安装及与JDK-1.5版本共存

    ### JDK1.6安装及与JDK-1.5版本共存 #### 一、前言 随着软件开发环境的变化和技术的进步,不同的项目可能需要不同的Java版本来支持其运行。例如,在某些特定环境下,可能既需要使用JDK1.5(Java Development Kit ...

    苹果电脑安装jdk1.6 mac for jdk1.6 jdk6 安装版

    mac for jdk1.6 jdk6 安装版 里面有两个jdk1.6的安装包,都可以用 如果电脑上安装有1.7,1.8等高版本jdk就不要再下安装包了,安装包安装会报错 命令是这个:brew install java6或 brew install homebrew/cask-...

    zxing jar包,支持jdk1.6,包括源码

    - 这可能是ZXing库的完整源码包,专门针对JDK1.6编译,包含了所有必要的源文件和资源,供开发者进行更深度的定制和集成。 总之,ZXing库是一个强大的条形码和二维码工具,这个特别适配JDK1.6的版本为那些仍在使用...

    jdk-6u45-linux-x64.zip_jdk-1.6u45_jdk-6u45_jdk-6u45-linux-x64_jd

    这个压缩包文件"jdk-6u45-linux-x64.zip"包含的是JDK 1.6.0_45(也被称为6u45或1.6u45)的64位Linux版本。JDK 1.6是Java平台标准版的一个重要版本,它提供了许多功能和性能改进,是许多企业级应用的基础。 JDK 1.6u...

    jdk-1.6-linux-32-2

    jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3

    JDK-1.6-Windows-32位 官方

    压缩包中的文件`jdk-6u45-windows-i586.exe`是JDK 1.6更新45的Windows 32位安装程序。安装步骤通常包括: 1. 下载并运行安装程序。 2. 遵循安装向导的提示,选择安装路径和组件。 3. 设置环境变量,包括`JAVA_HOME`...

    jdk1.6官方版 jdk-6u45-windows-x64 下载

    java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64

    jdk1.6 Java编程工具jdk-6u10-windows-i586-p.exe

    Java编程开发工具包,最新版本,很好用,经典

Global site tag (gtag.js) - Google Analytics