- 浏览: 150473 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
EclipseEye:
fair_jm 写道不错 蛮详细的 谢谢分享
SWT/JFace专题 --- SWT中Display和多线程 -
fair_jm:
不错 蛮详细的 谢谢分享
SWT/JFace专题 --- SWT中Display和多线程
Queue
------------
1.ArrayDeque, (数组双端队列)
2.PriorityQueue, (优先级队列)
3.ConcurrentLinkedQueue, (基于链表的并发队列)
4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
5.ArrayBlockingQueue, (基于数组的并发阻塞队列)
6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列)
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
8.PriorityBlockingQueue, (带优先级的无界阻塞队列)
9.SynchronousQueue (并发同步阻塞队列)
-----------------------------------------------------
阻塞队列和生产者-消费者模式
阻塞队列(Blocking queue)提供了可阻塞的put和take方法,它们与可定时的offer和poll是等价的。如果Queue已经满了,put方法会被阻塞直到有空间可用;如果Queue是空的,那么take方法会被阻塞,直到有元素可用。Queue的长度可以有限,也可以无限;无限的Queue永远不会充满,所以它的put方法永远不会阻塞。
阻塞队列支持生产者-消费者设计模式。一个生产者-消费者设计分离了“生产产品”和“消费产品”。该模式不会发现一个工作便立即处理,而是把工作置于一个任务(“to do”)清单中,以备后期处理。生产者-消费者模式简化了开发,因为它解除了生产者和消费者之间相互依赖的代码。生产者和消费者以不同的或者变化的速度生产和消费数据,生产者-消费者模式将这些活动解耦,因而简化了工作负荷的管理。
生产者-消费者设计是围绕阻塞队列展开的,生产者把数据放入队列,并使数据可用,当消费者为适当的行为做准备时会从队列中获取数据。生产者不需要知道消费者的省份或者数量,甚至根本没有消费者---它们只负责把数据放入队列。类似地,消费者也不需要知道生产者是谁,以及是谁给它们安排的工作。BlockingQueue可以使用任意数量的生产者和消费者,从而简化了生产者-消费者设计的实现。最常见的生产者-消费者设计是将线程池与工作队列相结合。
阻塞队列简化了消费者的编码,因为take会保持阻塞直到可用数据出现。如果生产者不能足够快地产生工作,让消费者忙碌起来,那么消费者只能一直等待,直到有工作可做。同时,put方法的阻塞特性也大大地简化了生产者的编码;如果使用一个有界队列,那么当队列充满的时候,生产者就会阻塞,暂不能生成更多的工作,从而给消费者时间来赶进进度。
有界队列是强大的资源管理工具,用来建立可靠的应用程序:它们遏制那些可以产生过多工作量、具有威胁的活动,从而让你的程序在面对超负荷工作时更加健壮。
虽然生产者-消费者模式可以把生产者和消费者的代码相互解耦合,但是它们的行为还是间接地通过共享队列耦合在一起了。
类库中包含一些BlockingQueue的实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,与LinkedList和ArrayList相似,但是却拥有比同步List更好的并发性能。PriorityBlockingQueue是一个按优先级顺序排序的队列,当你不希望按照FIFO的属性处理元素时,这个PriorityBolckingQueue是非常有用的。正如其他排序的容器一样,PriorityBlockingQueue可以比较元素本身的自然顺序(如果它们实现了Comparable),也可以使用一个Comparator进行排序。
最后一个BlockingQueue的实现是SynchronousQueue,它根本上不是一个真正的队列,因为它不会为队列元素维护任何存储空间。不过,它维护一个排队的线程清单,这些线程等待把元素加入(enqueue)队列或者移出(dequeue)队列。因为SynchronousQueue没有存储能力,所以除非另一个线程已经准备好参与移交工作,否则put和take会一直阻止。SynchronousQueue这类队列只有在消费者充足的时候比较合适,它们总能为下一个任务作好准备。
生产者-消费者模式同样带来了一些性能方面的提高。生产者和消费者可以并发地执行,如果一个受限于I/O,另一个受限于CPU,那么并发执行的全部产出会高于顺序执行的产出。
------------
Deque是一个双端队列,允许高效地在头和尾部分别进行插入和移除。实现有ArrayDeque和LinkedBlockingDeque。
正如阻塞队列适用于生产者-消费者模式一样,双端队列使它们自身与一种叫做窃取工作(work stealing)的模式相关联。一个消费者生产者设计中,所有的消费者只共享一个工作队列;在窃取工作的设计中,每一个消费者都有一个自己的双端队列。如果一个消费者完成了自己双端队列中的全部工作,它可以窃取其他消费者的双端队列中的末尾任务。因为工作者线程并不会竞争一个共享的任务队列,所以窃取工作模式比传统的生产者-消费者设计有更佳的可伸缩性;大多数时候它们访问自己的双端队列,减少竞争。当一个工作者必须要访问另一个队列时,它会从尾部截取,而不是从头部,从而进一步降低对双端队列的争夺。
窃取工作恰好适合用于解决消费者与生产者同体的问题----当运行到一个任务的某单元时,可能会识别出更多的任务。比如垃圾回收时对堆做了记号,可以并行使用窃取工作。当一个线程发现了一个新的任务单元时,它会把它放在自己队列的末尾;当双端队列为空时,它会去其他队列的队尾寻找新的任务,这样能确保每一个线程都保持忙碌状态。
------------------------
非阻塞算法
基于锁的算法会带来一些活跃度失败的风险。如果线程在持有锁的时候因为阻塞I/O,页面错误,或其他原因发生延迟,很可能所有的线程都不能前进了。
一个线程的失败或挂起不应该影响其他线程的失败或挂起,这样的算法成为非阻塞(nonblocking)算法;如果算法的每一个步骤中都有一些线程能够继续执行,那么这样的算法称为锁自由(lock-free)算法。在线程间使用CAS进行协调,这样的算法如果能构建正确的话,它既是非阻塞的,又是锁自由的。非竞争的CAS总是能够成功,如果多个线程以一个CAS竞争,总会有一个胜出并前进。非阻塞算法堆死锁和优先级倒置有“免疫性”(但它们可能会出现饥饿和活锁,因为它们允许重进入)。
非阻塞算法通过使用低层次的并发原语,比如比较交换,取代了锁。原子变量类向用户提供了这些底层级原语,也能够当做“更佳的volatile变量”使用,同时提供了整数类和对象引用的原子化更新操作。
-------------------------------
---------------------------
------------
1.ArrayDeque, (数组双端队列)
2.PriorityQueue, (优先级队列)
3.ConcurrentLinkedQueue, (基于链表的并发队列)
4.DelayQueue, (延期阻塞队列)(阻塞队列实现了BlockingQueue接口)
5.ArrayBlockingQueue, (基于数组的并发阻塞队列)
6.LinkedBlockingQueue, (基于链表的FIFO阻塞队列)
7.LinkedBlockingDeque, (基于链表的FIFO双端阻塞队列)
8.PriorityBlockingQueue, (带优先级的无界阻塞队列)
9.SynchronousQueue (并发同步阻塞队列)
-----------------------------------------------------
阻塞队列和生产者-消费者模式
阻塞队列(Blocking queue)提供了可阻塞的put和take方法,它们与可定时的offer和poll是等价的。如果Queue已经满了,put方法会被阻塞直到有空间可用;如果Queue是空的,那么take方法会被阻塞,直到有元素可用。Queue的长度可以有限,也可以无限;无限的Queue永远不会充满,所以它的put方法永远不会阻塞。
阻塞队列支持生产者-消费者设计模式。一个生产者-消费者设计分离了“生产产品”和“消费产品”。该模式不会发现一个工作便立即处理,而是把工作置于一个任务(“to do”)清单中,以备后期处理。生产者-消费者模式简化了开发,因为它解除了生产者和消费者之间相互依赖的代码。生产者和消费者以不同的或者变化的速度生产和消费数据,生产者-消费者模式将这些活动解耦,因而简化了工作负荷的管理。
生产者-消费者设计是围绕阻塞队列展开的,生产者把数据放入队列,并使数据可用,当消费者为适当的行为做准备时会从队列中获取数据。生产者不需要知道消费者的省份或者数量,甚至根本没有消费者---它们只负责把数据放入队列。类似地,消费者也不需要知道生产者是谁,以及是谁给它们安排的工作。BlockingQueue可以使用任意数量的生产者和消费者,从而简化了生产者-消费者设计的实现。最常见的生产者-消费者设计是将线程池与工作队列相结合。
阻塞队列简化了消费者的编码,因为take会保持阻塞直到可用数据出现。如果生产者不能足够快地产生工作,让消费者忙碌起来,那么消费者只能一直等待,直到有工作可做。同时,put方法的阻塞特性也大大地简化了生产者的编码;如果使用一个有界队列,那么当队列充满的时候,生产者就会阻塞,暂不能生成更多的工作,从而给消费者时间来赶进进度。
有界队列是强大的资源管理工具,用来建立可靠的应用程序:它们遏制那些可以产生过多工作量、具有威胁的活动,从而让你的程序在面对超负荷工作时更加健壮。
虽然生产者-消费者模式可以把生产者和消费者的代码相互解耦合,但是它们的行为还是间接地通过共享队列耦合在一起了。
类库中包含一些BlockingQueue的实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,与LinkedList和ArrayList相似,但是却拥有比同步List更好的并发性能。PriorityBlockingQueue是一个按优先级顺序排序的队列,当你不希望按照FIFO的属性处理元素时,这个PriorityBolckingQueue是非常有用的。正如其他排序的容器一样,PriorityBlockingQueue可以比较元素本身的自然顺序(如果它们实现了Comparable),也可以使用一个Comparator进行排序。
最后一个BlockingQueue的实现是SynchronousQueue,它根本上不是一个真正的队列,因为它不会为队列元素维护任何存储空间。不过,它维护一个排队的线程清单,这些线程等待把元素加入(enqueue)队列或者移出(dequeue)队列。因为SynchronousQueue没有存储能力,所以除非另一个线程已经准备好参与移交工作,否则put和take会一直阻止。SynchronousQueue这类队列只有在消费者充足的时候比较合适,它们总能为下一个任务作好准备。
生产者-消费者模式同样带来了一些性能方面的提高。生产者和消费者可以并发地执行,如果一个受限于I/O,另一个受限于CPU,那么并发执行的全部产出会高于顺序执行的产出。
------------
Deque是一个双端队列,允许高效地在头和尾部分别进行插入和移除。实现有ArrayDeque和LinkedBlockingDeque。
正如阻塞队列适用于生产者-消费者模式一样,双端队列使它们自身与一种叫做窃取工作(work stealing)的模式相关联。一个消费者生产者设计中,所有的消费者只共享一个工作队列;在窃取工作的设计中,每一个消费者都有一个自己的双端队列。如果一个消费者完成了自己双端队列中的全部工作,它可以窃取其他消费者的双端队列中的末尾任务。因为工作者线程并不会竞争一个共享的任务队列,所以窃取工作模式比传统的生产者-消费者设计有更佳的可伸缩性;大多数时候它们访问自己的双端队列,减少竞争。当一个工作者必须要访问另一个队列时,它会从尾部截取,而不是从头部,从而进一步降低对双端队列的争夺。
窃取工作恰好适合用于解决消费者与生产者同体的问题----当运行到一个任务的某单元时,可能会识别出更多的任务。比如垃圾回收时对堆做了记号,可以并行使用窃取工作。当一个线程发现了一个新的任务单元时,它会把它放在自己队列的末尾;当双端队列为空时,它会去其他队列的队尾寻找新的任务,这样能确保每一个线程都保持忙碌状态。
------------------------
非阻塞算法
基于锁的算法会带来一些活跃度失败的风险。如果线程在持有锁的时候因为阻塞I/O,页面错误,或其他原因发生延迟,很可能所有的线程都不能前进了。
一个线程的失败或挂起不应该影响其他线程的失败或挂起,这样的算法成为非阻塞(nonblocking)算法;如果算法的每一个步骤中都有一些线程能够继续执行,那么这样的算法称为锁自由(lock-free)算法。在线程间使用CAS进行协调,这样的算法如果能构建正确的话,它既是非阻塞的,又是锁自由的。非竞争的CAS总是能够成功,如果多个线程以一个CAS竞争,总会有一个胜出并前进。非阻塞算法堆死锁和优先级倒置有“免疫性”(但它们可能会出现饥饿和活锁,因为它们允许重进入)。
非阻塞算法通过使用低层次的并发原语,比如比较交换,取代了锁。原子变量类向用户提供了这些底层级原语,也能够当做“更佳的volatile变量”使用,同时提供了整数类和对象引用的原子化更新操作。
-------------------------------
/** *DelayQueue(延时队列)是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象, *内部通过一个优先队列(PriorityQueue)的引用实现相关数据操作。 *其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即对头对象的延迟到期的时间最长。 *如果没有任何到期,那么就不会有任何头元素,并且poll将返回null(正因为这样,不能将null放入该队列中) *Delayed接口有一个名为getDelay()的方法,它用来告知延时到期有多长时间,或延迟在多长时间之前已经到期。 *为了排序,Delayed接口还继承了Comparable接口,因此必须实现comparaTo(),使其产生合理比较。 */ public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private transient final ReentrantLock lock = new ReentrantLock();//锁 /** *接口Condition :锁条件变量,其实例和特定的锁绑定。提供了: * await()、awaitUninterruptibly()、awaitNanos(long)、await(long, TimeUnit)、awaitUntil(Date) * signal()、signalAll() 方法,实现了对线程的“等待”和“唤醒”操作 */ private transient final Condition available = lock.newCondition();//锁的条件变量,提供“等待”“唤醒”线程的操作 private final PriorityQueue<E> q = new PriorityQueue<E>();//内部有一个优先队列(无界的队列)的引用 public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); } /** * 插入元素到延时队列 (调用offer实现) */ public boolean add(E e) { return offer(e); } /** * 插入元素(加锁) */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); q.offer(e); //调用优先队列的实现 if (first == null || e.compareTo(first) < 0) available.signalAll();//唤醒其他的线程 return true; } finally { lock.unlock(); } } /** * 插入元素 */ public void put(E e) { offer(e); } //由于是无界的,不会对offer插入元素阻塞,参数unit无效 public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } /** * 头元素出对列(加锁) */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); return x; } } finally { lock.unlock(); } } /** * 获得并移除头元素,在延时到期的情况下。 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay); } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); // wake up other takers return x; } } } } finally { lock.unlock(); } } /** * 获得并移除头元素,延迟到期或指定时间到期后。 */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { if (nanos <= 0) return null; if (delay > nanos) delay = nanos; long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); return x; } } } } finally { lock.unlock(); } } /** * 获取但不移除元素(调用优先队列的peek方法) */ public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } } //把队列在的元素“剪切到”集合c中 public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (;;) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } if (n > 0) available.signalAll(); return n; } finally { lock.unlock(); } } //把maxElements个元素“剪切”到集合c中 public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; while (n < maxElements) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } if (n > 0) available.signalAll(); return n; } finally { lock.unlock(); } } public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { q.clear(); } finally { lock.unlock(); } } public int remainingCapacity() { return Integer.MAX_VALUE; } public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(); } finally { lock.unlock(); } } public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(a); } finally { lock.unlock(); } } public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } } …… }
---------------------------
发表评论
-
Nio Socket
2013-05-16 05:53 0asfda -
结合jdk源码解读,Error Exception
2013-05-10 04:00 0/* * @(#)Error.java 1.17 05/11 ... -
从不同的角度,重新审视class和interface
2013-05-07 03:40 0java开发中,对应class和interface的基本区别都 ... -
java.lang.Object
2013-05-07 03:35 0/* * @(#)Object.java 1.73 06/0 ... -
反射机制+类加载机制
2013-02-18 01:30 0反射机制+类加载机制 -
并发专题----使用开源软件Amino构建并发应用程序/多线程运行时分析工具MTRAT
2013-02-14 00:50 1378使用开源软件构建并发 ... -
并发专题 ---- 线程安全
2013-02-14 00:50 753线程安全 ================== ... -
并发专题 --- 锁
2013-02-14 00:50 806相比于synchronized,ReentrantLock 提 ... -
并发专题 ----(JMM)java内存模型
2013-02-14 00:50 543Java 内存模型 ------------ ... -
并发专题 ---java.util.concurrent 包
2013-02-13 02:26 1841java.util.concurrent 包 原 ... -
集合框架 Queue篇(8)---PriorityBlockingQueue、SynchronousQueue
2013-02-07 12:40 1317Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(7)---LinkedBlockingDeque
2013-02-07 12:40 854Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(6)---LinkedBlockingQueue
2013-02-07 12:39 836Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(5)---ArrayBlockingQueue
2013-02-06 10:39 707Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(3)---ConcurrentLinkedQueue
2013-02-06 10:38 1050Queue ------------ 1.ArrayDequ ... -
集合框架 Queue篇(2)---PriorityQueue
2013-02-06 10:38 835Queue ------------ 1.ArrayDeq ... -
集合框架 Queue篇(1)---ArrayDeque
2013-02-06 10:38 934Queue ------------ 1.ArrayDeq ... -
集合框架 Set篇---HashSet、LinkedHashSet、TreeSet、CopyOnWriteArraySet、ConcurrentSkipList
2013-02-05 08:43 1485Set --------- 1.HashSet 2.Link ... -
集合框架 List篇(2)---CopyOnWriteArrayList
2013-02-05 08:43 844List ----------- 1.ArrayList(jd ... -
集合框架 List篇(1)---ArrayList、LinkedList
2013-02-05 08:42 905List ----------- 1.ArrayList(jd ...
相关推荐
这个模型展示了阻塞队列如何有效地协调生产者和消费者的活动。 总之,Java并发编程中的阻塞队列和阻塞栈是高效并发处理的核心工具,它们通过内置的阻塞机制实现线程间的同步,简化了多线程环境下的复杂性,提升了...
3. 提高可靠性:阻塞队列可以提高系统的可靠性,因为它可以避免生产者和消费者之间的阻塞。 阻塞队列的缺点: 1. 增加了系统的复杂性:阻塞队列需要更多的代码和更多的资源。 2. 需要更多的同步机制:阻塞队列需要...
阻塞队列是Java中并发编程的一个重要组件,它属于Java.util.concurrent包中的一部分。...使用阻塞队列能够极大地简化生产者和消费者模型的实现,并且能够在多线程环境中更加有效地管理资源和任务。
2. **生产者-消费者模型**:阻塞队列可以有效地解决生产者和消费者之间的并发问题,避免线程间的竞争和冲突。 3. **消息队列**:消息队列使用阻塞队列来存储消息,可以实现异步通信,提高系统的吞吐量和响应性能。 4...
6. LinkedTransferQueue:基于链表的无界阻塞队列,支持高效的“传输”操作,可以在生产者和消费者之间直接传递元素。 7. LinkedBlockingDeque:基于链表的双向阻塞队列,既可以作为队列也可以作为栈使用。 在生产...
阻塞队列的特性使得生产者(在这里是`FileEnumerationTask`)和消费者(这里是`SearchTask`)能够协同工作,避免了传统的锁和条件变量带来的复杂性和潜在的死锁风险。当队列满时,生产者线程会被阻塞,直到有空间...
在生产者消费者模式中,BlockingQueue可以作为共享的队列,生产者不断地将元素放入队列中,而消费者则不断地从队列中取出元素。如果队列已满,生产者将被阻塞直到队列非满。如果队列为空,消费者将被阻塞直到队列...
`BlockingQueue`常用于实现生产者-消费者模型,生产者线程将数据放入队列,消费者线程从队列中取出数据。这种模型可以有效避免资源竞争,提高程序效率。 6. **线程安全** `BlockingQueue`的所有操作都是线程安全...
这种设计模式非常适合于实现生产者-消费者模型,即一个或多个线程(生产者)向队列添加元素,而另一个或多个线程(消费者)从队列中移除元素。 ##### 3.2 阻塞队列方法 `BlockingQueue` 提供了几种主要的方法来...
阻塞队列是一个线程安全的接口,它在生产者-消费者模式中扮演关键角色,允许生产者线程向队列添加元素,而消费者线程可以从队列中移除元素,同时在队列满或空时自动进行线程的阻塞和唤醒操作。 ### 阻塞队列的主要...
BlockQueue是Java并发编程中非常重要的一个接口,它位于`java.util.concurrent`包下,是线程安全的队列,特别适用于多生产者多消费者(multi-producer multi-consumer, MPMC)的场景。在本练习中,我们将通过`...
- 使用`Condition`实现通知机制,以同步队列中的生产者和消费者。 - **非阻塞队列**如`ConcurrentLinkedQueue`使用CAS算法来实现线程安全的操作。 **6. 反射中,Class.forName和ClassLoader的区别** - **Class....
【Java并发之BlockingQueue的使用】讲解了Java中用于并发编程的重要工具——BlockingQueue,它是一种线程安全的队列,特别适用于生产者/消费者的场景。 BlockingQueue的主要特性在于其在队列满或空时会阻塞相应的...