介绍
Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。
不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。
SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
实现原理
同步队列的实现方法有许多:
阻塞算法实现
阻塞算法实现通常在内部采用一个锁来保证多个线程中的put()和take()方法是串行执行的。采用锁的开销是比较大的,还会存在一种情况是线程A持有线程B需要的锁,B必须一直等待A释放锁,即使A可能一段时间内因为B的优先级比较高而得不到时间片运行。所以在高性能的应用中我们常常希望规避锁的使用。
publicclassNativeSynchronousQueue<E>{
boolean putting =false;
E item =null;
publicsynchronized E take()throwsInterruptedException{
while(item ==null)
wait();
E e = item;
item =null;
notifyAll();
return e;
}
publicsynchronizedvoid put(E e)throwsInterruptedException{
if(e==null)return;
while(putting)
wait();
putting =true;
item = e;
notifyAll();
while(item!=null)
wait();
putting =false;
notifyAll();
}
}
信号量实现
经典同步队列实现采用了三个信号量,代码很简单,比较容易理解:
publicclassSemaphoreSynchronousQueue<E>{
E item =null;
Semaphore sync =newSemaphore(0);
Semaphore send =newSemaphore(1);
Semaphore recv =newSemaphore(0);
public E take()throwsInterruptedException{
recv.acquire();
E x = item;
sync.release();
send.release();
return x;
}
publicvoid put (E x)throwsInterruptedException{
send.acquire();
item = x;
recv.release();
sync.acquire();
}
}
在多核机器上,上面方法的同步代价仍然较高,操作系统调度器需要上千个时间片来阻塞或唤醒线程,而上面的实现即使在生产者put()时已经有一个消费者在等待的情况下,阻塞和唤醒的调用仍然需要。
Java 5实现
publicclassJava5SynchronousQueue<E>{
ReentrantLock qlock =newReentrantLock();
Queue waitingProducers =newQueue();
Queue waitingConsumers =newQueue();
staticclassNodeextendsAbstractQueuedSynchronizer{
E item;
Nodenext;
Node(Object x){ item = x;}
void waitForTake(){/* (uses AQS) */}
E waitForPut(){/* (uses AQS) */}
}
public E take(){
Node node;
boolean mustWait;
qlock.lock();
node = waitingProducers.pop();
if(mustWait =(node ==null))
node = waitingConsumers.push(null);
qlock.unlock();
if(mustWait)
return node.waitForPut();
else
return node.item;
}
publicvoid put(E e){
Node node;
boolean mustWait;
qlock.lock();
node = waitingConsumers.pop();
if(mustWait =(node ==null))
node = waitingProducers.push(e);
qlock.unlock();
if(mustWait)
node.waitForTake();
else
node.item = e;
}
}
Java 5的实现相对来说做了一些优化,只使用了一个锁,使用队列代替信号量也可以允许发布者直接发布数据,而不是要首先从阻塞在信号量处被唤醒。
Java6实现
Java 6的SynchronousQueue的实现采用了一种性能更好的无锁算法 – 扩展的“Dual stack and Dual queue”算法。性能比Java5的实现有较大提升。竞争机制支持公平和非公平两种:非公平竞争模式使用的数据结构是后进先出栈(Lifo Stack);公平竞争模式则使用先进先出队列(Fifo Queue),性能上两者是相当的,一般情况下,Fifo通常可以支持更大的吞吐量,但Lifo可以更大程度的保持线程的本地化。
代码实现里的Dual Queue或Stack内部是用链表(LinkedList)来实现的,其节点状态为以下三种情况:
- 持有数据 - put()方法的元素
- 持有请求 - take()方法
- 空
这个算法的特点就是任何操作都可以根据节点的状态判断执行,而不需要用到锁。
其核心接口是Transfer,生产者的put或消费者的take都使用这个接口,根据第一个参数来区别是入列(栈)还是出列(栈)。
/**
* Shared internal API for dual stacks and queues.
*/
staticabstractclassTransferer{
/**
* Performs a put or take.
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanoseconds
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstractObject transfer(Object e,boolean timed,long nanos);
}
TransferQueue实现如下(摘自Java 6源代码),入列和出列都基于Spin和CAS方法:
/**
* Puts or takes an item.
*/
Object transfer(Object e,boolean timed,long nanos){
/* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for and try to help
* advance head and tail on behalf of other stalled/slow
* threads.
*
* The loop starts off with a null check guarding against
* seeing uninitialized head or tail values. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicitly interspersed.
*/
QNode s =null;// constructed/reused as needed
boolean isData =(e !=null);
for(;;){
QNode t = tail;
QNode h = head;
if(t ==null|| h ==null)// saw uninitialized value
continue;// spin
if(h == t || t.isData == isData){// empty or same-mode
QNode tn = t.next;
if(t != tail)// inconsistent read
continue;
if(tn !=null){// lagging tail
advanceTail(t, tn);
continue;
}
if(timed && nanos <=0)// can't wait
returnnull;
if(s ==null)
s =newQNode(e, isData);
if(!t.casNext(null, s))// failed to link in
continue;
advanceTail(t, s);// swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if(x == s){// wait was cancelled
clean(t, s);
returnnull;
}
if(!s.isOffList()){// not already unlinked
advanceHead(t, s);// unlink if head
if(x !=null)// and forget fields
s.item = s;
s.waiter =null;
}
return(x !=null)? x : e;
}else{// complementary-mode
QNode m = h.next;// node to fulfill
if(t != tail || m ==null|| h != head)
continue;// inconsistent read
Object x = m.item;
if(isData ==(x !=null)||// m already fulfilled
x == m ||// m cancelled
!m.casItem(x, e)){// lost CAS
advanceHead(h, m);// dequeue and retry
continue;
}
advanceHead(h, m);// successfully fulfilled
LockSupport.unpark(m.waiter);
return(x !=null)? x : e;
}
}
}
http://blog.csdn.net/cloudeagle_bupt/article/details/9066115
相关推荐
2. 定时生产者消费者问题:使用 SynchronousQueue 可以实现定时生产者消费者问题,生产者将元素放入队列中,并指定超时时间,而消费者则从队列中取出元素。 SynchronousQueue 的使用方法: 生产者: ```java ...
转载的一篇博客资源
7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航 映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 LinkedBlockingDeque,并发 Map(映射) ConcurrentMap, 并发导航映射 ConcurrentNavigableMap,交换机 Exchanger, 信号量 Semaphore,执行器...
7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...
"详解Java阻塞队列(BlockingQueue)的实现原理" Java阻塞队列(BlockingQueue)是Java.util.concurrent包下重要的数据结构,提供了线程安全的队列访问方式。BlockingQueue的实现原理主要是基于四组不同的方法用于...
在Java中,`java.util.concurrent.ExecutorService` 接口代表了一个线程池服务,而`ThreadPoolExecutor` 是它的具体实现,我们可以自定义线程池的核心参数,如核心线程数、最大线程数、线程存活时间、线程队列等。...
Java中的阻塞队列是一种特殊的队列数据结构,它在多线程环境下广泛应用于生产者-消费者模式。阻塞队列的主要特点在于当队列为空时,试图从中取出元素的消费者线程会被阻塞,直到有其他生产者线程添加元素;同样,当...
BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一个元素,如果队列满了,就会抛出异常。 2. offer(E e):添加一个元素,如果队列满了,返回false。 3. offer(E...
Java中提供了7种阻塞队列,分别是ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue、LinkedBlockingDeque等。 BlockingQueue接口是阻塞队列...
Java中的阻塞队列BlockingQueue是一种并发编程中常用的工具,它实现了线程间的同步和通信。阻塞队列的核心特性在于当队列为空时,尝试获取元素的线程会被阻塞,直到其他线程添加元素;当队列满时,尝试添加元素的...
- `SynchronousQueue`: 同步队列,不存储任务,任务间直接传递,线程安全。 ### 5. 拒绝策略 当线程池无法接受新任务时,可以通过实现`RejectedExecutionHandler`接口自定义拒绝策略: - `AbortPolicy`:默认策略,...
在Java中,`Semaphore`类提供了一种机制,可以控制同时访问特定资源的线程数量。然而,它并不保证线程安全,也就是说,开发者需要额外处理线程安全问题,确保在并发环境中正确地访问资源。 接着,Java 5引入了阻塞...
实现阻塞接口的Queue包括java.util.concurrent中的BlockingQueue接口和五个阻塞队列类。BlockingQueue接口是一个带有一点扭曲的FIFO数据结构。不是立即从队列中添加或者删除元素,线程执行操作阻塞,直到有空间或者...
CAS 是一种基于硬件级别的指令实现的同步原语,用于.compareAndSwap,是 Java 并发包 java.utile.concurrent 中许多同步类的基础。CAS 的工作原理是首先检查要操作的变量是否是期望的值,如果是,则进行操作,否则不...
根据不同的应用场景,可以选择不同类型的队列,如无界队列(LinkedBlockingQueue)、有界队列(ArrayBlockingQueue)或同步队列(SynchronousQueue)。 线程池的工作流程大致如下: 1. 当一个任务被提交到线程池时...
在本文中,我们将详细介绍Java线程池工作队列饱和策略的概念、原理和实现。 线程池(Thread Pool)是并行执行任务收集的实用工具。随着CPU引入适合于应用程序并行化的多核体系结构,线程池的作用正日益显现。通过...
在Java中,工作队列的一个常见实现是Java并发库中的`java.util.concurrent`包,特别是`ExecutorService`接口和`ThreadPoolExecutor`类。`ExecutorService`提供了一种管理线程池和调度任务的方式,而`...