`

并发容器——BlockingQueue相关类

阅读更多
java.util.concurrent提供了多种并发容器,总体上来说有4类
  • Queue类:BlockingQueue  ConcurrentLinkedQueue
  • Map类:ConcurrentMap
  • Set类:ConcurrentSkipListSet   CopyOnWriteArraySet
  • List类:CopyOnWriteArrayList

接下来一系列文章,我会对每一类的源码进行分析,试图让它们的实现机制完全暴露在大家面前。这篇主要是BlockingQueue及其相关类。
先给出结构图:


下面我按这样的顺序来展开:
  • 1、BlockingQueue
  • 2、ArrayBlockingQueue
  •      2.1 添加新元素的方法:add/put/offer
         2.2 该类的几个实例变量:takeIndex/putIndex/count/
         2.3 Condition实现
  • 3、LinkedBlockingQueue
  • 4、PriorityBlockingQueue
  • 5、DelayQueue<E extends Delayed>
  • 6、BlockingDque+LinkedBlockingQueue

其中前两个分析的尽量详细,为了方便大家看,基本贴出了所有相关源码。后面几个就用尽量用文字论述,如果看得吃力,建议对着jdk的源码看。


1、BlockingQueue
BlockingQueue继承了Queue,Queu是先入先出(FIFO),BlockingQueue是JDK 5.0新引入的。
根据队列null/full时的表现,BlockingQueue的方法分为以下几类:



至于为什么要使用并发容器,一个典型的例子就是生产者-消费者的例子,为了精简本文篇幅,放到附件中见附件:“生产者-消费者 测试.rar”。

另外,BlockingQueue接口定义的所有方法实现都是线程安全的,它的实现类里面都会用锁和其他控制并发的手段保证这种线程安全,但是这些类同时也实现了Collection接口(主要是AbstractQueue实现),所以会出现BlockingQueue的实现类也能同时使用Conllection接口方法,而这时会出现的问题就是像addAll,containsAll,retainAll和removeAll这类批量方法的实现不保证线程安全,举个例子就是addAll 10个items到一个ArrayBlockingQueue,可能中途失败但是却有几个item已经被放进这个队列里面了。

2、ArrayBlockingQueue
ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容)以及是否为公平锁(fair参数)。
在创建ArrayBlockingQueue的时候默认创建的是非公平锁,不过我们可以在它的构造函数里指定。这里调用ReentrantLock的构造函数创建锁的时候,调用了:
public ReentrantLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
}
FairSync/ NonfairSync是ReentrantLock的内部类:
线程按顺序请求获得公平锁,而一个非公平锁可以闯入,如果锁的状态可用,请求非公平锁的线程可在等待队列中向前跳跃,获得该锁。内部锁synchronized没有提供确定的公平性保证。

分三点来讲这个类:
  • 2.1 添加新元素的方法:add/put/offer
  • 2.2 该类的几个实例变量:takeIndex/putIndex/count/
  • 2.3 Condition实现


2.1 添加新元素的方法:add/put/offer

首先,谈到添加元素的方法,首先得分析以下该类同步机制中用到的锁:
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();//Condition Variable 1
notFull =  lock.newCondition();//Condition Variable 2

这三个都是该类的实例变量,只有一个锁lock,然后lock实例化出两个Condition,notEmpty/noFull分别用来协调多线程的读写操作。

1、
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;//每个对象对应一个显示的锁
        lock.lock();//请求锁直到获得锁(不可以被interrupte)
        try {
            if (count == items.length)//如果队列已经满了
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();//
        }
}
看insert方法:
private void insert(E x) {
        items[putIndex] = x;
		//增加全局index的值。
		/*
		Inc方法体内部:
		final int inc(int i) {
        return (++i == items.length)? 0 : i;
    		}
		这里可以看出ArrayBlockingQueue采用从前到后向内部数组插入的方式插入新元素的。如果插完了,putIndex可能重新变为0(在已经执行了移除操作的前提下,否则在之前的判断中队列为满)
   		*/
        putIndex = inc(putIndex); 
        ++count;
        notEmpty.signal();//wake up one waiting thread
}


2
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//请求锁直到得到锁或者变为interrupted
        try {
            try {
                while (count == items.length)//如果满了,当前线程进入noFull对应的等waiting状态
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            insert(e);
        } finally {
            lock.unlock();
        }
}


3、
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
	long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                if (count != items.length) {
                    insert(e);
                    return true;
                }
                if (nanos <= 0)
                    return false;
                try {
				//如果没有被 signal/interruptes,需要等待nanos时间才返回
                    nanos = notFull.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    notFull.signal(); // propagate to non-interrupted thread
                    throw ie;
                }
            }
        } finally {
            lock.unlock();
        }
    }


4、public boolean add(E e) {
	return super.add(e);
    }
父类:
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }




2.2 该类的几个实例变量:takeIndex/putIndex/count

用三个数字来维护这个队列中的数据变更:
/** items index for next take, poll or remove */
    private int takeIndex;
    /** items index for next put, offer, or add. */
    private int putIndex;
    /** Number of items in the queue */
    private int count;

提取元素的三个方法take/poll/remove内部都调用了这个方法:
private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];
        items[takeIndex] = null;//移除已经被提取出的元素
        takeIndex = inc(takeIndex);//策略和添加元素时相同
        --count;
        notFull.signal();//提醒其他在notFull这个Condition上waiting的线程可以尝试工作了
        return x;
    }

从这个方法里可见,tabkeIndex维护一个可以提取/移除元素的索引位置,因为takeIndex是从0递增的,所以这个类是FIFO队列。
putIndex维护一个可以插入的元素的位置索引。
count显然是维护队列中已经存在的元素总数。




2.3 Condition实现

Condition现在的实现只有java.util.concurrent.locks.AbstractQueueSynchoronizer内部的ConditionObject,并且通过ReentranLock的newCondition()方法暴露出来,这是因为Condition的await()/sinal()一般在lock.lock()与lock.unlock()之间执行,当执行condition.await()方法时,它会首先释放掉本线程持有的锁,然后自己进入等待队列。直到sinal(),唤醒后又会重新试图去拿到锁,拿到后执行await()下的代码,其中释放当前锁和得到当前锁都需要ReentranLock的tryAcquire(int arg)方法来判定,并且享受ReentranLock的重进入特性。
public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
           //加一个新的condition等待节点
 Node node = addConditionWaiter();
//释放自己的锁
            int savedState = fullyRelease(node); 
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
            //如果当前线程 等待状态时CONDITION,park住当前线程,等待condition的signal来解除
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }




3、LinkedBlockingQueue
单向链表结构的队列。如果不指定容量默认为Integer.MAX_VALUE。通过putLock和takeLock两个锁进行同步,两个锁分别实例化notFull和notEmpty两个Condtion,用来协调多线程的存取动作。其中某些方法(如remove,toArray,toString,clear等)的同步需要同时获得这两个锁,并且总是先putLock.lock紧接着takeLock.lock(在同一方法fullyLock中),这样的顺序是为了避免可能出现的死锁情况(我也想不明白为什么会是这样?)

4、PriorityBlockingQueue
看它的三个属性,就基本能看懂这个类了:
private final PriorityQueue<E> q;
    private final ReentrantLock lock = new ReentrantLock(true);
    private final Condition notEmpty = lock.newCondition();

q说明,本类内部数据结构是PriorityQueue,至于PriorityQueue怎么排序看我之前一篇文章:http://jiadongkai-sina-com.iteye.com/blog/825683
lock说明本类使用一个lock来同步读写等操作。
notEmpty协调队列是否有新元素提供,而队列满了以后会调用PriorityQueue的grow方法来扩容。


5、DelayQueue<E extends Delayed>
Delayed接口继承自Comparable<Delayed>,我们插入的E元素都要实现这个接口。
DelayQueue的设计目的间API文档:

An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past. If no delay has expired there is no head and poll will returnnull. Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero. Even though unexpired elements cannot be removed using take or poll, they are otherwise treated as normal elements. For example, the size method returns the count of both expired and unexpired elements. This queue does not permit null elements.

因为DelayQueue构造函数了里限定死不允许传入comparator(之前的PriorityBlockingQueue中没有限定死),即只能在compare方法里定义优先级的比较规则。再看上面这段英文,“The head of the queue is that Delayed element whose delay expired furthest in the past.”说明compare方法实现的时候要保证最先加入的元素最早结束延时。而 “Expiration occurs when an element's getDelay(TimeUnit.NANOSECONDS) method returns a value less than or equal to zero.”说明getDelay方法的实现必须保证延时到了返回的值变为<=0的int。

上面这段英文中,还说明了:在poll/take的时候,队列中元素会判定这个elment有没有达到超时时间,如果没有达到,poll返回null,而take进入等待状态。但是,除了这两个方法,队列中的元素会被当做正常的元素来对待。例如,size方法返回所有元素的数量,而不管它们有没有达到超时时间。而协调的Condition available只对take和poll是有意义的。

另外需要补充的是,在ScheduledThreadPoolExecutor中工作队列类型是它的内部类DelayedWorkQueue,而DelayedWorkQueue的Task容器是DelayQueue类型,而ScheduledFutureTask作为Delay的实现类作为Runnable的封装后的Task类。也就是说ScheduledThreadPoolExecutor是通过DelayQueue优先级判定规则来执行任务的。


6、BlockingDque+LinkedBlockingQueue
BlockingDque为阻塞双端队列接口,实现类有LinkedBlockingDque。双端队列特别之处是它首尾都可以操作。LinkedBlockingDque不同于LinkedBlockingQueue,它只用一个lock来维护读写操作,并由这个lock实例化出两个Condition notEmpty及notFull,而LinkedBlockingQueue读和写分别维护一个lock。

部分参考:http://rdc.taobao.com/team/jm/archives/539
  • 大小: 58.1 KB
  • 大小: 26.1 KB
2
3
分享到:
评论

相关推荐

    【2018最新最详细】并发多线程教程

    19.并发容器之BlockingQueue 20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解 21.线程池ThreadPoolExecutor实现原理 22.线程池之ScheduledThreadPoolExecutor 23.FutureTask基本操作总结 24.Java...

    并发编程——线程基础.pdf

    此外,线程同步还可以通过使用线程安全的集合,比如ConcurrentHashMap、BlockingQueue等,以及使用java.util.concurrent包下的并发工具类来实现。这些工具类提供了原子操作、线程安全的集合等,能够在多线程环境下...

    2011.08.30(2)——— java BlockingQueue ExecutorService

    标题 "2011.08.30(2)——— java BlockingQueue ExecutorService" 涉及到Java并发编程中的两个核心组件:BlockingQueue(阻塞队列)和ExecutorService。这篇博客可能深入探讨了如何使用这两个工具来优化多线程环境...

    JAVA并发容器代码随读1

    在`java.util.concurrent`包中,有四种主要的并发容器类型:队列(BlockingQueue)、Map(ConcurrentMap)、Set(ConcurrentSkipListSet和CopyOnWriteArraySet)以及List(CopyOnWriteArrayList)。这些容器的设计...

    Java 常见并发容器总结

    Java 常见并发容器总结 JDK 提供的这些容器大部分在 `java.util.concurrent` 包中。 - **`ConcurrentHashMap`** : 线程安全的 `HashMap` - **`CopyOnWriteArrayList`** : 线程安全的 `List`,在读多写少的场合性能...

    Java并发编程--BlockingQueue.docx

    【Java并发编程--BlockingQueue详解】 BlockingQueue 是 Java 并发包(java.util.concurrent)中的一个接口,它扩展了 Queue 接口,并引入了线程安全的特性,特别适合于多线程环境下的数据共享。 BlockingQueue 的...

    java并发学习之BlockingQueue实现生产者消费者详解

    "java并发学习之BlockingQueue实现生产者消费者详解" BlockingQueue是Java util.concurrent包下重要的数据结构,提供了线程安全的队列访问方式。在多线程应用中,常用于生产-消费场景。BlockingQueue有多种实现,...

    Java并发之BlockingQueue的使用

    【Java并发之BlockingQueue的使用】讲解了Java中用于并发编程的重要工具——BlockingQueue,它是一种线程安全的队列,特别适用于生产者/消费者的场景。 BlockingQueue的主要特性在于其在队列满或空时会阻塞相应的...

    并发容器和线程池,java并发编程3

    ### 并发容器和线程池 #### 一、并发容器 ##### 1.1 概述 在Java中,为了提高程序的并发处理能力,Java标准库提供了多个线程安全的并发容器,它们主要位于`java.util.concurrent`包中。这些容器能够有效地管理...

    并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解

    它们都继承自AbstractQueue类,并实现了BlockingQueue接口,提供了线程安全的队列操作。 ArrayBlockingQueue实现原理: ArrayBlockingQueue内部使用数组存储元素,使用ReentrantLock来保证线程安全,使用Condition...

    java线程并发blockingqueue类使用示例

    Java中的`BlockingQueue`是一个非常重要的并发工具类,它提供了线程安全的队列操作,主要用于生产者-消费者模式。这个接口定义了一种在多线程环境下高效、可靠的队列操作,它允许线程在队列为空时等待新元素的到来,...

    (PDF带目录)《Java 并发编程实战》,java并发实战,并发

    3. **并发容器**:书中详细讨论了`java.util.concurrent`包下的并发容器,如`ConcurrentHashMap`、`CopyOnWriteArrayList`和`BlockingQueue`等。这些容器设计为线程安全,可以提高多线程环境下的性能。 4. **并发...

    BlockingQueue队列自定义超时时间取消线程池任务

    在Java编程中,`BlockingQueue`是一个非常重要的并发工具类,它主要用于线程间的数据通信。`newFixedThreadPool`是`java.util.concurrent`包中的一个线程池工厂方法,用于创建固定数量线程的线程池。`FutureTask`则...

    26不让我进门,我就在门口一直等!—BlockingQueue和ArrayBlockingQueue.pdf

    【描述】:此文档是关于Java并发编程的学习资料,以漫画形式讲解,聚焦于Java并发编程中的核心概念——BlockingQueue接口及其具体实现ArrayBlockingQueue。 【标签】:“java”、“并发”、“编程”、“宝典” ...

    Java中的BlockingQueue:深入理解与实践应用

    在Java并发编程中,BlockingQueue是一个非常重要的接口,它提供了线程安全的队列操作,特别是在生产者-消费者模式中发挥着核心作用。本文将深入探讨BlockingQueue的工作原理、常见实现、使用场景以及代码示例。 在...

    实战Java高并发程序设计(高清版)

    2. **并发容器**:Java提供了一系列优化过的并发容器,如`ConcurrentHashMap`、`BlockingQueue`、`CountDownLatch`和`CyclicBarrier`等,这些容器在多线程环境下提供了高效且安全的数据共享机制。 3. **并发设计...

    Java并发编程

    书中也可能分析了并发集合类的内部实现,如ConcurrentHashMap的工作机制,以及并发容器如BlockingQueue的应用场景。 《JAVA并发编程实践》(Doug Lea著)是Java并发领域的经典之作,作者是Java并发API的主要设计者...

    JAVA并发编程实践.pdf+高清版+目录 书籍源码

    此外,书中还介绍了Java并发容器,如ConcurrentHashMap、CopyOnWriteArrayList和BlockingQueue等,这些都是为并发环境设计的高效数据结构。它们在多线程环境下的性能和线程安全特性使得开发者能更方便地实现并发操作...

    线程----BlockingQueue

    `BlockingQueue`接口有几个具体实现类,根据不同的应用场景可以选择不同的实现: - **ArrayBlockingQueue**: 一种固定大小的`BlockingQueue`,在构造时需要指定其容量大小。队列中的元素按照FIFO(先进先出)原则...

Global site tag (gtag.js) - Google Analytics