`
BrokenDreams
  • 浏览: 254000 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:100168
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(13)-LinkedBlockingQueue

阅读更多

Jdk1.6 JUC源码解析(13)-LinkedBlockingQueue

作者:大飞

 

功能简介:
  • LinkedBlockingQueue是一种基于单向链表实现的有界的(可选的,不指定默认int最大值)阻塞队列。队列中的元素遵循先入先出(FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。(在并发程序中,基于链表实现的队列和基于数组实现的队列相比,往往具有更高的吞吐量,但性能稍差一些)
源码分析:
  • 首先看下LinkedBlockingQueue内部的数据结构:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -6903933977591709194L;

    /**
     * Linked list node class
     */
    static class Node<E> {
        /** The item, volatile to ensure barrier separating write and read */
        volatile E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;
    /** 这里的count为原子量,避免了一些使用count的地方需要加两把锁。 */
    private final AtomicInteger count = new AtomicInteger(0);
    /** Head of linked list */
    private transient Node<E> head;
    /** Tail of linked list */
    private transient Node<E> last;
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

    /**
     * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    /**
     * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        for (E e : c)
            add(e);
    }
        首先可见,内部为单向链表;其次,内部为两把锁:存锁和取锁,并分别关联一个条件(是一种双锁队列)。

 

 

  • 还是从put和take入手,先看下put方法:
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset
        // local var holding count  negative to indicate failure unless set.
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from
             * capacity. Similarly for all other uses of count in
             * other wait guards.
             */
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            insert(e);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                /* 
                 * 注意这里的处理:和单锁队列不同,count为原子量,不需要锁保护。
                 * put过程中可能有其他线程执行多次get,所以这里需要判断一下当前
                 * 如果还有剩余容量,那么继续唤醒notFull条件上等待的线程。
                 */
                notFull.signal(); 
        } finally {
            putLock.unlock();
        }
        if (c == 0) //如果count又0变为1,说明在队列是空的情况下插入了1个元素,唤醒notNull条件上等待的线程。
            signalNotEmpty();
    }
    /**
     * Creates a node and links it at end of queue.
     * @param x the item
     */
    private void insert(E x) {
        last = last.next = new Node<E>(x);
    }
    /**
     * Signals a waiting take. Called only from put/offer (which do not
     * otherwise ordinarily lock takeLock.)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

 

       代码很容易看懂,再看下take方法实现:

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            try {
                while (count.get() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            x = extract();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    /**
     * Removes a node from head of queue,
     * @return the node
     */
    private E extract() {
        Node<E> first = head.next;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

       和put对等的逻辑,也很容易看懂。

 
  • 上面看到,主要方法里并没有同时用两把锁,但有些方法里会同时使用两把锁,比如remove方法等:
    public boolean remove(Object o) {
        if (o == null) return false;
        boolean removed = false;
        fullyLock();
        try {
            Node<E> trail = head;
            Node<E> p = head.next;
            while (p != null) {
                if (o.equals(p.item)) {
                    removed = true;
                    break;
                }
                trail = p;
                p = p.next;
            }
            if (removed) {
                p.item = null;
                trail.next = p.next;
                if (last == p)
                    last = trail;
                if (count.getAndDecrement() == capacity)
                    notFull.signalAll();
            }
        } finally {
            fullyUnlock();
        }
        return removed;
    }
    /**
     * Lock to prevent both puts and takes.
     */
    private void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }
    /**
     * Unlock to allow both puts and takes.
     */
    private void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

 

 

  • 其他方法的实现也比较容易看懂,这里就不做具体分析了。
 

       LinkedBlockingQueue的代码解析完毕! 

 

       参见:Jdk1.6 JUC源码解析(7)-locks-ReentrantLock

 

分享到:
评论

相关推荐

    aspose-words-15.8.0-jdk1.6

    aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...

    jdk-1.6-windows-64-01

    2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02

    okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.zip

    1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar

    JDK 1.6 64位(jdk-6u45-windows-x64(1.6 64))

    下载的压缩包文件"jdk-6u45-windows-x64(1.6 64).exe"是Windows 64位系统的安装程序。安装过程中,用户需要选择安装路径,并设置环境变量,包括`JAVA_HOME`指向JDK的安装目录,`PATH`添加JDK的bin目录,确保系统可以...

    jdk1.6集成jjwt的问题

    标题中的“jdk1.6集成jjwt的问题”指的是在Java Development Kit (JDK) 版本1.6的环境下,尝试整合JSON Web Token (JWT) 库jjwt时遇到的挑战。JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息作为 ...

    jdk-1.6-linux-64-3

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    java-jdk1.6-jdk-6u45-windows-x64.zip

    1. 解压缩"java-jdk1.6-jdk-6u45-windows-x64.zip"文件,这将释放出"jdk-6u45-windows-x64.exe"可执行文件。 2. 双击运行"jdk-6u45-windows-x64.exe",安装向导会引导你完成安装过程。通常,你需要选择安装路径,...

    jdk-jdk1.6.0.24-windows-i586.exe

    标题中的"jdk-jdk1.6.0.24-windows-i586.exe"是一个Java Development Kit(JDK)的安装程序,适用于Windows操作系统且为32位版本。JDK是Oracle公司提供的一个用于开发和运行Java应用程序的软件包。这个特定的版本,...

    logback-cfca-jdk1.6-3.1.0.0.jar

    logback-cfca-jdk1.6-3.1.0.0.jar

    jdk-1.6-linux-64-2

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    jdk-1.6-windows-32-3

    三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3

    jdk-1.6-linux-64-1

    三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3

    JDK1.6安装及与JDK-1.5版本共存

    ### JDK1.6安装及与JDK-1.5版本共存 #### 一、前言 随着软件开发环境的变化和技术的进步,不同的项目可能需要不同的Java版本来支持其运行。例如,在某些特定环境下,可能既需要使用JDK1.5(Java Development Kit ...

    苹果电脑安装jdk1.6 mac for jdk1.6 jdk6 安装版

    mac for jdk1.6 jdk6 安装版 里面有两个jdk1.6的安装包,都可以用 如果电脑上安装有1.7,1.8等高版本jdk就不要再下安装包了,安装包安装会报错 命令是这个:brew install java6或 brew install homebrew/cask-...

    zxing jar包,支持jdk1.6,包括源码

    - 这可能是ZXing库的完整源码包,专门针对JDK1.6编译,包含了所有必要的源文件和资源,供开发者进行更深度的定制和集成。 总之,ZXing库是一个强大的条形码和二维码工具,这个特别适配JDK1.6的版本为那些仍在使用...

    jdk-6u45-linux-x64.zip_jdk-1.6u45_jdk-6u45_jdk-6u45-linux-x64_jd

    这个压缩包文件"jdk-6u45-linux-x64.zip"包含的是JDK 1.6.0_45(也被称为6u45或1.6u45)的64位Linux版本。JDK 1.6是Java平台标准版的一个重要版本,它提供了许多功能和性能改进,是许多企业级应用的基础。 JDK 1.6u...

    jdk-1.6-linux-32-2

    jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3

    JDK-1.6-Windows-32位 官方

    压缩包中的文件`jdk-6u45-windows-i586.exe`是JDK 1.6更新45的Windows 32位安装程序。安装步骤通常包括: 1. 下载并运行安装程序。 2. 遵循安装向导的提示,选择安装路径和组件。 3. 设置环境变量,包括`JAVA_HOME`...

    jdk1.6官方版 jdk-6u45-windows-x64 下载

    java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64

    jdk1.6 Java编程工具jdk-6u10-windows-i586-p.exe

    Java编程开发工具包,最新版本,很好用,经典

Global site tag (gtag.js) - Google Analytics