SynchronousQueue:同步Queue,属于线程安全的BlockingQueue的一种,此队列设计的理念类似于"单工模式",对于每个put/offer操作,必须等待一个take/poll操作,类似于我们的现实生活中的"火把传递":一个火把传递地他人,需要2个人"触手可及"才行. 因为这种策略,最终导致队列中并没有一个真正的元素;这是一种pipleline思路的基于queue的"操作传递".
- void put(E o):向队列提交一个元素,阻塞直到其他线程take或者poll此元素.
- boolean offer(E o):向队列中提交一个元素,如果此时有其他线程正在被take阻塞(即其他线程已准备接收)或者"碰巧"有poll操作,那么将返回true,否则返回false.
- E take():获取并删除一个元素,阻塞直到有其他线程offer/put.
- boolean poll():获取并删除一个元素,如果此时有其他线程正在被put阻塞(即其他线程提交元素正等待被接收)或者"碰巧"有offer操作,那么将返回true,否则返回false.
- E peek():总会返回null,硬编码.
这个队列中,对我们有意义的操作时put/take,以及put/offer + take或者put/take + poll,对于无法进入队列的元素,需要有额外的"拒绝"策略支持.
SynchronousQueue经常用来,一端或者双端严格遵守"单工"(单工作者)模式的场景,队列的两个操作端分别是productor和consumer.常用于一个productor多个consumer的场景。
在ThreadPoolExecutor中,通过Executors创建的cachedThreadPool就是使用此类型队列.已确保,如果现有线程无法接收任务(offer失败),将会创建新的线程来执行.
下面为简单的例子:
public class SynchronousQueueTest { /** * @param args */ public static void main(String[] args) throws Exception{ SynchronousQueue<Object> queue = new SynchronousQueue<Object>(); for(int i=0;i<5;i++){ Thread t = new SQThread(queue, 1); t.start(); } //Thread.sleep(1000); for(int i=0;i<10;i++){ if(!queue.offer(new Object())){ System.out.println("Failure"); } } } public static class SQThread extends Thread{ private SynchronousQueue<Object> queue; int mode; SQThread(SynchronousQueue<Object> queue,int mode){ this.queue = queue; this.mode = mode; } @Override public void run(){ Object item = null; try{ System.out.println(Thread.currentThread().getId()); if(mode == 1){ while((item = queue.take()) != null){ System.out.println(item.toString()); } }else{ // } }catch(Exception e){ // } System.out.println("end"); } } }
事实上多次执行此程序,会发现不同的结果,同是注释部分Thread.sleep的时间不同,也会对结果不同.所以在线程池中,我们需要"拒绝策略"的支持.
SynchronousQueue支持2种策略:公平和非公平,默认为非公平.
1) 公平策略:内部实现为TransferQueue,即一个队列.(consumer和productor将会被队列化)
2) 非公平策略:内部实现为TransferStack,即一个stack(内部模拟了一个单向链表,允许闯入行为)
内部实现比较复杂,尽管支持线程安全,但是其内部并没有使用lock(事实上无法使用lock),使用了LockSupport来控制线程,使用CAS来控制栈的head游标(非公平模式下)。
如下为非公平策略下实现
对于consumer而言,其为data消费者,每次take将会检测是否已经有productor的线程在等待,如果有,此时head应该不为null并且head的数据类型为“DATA”(两种类型:DATA 和REQUEST)
此时consumer将消费数据。取消productor的阻塞效果(使用LockSupport.unpark).
如果此时没有productor提供数据,即head为null,此时consumer也将创建一个mode为REQUEST的元素,并阻塞线程。当有productor提供数据时,有productor来取消其阻塞效果。
对于productor而言,情况类似。。当put数据时,如果队列中无元素,和take一样,创建一个node为DATA的元素,并阻塞进程。。
如果队列中有元素,则CAS更新head,使head指向新元素,并不断去tryMatch是否有正在阻塞的尚未take到数据的consumer,如果有,就unpark它,并返回数据,并变更列表的next指针。(有点像俄罗斯方块)
如果队列中没有元素(即没有消费者),则自己阻塞。
每个NODE,都有一个waiter属性,标示此在此元素上操作时阻塞的线程(put操作产生的元素未被消费,那么waiter就是put线程;take产生的元素单无元素消费成功时,那么waiter就是take线程)。
相关推荐
论文中详细介绍了这两种同步队列的设计原理、实现细节以及性能评估,并将其与Java SE 5.0中的`SynchronousQueue`类进行了对比。 #### 同步队列的重要性 同步队列作为一种特殊类型的队列,它在并发编程中扮演着重要...
Java 中的同步器 SynchronousQueue 是一种特殊的阻塞队列,它最多只能放一个元素,这个元素如果不在特定的时间消费掉就会被删除,队列的长度始终为 0。SynchronousQueue 主要用于生产者消费者问题,下面是对 ...
同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 LinkedBlockingDeque,并发 Map(映射) ConcurrentMap, 并发导航映射 ConcurrentNavigableMap,交换机 Exchanger, 信号量 Semaphore,执行器...
这种同步机制使得SynchronousQueue非常适合于线程间的直接对象传递,就像生产者-消费者模型中的情况。与Exchanger相比,SynchronousQueue同样允许两个线程间交换数据,但Exchanger是在两个线程交换数据后才继续执行...
7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航 映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
2. 需要更多的同步机制:阻塞队列需要更多的同步机制来避免线程之间的冲突。 阻塞队列是一种非常有用的数据结构,它可以帮助我们解决很多问题,但是也需要我们小心地使用它,以免引起更多的问题。
阻塞队列在Java并发包java.util.concurrent中提供了多种实现,如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue和SynchronousQueue等,每种阻塞队列都根据其特性适用于不同的场景。...
队列类型有几种,如无界队列(如`LinkedBlockingQueue`)、有界队列(如`ArrayBlockingQueue`)和同步队列(如`SynchronousQueue`)。选择合适的队列取决于应用需求,例如,同步队列能强制新任务与已完成任务的线程...
它有一个构造参数来定义队列的容量,并使用一个锁来实现线程间的同步。 2. LinkedBlockingQueue:基于链表结构的可选界阻塞队列。默认情况下是无界的,也可以通过构造函数指定最大容量。 3. PriorityBlockingQueue:...
* SynchronousQueue:同步队列,是一个特殊的 BlockingQueue,它没有容量(这是因为在 SynchronousQueue 中,插入将等待另一个线程的删除操作,反之亦然) 阻塞队列的常见方法 阻塞队列常见的方法有: * add:...
SynchronousQueue 是一个同步队列,不存储信息,消费一个才生产下一个。非公平锁,每次‘AAA’线程 put 一个信息,只有当‘BBB’线程 take 后‘AAA’线程才能继续下一次的 put 操作。 11. 传统生产者消费者问题 ...
3. **同步队列处理**:为了解决上述问题,设计了一个同步队列(SynchronousQueue),将Class文件生成后的Apache Camel相关操作通知到其他线程执行。SynchronousQueue是一种特殊的阻塞队列,它的put和take操作是互斥...
- SynchronousQueue:同步队列,不允许存储元素,仅作为线程间传递数据的媒介。 - BlockingDeque:双端阻塞队列,支持从两端进行插入和删除操作。 三、ConcurrentMap(并发映射) ConcurrentMap是Java并发包中的一...
3. **SynchronousQueue**:一个不存储元素的阻塞队列,每个插入操作必须等到另一个删除操作对应接收,适合单生产者单消费者场景。 4. **PriorityBlockingQueue**:支持优先级的无界阻塞队列,按照自然顺序或自定义...
标题《2004_DISC_dual_DS.pdf》和描述“SynchronousQueue 底层算法相关实现论文”暗示了文档与Java并发编程中的非阻塞同步队列实现及其底层算法相关。从提供的部分内容,我们可以看出,文档讨论了线性化的经典理论...
3. **SynchronousQueue**:一个特殊的阻塞队列,它不存储元素,每个put操作必须等待一个take操作,反之亦然。SynchronousQueue提供了公平锁和非公平锁的选项,适用于需要精确控制线程间通信的场景。 以下是一些常用...
- `SynchronousQueue`: 同步队列,不存储任务,任务间直接传递,线程安全。 ### 5. 拒绝策略 当线程池无法接受新任务时,可以通过实现`RejectedExecutionHandler`接口自定义拒绝策略: - `AbortPolicy`:默认策略,...