`
BrokenDreams
  • 浏览: 253746 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:100042
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(22)-LinkedBlockingDeque

阅读更多

Jdk1.6 JUC源码解析(22)-LinkedBlockingDeque

作者:大飞

 

功能简介:
  • LinkedBlockingDeque是一种基于双向链表实现的有界的(可选的,不指定默认int最大值)阻塞双端队列。
       双端队列一般适用于工作密取模式,即每个消费者都拥有自己的双端队列,如果某个消费者完成了自己队列的全部任务,可以到其他消费者双端队列尾部秘密获取任务来处理。
 
源码分析:
  • LinkedBlockingDeque实现了BlockingDeque接口,简单看下这个接口:

 

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
    /**
     * 将元素插入队头,如果队列满了,抛出IllegalStateException。
     */
    void addFirst(E e);
    /**
     * 将元素插入队尾,如果队列满了,抛出IllegalStateException。
     */
    void addLast(E e);
    /**
     * 将元素插入队头,如果成功,返回true;如果队列满了,返回false。
     */
    boolean offerFirst(E e);
    /**
     * 将元素插入队尾,如果成功,返回true;如果队列满了,返回false。
     */
    boolean offerLast(E e);
    /**
     * 将元素插入队头。如果队列满了,阻塞等待,直到队列有可用空间。
     */
    void putFirst(E e) throws InterruptedException;
    /**
     * 将元素插入队尾。如果队列满了,阻塞等待,直到队列有可用空间。
     */
    void putLast(E e) throws InterruptedException;
    /**
     * 将元素插入队头。如果队列满了,阻塞等待。
     * 如果插入成功,返回true;如果等待超时,返回false。
     */
    boolean offerFirst(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 将元素插入队尾。如果队列满了,阻塞等待。
     * 如果插入成功,返回true;如果等待超时,返回false。
     */
    boolean offerLast(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 获取并移除队头元素,如果队头没有元素,阻塞等待,直到有元素可以获取。
     */
    E takeFirst() throws InterruptedException;
    /**
     * 获取并移除队尾元素,如果队头没有元素,阻塞等待,直到有元素可以获取。
     */
    E takeLast() throws InterruptedException;
    /**
     * 获取并移除队头元素,如果队头没有元素,阻塞等待。
     * 如果过了给定的时间还不能获取元素,返回null。
     */
    E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 获取并移除队尾元素,如果队头没有元素,阻塞等待。
     * 如果过了给定的时间还不能获取元素,返回null。
     */
    E pollLast(long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 从队列中删除第一个出现的指定元素。
     * 如果有这个元素并删除,返回true;如果没有指定元素,返回false。
     */
    boolean removeFirstOccurrence(Object o);
    /**
     * 从队列中删除最后一个出现的指定元素。
     * 如果有这个元素并删除,返回true;如果没有指定元素,返回false。
     */
    boolean removeLastOccurrence(Object o);
    // *** BlockingQueue methods ***
    /**
     * 添加一个元素到队尾,如果成功,返回true。如果队列满了,抛出IllegalStateException。
     */
    boolean add(E e);
    /**
     * 添加一个元素到队尾。添加成功,返回treue;如果队列满了,返回false。
     */
    boolean offer(E e);
    /**
     * 将元素插入队尾。如果队列满了,阻塞等待。
     */
    void put(E e) throws InterruptedException;
    /**
     * 将元素插入队尾。如果队列满了,阻塞等待。
     * 如果插入成功,返回true;如果等待超时,返回false。
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 获取并移除队头元素,如果队头没有元素,抛出NoSuchElementException异常。
     */
    E remove();
    /**
     * 获取并移除队头元素,如果队头没有元素,返回null。
     */
    E poll();
    /**
     * 获取并移除队头元素,如果队头没有元素,阻塞等待,直到有元素可以获取。
     */
    E take() throws InterruptedException;
    /**
     * 获取并移除队头元素,如果队头没有元素,阻塞等待。
     * 如果过了给定的时间还不能获取元素,返回null。
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 查看队头元素,如果队头没有元素,抛出NoSuchElementException。
     */
    E element();
    /**
     * 查看队头元素,如果队头没有元素,返回null。
     */
    E peek();
    /**
     * 从队列中删除第一个出现的指定元素。
     * 如果有这个元素并删除,返回true;如果没有指定元素,返回false。
     */
    boolean remove(Object o);
    /**
     * 查看队列是否包含给定的元素,包含返回true;不包含返回false。
     */
    public boolean contains(Object o);
    /**
     * 返回队列中元素数量。
     */
    public int size();
    /**
     * 返回一个从头到尾排序的迭代器。
     */
    Iterator<E> iterator();
    // *** Stack methods ***
    /**
     * 相当于addFirst(Object)
     */
    void push(E e);
}
 

 

 

  • 接下来看下LinkedBlockingDeque内部数据结构:

 

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>,  java.io.Serializable {
    private static final long serialVersionUID = -387911632671998426L;
    /** Doubly-linked list node class */
    static final class Node<E> {
       /**
        * 保存元素的域,如果为nul说明当前节点已被删除。
        */
	E item;
        /**
         * - 指向其前驱节点。
         * - 如果指向自身,说明前面是队尾节点。
         * - 如果为null,说明没有前驱节点。
         */
        Node<E> prev;
        /**
         * - 指向其后继节点。
         * - 如果指向自身,说明后面是队头节点。
         * - 如果为null,说明没有后继节点。
         */
        Node<E> next;
        Node(E x, Node<E> p, Node<E> n) {
            item = x;
            prev = p;
            next = n;
        }
    }
    /** 指向队头节点 */
    transient Node<E> first;
    /** 指向队尾节点 */
    transient Node<E> last;
    /** 队列中元素数量 */
    private transient int count;
    /** 队列最大容量 */
    private final int capacity;
    /** 队列中保护访问使用的锁 */
    final ReentrantLock lock = new ReentrantLock();
    /** 获取元素的等待条件(队列非空) */
    private final Condition notEmpty = lock.newCondition();
    /** 插入元素的等待条件(队列非满) */
    private final Condition notFull = lock.newCondition();
    /**
     * 不指定容量,默认为Integer.MAX_VALUE(可认为无限)。
     */
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

       内部结构:一个双向链表、一把锁和两个锁条件。

 
  • 再看下内部的一些基础操作,这些操作必须在持有锁的前提下调用:
    /**
     * Links e as first element, or returns false if full.
     */
    private boolean linkFirst(E e) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false; //如果队列已满,返回false;
        Node<E> f = first;
        //新建节点x,用来存放数据e,将e插入到队头节点前面。
        Node<E> x = new Node<E>(e, null, f);
        //然后将e设置为队头节点。 
        first = x;
        if (last == null)
            last = x; //如果没有队尾节点,那么将x设置为队尾节点。
        else
            f.prev = x; //如果有队尾节点,那么将f的prev指向x,完成节点拼接。
        ++count; // 累加当前元素计数
        notEmpty.signal(); //有元素入队了,唤醒在notEmpty上等待的获取元素的线程。
        return true;
    }

       linkFirst就是将一个元素插入到队头,并成为新的队头元素。 

 

    /**
     * Links e as last element, or returns false if full.
     */
    private boolean linkLast(E e) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false; //如果队列已满,返回false;
        Node<E> l = last;
        //新建节点x,用来存放数据e,将e插入到队尾节点后面。
        Node<E> x = new Node<E>(e, l, null);
        last = x;
        if (first == null)
            first = x; //如果没有队头节点,那么将x设置为队头节点。
        else
            l.next = x; //如果有队头节点,那么将l的next指向x,完成节点拼接。
        ++count; // 累加当前元素计数
        notEmpty.signal(); //有元素入队了,唤醒在notEmpty上等待的获取元素的线程。
        return true;
    }

       linkLast就是将一个元素插入到队尾,并成为新的队尾元素。 

 

    /**
     * Removes and returns first element, or null if empty.
     */
    private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first; //获取队头节点f。
        if (f == null)
            return null; //如果没有队头节点,返回null。
        Node<E> n = f.next; //获取f的后继节点。
        E item = f.item; //获取f的元素item。
        f.item = null; //将f的item域置空。
        f.next = f; // 将f的next域指向自身,帮助GC。
        first = n; //将f的后继节点n设置为新的队头节点。
        if (n == null)
            last = null; //如果n为空,说明队列为空了,把队尾节点也置空一下。
        else
            n.prev = null; //如果n不为空,现在n是队头节点,需要将其prev域置空。
        --count; //递减当前元素计数
        notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。
        return item; //返回元素item。
    }

       unlinkFirst就是将现有的队头节点移除,并将其后继节点设置为新的队头节点,并返回移除的队头节点中保存的元素。 

 

    /**
     * Removes and returns last element, or null if empty.
     */
    private E unlinkLast() {
        // assert lock.isHeldByCurrentThread();
        Node<E> l = last; //获取队尾节点l。
        if (l == null)
            return null; //如果没有队尾节点,返回null。
        Node<E> p = l.prev; //获取l的前驱节点。
        E item = l.item; //获取l的元素item。
        l.item = null; //将l的item域置空。
        l.prev = l; // 将f的prev域指向自身,帮助GC。
        last = p; //将l的前驱节点p设置为新的队尾节点。
        if (p == null)
            first = null; //如果p为空,说明队列为空了,把队头节点也置空一下。
        else
            p.next = null; //如果p不为空,现在p是队尾节点,需要将其next域置空。
        --count; //递减当前元素计数
        notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。
        return item; //返回元素item。
    }

      unlinkLast就是将现有的队尾节点移除,并将其前驱节点设置为新的队尾节点,并返回移除的队尾节点中保存的元素。 

 

    /**
     * Unlinks x
     */
    void unlink(Node<E> x) {
        // assert lock.isHeldByCurrentThread();
        Node<E> p = x.prev;
        Node<E> n = x.next;
        if (p == null) {
           unlinkFirst(); //如果x没有前驱节点,那么x就是队头节点,所以调用一下unlinkFirst就可以了。
        } else if (n == null) {
          unlinkLast(); //如果x没有后继节点,那么x就是队尾节点,所以调用一下unlinkLast就可以了。
        } else {
            p.next = n; //将x的前驱节点p的next指向x的后继节点n。
            n.prev = p; //将x的后继节点n的prev指向x的前驱节点n。
            x.item = null; //置空x的item域。
            // 注意上面并没有清理x本身的prev和next域,因为它们可能正在被某个迭代器使用中。
            --count; //递减当前元素计数
            notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。
        }
    }

 

 

  • LinkedBlockingDeque中利用上述基础方法来实现BlockingDeque接口定义的方法,随便看几个:

 

    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkFirst(e))
                notFull.await(); //如果插入元素到队头失败,在notFull条件上等待。
        } finally {
            lock.unlock();
        }
    }

    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(e))
                notFull.await(); //如果插入元素到队尾失败,在notFull条件上等待。
        } finally {
            lock.unlock();
        }
    }
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await(); //如果从队头获取并删除元素失败,在notEmpty条件上等待。
            return x;
        } finally {
            lock.unlock();
        }
    }
    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkLast()) == null)
                notEmpty.await(); //如果从队尾获取并删除元素失败,在notEmpty条件上等待。
            return x;
        } finally {
            lock.unlock();
        }
    }

 

 

  • 其他的方法也很容易看懂了,这里就不啰嗦了。最后注意LinkedBlockingDeque的迭代器是弱一致的,而且支持双向迭代器。
 
       LinkedBlockingDeque的代码解析完毕!
 
 
 
分享到:
评论

相关推荐

    aspose-words-15.8.0-jdk1.6

    aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...

    jdk-1.6-windows-64-01

    2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02

    java-jdk1.6-jdk-6u45-windows-x64.zip

    1. 解压缩"java-jdk1.6-jdk-6u45-windows-x64.zip"文件,这将释放出"jdk-6u45-windows-x64.exe"可执行文件。 2. 双击运行"jdk-6u45-windows-x64.exe",安装向导会引导你完成安装过程。通常,你需要选择安装路径,...

    JDK 1.6 64位(jdk-6u45-windows-x64(1.6 64))

    下载的压缩包文件"jdk-6u45-windows-x64(1.6 64).exe"是Windows 64位系统的安装程序。安装过程中,用户需要选择安装路径,并设置环境变量,包括`JAVA_HOME`指向JDK的安装目录,`PATH`添加JDK的bin目录,确保系统可以...

    jdk1.6集成jjwt的问题

    标题中的“jdk1.6集成jjwt的问题”指的是在Java Development Kit (JDK) 版本1.6的环境下,尝试整合JSON Web Token (JWT) 库jjwt时遇到的挑战。JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息作为 ...

    jdk-1.6-linux-64-3

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.zip

    1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar

    jdk-jdk1.6.0.24-windows-i586.exe

    标题中的"jdk-jdk1.6.0.24-windows-i586.exe"是一个Java Development Kit(JDK)的安装程序,适用于Windows操作系统且为32位版本。JDK是Oracle公司提供的一个用于开发和运行Java应用程序的软件包。这个特定的版本,...

    jdk-1.6-linux-64-2

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    jdk-1.6-windows-32-3

    三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3

    jdk-1.6-linux-64-1

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    JDK1.6安装及与JDK-1.5版本共存

    ### JDK1.6安装及与JDK-1.5版本共存 #### 一、前言 随着软件开发环境的变化和技术的进步,不同的项目可能需要不同的Java版本来支持其运行。例如,在某些特定环境下,可能既需要使用JDK1.5(Java Development Kit ...

    logback-cfca-jdk1.6-3.1.0.0.jar

    logback-cfca-jdk1.6-3.1.0.0.jar

    zxing jar包,支持jdk1.6,包括源码

    - 这可能是ZXing库的完整源码包,专门针对JDK1.6编译,包含了所有必要的源文件和资源,供开发者进行更深度的定制和集成。 总之,ZXing库是一个强大的条形码和二维码工具,这个特别适配JDK1.6的版本为那些仍在使用...

    jdk-6u45-linux-x64.zip_jdk-1.6u45_jdk-6u45_jdk-6u45-linux-x64_jd

    这个压缩包文件"jdk-6u45-linux-x64.zip"包含的是JDK 1.6.0_45(也被称为6u45或1.6u45)的64位Linux版本。JDK 1.6是Java平台标准版的一个重要版本,它提供了许多功能和性能改进,是许多企业级应用的基础。 JDK 1.6u...

    jdk-1.6-linux-32-2

    jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3

    JDK-1.6-Windows-32位 官方

    压缩包中的文件`jdk-6u45-windows-i586.exe`是JDK 1.6更新45的Windows 32位安装程序。安装步骤通常包括: 1. 下载并运行安装程序。 2. 遵循安装向导的提示,选择安装路径和组件。 3. 设置环境变量,包括`JAVA_HOME`...

    jdk1.6官方版 jdk-6u45-windows-x64 下载

    java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64

    Linux64位JDK1.6(jdk-6u45-linux-x64.bin)

    Linux64位环境下的jdk6安装包:jdk-6u45-linux-x64.bin。 由于积分无法修改,现提供网盘下载地址: https://pan.baidu.com/s/1BE55ImTxZTQO6T22051P2g 提取码:5wvm

    jdk1.6 Java编程工具jdk-6u10-windows-i586-p.exe

    Java编程开发工具包,最新版本,很好用,经典

Global site tag (gtag.js) - Google Analytics