`

集合框架 Queue篇(6)---LinkedBlockingQueue

 
阅读更多
Queue

------------
1.ArrayDeque, (数组双端队列)
2.PriorityQueue, (优先级队列)
3.ConcurrentLinkedQueue, (基于链表的并发队列)

4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
5.ArrayBlockingQueue,           (基于数组的并发阻塞队列)
6.LinkedBlockingQueue,        (基于链表的FIFO阻塞队列)
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
8.PriorityBlockingQueue,        (带优先级的无界阻塞队列)
9.SynchronousQueue                       (并发同步阻塞队列)
-----------------------------------------------------
LinkedBlockingQueue

采用对于的next构成链表的方式来存储对象。
由于读只操作队头,而写只操作队尾,这里巧妙地采用了两把锁,
对put和offer采用putLock,对take和poll采用takeLock,避免了读写时相互竞争锁的现象,
因此LinkedBlockingQueue在高并发读写操作都多的情况下,性能会比ArrayBlockingQueue好很多,
在遍历以及删除元素时则要把两把锁都锁住。

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    /**
     * 链表节点node类结构
     */
    static class Node<E> {
        volatile E item;//volatile使得所有的write happen-befor read,保证了数据的可见性 
        Node<E> next;
        Node(E x) { item = x; }
    }

    /** 队列容量,默认为Integer.MAX_VALUE*/
    private final int capacity;

    /** 用原子变量 表示当前元素的个数 */
    private final AtomicInteger count = new AtomicInteger(0);

    /** 表头节点 */
    private transient Node<E> head;

    /** 表尾节点 */
    private transient Node<E> last;

    /** 获取元素或删除元素时 要加的takeLock锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 获取元素 notEmpty条件 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 插入元素时 要加putLock锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 插入时,要判满 */
    private final Condition notFull = putLock.newCondition();

    /**
     * 唤醒等待的take操作,在put/offer中调用(因为这些操作中不会用到takeLock锁)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * 唤醒等待插入操作,在take/poll中调用.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    /**
     * 插入到尾部
     */
    private void insert(E x) {
        last = last.next = new Node<E>(x);
    }

    /**
     * 获取并移除头元素
     */
    private E extract() {
        Node<E> first = head.next;
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    /**
     * 锁住两把锁,在remove,clear等方法中调用
     */
    private void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    /**
     * 和fullyLock成对使用
     */
    private void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

    /**
     * 默认构造,容量为 Integer.MAX_VALUE 
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    /**
     *指定容量的构造
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    /**
     * 指定初始化集合的构造
     */
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        for (E e : c)
            add(e);
    }

    /**
     * 通过原子变量,直接获得大小
     */
    public int size() {
        return count.get();
    }

    /**
     *返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
     */
    public int remainingCapacity() {
        return capacity - count.get();
    }

    /**
     * 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            try {
                while (count.get() == capacity)
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            insert(e);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

    /**
     * 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            for (;;) {
                if (count.get() < capacity) {
                    insert(e);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                    break;
                }
                if (nanos <= 0)
                    return false;
                try {
                    nanos = notFull.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notFull.signal(); // propagate to a non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

    /**
     *将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),
     *在成功时返回 true,如果此队列已满,则返回 false。
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                insert(e);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

    //获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            try {
                while (count.get() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to a non-interrupted thread
                throw ie;
            }

            x = extract();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    //获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            for (;;) {
                if (count.get() > 0) {
                    x = extract();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                    break;
                }
                if (nanos <= 0)
                    return null;
                try {
                    nanos = notEmpty.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to a non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    //获取并移除此队列的头,如果此队列为空,则返回 null。
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = extract();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    //获取但不移除此队列的头;如果此队列为空,则返回 null。
    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * 从此队列移除指定元素的单个实例(如果存在)。
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        boolean removed = false;
        fullyLock();
        try {
            Node<E> trail = head;
            Node<E> p = head.next;
            while (p != null) {
                if (o.equals(p.item)) {
                    removed = true;
                    break;
                }
                trail = p;
                p = p.next;
            }
            if (removed) {
                p.item = null;
                trail.next = p.next;
                if (last == p)
                    last = trail;
                if (count.getAndDecrement() == capacity)
                    notFull.signalAll();
            }
        } finally {
            fullyUnlock();
        }
        return removed;
    }
    ……
}

--------------------------------------------------------
分享到:
评论

相关推荐

    Implementation-of-Queue-in-Java

    Java集合框架提供了一个名为`java.util.Queue`的接口,它是`Collection`接口的子接口。这个接口定义了队列操作的基本方法,如`add(E e)`(入队)、`remove()`(出队)、`element()`(获取但不移除队头元素)等。 3...

    java集合详细解释

    这篇博文链接虽然没有提供具体的内容,但是我们可以基于Java集合框架的常见知识点进行深入的解释。 首先,我们来了解一下Java集合框架的基本层次结构。在Java中,集合框架主要分为两大类:接口和实现。接口定义了...

    Java--collection.rar_SEP_java 集合

    6. **LinkedBlockingQueue** 和 **ConcurrentLinkedQueue** 是线程安全的队列实现,常用于多线程环境下的并发编程,它们实现了`Queue`接口。 7. **Collections** 类是集合框架的一个工具类,提供了静态方法来操作...

    关于java的集合类

    集合框架是Java SE(标准版)库的一部分,它包含多个接口和类,这些接口和类共同构成了处理对象集合的基石。在这个主题中,我们将深入探讨Java集合类的关键知识点。 1. **集合接口**: - `List`:有序的集合,允许...

    java 面试题 集合

    Java集合框架是Java编程语言中的核心部分,它提供了一种高效、灵活的数据组织方式,使得开发者可以方便地存储和操作对象。在Java面试中,集合相关的知识点常常是考察的重点,因为它们涉及到数据结构、内存管理和多...

    java面试——深圳-OPPO-Java高级.zip

    3. **集合框架**: - List、Set、Queue的实现及区别:ArrayList、LinkedList、HashSet、TreeSet、ArrayDeque等的特性与使用场景。 - 高级集合:ConcurrentHashMap、CopyOnWriteArrayList、LinkedBlockingQueue等...

    Using_Java_Queue.zip_java队列

    Java队列是Java集合框架中的一个关键组成部分,主要用于在多个线程之间同步数据传输或实现异步处理。队列遵循先进先出(FIFO)的原则,即最早添加到队列中的元素将首先被处理。本教程将深入探讨如何在Java中使用队列...

    Java Collections

    《Java Collections》是一本详尽阐述Java集合框架的书籍,作者为John Zukowski。本书深入探讨了Java集合类库的核心概念和技术细节,是学习和掌握Java集合的重要参考资料。 ### 书籍内容简介 #### 版权声明与出版...

    Java Collections.pdf

    集合框架主要包括了List、Set、Queue和Map四大接口,它们各自代表了不同类型的数据结构。List接口存储有序的元素,允许重复;Set接口存储无序且不允许重复的元素;Queue接口提供了先进先出(FIFO)的数据结构;而Map...

    数据结构面试专题.docx

    3. **Java集合框架** - **Collection接口**:所有集合类的顶级接口,包含List、Set等子接口。 - **Map接口**:不继承Collection,提供键值对映射,如HashMap、TreeMap和LinkedHashMap。 - **Set接口**:不包含...

    数据结构面试题及正确答案

    4. **Java集合框架**: - **Collection**接口是所有单列集合的父接口,包括`List`、`Set`。 - **Map**接口则用于存储键值对,如`HashMap`、`TreeMap`等。 - **集合之间的继承关系**:`ArrayList`和`LinkedList`...

    java中queue接口的使用详解

    Java中的`Queue`接口是Java集合框架的一部分,它位于`java.util`包中,是`Collection`接口的子接口。`Queue`接口主要用于实现队列数据结构,它遵循先进先出(FIFO)的原则,即最先添加的元素会被最先移除。在Java中...

    Java面试题

    #### 一、集合框架(Collection Framework) 在Java中,集合框架(Collection Framework)是处理各种不同类型的数据集的核心组件之一。它主要包括两大类:`Collection`和`Map`。 ##### Collection接口 `Collection`...

    Java数据结构(三)

    在Java中,我们可以使用多种方式来实现队列,包括使用集合框架中的Queue接口以及其具体实现类。 首先,Java的`java.util.Queue`接口定义了队列的主要操作,如`add()`用于入队,`remove()`或`poll()`用于出队,`peek...

    android-mvc

    - `LinkedList`或`ArrayList`可以作为基础队列结构,但它们在并发环境中不是线程安全的,需配合`synchronized`关键字或`java.util.concurrent`包中的并发集合使用。 - `java.util.concurrent`包提供了`...

    Android校招面试指南 2018最新版本

    - **Java集合框架**:这是Java编程中非常重要的一个部分,主要用于处理数据集合。 - **ArrayList**:一种动态数组实现的数据结构,适用于频繁查询场景。 - **LinkedList**:双链表实现的数据结构,适合频繁插入...

    Java 队列 Queue 用法实例详解

    首先,`java.util.Queue`是Java集合框架的一部分,它是所有队列类型的父接口。Queue接口提供了多种方法用于操作队列,包括添加元素、移除元素以及检查队首元素等。 1. 添加元素: - `add(E e)`:将指定的元素添加...

    java面试题合集

    - **Queue**: 如`ArrayDeque`、`LinkedBlockingQueue`等,用于存储FIFO(先进先出)的数据结构。 - **Map**: 如`HashMap`、`TreeMap`等,用于存储键值对。 **4. ArrayList与LinkedList的区别** - **数据结构**: -...

    Java技术手册:第6版.pdf.zip

    4. **集合框架**:手册详细阐述了Java集合框架,包括List、Set、Queue、Map等接口,以及ArrayList、HashSet、LinkedList、HashMap等实现类的使用。同时,也会讨论新引入的LinkedBlockingQueue和ConcurrentHashMap等...

    [计算机软件及应用]Java高级程序设计实验指导.doc

    集合框架包括List、Set、Queue等接口以及ArrayList、LinkedList、HashSet、HashMap等实现类。实验内容可能涵盖以下知识点: 1. **List接口**:List接口代表一个有序的集合,允许有重复元素。ArrayList和LinkedList...

Global site tag (gtag.js) - Google Analytics