介绍
Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。
不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。
SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
实现原理
同步队列的实现方法有许多:
阻塞算法实现
阻塞算法实现通常在内部采用一个锁来保证多个线程中的put()和take()方法是串行执行的。采用锁的开销是比较大的,还会存在一种情况是线程A持有线程B需要的锁,B必须一直等待A释放锁,即使A可能一段时间内因为B的优先级比较高而得不到时间片运行。所以在高性能的应用中我们常常希望规避锁的使用。
publicclassNativeSynchronousQueue<E>{
boolean putting =false;
E item =null;
publicsynchronized E take()throwsInterruptedException{
while(item ==null)
wait();
E e = item;
item =null;
notifyAll();
return e;
}
publicsynchronizedvoid put(E e)throwsInterruptedException{
if(e==null)return;
while(putting)
wait();
putting =true;
item = e;
notifyAll();
while(item!=null)
wait();
putting =false;
notifyAll();
}
}
信号量实现
经典同步队列实现采用了三个信号量,代码很简单,比较容易理解:
publicclassSemaphoreSynchronousQueue<E>{
E item =null;
Semaphore sync =newSemaphore(0);
Semaphore send =newSemaphore(1);
Semaphore recv =newSemaphore(0);
public E take()throwsInterruptedException{
recv.acquire();
E x = item;
sync.release();
send.release();
return x;
}
publicvoid put (E x)throwsInterruptedException{
send.acquire();
item = x;
recv.release();
sync.acquire();
}
}
在多核机器上,上面方法的同步代价仍然较高,操作系统调度器需要上千个时间片来阻塞或唤醒线程,而上面的实现即使在生产者put()时已经有一个消费者在等待的情况下,阻塞和唤醒的调用仍然需要。
Java 5实现
publicclassJava5SynchronousQueue<E>{
ReentrantLock qlock =newReentrantLock();
Queue waitingProducers =newQueue();
Queue waitingConsumers =newQueue();
staticclassNodeextendsAbstractQueuedSynchronizer{
E item;
Nodenext;
Node(Object x){ item = x;}
void waitForTake(){/* (uses AQS) */}
E waitForPut(){/* (uses AQS) */}
}
public E take(){
Node node;
boolean mustWait;
qlock.lock();
node = waitingProducers.pop();
if(mustWait =(node ==null))
node = waitingConsumers.push(null);
qlock.unlock();
if(mustWait)
return node.waitForPut();
else
return node.item;
}
publicvoid put(E e){
Node node;
boolean mustWait;
qlock.lock();
node = waitingConsumers.pop();
if(mustWait =(node ==null))
node = waitingProducers.push(e);
qlock.unlock();
if(mustWait)
node.waitForTake();
else
node.item = e;
}
}
Java 5的实现相对来说做了一些优化,只使用了一个锁,使用队列代替信号量也可以允许发布者直接发布数据,而不是要首先从阻塞在信号量处被唤醒。
Java6实现
Java 6的SynchronousQueue的实现采用了一种性能更好的无锁算法 – 扩展的“Dual stack and Dual queue”算法。性能比Java5的实现有较大提升。竞争机制支持公平和非公平两种:非公平竞争模式使用的数据结构是后进先出栈(Lifo Stack);公平竞争模式则使用先进先出队列(Fifo Queue),性能上两者是相当的,一般情况下,Fifo通常可以支持更大的吞吐量,但Lifo可以更大程度的保持线程的本地化。
代码实现里的Dual Queue或Stack内部是用链表(LinkedList)来实现的,其节点状态为以下三种情况:
- 持有数据 - put()方法的元素
- 持有请求 - take()方法
- 空
这个算法的特点就是任何操作都可以根据节点的状态判断执行,而不需要用到锁。
其核心接口是Transfer,生产者的put或消费者的take都使用这个接口,根据第一个参数来区别是入列(栈)还是出列(栈)。
/**
* Shared internal API for dual stacks and queues.
*/
staticabstractclassTransferer{
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstractObject transfer(Object e,boolean timed,long nanos);
}
TransferQueue实现如下(摘自Java 6源代码),入列和出列都基于Spin和CAS方法:
/**
* Puts or takes an item.
*/
Object transfer(Object e,boolean timed,long nanos){
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s =null;// constructed/reused as needed
boolean isData =(e !=null);
for(;;){
QNode t = tail;
QNode h = head;
if(t ==null|| h ==null)// saw uninitialized value
continue;// spin
if(h == t || t.isData == isData){// empty or same-mode
QNode tn = t.next;
if(t != tail)// inconsistent read
continue;
if(tn !=null){// lagging tail
advanceTail(t, tn);
continue;
}
if(timed && nanos <=0)// can't wait
returnnull;
if(s ==null)
s =newQNode(e, isData);
if(!t.casNext(null, s))// failed to link in
continue;
advanceTail(t, s);// swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if(x == s){// wait was cancelled
clean(t, s);
returnnull;
}
if(!s.isOffList()){// not already unlinked
advanceHead(t, s);// unlink if head
if(x !=null)// and forget fields
s.item = s;
s.waiter =null;
}
return(x !=null)? x : e;
}else{// complementary-mode
QNode m = h.next;// node to fulfill
if(t != tail || m ==null|| h != head)
continue;// inconsistent read
Object x = m.item;
if(isData ==(x !=null)||// m already fulfilled
x == m ||// m cancelled
!m.casItem(x, e)){// lost CAS
advanceHead(h, m);// dequeue and retry
continue;
}
advanceHead(h, m);// successfully fulfilled
LockSupport.unpark(m.waiter);
return(x !=null)? x : e;
}
}
}
http://blog.csdn.net/cloudeagle_bupt/article/details/9066115
相关推荐
SynchronousQueue是Java中的一个阻塞队列实现,它在Java并发编程中扮演着重要的角色。SynchronousQueue特别之处在于它没有进行数据存储的能力,也就是说它的容量为0。其核心功能是用于线程之间的直接传递数据,实现...
2. 定时生产者消费者问题:使用 SynchronousQueue 可以实现定时生产者消费者问题,生产者将元素放入队列中,并指定超时时间,而消费者则从队列中取出元素。 SynchronousQueue 的使用方法: 生产者: ```java ...
论文中详细介绍了这两种同步队列的设计原理、实现细节以及性能评估,并将其与Java SE 5.0中的`SynchronousQueue`类进行了对比。 #### 同步队列的重要性 同步队列作为一种特殊类型的队列,它在并发编程中扮演着重要...
7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航 映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
阻塞队列在Java并发包java.util.concurrent中提供了多种实现,如ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue和SynchronousQueue等,每种阻塞队列都根据其特性适用于不同的场景。...
同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 LinkedBlockingDeque,并发 Map(映射) ConcurrentMap, 并发导航映射 ConcurrentNavigableMap,交换机 Exchanger, 信号量 Semaphore,执行器...
7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
"详解Java阻塞队列(BlockingQueue)的实现原理" Java阻塞队列(BlockingQueue)是Java.util.concurrent包下重要的数据结构,提供了线程安全的队列访问方式。BlockingQueue的实现原理主要是基于四组不同的方法用于...
在Java中,`java.util.concurrent.ExecutorService` 接口代表了一个线程池服务,而`ThreadPoolExecutor` 是它的具体实现,我们可以自定义线程池的核心参数,如核心线程数、最大线程数、线程存活时间、线程队列等。...
在Java中,`Queue`接口定义了一系列队列的操作: - `boolean add(E e)`:添加一个元素到队列,若队列已满则抛出异常。 - `boolean offer(E e)`:尝试添加一个元素到队列,若队列已满则返回`false`。 - `E remove()`...
Java中的阻塞队列是一种特殊的队列数据结构,它在多线程环境下广泛应用于生产者-消费者模式。阻塞队列的主要特点在于当队列为空时,试图从中取出元素的消费者线程会被阻塞,直到有其他生产者线程添加元素;同样,当...
BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一个元素,如果队列满了,就会抛出异常。 2. offer(E e):添加一个元素,如果队列满了,返回false。 3. offer(E...
SynchronousQueue的这种特性使其在并发编程中有独特应用,比如用于线程间的任务传递,或者实现Fork/Join框架中的工作窃取算法。然而,由于其零容量的特性,如果有一个线程未能及时处理另一个线程的请求,可能导致...
Java中提供了7种阻塞队列,分别是ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue、LinkedBlockingDeque等。 BlockingQueue接口是阻塞队列...
在这个大总结中,我们将重点关注Java中的两种重要并发工具:信号量(Semaphore)和阻塞队列(BlockingQueue),以及相关的阻塞栈(BlockingDeque)。 首先,信号量是一个用于控制对共享资源访问的计数器。它允许...
在Java中,为了高效地处理并发问题,引入了一些高级数据结构,如信号量(Semaphore)和阻塞队列(BlockingQueue)。这些工具在并发编程中起到了关键作用,能够帮助开发者有效地控制对共享资源的访问。 信号量是一种...
Java中的阻塞队列BlockingQueue是一种并发编程中常用的工具,它实现了线程间的同步和通信。阻塞队列的核心特性在于当队列为空时,尝试获取元素的线程会被阻塞,直到其他线程添加元素;当队列满时,尝试添加元素的...
这种设计模式非常适合于实现生产者-消费者模型,即一个或多个线程(生产者)向队列添加元素,而另一个或多个线程(消费者)从队列中移除元素。 ##### 3.2 阻塞队列方法 `BlockingQueue` 提供了几种主要的方法来...