`
haibin369
  • 浏览: 60533 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Java源码:阻塞队列(ArrayBlockingQueue)

    博客分类:
  • Java
阅读更多

一、简介

所谓阻塞队列,其实就是支持下面这两种阻塞功能的队列:

  • 当队列为空时,读取该队列可以阻塞直到队列不为空;
  • 当队列已满时,写入该队列可以阻塞直到队列不为满;

这种阻塞队列主要用于可以用来构建生产者-消费者模型,生产者只需要往队列中发送消息,而消费者也只需要专注于从队列中读取消息,剩下的同步、阻塞细节都交给阻塞队列把。


Java提供了下面7种阻塞队列,区别于底层数据结构的不同:

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
阻塞队列的接口是:java.util.concurrent.BlockingQueue,主要提供了以下存取方法(根据队列空或者满时的响应方式,可分成3类):
  立即返回结果值 超时返回结果值 阻塞 抛出异常
插入 offer(e) offer(e,time,unit) put(e) add(e)
移除 poll() poll(time,unit) take() remove()
读取 peek() element()

二、源码

  • ArrayBlockingQueue

    下面以ArrayBlockingQueue为例看看JDK的源码,其他的实现类,有先看一下它的主要属性:
        public class ArrayBlockingQueue<E> extends AbstractQueue<E>  
                implements BlockingQueue<E>, java.io.Serializable{  
            /** 底层用于存放队列元素的数组 */  
            final Object[] items;  
          
            /** 下一个获取索引,take,poll,peek和remove会用到 */  
            int takeIndex;  
          
            /** 下一个插入索引,put,offer和add方法会用到 */  
            int putIndex;  
          
            /** 当前队列中元素的个数 */  
            int count;  
          
            /** 用于控制并发操作队列的锁对象 */  
            final ReentrantLock lock;  
            /** 队列为非空的条件对象,用于唤醒阻塞中的读操作 */  
            private final Condition notEmpty;  
            /** 队列为非满的条件对象,用于唤醒阻塞中的写操作 */  
            private final Condition notFull;  
        }  
  • 写入

    1. offer(e)与offer(e,time,unit)
    首先是offer(e),逻辑比较简单,上锁、判断插入(不满则插入,满了则返回)、解锁。
        public boolean offer(E e) {  
            if (e == null) throw new NullPointerException();  
            final ReentrantLock lock = this.lock;  
            //锁住队列,防止在插入过程中的并发读写  
            lock.lock();  
            try {  
                //满了,返回false  
                if (count == items.length)  
                    return false;  
                else {  
                    //执行插入  
                    insert(e);  
                    return true;  
                }  
            } finally {  
                //释放锁  
                lock.unlock();  
            }  
        }  

    而offer(e,time,unit)相比offer(e)则多了阻塞给定时间的功能,注意下面的无条件for循环是为了防止多个线程同时被唤醒操作队列,因此每次都需要判断队列是否已满,是的话继续阻塞:
        public boolean offer(E e, long timeout, TimeUnit unit)  
                throws InterruptedException {  
          
            if (e == null) throw new NullPointerException();  
                long nanos = unit.toNanos(timeout);  
            //使用lockInterruptibly锁住队列,可以被中断  
            final ReentrantLock lock = this.lock;  
            lock.lockInterruptibly();  
            try {  
                for (;;) {  
                    //如果队列不满,则执行插入并返回true  
                    if (count != items.length) {  
                        insert(e);  
                        return true;  
                    }  
          
                    //如果nanos<=0,说明已经阻塞超过了给定时间了,直接返回false  
                    if (nanos <= 0)  
                        return false;  
                    try {  
                        //若该条件没被唤醒或者该线程没被中断,等待给定时间  
                        //如果等待中被唤醒,返回剩余的等待时间  
                        nanos = notFull.awaitNanos(nanos);  
                    } catch (InterruptedException ie) {  
                        notFull.signal(); // propagate to non-interrupted thread  
                        throw ie;  
                    }  
                }  
            } finally {  
                //释放锁  
                lock.unlock();  
            }  
        }  

    看到两个方法都是调用的insert(e)执行实际的插入,insert方法也比较简单,插入、非空唤醒
        private void insert(E x) {  
            items[putIndex] = x;  
            putIndex = inc(putIndex);  
            ++count;  
            notEmpty.signal();  
        }  

    2.add(e)
    add方法其实是调用了父类AbstractQueue的add方法:
        public boolean add(E e) {  
            if (offer(e))  
                return true;  
            else  
                throw new IllegalStateException("Queue full");  
        }  
    很简单,其实就是通过offer判断当前队列是否满了,是就立刻抛出异常。

    3. put(e) 其实put就相当与无限阻塞的offer(e,time,unit),也是在无限循环里面判断队列是否已满并插入,否则阻塞:
        public void put(E e) throws InterruptedException {  
            if (e == null) throw new NullPointerException();  
            final E[] items = this.items;  
            final ReentrantLock lock = this.lock;  
            lock.lockInterruptibly();  
            try {  
                try {  
                    //每次被唤醒都判断一下队列是否满了,是则继续阻塞  
                    while (count == items.length)  
                        notFull.await();  
                } catch (InterruptedException ie) {  
                    notFull.signal(); // propagate to non-interrupted thread  
                    throw ie;  
                }  
                insert(e);  
            } finally {  
                lock.unlock();  
            }  
        }  
  • 移除

    读取操作和写入其实大同小异,底层和核心逻辑都差不多,如果理解了上面关于读取的几个方法的话,读取就无需过多解释了。
    1.poll()与poll(time,unit)
        public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                if (count == 0)
                    return null;
                E x = extract();
                return x;
            } finally {
                lock.unlock();
            }
        }
    
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                //无限循环,确保每次唤醒都要判断当前是否为空
                for (; ; ) {
                    if (count != 0) {
                        E x = extract();
                        return x;
                    }
                    if (nanos <= 0)
                        return null;
                    try {
                        nanos = notEmpty.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        notEmpty.signal(); // propagate to non-interrupted thread
                        throw ie;
                    }
    
                }
            } finally {
                lock.unlock();
            }
        }
    
        private E extract() {
            //读取队列头元素并唤醒都有等待非满线程
            final E[] items = this.items;
            E x = items[takeIndex];
            items[takeIndex] = null;
            takeIndex = inc(takeIndex);
            --count;
            notFull.signal();
            return x;
        }

    2.take()
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                try {
                    //为空的情况下,无限阻塞
                    while (count == 0)
                        notEmpty.await();
                } catch (InterruptedException ie) {
                    notEmpty.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
                E x = extract();
                return x;
            } finally {
                lock.unlock();
            }
        }

    3.remove()
        public boolean remove(Object o) {
            if (o == null) return false;
            final E[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //遍历队列中的元素,使用equals方法判定相等则移除
                int i = takeIndex;
                int k = 0;
                for (;;) {
                    if (k++ >= count)
                        return false;
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    i = inc(i);
                }
    
            } finally {
                lock.unlock();
            }
        }
  • 读取

    首先说明一下,这下面的读取方法都是指读取队列头部的元素,因为如果构建消息队列,都是尾部插入,头部读取。读取的两个方法的核心逻辑其实都在peek()里:上锁 - 读取 - 解锁:
        public E element() {
            E x = peek();
            if (x != null)
                return x;
            else
                throw new NoSuchElementException();
        }
    
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //takeIndex会一直指向队列的头
                return (count == 0) ? null : items[takeIndex];
            } finally {
                lock.unlock();
            }
        }
以上就是我对ArrayBlockingQueue的理解,水平有限,恳请指教。
分享到:
评论

相关推荐

    Java源码解析阻塞队列ArrayBlockingQueue介绍

    Java源码解析阻塞队列ArrayBlockingQueue介绍 Java源码解析阻塞队列ArrayBlockingQueue介绍是Java中的一种阻塞队列实现,使用ReentrantLock和Condition来实现同步和阻塞机制。本文将对ArrayBlockingQueue的源码进行...

    Java源码解析阻塞队列ArrayBlockingQueue功能简介

    Java源码解析阻塞队列ArrayBlockingQueue功能简介 ArrayBlockingQueue是Java中一个重要的阻塞队列实现,它基于数组实现了有界阻塞队列,提供FIFO(First-In-First-Out)功能。该队列的头元素是最长时间呆在队列中的...

    Java源码解析阻塞队列ArrayBlockingQueue常用方法

    Java中的ArrayBlockingQueue是一个高效的并发数据结构,它是Java并发包`java.util.concurrent`下的一个阻塞队列。本文将深入解析ArrayBlockingQueue的常用方法及其内部实现机制。 ArrayBlockingQueue的核心是一个...

    ArrayBlockingQueue源码分析.docx

    `ArrayBlockingQueue` 是 Java 中实现并发编程时常用的一个线程安全的数据结构,它是一个有界的阻塞队列。在 `java.util.concurrent` 包下,`ArrayBlockingQueue` 继承自 `java.util.concurrent.BlockingQueue` 接口...

    ArrayBlockingQueue源码解析-动力节点共

    ArrayBlockingQueue是Java并发编程中一个重要的数据结构,它是JDK内置的并发队列,源自java.util.concurrent包下的`java.util.concurrent.ArrayBlockingQueue`类。这个队列基于数组实现,支持阻塞插入和移除操作,是...

    java队列

    2. **阻塞队列**:Java的`java.util.concurrent`包提供了阻塞队列,如`BlockingQueue`接口。这些队列在满时会阻止插入操作,空时会阻止提取操作,直到队列有足够空间或元素可用。`LinkedBlockingQueue`和`...

    java concurrent 精简源码

    Java并发库中实现阻塞队列的主要类有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue和DelayQueue等。它们在并发编程中起到缓冲和协调生产者与消费者线程的作用,实现高效的数据交换。 3. **线程...

    java队列Java系列2021.pdf

    阻塞队列在Java并发包java.util.concurrent中提供了多种实现,如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue和SynchronousQueue等,每种阻塞队列都根据其特性适用于不同的场景。...

    生产者消费者java源码

    在描述中提到的"生产者消费者java源码.txt"文件,可能包含了具体的实现细节。在这个文件中,我们可以预期找到`Producer`和`Consumer`类的定义,这两个类分别代表生产者和消费者。`Producer`类通常包含一个循环,用于...

    JAVA经典线程池源码

    - **工作队列**:用来存放等待执行的任务,常见的有无界队列(如`LinkedBlockingQueue`)和有界队列(如`ArrayBlockingQueue`)。 2. **线程池的创建与使用** - 使用`Executors`工厂类创建线程池,如`...

    javabitset源码-java_master:后端架构师技术图谱

    阻塞队列:ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue、PriorityBlockingQueue,采用锁机制;使用 ReentrantLock 锁。 集合 链表、数组 字典、关联数组 栈 Stack 是线程安全的。 内部使用...

    java爬虫源码

    7. **阻塞队列**:在Java中,如LinkedBlockingQueue、ArrayBlockingQueue等阻塞队列常用于线程间的通信和协调。它们允许生产者线程放入元素,消费者线程取出元素,而不会发生数据竞争。 8. **网络编程**:在爬虫...

    【死磕Java集合】-集合源码分析.pdf

    ArrayBlockingQueue是一种基于数组实现的阻塞队列,提供了线程安全的生产者消费者模型。ArrayBlockingQueue的继承体系中,它继承了AbstractQueue,实现了BlockingQueue接口。 ArrayBlockingQueue的主要属性包括元素...

    java 生产者消费者问题(源码)

    `BlockingQueue`是一个线程安全的数据结构,它提供了插入(`put`)和移除(`take`)操作,当队列为空时,`take()`操作会阻塞消费者线程,直到有新的元素被放入;同样,当队列满时,`put()`操作会阻塞生产者线程。 以下...

    简单实现BlockingQueue,BlockingQueue源码详解

    1. **ArrayBlockingQueue**:基于数组的有界阻塞队列,容量固定,插入和删除操作的性能较高,但是因为有界,所以需要预先知道队列的最大容量。 2. **LinkedBlockingQueue**:基于链表的阻塞队列,默认无界,也可以...

    java面试题合集

    - **ArrayBlockingQueue**: 由数组支持的固定长度的队列。 - **LinkedBlockingQueue**: 由链表支持的固定长度队列。 - **PriorityBlockingQueue**: 支持优先级排序的无界队列。 - **DelayQueue**: 元素具有延迟...

    队列(数据结构--Java版)

    3. **Queue 接口**:Java 的 `java.util.Queue` 是所有队列实现的抽象接口,定义了队列的基本操作,如 `offer()`(添加元素到队尾)、`poll()`(移除并返回队头元素)、`peek()`(查看但不移除队头元素)等。...

Global site tag (gtag.js) - Google Analytics