第21章 - 并发 - BlcokingQueue
1. BlockingQueue简介
BlockingQueue 不接受 null 元素。试图 add、put 或 offer 一个 null 元素时,
某些实现会抛出 NullPointerException。null 被用作指示 poll 操作失败的警戒值。
BlockingQueue 可以是限定容量的。它在任意给定时间都可以有一个 remainingCapacity,
超出此容量,便无法无阻塞地 put 附加元素。没有任何内部容量约束的 BlockingQueue
总是报告 Integer.MAX_VALUE 的剩余容量。
BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口。
因此,举例来说,使用 remove(x) 从队列中移除任意一个元素是有可能的。
然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。
BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的
并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、
retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。
因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。
BlockingQueue 实质上不 支持使用任何一种“close”或“shutdown”操作来指示不再添加任何项。
插入特殊的 end-of-stream 或 poison 对象,并根据使用者获取这些对象的时间来对它们进行解释。
2. BlockingQueue接口的继承关系
Iterable <- Collection <- Queue <- BlockingQueue
(1) public interface Iterable<T> 接口只有一个方法
Iterator<T> iterator() 返回一个在一组 T 类型的元素上进行迭代的迭代器。
boolean hasNext() 如果仍有元素可以迭代,则返回 true。
E next() 返回迭代的下一个元素。
void remove() 从迭代器指向的 collection 中移除迭代器返回的最后一个元素(可选操作)。
(2) public interface Collection<E>extends Iterable<E>
boolean add(E e) 确保此 collection 包含指定的元素(可选操作)。
boolean addAll(Collection<? extends E> c) 将指定 collection 中的所有元素都添加到此 collection 中(可选操作)。
void clear() 移除此 collection 中的所有元素(可选操作)。
boolean contains(Object o) 如果此 collection 包含指定的元素,则返回 true。
boolean containsAll(Collection<?> c) 如果此 collection 包含指定 collection 中的所有元素,则返回 true。
boolean equals(Object o) 比较此 collection 与指定对象是否相等。
int hashCode() 返回此 collection 的哈希码值。
boolean isEmpty() 如果此 collection 不包含元素,则返回 true。
Iterator<E> iterator() 返回在此 collection 的元素上进行迭代的迭代器。
boolean remove(Object o) 从此 collection 中移除指定元素的单个实例,如果存在的话(可选操作)。
boolean removeAll(Collection<?> c) 移除此 collection 中那些也包含在指定 collection 中的所有元素(可选操作)。
boolean retainAll(Collection<?> c) 仅保留此 collection 中那些也包含在指定 collection 的元素(可选操作)。
int size() 返回此 collection 中的元素数。
Object[] toArray() 返回包含此 collection 中所有元素的数组。
<T> T[] toArray(T[] a) 返回包含此 collection 中所有元素的数组;返回数组的运行时类型与指定数组的运行时类型相同。
(3) public interface Queue<E>extends Collection<E>
boolean add(E e) 将指定的元素插入此队列(如果立即可行且不会违反容量限制),在成功时返回 true,
如果当前没有可用的空间,则抛出 IllegalStateException。
E element() 获取,但是不移除此队列的头。
boolean offer(E e) 将指定的元素插入此队列(如果立即可行且不会违反容量限制),当使用有容量限制的队列时,
此方法通常要优于 add(E),后者可能无法插入元素,而只是抛出一个异常。
E peek() 获取但不移除此队列的头;如果此队列为空,则返回 null。
E poll() 获取并移除此队列的头,如果此队列为空,则返回 null。
E remove() 获取并移除此队列的头。如果此队列为空抛异常 NoSuchElementException
抛出异常 返回特殊值
插入 add(e) offer(e) (插入失败,则返回false)
移除 remove() poll() (队列为空,返回null)
检查 element() peek() (队列为空,返回null)
(4) public interface BlockingQueue<E>extends Queue<E>
boolean add(E e) 将指定元素插入此队列中(如果立即可行且不会违反容量限制),
成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。
boolean contains(Object o) 如果此队列包含指定元素,则返回 true。
int drainTo(Collection<? super E> c) 移除此队列中所有可用的元素,
并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements) 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
boolean offer(E e) 将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。
boolean offer(E e, long timeout, TimeUnit unit) 将指定元素插入此队列中,在到达指定的等待时间前等待可用的空间(如果有必要)。
E poll(long timeout, TimeUnit unit) 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
void put(E e) 将指定元素插入此队列中,将等待可用的空间(如果有必要)。
int remainingCapacity() 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的附加元素数量;
如果没有内部限制,则返回 Integer.MAX_VALUE。
boolean remove(Object o) 从此队列中移除指定元素的单个实例(如果存在)。
E take() 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
比如 BlockingQueue 常用于实现生产者-消费者问题,最常用方法take()和put(),offer(),poll(),等
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,
第二种是返回一个特殊值(null 或 false,具体取决于操作),
抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
3. BlockingQueue 接口实现类
(1)public class ArrayBlockingQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。
队列的头部 是在队列中存在时间最长的元素。
队列的尾部 是在队列中存在时间最短的元素。
然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。
(2) public class LinkedBlockingQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。
队列的头部 是在队列中时间最长的元素。
队列的尾部 是在队列中时间最短的元素。
如果未指定容量,则它等于 Integer.MAX_VALUE。
此类及其迭代器实现 Collection 和 Iterator 接口的所有可选 方法。
(3)public class PriorityBlockingQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。
虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。
此类不允许使用 null 元素。
依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。
此类及其迭代器可以实现 Collection 和 Iterator 接口的所有可选 方法。
iterator() 方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。
如果需要有序地进行遍历,则应考虑使用 Arrays.sort(pq.toArray())。
此外,可以使用方法 drainTo 按优先级顺序移除 全部或部分元素,并将它们放在另一个 collection 中。
例如,以下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。
要使用该类,则需要插入一个新的 FIFOEntry(anEntry) 来替换普通的条目对象。
class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> { final static AtomicLong seq = new AtomicLong(); final long seqNum; final E entry; public FIFOEntry(E entry) { seqNum = seq.getAndIncrement(); this.entry = entry; } public E getEntry() { return entry; } public int compareTo(FIFOEntry<E> other) { int res = entry.compareTo(other.entry); if (res == 0 && other.entry != this.entry) res = (seqNum < other.seqNum ? -1 : 1); return res; } }
(4) public class SynchronousQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。
不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;
队列的头 是尝试添加到队列中的首个已排队插入线程的元素;
如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。
对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。
此队列不允许 null 元素。
同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。
但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。
(5) public class DelayQueue<E extends Delayed>extends AbstractQueue<E>implements BlockingQueue<E>
Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。
该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。
如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。
当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。
即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。
例如,size 方法同时返回到期和未到期元素的计数。
此队列不允许使用 null 元素。
此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。
(6)public class LinkedBlockingDeque<E>extends AbstractQueue<E>implements BlockingDeque<E>, Serializable
如果未指定容量,那么容量将等于 Integer.MAX_VALUE。
异常包括 remove、removeFirstOccurrence、removeLastOccurrence、contains、iterator.remove()
此类及其迭代器实现 Collection 和 Iterator 接口的所有可选 方法。
p.s. 这个类实际上实现的是 public interface BlockingDeque<E>extends BlockingQueue<E>, Deque<E>
4. BlockingQueue 类接口实现类 用法示例:
(1) 示例1,制作吐司.一台机器具有三个任务,一个制作吐司,一个给吐司抹黄油,一个给抹过黄油的吐司上涂果酱.
package concurrency; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; class Toast { public enum Status { DRY, BUTTERED, JAMMED } private Status status = Status.DRY; private final int id; public Toast(int idn) { id = idn; } public void butter() { status = Status.BUTTERED; } public void jam() { status = Status.JAMMED; } public Status getStatus() { return status; } public int getId() { return id; } public String toString() { return "Toast " + id + ": " + status; } } class ToastQueue extends LinkedBlockingQueue<Toast> { } class Toaster implements Runnable { private ToastQueue toastQueue; private int count = 0; private Random rand = new Random(47); public Toaster(ToastQueue tq) { toastQueue = tq; } public void run() { try { while (!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500)); // Make toast Toast t = new Toast(count++); System.out.println(t); // Insert into queue toastQueue.put(t); } } catch (InterruptedException e) { System.out.println("Toaster interrupted"); } System.out.println("Toaster off"); } } // Apply butter to toast: class Butterer implements Runnable { private ToastQueue dryQueue, butteredQueue; public Butterer(ToastQueue dry, ToastQueue buttered) { dryQueue = dry; butteredQueue = buttered; } public void run() { try { while (!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = dryQueue.take(); t.butter(); System.out.println(t); butteredQueue.put(t); } } catch (InterruptedException e) { System.out.println("Butterer interrupted"); } System.out.println("Butterer off"); } } // Apply jam to buttered toast: class Jammer implements Runnable { private ToastQueue butteredQueue, finishedQueue; public Jammer(ToastQueue buttered, ToastQueue finished) { butteredQueue = buttered; finishedQueue = finished; } public void run() { try { while (!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = butteredQueue.take(); t.jam(); System.out.println(t); finishedQueue.put(t); } } catch (InterruptedException e) { System.out.println("Jammer interrupted"); } System.out.println("Jammer off"); } } // Consume the toast: class Eater implements Runnable { private ToastQueue finishedQueue; private int counter = 0; public Eater(ToastQueue finished) { finishedQueue = finished; } public void run() { try { while (!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = finishedQueue.take(); // Verify that the toast is coming in order, // and that all pieces are getting jammed: if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) { System.out.println(">>>> Error: " + t); System.exit(1); } else System.out.println("Chomp! " + t); } } catch (InterruptedException e) { System.out.println("Eater interrupted"); } System.out.println("Eater off"); } } public class ToastOMatic { public static void main(String[] args) throws Exception { ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Toaster(dryQueue)); exec.execute(new Butterer(dryQueue, butteredQueue)); //exec.execute(new Butterer(dryQueue, butteredQueue)); 可以使用2个抹黄油线程 exec.execute(new Jammer(butteredQueue, finishedQueue)); exec.execute(new Eater(finishedQueue)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }
(2) 示例2
package concurrency; public class LiftOff implements Runnable { protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff() { } public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while (countDown-- > 0) { System.out.print(status()); Thread.yield(); } System.out.println(""); } } package concurrency; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets; public LiftOffRunner(BlockingQueue<LiftOff> queue) { rockets = queue; } public void add(LiftOff lo) { try { rockets.put(lo); } catch (InterruptedException e) { System.out.println("Interrupted during put()"); } } public void run() { try { while (!Thread.interrupted()) { LiftOff rocket = rockets.take(); rocket.run(); // Use this thread } } catch (InterruptedException e) { System.out.println("Waking from take()"); } System.out.println("Exiting LiftOffRunner"); } } public class TestBlockingQueues { static void getkey() { try { // Compensate for Windows/Linux difference in the // length of the result produced by the Enter key: new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (java.io.IOException e) { throw new RuntimeException(e); } } static void getkey(String message) { System.out.println(message); getkey(); } static void test(String msg, BlockingQueue<LiftOff> queue) { System.out.println(msg); LiftOffRunner runner = new LiftOffRunner(queue); Thread t = new Thread(runner); t.start(); for (int i = 0; i < 5; i++) runner.add(new LiftOff(5)); getkey("Press 'Enter' (" + msg + ")"); t.interrupt(); System.out.println("Finished " + msg + " test"); } //执行main的线程调用add方法,往BlockingQueue中插入Runnable,相当与生产者 //在插入之前,已经启动一个新线程,并start.start的时候执行消费者的run方法, //在run方法中,调用BlockingQueue的take()方法,如果Queue为空,则消费者线程阻塞. //这个例子演示了 单生产者和单消费者 的情况 public static void main(String[] args) { test("LinkedBlockingQueue", // 不限制容量的阻塞队列 new LinkedBlockingQueue<LiftOff>()); test("ArrayBlockingQueue", // 限制容量为3个的阻塞队列 new ArrayBlockingQueue<LiftOff>(3)); test("SynchronousQueue", // 队列容量始终为1 new SynchronousQueue<LiftOff>()); } }