一、简介
所谓阻塞队列,其实就是支持下面这两种阻塞功能的队列:
- 当队列为空时,读取该队列可以阻塞直到队列不为空;
- 当队列已满时,写入该队列可以阻塞直到队列不为满;
这种阻塞队列主要用于可以用来构建生产者-消费者模型,生产者只需要往队列中发送消息,而消费者也只需要专注于从队列中读取消息,剩下的同步、阻塞细节都交给阻塞队列把。
Java提供了下面7种阻塞队列,区别于底层数据结构的不同:
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
立即返回结果值 | 超时返回结果值 | 阻塞 | 抛出异常 | |
插入 | offer(e) | offer(e,time,unit) | put(e) | add(e) |
移除 | poll() | poll(time,unit) | take() | remove() |
读取 | peek() | 无 | 无 | element() |
二、源码
-
ArrayBlockingQueue
下面以ArrayBlockingQueue为例看看JDK的源码,其他的实现类,有先看一下它的主要属性:public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable{ /** 底层用于存放队列元素的数组 */ final Object[] items; /** 下一个获取索引,take,poll,peek和remove会用到 */ int takeIndex; /** 下一个插入索引,put,offer和add方法会用到 */ int putIndex; /** 当前队列中元素的个数 */ int count; /** 用于控制并发操作队列的锁对象 */ final ReentrantLock lock; /** 队列为非空的条件对象,用于唤醒阻塞中的读操作 */ private final Condition notEmpty; /** 队列为非满的条件对象,用于唤醒阻塞中的写操作 */ private final Condition notFull; }
-
写入
1. offer(e)与offer(e,time,unit)
首先是offer(e),逻辑比较简单,上锁、判断插入(不满则插入,满了则返回)、解锁。public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; //锁住队列,防止在插入过程中的并发读写 lock.lock(); try { //满了,返回false if (count == items.length) return false; else { //执行插入 insert(e); return true; } } finally { //释放锁 lock.unlock(); } }
而offer(e,time,unit)相比offer(e)则多了阻塞给定时间的功能,注意下面的无条件for循环是为了防止多个线程同时被唤醒操作队列,因此每次都需要判断队列是否已满,是的话继续阻塞:public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); //使用lockInterruptibly锁住队列,可以被中断 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //如果队列不满,则执行插入并返回true if (count != items.length) { insert(e); return true; } //如果nanos<=0,说明已经阻塞超过了给定时间了,直接返回false if (nanos <= 0) return false; try { //若该条件没被唤醒或者该线程没被中断,等待给定时间 //如果等待中被唤醒,返回剩余的等待时间 nanos = notFull.awaitNanos(nanos); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { //释放锁 lock.unlock(); } }
看到两个方法都是调用的insert(e)执行实际的插入,insert方法也比较简单,插入、非空唤醒private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
2.add(e)
add方法其实是调用了父类AbstractQueue的add方法:public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
很简单,其实就是通过offer判断当前队列是否满了,是就立刻抛出异常。
3. put(e) 其实put就相当与无限阻塞的offer(e,time,unit),也是在无限循环里面判断队列是否已满并插入,否则阻塞:public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { //每次被唤醒都判断一下队列是否满了,是则继续阻塞 while (count == items.length) notFull.await(); } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } }
-
移除
读取操作和写入其实大同小异,底层和核心逻辑都差不多,如果理解了上面关于读取的几个方法的话,读取就无需过多解释了。
1.poll()与poll(time,unit)public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //无限循环,确保每次唤醒都要判断当前是否为空 for (; ; ) { if (count != 0) { E x = extract(); return x; } if (nanos <= 0) return null; try { nanos = notEmpty.awaitNanos(nanos); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } } private E extract() { //读取队列头元素并唤醒都有等待非满线程 final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; }
2.take()public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { //为空的情况下,无限阻塞 while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } E x = extract(); return x; } finally { lock.unlock(); } }
3.remove()public boolean remove(Object o) { if (o == null) return false; final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { //遍历队列中的元素,使用equals方法判定相等则移除 int i = takeIndex; int k = 0; for (;;) { if (k++ >= count) return false; if (o.equals(items[i])) { removeAt(i); return true; } i = inc(i); } } finally { lock.unlock(); } }
-
读取
首先说明一下,这下面的读取方法都是指读取队列头部的元素,因为如果构建消息队列,都是尾部插入,头部读取。读取的两个方法的核心逻辑其实都在peek()里:上锁 - 读取 - 解锁:public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //takeIndex会一直指向队列的头 return (count == 0) ? null : items[takeIndex]; } finally { lock.unlock(); } }
相关推荐
Java源码解析阻塞队列ArrayBlockingQueue介绍 Java源码解析阻塞队列ArrayBlockingQueue介绍是Java中的一种阻塞队列实现,使用ReentrantLock和Condition来实现同步和阻塞机制。本文将对ArrayBlockingQueue的源码进行...
Java源码解析阻塞队列ArrayBlockingQueue功能简介 ArrayBlockingQueue是Java中一个重要的阻塞队列实现,它基于数组实现了有界阻塞队列,提供FIFO(First-In-First-Out)功能。该队列的头元素是最长时间呆在队列中的...
Java中的ArrayBlockingQueue是一个高效的并发数据结构,它是Java并发包`java.util.concurrent`下的一个阻塞队列。本文将深入解析ArrayBlockingQueue的常用方法及其内部实现机制。 ArrayBlockingQueue的核心是一个...
`ArrayBlockingQueue` 是 Java 中实现并发编程时常用的一个线程安全的数据结构,它是一个有界的阻塞队列。在 `java.util.concurrent` 包下,`ArrayBlockingQueue` 继承自 `java.util.concurrent.BlockingQueue` 接口...
ArrayBlockingQueue是Java并发编程中一个重要的数据结构,它是JDK内置的并发队列,源自java.util.concurrent包下的`java.util.concurrent.ArrayBlockingQueue`类。这个队列基于数组实现,支持阻塞插入和移除操作,是...
2. **阻塞队列**:Java的`java.util.concurrent`包提供了阻塞队列,如`BlockingQueue`接口。这些队列在满时会阻止插入操作,空时会阻止提取操作,直到队列有足够空间或元素可用。`LinkedBlockingQueue`和`...
Java并发库中实现阻塞队列的主要类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue和DelayQueue等。它们在并发编程中起到缓冲和协调生产者与消费者线程的作用,实现高效的数据交换。 3. **线程...
阻塞队列在Java并发包java.util.concurrent中提供了多种实现,如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue和SynchronousQueue等,每种阻塞队列都根据其特性适用于不同的场景。...
在描述中提到的"生产者消费者java源码.txt"文件,可能包含了具体的实现细节。在这个文件中,我们可以预期找到`Producer`和`Consumer`类的定义,这两个类分别代表生产者和消费者。`Producer`类通常包含一个循环,用于...
- **工作队列**:用来存放等待执行的任务,常见的有无界队列(如`LinkedBlockingQueue`)和有界队列(如`ArrayBlockingQueue`)。 2. **线程池的创建与使用** - 使用`Executors`工厂类创建线程池,如`...
阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...
7. **阻塞队列**:在Java中,如LinkedBlockingQueue、ArrayBlockingQueue等阻塞队列常用于线程间的通信和协调。它们允许生产者线程放入元素,消费者线程取出元素,而不会发生数据竞争。 8. **网络编程**:在爬虫...
ArrayBlockingQueue是一种基于数组实现的阻塞队列,提供了线程安全的生产者消费者模型。ArrayBlockingQueue的继承体系中,它继承了AbstractQueue,实现了BlockingQueue接口。 ArrayBlockingQueue的主要属性包括元素...
`BlockingQueue`是一个线程安全的数据结构,它提供了插入(`put`)和移除(`take`)操作,当队列为空时,`take()`操作会阻塞消费者线程,直到有新的元素被放入;同样,当队列满时,`put()`操作会阻塞生产者线程。 以下...
1. **ArrayBlockingQueue**:基于数组的有界阻塞队列,容量固定,插入和删除操作的性能较高,但是因为有界,所以需要预先知道队列的最大容量。 2. **LinkedBlockingQueue**:基于链表的阻塞队列,默认无界,也可以...
- **ArrayBlockingQueue**: 由数组支持的固定长度的队列。 - **LinkedBlockingQueue**: 由链表支持的固定长度队列。 - **PriorityBlockingQueue**: 支持优先级排序的无界队列。 - **DelayQueue**: 元素具有延迟...
3. **Queue 接口**:Java 的 `java.util.Queue` 是所有队列实现的抽象接口,定义了队列的基本操作,如 `offer()`(添加元素到队尾)、`poll()`(移除并返回队头元素)、`peek()`(查看但不移除队头元素)等。...