`

Java并发包中的同步队列SynchronousQueue实现原理

阅读更多

介绍

Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

实现原理

同步队列的实现方法有许多:

阻塞算法实现

阻塞算法实现通常在内部采用一个锁来保证多个线程中的put()和take()方法是串行执行的。采用锁的开销是比较大的,还会存在一种情况是线程A持有线程B需要的锁,B必须一直等待A释放锁,即使A可能一段时间内因为B的优先级比较高而得不到时间片运行。所以在高性能的应用中我们常常希望规避锁的使用。

  1. publicclassNativeSynchronousQueue<E>{
  2. boolean putting =false;
  3. E item =null;
  4. publicsynchronized E take()throwsInterruptedException{
  5. while(item ==null)
  6. wait();
  7. E e = item;
  8. item =null;
  9. notifyAll();
  10. return e;
  11. }
  12. publicsynchronizedvoid put(E e)throwsInterruptedException{
  13. if(e==null)return;
  14. while(putting)
  15. wait();
  16. putting =true;
  17. item = e;
  18. notifyAll();
  19. while(item!=null)
  20. wait();
  21. putting =false;
  22. notifyAll();
  23. }
  24. }

信号量实现

经典同步队列实现采用了三个信号量,代码很简单,比较容易理解:

  1. publicclassSemaphoreSynchronousQueue<E>{
  2. E item =null;
  3. Semaphore sync =newSemaphore(0);
  4. Semaphore send =newSemaphore(1);
  5. Semaphore recv =newSemaphore(0);
  6. public E take()throwsInterruptedException{
  7. recv.acquire();
  8. E x = item;
  9. sync.release();
  10. send.release();
  11. return x;
  12. }
  13. publicvoid put (E x)throwsInterruptedException{
  14. send.acquire();
  15. item = x;
  16. recv.release();
  17. sync.acquire();
  18. }
  19. }

在多核机器上,上面方法的同步代价仍然较高,操作系统调度器需要上千个时间片来阻塞或唤醒线程,而上面的实现即使在生产者put()时已经有一个消费者在等待的情况下,阻塞和唤醒的调用仍然需要。

Java 5实现

  1. publicclassJava5SynchronousQueue<E>{
  2. ReentrantLock qlock =newReentrantLock();
  3. Queue waitingProducers =newQueue();
  4. Queue waitingConsumers =newQueue();
  5. staticclassNodeextendsAbstractQueuedSynchronizer{
  6. E item;
  7. Nodenext;
  8. Node(Object x){ item = x;}
  9. void waitForTake(){/* (uses AQS) */}
  10. E waitForPut(){/* (uses AQS) */}
  11. }
  12. public E take(){
  13. Node node;
  14. boolean mustWait;
  15. qlock.lock();
  16. node = waitingProducers.pop();
  17. if(mustWait =(node ==null))
  18. node = waitingConsumers.push(null);
  19. qlock.unlock();
  20. if(mustWait)
  21. return node.waitForPut();
  22. else
  23. return node.item;
  24. }
  25. publicvoid put(E e){
  26. Node node;
  27. boolean mustWait;
  28. qlock.lock();
  29. node = waitingConsumers.pop();
  30. if(mustWait =(node ==null))
  31. node = waitingProducers.push(e);
  32. qlock.unlock();
  33. if(mustWait)
  34. node.waitForTake();
  35. else
  36. node.item = e;
  37. }
  38. }

Java 5的实现相对来说做了一些优化,只使用了一个锁,使用队列代替信号量也可以允许发布者直接发布数据,而不是要首先从阻塞在信号量处被唤醒。

Java6实现

Java 6的SynchronousQueue的实现采用了一种性能更好的无锁算法 – 扩展的“Dual stack and Dual queue”算法。性能比Java5的实现有较大提升。竞争机制支持公平和非公平两种:非公平竞争模式使用的数据结构是后进先出栈(Lifo Stack);公平竞争模式则使用先进先出队列(Fifo Queue),性能上两者是相当的,一般情况下,Fifo通常可以支持更大的吞吐量,但Lifo可以更大程度的保持线程的本地化。

代码实现里的Dual Queue或Stack内部是用链表(LinkedList)来实现的,其节点状态为以下三种情况:

  1. 持有数据 - put()方法的元素
  2. 持有请求 - take()方法

这个算法的特点就是任何操作都可以根据节点的状态判断执行,而不需要用到锁。

其核心接口是Transfer,生产者的put或消费者的take都使用这个接口,根据第一个参数来区别是入列(栈)还是出列(栈)。

  1. /**
  2. * Shared internal API for dual stacks and queues.
  3. */
  4. staticabstractclassTransferer{
  5. /**
  6. * Performs a put or take.
  7. *
  8. * @param e if non-null, the item to be handed to a consumer;
  9. * if null, requests that transfer return an item
  10. * offered by producer.
  11. * @param timed if this operation should timeout
  12. * @param nanos the timeout, in nanoseconds
  13. * @return if non-null, the item provided or received; if null,
  14. * the operation failed due to timeout or interrupt --
  15. * the caller can distinguish which of these occurred
  16. * by checking Thread.interrupted.
  17. */
  18. abstractObject transfer(Object e,boolean timed,long nanos);
  19. }

TransferQueue实现如下(摘自Java 6源代码),入列和出列都基于Spin和CAS方法:

  1. /**
  2. * Puts or takes an item.
  3. */
  4. Object transfer(Object e,boolean timed,long nanos){
  5. /* Basic algorithm is to loop trying to take either of
  6. * two actions:
  7. *
  8. * 1. If queue apparently empty or holding same-mode nodes,
  9. * try to add node to queue of waiters, wait to be
  10. * fulfilled (or cancelled) and return matching item.
  11. *
  12. * 2. If queue apparently contains waiting items, and this
  13. * call is of complementary mode, try to fulfill by CAS'ing
  14. * item field of waiting node and dequeuing it, and then
  15. * returning matching item.
  16. *
  17. * In each case, along the way, check for and try to help
  18. * advance head and tail on behalf of other stalled/slow
  19. * threads.
  20. *
  21. * The loop starts off with a null check guarding against
  22. * seeing uninitialized head or tail values. This never
  23. * happens in current SynchronousQueue, but could if
  24. * callers held non-volatile/final ref to the
  25. * transferer. The check is here anyway because it places
  26. * null checks at top of loop, which is usually faster
  27. * than having them implicitly interspersed.
  28. */
  29. QNode s =null;// constructed/reused as needed
  30. boolean isData =(e !=null);
  31. for(;;){
  32. QNode t = tail;
  33. QNode h = head;
  34. if(t ==null|| h ==null)// saw uninitialized value
  35. continue;// spin
  36. if(h == t || t.isData == isData){// empty or same-mode
  37. QNode tn = t.next;
  38. if(t != tail)// inconsistent read
  39. continue;
  40. if(tn !=null){// lagging tail
  41. advanceTail(t, tn);
  42. continue;
  43. }
  44. if(timed && nanos <=0)// can't wait
  45. returnnull;
  46. if(s ==null)
  47. s =newQNode(e, isData);
  48. if(!t.casNext(null, s))// failed to link in
  49. continue;
  50. advanceTail(t, s);// swing tail and wait
  51. Object x = awaitFulfill(s, e, timed, nanos);
  52. if(x == s){// wait was cancelled
  53. clean(t, s);
  54. returnnull;
  55. }
  56. if(!s.isOffList()){// not already unlinked
  57. advanceHead(t, s);// unlink if head
  58. if(x !=null)// and forget fields
  59. s.item = s;
  60. s.waiter =null;
  61. }
  62. return(x !=null)? x : e;
  63. }else{// complementary-mode
  64. QNode m = h.next;// node to fulfill
  65. if(t != tail || m ==null|| h != head)
  66. continue;// inconsistent read
  67. Object x = m.item;
  68. if(isData ==(x !=null)||// m already fulfilled
  69. x == m ||// m cancelled
  70. !m.casItem(x, e)){// lost CAS
  71. advanceHead(h, m);// dequeue and retry
  72. continue;
  73. }
  74. advanceHead(h, m);// successfully fulfilled
  75. LockSupport.unpark(m.waiter);
  76. return(x !=null)? x : e;
  77. }
  78. }
  79. }

 

http://blog.csdn.net/cloudeagle_bupt/article/details/9066115

分享到:
评论

相关推荐

    java 同步器SynchronousQueue详解及实例

    2. 定时生产者消费者问题:使用 SynchronousQueue 可以实现定时生产者消费者问题,生产者将元素放入队列中,并指定超时时间,而消费者则从队列中取出元素。 SynchronousQueue 的使用方法: 生产者: ```java ...

    SynchronousQueue实现原理.pdf

    转载的一篇博客资源

    java并发包资源

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航 映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 LinkedBlockingDeque,并发 Map(映射) ConcurrentMap, 并发导航映射 ConcurrentNavigableMap,交换机 Exchanger, 信号量 Semaphore,执行器...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    详解Java阻塞队列(BlockingQueue)的实现原理

    "详解Java阻塞队列(BlockingQueue)的实现原理" Java阻塞队列(BlockingQueue)是Java.util.concurrent包下重要的数据结构,提供了线程安全的队列访问方式。BlockingQueue的实现原理主要是基于四组不同的方法用于...

    java.util.concurrent 实现线程池队列

    在Java中,`java.util.concurrent.ExecutorService` 接口代表了一个线程池服务,而`ThreadPoolExecutor` 是它的具体实现,我们可以自定义线程池的核心参数,如核心线程数、最大线程数、线程存活时间、线程队列等。...

    Java中的阻塞队列详细介绍

    Java中的阻塞队列是一种特殊的队列数据结构,它在多线程环境下广泛应用于生产者-消费者模式。阻塞队列的主要特点在于当队列为空时,试图从中取出元素的消费者线程会被阻塞,直到有其他生产者线程添加元素;同样,当...

    14-阻塞队列BlockingQueue实战及其原理分析二.pdf

    BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一个元素,如果队列满了,就会抛出异常。 2. offer(E e):添加一个元素,如果队列满了,返回false。 3. offer(E...

    详解java中的阻塞队列

    Java中提供了7种阻塞队列,分别是ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue、LinkedBlockingDeque等。 BlockingQueue接口是阻塞队列...

    java 中 阻塞队列BlockingQueue详解及实例

    Java中的阻塞队列BlockingQueue是一种并发编程中常用的工具,它实现了线程间的同步和通信。阻塞队列的核心特性在于当队列为空时,尝试获取元素的线程会被阻塞,直到其他线程添加元素;当队列满时,尝试添加元素的...

    线程池java写的代码

    - `SynchronousQueue`: 同步队列,不存储任务,任务间直接传递,线程安全。 ### 5. 拒绝策略 当线程池无法接受新任务时,可以通过实现`RejectedExecutionHandler`接口自定义拒绝策略: - `AbortPolicy`:默认策略,...

    java线程大总结.pdf

    在Java中,`Semaphore`类提供了一种机制,可以控制同时访问特定资源的线程数量。然而,它并不保证线程安全,也就是说,开发者需要额外处理线程安全问题,确保在并发环境中正确地访问资源。 接着,Java 5引入了阻塞...

    java队列之queue用法实例分析

    实现阻塞接口的Queue包括java.util.concurrent中的BlockingQueue接口和五个阻塞队列类。BlockingQueue接口是一个带有一点扭曲的FIFO数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者...

    Thread基础知识点笔记总结

    CAS 是一种基于硬件级别的指令实现的同步原语,用于.compareAndSwap,是 Java 并发包 java.utile.concurrent 中许多同步类的基础。CAS 的工作原理是首先检查要操作的变量是否是期望的值,如果是,则进行操作,否则不...

    Java线程池,正式上线运行的源码,分享欢迎使用并提意见

    根据不同的应用场景,可以选择不同类型的队列,如无界队列(LinkedBlockingQueue)、有界队列(ArrayBlockingQueue)或同步队列(SynchronousQueue)。 线程池的工作流程大致如下: 1. 当一个任务被提交到线程池时...

    java线程池工作队列饱和策略代码示例

    在本文中,我们将详细介绍Java线程池工作队列饱和策略的概念、原理和实现。 线程池(Thread Pool)是并行执行任务收集的实用工具。随着CPU引入适合于应用程序并行化的多核体系结构,线程池的作用正日益显现。通过...

    工作队列示例

    在Java中,工作队列的一个常见实现是Java并发库中的`java.util.concurrent`包,特别是`ExecutorService`接口和`ThreadPoolExecutor`类。`ExecutorService`提供了一种管理线程池和调度任务的方式,而`...

Global site tag (gtag.js) - Google Analytics