`
liuluo129
  • 浏览: 116634 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

BlockingQueue阻塞队列分析(一)-ArrayBlockingQueue和LinkedBlockingQueue

    博客分类:
  • java
阅读更多

BlockingQueue接口

BlockingQueue接口继承自Queue接口:

public interface BlockingQueue<E>extends Queue<E>

与Queue队列相比,它是线程安全的。添加和移除元素有四类方式,其中add()、remove()、offer()、poll()、element()、peek()方法继承自Queue。

  Throws exception Special value Blocks Times out
Insert boolean add(e) boolean offer(e) void put(e) boolean offer(e, time, unit)
Remove E remove() E poll() E take() E poll(time, unit)
Examine E element() E peek() not applicable not applicable

 

通俗的说:

  • 不能搞时抛出异常;
  • 不能搞时返回null或false
  • 不能搞时一直等,直到能搞且搞到为止
  • 一段时间内尝试去搞,实在搞不了返回null或false

除此之外,还有以下方法:

int remainingCapacity();                  队列剩余存储空间

boolean remove(Object o);             删除队列中的特定元素

public boolean contains(Object o); 队列中是否包含特定元素

int drainTo(Collection<? super E> c);     把队列中的元素删除并添加到指定的集合中

int drainTo(Collection<? super E> c, int maxElements);   把队列中的maxElements个元素删除添加到指定集合

ArrayBlockingQueue

底层以数组来存储元素,且构造函数必须至少指定队列大小,只使用一个可重入锁来来控制线程访问

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 底层存储元素数组  */
    private final E[] items;
    private int takeIndex;
    private int putIndex;
    private int count;

    // 使用经典的two-condition算法进行并发控制
    private final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    // 指定队列容器,默认使用非公平锁
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        if (capacity < c.size())
            throw new IllegalArgumentException();

        for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
            add(it.next());
    }
...
}

put()和take()方法是经典的生产者-消费者算法:

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length)
                    notFull.await();  //使用while自旋锁
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e); //插入元素,释放notEmpty信号
        } finally {
            lock.unlock();
        }
    }
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

  take()方法如下:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }

 

LinkedBlockingQueue

LinkedBlockingQueue与ArrayBlockingQueue相比有以下不同:

  • 底层使用链表而非数组存储元素;
  • 使用两个锁来控制线程访问,这样队列可以同时进行put和take的操作,因此吞吐量相对ArrayBlockingQueue就高。
  • 可以不指定队列大小,此时默认大小为Integer.MAX_VALUE
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** 队列边界大小,不指定为默认为Integer.MAX_VALUE */
    private final int capacity;
    private final AtomicInteger count = new AtomicInteger(0);
    private transient Node<E> head;
    private transient Node<E> last;

    /** 控制take、poll的锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** 控制put、offer的锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(e);
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }
}

 看下put操作源码:

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 当队列大小等于队列边界大小时,使用自旋锁等待
            while (count.get() == capacity) { 
                    notFull.await();
            }
            enqueue(e);  // 队列最后插入元素
            c = count.getAndIncrement();
            // 队列不满,则发送notFull信号量,其他线程可以进行put
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            //队列为空,发送notEmpty信号量,通知线程可以进行take
            signalNotEmpty(); 
    }
    private void enqueue(E x) {
        // assert putLock.isHeldByCurrentThread();
        last = last.next = new Node<E>(x);
    }

 take()方法与put类似:

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

 使用Executors.newFixedThreadPool()方法,使用的就是LinkedBlockingQueue存储的等待线程队列。

 

分享到:
评论

相关推荐

    并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解

    ArrayBlockingQueue和LinkedBlockingQueue是Java并发容器中两个常用的阻塞队列实现,分别基于数组和链表存储元素。它们都继承自AbstractQueue类,并实现了BlockingQueue接口,提供了线程安全的队列操作。 ...

    14-阻塞队列BlockingQueue实战及其原理分析二.pdf

    阻塞队列(BlockingQueue)是一种特殊的队列,它支持两个附加操作:阻塞的插入方法put和阻塞的移除方法take。BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一...

    10、阻塞队列BlockingQueue实战及其原理分析

    阻塞队列BlockingQueue是Java并发编程中一个重要的数据结构,它是线程安全的队列,主要用于生产者消费者模型中的数据交换。在Java的`java.util.concurrent`包中,提供了多种实现阻塞队列的类,如`ArrayBlockingQueue...

    并发-线程池和阻塞队列

    常见的阻塞队列实现包括ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue等。阻塞队列常被用作线程池的工作队列,连接生产者(添加任务的线程)和消费者(执行任务的线程),实现任务的异步处理。 ...

    spring-blockingqueue:用Spring Boot阻止队列

    BlockingQueue是Java并发包`java.util.concurrent`中的一个接口,它提供了在队列满时阻塞插入操作和队列空时阻塞删除操作的能力。这种设计模式被称为生产者-消费者模型,它有效地解决了线程间的同步问题,避免了不必...

    BlockingQueue队列自定义超时时间取消线程池任务

    首先,`BlockingQueue`是一个并发容器,它遵循先进先出(FIFO)原则,具有阻塞性质,当队列满时,生产者线程会被阻塞,直到有消费者取走元素;当队列空时,消费者线程会被阻塞,直到生产者放入新的元素。常用实现如`...

    线程----BlockingQueue

    `BlockingQueue`是一种特殊类型的队列,主要用于多线程环境中的任务管理。它具有以下特性:当队列为空时,从队列中获取元素的操作会被阻塞;同样地,当队列满时,向队列中添加元素的操作也会被阻塞。这种特性使得`...

    10、阻塞队列BlockingQueue实战及其原理分析.pdf

    ### 10、阻塞队列BlockingQueue 实战及其原理分析 #### 一、阻塞队列概述 阻塞队列(BlockingQueue)是Java语言中`java.util.concurrent`包下提供的一种重要的线程安全队列。它继承自`Queue`接口,并在此基础上...

    支持多线程和泛型的阻塞队列

    在Java中,`java.util.concurrent`包提供了多种阻塞队列实现,如`ArrayBlockingQueue`, `LinkedBlockingQueue`等。它们都实现了`BlockingQueue`接口,提供了一套线程安全的方法来添加和移除元素,如`put()`, `take()...

    阻塞队列阻塞队列阻塞队列

    LinkedBlockingQueue是基于链表结构实现的阻塞队列,其内部节点为Node类,包含元素值和指向下一个节点的引用。与ArrayBlockingQueue不同,LinkedBlockingQueue的容量可以是Integer.MAX_VALUE(即无界队列),也可以...

    java模拟阻塞队列

    Java中的阻塞队列实现主要依赖于`java.util.concurrent`包下的几个类,如`BlockingQueue`接口、`ArrayBlockingQueue`、`LinkedBlockingQueue`等。`BlockingQueue`接口定义了一组操作,如`put`、`take`、`offer`等,...

    简单实现BlockingQueue,BlockingQueue源码详解

    1. **ArrayBlockingQueue**:基于数组的有界阻塞队列,容量固定,插入和删除操作的性能较高,但是因为有界,所以需要预先知道队列的最大容量。 2. **LinkedBlockingQueue**:基于链表的阻塞队列,默认无界,也可以...

    java并发工具包详解

    2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. ...

    java中线程队列BlockingQueue的用法

    在Java编程中,`BlockingQueue`(阻塞队列)是一种重要的并发工具,它结合了队列的数据结构和线程同步机制。`BlockingQueue`接口位于`java.util.concurrent`包中,提供了线程安全的数据结构,可以用于实现生产者-...

    Java并发编程--BlockingQueue.docx

    在 ArrayBlockingQueue 和 LinkedBlockingQueue 的实现中,当队列满时,生产者线程会进入等待状态;而当消费者线程从队列中取出元素,队列空位增加,此时会唤醒等待的生产者线程。这种同步机制通常依赖于 Java 的...

    java线程聊天室(阻塞队列实现)

    在Java中,阻塞队列的实现类包括ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue等。它们都实现了BlockingQueue接口,提供了put()和take()方法,分别用于插入和移除元素,这两个方法在队列满或空时...

    Java并发编程(21)并发新特性-阻塞队列和阻塞栈(含代

    2. `LinkedBlockingQueue`: 基于链表结构的阻塞队列,容量可以无限大,但内部维护了一个容量参数来控制性能。 3. `PriorityBlockingQueue`: 一个无界的阻塞队列,其中元素按照优先级进行排序。 4. `DelayQueue`: 一...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞...

    BlockingQueue的使用

    BlockingQueue是Java并发编程中非常重要的一个数据结构,它是一个具有阻塞特性的队列,主要用于线程间的协作。在多线程环境下,BlockingQueue能够有效地实现生产者-消费者模式,提高了程序的并发性能和效率。本文将...

Global site tag (gtag.js) - Google Analytics