`
g21121
  • 浏览: 696105 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java并发之BlockingQueue

 
阅读更多

        一、Queue

        Queue是队列接口是 Collection的子接口。除了基本的 Collection操作外,队列还提供其他的插入、提取和检查操作。每个方法都存在两种形式:一种抛出异常(操作失败时),另一种返回一个特殊值(null 或 false,具体取决于操作)。插入操作的后一种形式是用于专门为有容量限制的 Queue 实现设计的;在大多数实现中,插入操作不会失败。

 

  抛出异常 返回特殊值
插入 add(e) offer(e)
移除 remove(e) poll(e)
检查 element() peek()

 

        队列通常(但并非一定)以 FIFO(先进先出)的方式排序各个元素。不过优先级队列和 LIFO 队列(或堆栈)例外,前者根据提供的比较器或元素的自然顺序对元素进行排序,后者按 LIFO(后进先出)的方式对元素进行排序。无论使用哪种排序方式,队列的头 都是调用 remove() 或 poll() 所移除的元素。在 FIFO 队列中,所有的新元素都插入队列的末尾。其他种类的队列可能使用不同的元素放置规则。每个 Queue 实现必须指定其顺序属性。 

        如果可能,offer 方法可插入一个元素,失败则返回 false。这与 Collection.add 方法不同,该方法只能通过抛出未经检查的异常使添加元素失败。offer 方法设计用于正常的失败情况,而不是出现异常的情况,例如在容量固定(有界)的队列中。 

        remove() 和 poll() 方法可移除和返回队列的头。到底从队列中移除哪个元素是队列排序策略的功能,而该策略在各种实现中是不同的。remove() 和 poll() 方法仅在队列为空时其行为有所不同:remove() 方法抛出一个异常,而 poll() 方法则返回 null。 

        element() 和 peek() 获取但不移除队列的头,element与 peek 唯一的不同在于:此队列为空时将抛出一个异常。

        Queue 接口并未定义阻塞队列的方法,而这在并发编程中是很常见的。BlockingQueue 接口则定义了那些等待元素出现或等待队列中有可用空间的方法,这些方法扩展了此接口。 

        Queue 实现通常不允许插入 null 元素,尽管某些实现(如 LinkedList)并不禁止插入 null。即使在允许 null 的实现中,也不应该将 null 插入到 Queue 中,因为 null 也用作 poll 方法的一个特殊返回值,表明队列不包含元素。 

        Queue 实现通常未定义 equals 和 hashCode 方法的基于元素的版本,而是从 Object 类继承了基于身份的版本,因为对于具有相同元素但有不同排序属性的队列而言,基于元素的相等性并非总是定义良好的。 

        Queue 作为队列可以实现一个按固定顺序访问其内部元素的结构,与 LinkedList等实现不同,Queue并不能获取指定位置的元素。

        在 ThreadPoolExecutor类中创建线程池时使用的是 BlockingQueue。BlockingQueue是 Queue的子接口,BlockingQueue的实现类有很多:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue。

        Deque与 Queue不同在于,Deque是一个双端队列,支持在两端插入和移除元素。名称 deque 是“double ended queue(双端队列)”的缩写,通常读为“deck”。大多数 Deque 实现对于它们能够包含的元素数没有固定限制,但此接口既支持有容量限制的双端队列,也支持没有固定大小限制的双端队列。 Deque不是我们要学习的重点,下面就不提了。

        我们要用到的实现为 ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue,DelayedWorkQueue.其中 DelayedWorkQueue是 ScheduledThreadPoolExecutor的内部类实现。

        顶层接口为 Queue,然后是 Queue的抽象实现类 AbstractQueue和子接口 BlockingQueue。图中四个类均继承于 AbstractQueue并实现 BlockingQueue接口,DelayedWorkQueue同样实现了 BlockingQueue,但DelayedWorkQueue继承自 AbstractCollection。

        以下是Queue的源代码:

public interface Queue<E> extends Collection<E> {
	/**
	 * 将指定的元素插入此队列(如果立即可行且不会违反容量限制),在成功时返回 true,如果当前没有可用的空间,则抛出IllegalStateException
	 */
	boolean add(E e);

	/**
	 * 将指定的元素插入此队列(如果立即可行且不会违反容量限制),当使用有容量限制的队列时,此方法通常要优于add(E),后者可能无法插入元素,而只是抛出一个异常
	 */
	boolean offer(E e);

	/**
	 * 获取并移除此队列的头。此方法与 poll 唯一的不同在于:此队列为空时将抛出一个异常
	 */
	E remove();

	/**
	 * 获取并移除此队列的头,如果此队列为空,则返回 null
	 */
	E poll();

	/**
	 * 获取,但是不移除此队列的头。此方法与 peek 唯一的不同在于:此队列为空时将抛出一个异常
	 */
	E element();

	/**
	 * 获取但不移除此队列的头;如果此队列为空,则返回 null
	 */
	E peek();
}

 

        二、AbstractQueue

        AbstractQueue提供某些 Queue 操作的主要实现。此类中的实现适用于基本实现不 允许包含 null 元素时。add、remove 和 element 方法分别基于 offer、poll 和 peek 方法,但是它们通过抛出异常而不是返回 false 或 null 来指示失败。 

        扩展此类的 Queue 实现至少必须定义一个不允许插入 null 元素的 Queue.offer(E) 方法,该方法以及 Queue.peek()、Queue.poll()、Collection.size() 和 Collection.iterator() 都支持 Iterator.remove() 方法。通常还要重写其他方法。如果无法满足这些要求,那么可以转而考虑为 AbstractCollection 创建子类。 

        以下是 AbstractQueue的源代码:

public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {

	/**
	 * 子类使用的构造方法
	 */
	protected AbstractQueue() {
	}

	/**
	 * 将指定的元素插入到此队列中(如果立即可行且不会违反容量限制),在成功时返回 true,如果当前没有可用空间,则抛出 IllegalStateException。
	 */
	public boolean add(E e) {
		if (offer(e))
			return true;
		else
			throw new IllegalStateException("Queue full");
	}

	/**
	 * 获取并移除此队列的头。此方法与 poll 唯一的不同在于:此队列为空时将抛出一个异常。
	 * 除非队列为空,否则此实现返回 poll 的结果。 
	 */
	public E remove() {
		E x = poll();
		if (x != null)
			return x;
		else
			throw new NoSuchElementException();
	}

	/**
	 * 获取但不移除此队列的头。此方法与 peek 唯一的不同在于:此队列为空时将抛出一个异常。
	 * 除非队列为空,否则此实现返回 peek 的结果。
	 */
	public E element() {
		E x = peek();
		if (x != null)
			return x;
		else
			throw new NoSuchElementException();
	}

	/**
	 * 移除此队列中的所有元素。此调用返回后,队列将为空。 
	 * 此实现重复调用 poll,直到它返回 null 为止。 
	 */
	public void clear() {
		while (poll() != null)
			;
	}

	/**
	 * 将指定 collection 中的所有元素都添加到此队列中。
	 * 如果试图将某一队列 addAll 到该队列本身中,则会导致 IllegalArgumentException。
	 * 此外,如果正在进行此操作时修改指定的 collection,则此操作的行为是不确定的。
	 * 此实现在指定的 collection 上进行迭代,并依次将迭代器返回的每一个元素添加到此队列中。
	 * 在试图添加某一元素(尤其是 null 元素)时如果遇到了运行时异常,则可能导致在抛出相关异常时只成功地添加了某些元素。
	 */
	public boolean addAll(Collection<? extends E> c) {
		if (c == null)
			throw new NullPointerException();
		if (c == this)
			throw new IllegalArgumentException();
		boolean modified = false;
		Iterator<? extends E> e = c.iterator();
		while (e.hasNext()) {
			if (add(e.next()))
				modified = true;
		}
		return modified;
	}
}

 

        三、BlockingQueue

        阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用,从而产生阻塞。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的缓存容器,而消费者也只从容器里拿元素。

        BlockingQueue 的方法以四种形式出现,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

 

     抛出异常      返回特殊值       阻塞    超时
   插入    add(e) offer(e) put(e) offer(e, time, unit)
   移除    remove() poll() take() poll(time, unit)
   检查    element() peek() - -

 

        • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。 

        • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null 

        • 阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。 

        • 超时:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。 

        BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。 

        BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue 总是报告 Integer.MAX_VALUE 的剩余容量。 

        BlockingQueue 实现主要用于生产者-消费者队列,但它另外还支持 Collection 接口。因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。 

        题外话:所谓生产者消费者模式,这里简单介绍一下。比如我们在餐厅吃饭,我们就是消费者,餐厅的厨师就是生产者,而餐厅的服务员就是一个缓冲环节。当生产者制作好菜品(生产产品),交由服务员(缓冲区),由服务员将菜品送至顾客(消费者)品用。


        生产者-消费者模式最重要的作用就是解耦,利用缓冲区将两者分离。如果每一个厨师做完了菜都需要亲自送到顾客桌上,那么这样就是将厨师与顾客绑定到了一起。加入中间缓冲环节,也就是服务员,将送菜的任务交由服务员(缓冲区)去处理,这样生产者与消费者就可以各自做自己的事情了。在此模式下两者间支持并发操作,因为饭店的厨师肯定不止一个,顾客也是如此。再有就是支持两者间不同步,因为两者间的数量与效率是不同步的,这就会导致生产与消费的速度不同。

        BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。 

        BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。这种功能的需求和使用有依赖于实现的倾向。例如,一种常用的策略是:对于生产者,插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。 

       注意,BlockingQueue 可以安全地与多个生产者和多个使用者一起使用。 

        以下是基于典型的生产者-消费者场景的一个用例:

class Producer implements Runnable {
	private final BlockingQueue queue;
	private int i;

	Producer(BlockingQueue q) {
		queue = q;
	}

	public void run() {
		try {
			while (true) {
				queue.put(produce());// 将产品放入缓冲队列
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	int produce() {
		return i++;// 生产产品
	}
}

class Consumer implements Runnable {
	private final BlockingQueue queue;

	Consumer(BlockingQueue q) {
		queue = q;
	}

	public void run() {
		try {
			while (true) {
				consume(queue.take());// 从缓冲队列取出产品
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	void consume(Object x) {
		System.out.println("消费:"+x);// 消费产品
	}
}

public class Runner {
	public static void main(String[] args) {
		BlockingQueue q = new LinkedBlockingQueue<Integer>(10);// 或其他实现
		Producer p = new Producer(q);
		Consumer c1 = new Consumer(q);
		Consumer c2 = new Consumer(q);
		new Thread(p).start();
		new Thread(c1).start();
		new Thread(c2).start();
	}
}
//结果:
...
消费:160607
消费:160608
消费:160609
消费:160610
消费:160611
...

        当生产者与消费者线程启动后,首先生产者会不断往队列中添加产品,一旦队列填满则生产停止,然后消费者从队列中取出产品使用,显然过程中使用了类似于 wait与 notify的流程,后面会详细分析。

        以下是 BlockingQueue的源代码:

public interface BlockingQueue<E> extends Queue<E> {
	/**
	 * 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,
	 * 如果当前没有可用的空间,则抛出 IllegalStateException。
	 * 当使用有容量限制的队列时,通常首选 offer
	 */
	boolean add(E e);

	/**
	 * 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,
	 * 如果当前没有可用的空间,则返回 false。
	 * 当使用有容量限制的队列时,此方法通常要优于 add(E),后者可能无法插入元素,而只是抛出一个异常
	 */
	boolean offer(E e);

	/**
	 * 将指定元素插入此队列中,将等待可用的空间(即阻塞)
	 */
	void put(E e) throws InterruptedException;

	/**
	 * 将指定元素插入此队列中,在到达指定的等待时间前等待可用的空间
	 */
	boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

	/**
	 * 获取并移除此队列的头部,在元素变得可用之前一直等待
	 */
	E take() throws InterruptedException;

	/**
	 * 获取并移除此队列的头部,在指定的等待时间前等待可用的元素
	 */
	E poll(long timeout, TimeUnit unit) throws InterruptedException;

	/**
	 * 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的附加元素数量;
	 * 如果没有内部限制,则返回 Integer.MAX_VALUE
	 */
	int remainingCapacity();

	/**
	 * 从此队列中移除指定元素的单个实例(如果存在)。
	 * 更确切地讲,如果此队列包含一个或多个满足 o.equals(e) 的元素 e,则移除该元素。
	 * 如果此队列包含指定元素(或者此队列由于调用而发生更改),则返回 true
	 */
	boolean remove(Object o);

	/**
	 * 如果此队列包含指定元素,则返回 true。更确切地讲,当且仅当此队列至少包含一个满足 o.equals(e) 的元素 e时,返回 true
	 */
	public boolean contains(Object o);

	/**
	 * 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。此操作可能比反复轮询此队列更有效。
	 * 在试图向 collection c 中添加元素没有成功时,可能导致在抛出相关异常时,
	 * 元素会同时在两个 collection 中出现,或者在其中一个 collection中出现,也可能在两个 collection 中都不出现。
	 * 如果试图将一个队列放入自身队列中,则会导致 IllegalArgumentException 异常。
	 * 此外,如果正在进行此操作时修改指定的 collection,则此操作行为是不确定的。
	 */
	int drainTo(Collection<? super E> c);

	/**
	 * 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
	 * 在试图向 collection c中添加元素没有成功时,可能导致在抛出相关异常时,
	 * 元素会同时在两个 collection 中出现,或者在其中一个 collection中出现,也可能在两个 collection 中都不出现。
	 * 如果试图将一个队列放入自身队列中,则会导致 IllegalArgumentException 异常。
	 * 此外,如果正在进行此操作时修改指定的 collection,则此操作行为是不确定的。
	 */
	int drainTo(Collection<? super E> c, int maxElements);
}

        后续几篇介绍阻塞队列的相关实现。

0
0
分享到:
评论

相关推荐

    Java并发之BlockingQueue的使用

    【Java并发之BlockingQueue的使用】讲解了Java中用于并发编程的重要工具——BlockingQueue,它是一种线程安全的队列,特别适用于生产者/消费者的场景。 BlockingQueue的主要特性在于其在队列满或空时会阻塞相应的...

    java并发编程2

    Java并发编程是Java开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。以下是对标题和描述中所提及的几个知识点的详细解释: 1. **线程与并发** - **线程*...

    (PDF带目录)《Java 并发编程实战》,java并发实战,并发

    《Java 并发编程实战》是一本专注于Java并发编程的权威指南,对于任何希望深入了解Java多线程和并发控制机制的开发者来说,都是不可或缺的参考资料。这本书深入浅出地介绍了如何在Java环境中有效地管理和控制并发...

    Java中的BlockingQueue:深入理解与实践应用

    在Java并发编程中,BlockingQueue是一个非常重要的接口,它提供了线程安全的队列操作,特别是在生产者-消费者模式中发挥着核心作用。本文将深入探讨BlockingQueue的工作原理、常见实现、使用场景以及代码示例。 在...

    Java并发编程实践高清pdf及源码

    《Java并发编程实践》是一本深入探讨Java多线程编程的经典著作,由Brian Goetz、Tim Peierls、Joshua Bloch、Joseph Bowles和David Holmes等专家共同编写。这本书全面介绍了Java平台上的并发编程技术,是Java开发...

    java并发编程实战(英文版)

    ### Java并发编程实战知识点概述 #### 一、Java并发特性详解 在《Java并发编程实战》这本书中,作者深入浅出地介绍了Java 5.0和Java 6中新增的并发特性。这些特性旨在帮助开发者更高效、安全地编写多线程程序。书中...

    JAVA并发编程实践.pdf+高清版+目录 书籍源码

    此外,书中还介绍了Java并发容器,如ConcurrentHashMap、CopyOnWriteArrayList和BlockingQueue等,这些都是为并发环境设计的高效数据结构。它们在多线程环境下的性能和线程安全特性使得开发者能更方便地实现并发操作...

    JAVA并发编程艺术pdf版

    《JAVA并发编程艺术》是Java开发者深入理解和掌握并发编程的一本重要著作,它涵盖了Java并发领域的核心概念和技术。这本书详细阐述了如何在多线程环境下有效地编写高效、可靠的代码,对于提升Java程序员的技能水平...

    java并发编程书籍

    Java并发编程是软件开发中的一个关键领域,尤其是在大型企业级应用和分布式系统中。通过学习相关的书籍,开发者可以深入理解如何有效地设计和实现高效的多线程应用程序,避免并发问题,如竞态条件、死锁、活锁等。...

    java并发编程与实践

    "Java并发编程与实践"文档深入剖析了这一主题,旨在帮助开发者理解和掌握如何在Java环境中有效地实现并发。 并发是指在单个执行单元(如CPU)中同时执行两个或更多任务的能力。在Java中,这主要通过线程来实现,...

    Java并发编程实践.pdf

    阻塞队列(BlockingQueue)是Java并发工具包中的一个重要组成部分,它能够保证在队列为空时取元素的操作会等待队列变为非空,而在队列满时插入元素的操作会等待队列中有空余空间。常见的阻塞队列实现有...

    Java并发编程设计原则和模式

    本资料“Java并发编程设计原则和模式”深入探讨了如何在Java环境中有效地进行并发处理,以充分利用系统资源并避免潜在的并发问题。 一、并发编程基础 并发是指两个或多个操作在同一时间段内执行,但并不意味着这些...

    JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf

    根据提供的文件信息,“JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf”,我们可以推断出这份文档主要聚焦于Java并发编程的技术实践与理论探讨。下面将从多个角度来解析这个文档可能涵盖的关键知识点。 ...

    java并发编程实战高清版pdf

    4. **并发容器**:Java并发库提供了一系列优化过的并发容器,如`ConcurrentHashMap`、`CopyOnWriteArrayList`和`BlockingQueue`等,它们在并发环境下有更好的性能表现。 5. **原子变量**:`java.util.concurrent....

    Java并发程序设计教程

    Java并发工具类库(java.util.concurrent)包含了许多实用的并发组件,如ConcurrentHashMap、CopyOnWriteArrayList和BlockingQueue等。ConcurrentHashMap在并发环境下提供高效且线程安全的哈希映射,而...

    Java并发编程--BlockingQueue.docx

    【Java并发编程--BlockingQueue详解】 BlockingQueue 是 Java 并发包(java.util.concurrent)中的一个接口,它扩展了 Queue 接口,并引入了线程安全的特性,特别适合于多线程环境下的数据共享。 BlockingQueue 的...

    Java并发编程实战

    6.3.5 CompletionService:Executor与BlockingQueue 6.3.6 示例:使用CompletionService实现页面渲染器 6.3.7 为任务设置时限 6.3.8 示例:旅行预定门户网站 第7章 取消与关闭 第8章 线程池的使用 第9章 图形...

    Java并发编程实践源码

    《Java并发编程实践》是一本深入探讨Java多线程与并发编程的经典著作,其源码提供了丰富的示例,帮助读者理解和应用并发编程的核心概念。在这些文件中,我们可以看到多种并发设计模式和策略的实际运用,下面将逐一...

    java并发实战pdf及demo源码

    《Java并发实战》这本书是Java并发编程领域的重要参考资料,它深入浅出地讲解了Java平台上的多线程和并发编程技术。这本书对于开发者来说,无论你是初学者还是经验丰富的程序员,都能从中获得宝贵的知识和实践经验。...

Global site tag (gtag.js) - Google Analytics