



Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。







  1. publicclassNativeSynchronousQueue<E>{
  2. boolean putting =false;
  3. E item =null;
  4. publicsynchronized E take()throwsInterruptedException{
  5. while(item ==null)
  6. wait();
  7. E e = item;
  8. item =null;
  9. notifyAll();
  10. return e;
  11. }
  12. publicsynchronizedvoid put(E e)throwsInterruptedException{
  13. if(e==null)return;
  14. while(putting)
  15. wait();
  16. putting =true;
  17. item = e;
  18. notifyAll();
  19. while(item!=null)
  20. wait();
  21. putting =false;
  22. notifyAll();
  23. }
  24. }



  1. publicclassSemaphoreSynchronousQueue<E>{
  2. E item =null;
  3. Semaphore sync =newSemaphore(0);
  4. Semaphore send =newSemaphore(1);
  5. Semaphore recv =newSemaphore(0);
  6. public E take()throwsInterruptedException{
  7. recv.acquire();
  8. E x = item;
  9. sync.release();
  10. send.release();
  11. return x;
  12. }
  13. publicvoid put (E x)throwsInterruptedException{
  14. send.acquire();
  15. item = x;
  16. recv.release();
  17. sync.acquire();
  18. }
  19. }


Java 5实现

  1. publicclassJava5SynchronousQueue<E>{
  2. ReentrantLock qlock =newReentrantLock();
  3. Queue waitingProducers =newQueue();
  4. Queue waitingConsumers =newQueue();
  5. staticclassNodeextendsAbstractQueuedSynchronizer{
  6. E item;
  7. Nodenext;
  8. Node(Object x){ item = x;}
  9. void waitForTake(){/* (uses AQS) */}
  10. E waitForPut(){/* (uses AQS) */}
  11. }
  12. public E take(){
  13. Node node;
  14. boolean mustWait;
  15. qlock.lock();
  16. node = waitingProducers.pop();
  17. if(mustWait =(node ==null))
  18. node = waitingConsumers.push(null);
  19. qlock.unlock();
  20. if(mustWait)
  21. return node.waitForPut();
  22. else
  23. return node.item;
  24. }
  25. publicvoid put(E e){
  26. Node node;
  27. boolean mustWait;
  28. qlock.lock();
  29. node = waitingConsumers.pop();
  30. if(mustWait =(node ==null))
  31. node = waitingProducers.push(e);
  32. qlock.unlock();
  33. if(mustWait)
  34. node.waitForTake();
  35. else
  36. node.item = e;
  37. }
  38. }

Java 5的实现相对来说做了一些优化,只使用了一个锁,使用队列代替信号量也可以允许发布者直接发布数据,而不是要首先从阻塞在信号量处被唤醒。


Java 6的SynchronousQueue的实现采用了一种性能更好的无锁算法 – 扩展的“Dual stack and Dual queue”算法。性能比Java5的实现有较大提升。竞争机制支持公平和非公平两种:非公平竞争模式使用的数据结构是后进先出栈(Lifo Stack);公平竞争模式则使用先进先出队列(Fifo Queue),性能上两者是相当的,一般情况下,Fifo通常可以支持更大的吞吐量,但Lifo可以更大程度的保持线程的本地化。

代码实现里的Dual Queue或Stack内部是用链表(LinkedList)来实现的,其节点状态为以下三种情况:

  1. 持有数据 - put()方法的元素
  2. 持有请求 - take()方法



  1. /**
  2. * Shared internal API for dual stacks and queues.
  3. */
  4. staticabstractclassTransferer{
  5. /**
  6. * Performs a put or take.
  7. *
  8. * @param e if non-null, the item to be handed to a consumer;
  9. * if null, requests that transfer return an item
  10. * offered by producer.
  11. * @param timed if this operation should timeout
  12. * @param nanos the timeout, in nanoseconds
  13. * @return if non-null, the item provided or received; if null,
  14. * the operation failed due to timeout or interrupt --
  15. * the caller can distinguish which of these occurred
  16. * by checking Thread.interrupted.
  17. */
  18. abstractObject transfer(Object e,boolean timed,long nanos);
  19. }

TransferQueue实现如下(摘自Java 6源代码),入列和出列都基于Spin和CAS方法:

  1. /**
  2. * Puts or takes an item.
  3. */
  4. Object transfer(Object e,boolean timed,long nanos){
  5. /* Basic algorithm is to loop trying to take either of
  6. * two actions:
  7. *
  8. * 1. If queue apparently empty or holding same-mode nodes,
  9. * try to add node to queue of waiters, wait to be
  10. * fulfilled (or cancelled) and return matching item.
  11. *
  12. * 2. If queue apparently contains waiting items, and this
  13. * call is of complementary mode, try to fulfill by CAS'ing
  14. * item field of waiting node and dequeuing it, and then
  15. * returning matching item.
  16. *
  17. * In each case, along the way, check for and try to help
  18. * advance head and tail on behalf of other stalled/slow
  19. * threads.
  20. *
  21. * The loop starts off with a null check guarding against
  22. * seeing uninitialized head or tail values. This never
  23. * happens in current SynchronousQueue, but could if
  24. * callers held non-volatile/final ref to the
  25. * transferer. The check is here anyway because it places
  26. * null checks at top of loop, which is usually faster
  27. * than having them implicitly interspersed.
  28. */
  29. QNode s =null;// constructed/reused as needed
  30. boolean isData =(e !=null);
  31. for(;;){
  32. QNode t = tail;
  33. QNode h = head;
  34. if(t ==null|| h ==null)// saw uninitialized value
  35. continue;// spin
  36. if(h == t || t.isData == isData){// empty or same-mode
  37. QNode tn = t.next;
  38. if(t != tail)// inconsistent read
  39. continue;
  40. if(tn !=null){// lagging tail
  41. advanceTail(t, tn);
  42. continue;
  43. }
  44. if(timed && nanos <=0)// can't wait
  45. returnnull;
  46. if(s ==null)
  47. s =newQNode(e, isData);
  48. if(!t.casNext(null, s))// failed to link in
  49. continue;
  50. advanceTail(t, s);// swing tail and wait
  51. Object x = awaitFulfill(s, e, timed, nanos);
  52. if(x == s){// wait was cancelled
  53. clean(t, s);
  54. returnnull;
  55. }
  56. if(!s.isOffList()){// not already unlinked
  57. advanceHead(t, s);// unlink if head
  58. if(x !=null)// and forget fields
  59. s.item = s;
  60. s.waiter =null;
  61. }
  62. return(x !=null)? x : e;
  63. }else{// complementary-mode
  64. QNode m = h.next;// node to fulfill
  65. if(t != tail || m ==null|| h != head)
  66. continue;// inconsistent read
  67. Object x = m.item;
  68. if(isData ==(x !=null)||// m already fulfilled
  69. x == m ||// m cancelled
  70. !m.casItem(x, e)){// lost CAS
  71. advanceHead(h, m);// dequeue and retry
  72. continue;
  73. }
  74. advanceHead(h, m);// successfully fulfilled
  75. LockSupport.unpark(m.waiter);
  76. return(x !=null)? x : e;
  77. }
  78. }
  79. }





