`
囧囧有神
  • 浏览: 206593 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

服务器端利器--双缓冲队列

阅读更多

传统队列是生产者线程和消费者线程从同一个队列中存取数据,必然需要互斥访 问,在互相同步等待中浪费了宝贵的时间,使队列吞吐量受影响。双缓冲队使用两个队列,将读写分离,一个队列专门用来读,另一个专门用来写,当读队列空或写 队列满时将两个队列互换。这里为了保证队列的读写顺序,当读队列为空且写队列不为空时候才允许两个队列互换。

经过测试性能较JDK自带的queue的确有不小提高。


测试是和JDK6中性能最高的阻塞Queue:java.util.concurrent.ArrayBlockingQueue做比较,这个队列是环形队列的实现方式,性能还算不错,不过我们的目标是没有最好,只有更好。
测试场景:
起若干个生产者线程,往Queue中放数据,起若干个消费者线程从queue中取数据,统计每个消费者线程取N个数据的平均时间。
数据如下:
场景1
生产者线程数:1
消费者线程数:1
Queue容量:5w
取元素个数:1000w
JDK ArrayBlockingQueue用时平均为:  5,302,938,177纳秒
双缓冲队列用时平均为:                      5,146,302,116纳秒
相差大概160毫秒

场景2:
生产者线程数:5
消费者线程数:4
Queue容量:5w
取元素个数:1000w
JDK ArrayBlockingQueue用时平均为:  32,824,744,868纳秒
双缓冲队列用时平均为:                      20,508,495,221纳秒
相差大概12.3秒

可见在生产者消费者都只有一个的时候存和取的同步冲突比较小,双缓冲队列
优势不是很大,当存取线程比较多的时候优势就很明显了。

 

队列主要方法如下:

/**
 * 
 * CircularDoubleBufferedQueue.java
 * 囧囧有神
 * @param <E>2010-6-12
 */
public class CircularDoubleBufferedQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
{
	private static final long serialVersionUID = 1L;
	private Logger logger =   Logger.getLogger(CircularDoubleBufferedQueue.class.getName());

    /** The queued items  */
    private final E[] itemsA;
    private final E[] itemsB;
	
	private ReentrantLock readLock, writeLock;
	private Condition notEmpty;
	private Condition notFull;
	private Condition awake;
	
	
    private E[] writeArray, readArray;
    private volatile int writeCount, readCount;
    private int writeArrayHP, writeArrayTP, readArrayHP, readArrayTP;
	
	
	public CircularDoubleBufferedQueue(int capacity)
	{
		if(capacity<=0)
		{
			throw new IllegalArgumentException("Queue initial capacity can't less than 0!");
		}
		
		itemsA = (E[])new Object[capacity];
		itemsB = (E[])new Object[capacity];

		readLock = new ReentrantLock();
		writeLock = new ReentrantLock();
		
		notEmpty = readLock.newCondition();
		notFull = writeLock.newCondition();
		awake = writeLock.newCondition();
		
		readArray = itemsA;
		writeArray = itemsB;
	}
	
	private void insert(E e)
	{
		writeArray[writeArrayTP] = e;
		++writeArrayTP;
		++writeCount;
	}
	
	private E extract()
	{
		E e = readArray[readArrayHP];
		readArray[readArrayHP] = null;
		++readArrayHP;
		--readCount;
		return e;
	}

	
	/**
	 *switch condition: 
	 *read queue is empty && write queue is not empty
	 * 
	 *Notice:This function can only be invoked after readLock is 
         * grabbed,or may cause dead lock
	 * @param timeout
	 * @param isInfinite: whether need to wait forever until some other
	 * thread awake it
	 * @return
	 * @throws InterruptedException
	 */
	private long queueSwitch(long timeout, boolean isInfinite) throws InterruptedException
	{
		writeLock.lock();
		try
		{
			if (writeCount <= 0)
			{
				logger.debug("Write Count:" + writeCount + ", Write Queue is empty, do not switch!");
				try
				{
					logger.debug("Queue is empty, need wait....");
					if(isInfinite && timeout<=0)
					{
						awake.await();
						return -1;
					}
					else
					{
						return awake.awaitNanos(timeout);
					}
				}
				catch (InterruptedException ie)
				{
					awake.signal();
					throw ie;
				}
			}
			else
			{
				E[] tmpArray = readArray;
				readArray = writeArray;
				writeArray = tmpArray;

				readCount = writeCount;
				readArrayHP = 0;
				readArrayTP = writeArrayTP;

				writeCount = 0;
				writeArrayHP = readArrayHP;
				writeArrayTP = 0;
				
				notFull.signal();
				logger.debug("Queue switch successfully!");
				return -1;
			}
		}
		finally
		{
			writeLock.unlock();
		}
	}

	public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
	{
		if(e == null)
		{
			throw new NullPointerException();
		}
		
		long nanoTime = unit.toNanos(timeout);
		writeLock.lockInterruptibly();
		try
		{
			for (;;)
			{
				if(writeCount < writeArray.length)
				{
					insert(e);
					if (writeCount == 1)
					{
						awake.signal();
					}
					return true;
				}
				
				//Time out
				if(nanoTime<=0)
				{
					logger.debug("offer wait time out!");
					return false;
				}
				//keep waiting
				try
				{
					logger.debug("Queue is full, need wait....");
					nanoTime = notFull.awaitNanos(nanoTime);
				}
				catch(InterruptedException ie)
				{
					notFull.signal();
					throw ie;
				}
			}
		}
		finally
		{
			writeLock.unlock();
		}
	}

	public E poll(long timeout, TimeUnit unit) throws InterruptedException
	{
		long nanoTime = unit.toNanos(timeout);
		readLock.lockInterruptibly();
		
		try
		{
			for(;;)
			{
				if(readCount>0)
				{
					return extract();
				}
				
				if(nanoTime<=0)
				{
					logger.debug("poll time out!");
					return null;
				}
				nanoTime = queueSwitch(nanoTime, false);
			}
		}
		finally
		{
			readLock.unlock();
		}
	}

}
 

 

 

附带队列类代码和测试类代码如下:

欢迎大家提意见!
Ps:测试时候要把queue类中debug关掉,否则打印debug日志会对queue性能有不小的影响。

分享到:
评论
32 楼 wuhuajun 2013-05-15  
whitesock 写道
java.util.concurrent.Exchanger


Exchanger交换 好像到两边条件都满足了才能发生交换 但是这个只要一方满了 就可以发生交换。
31 楼 moon198654 2012-09-11  
囧囧有神 写道
whitesock 写道
为什么和ArrayBlockingQueue比较?
LinkedBlockingQueue是双端队列,只要队列非空,读写互不影响

ArrayBlockingQueue性能比LinkedBLockingQueue高,
双端队列读写也需要互斥的。

ArrayBlockingQueue性能比LinkedBLockingQueue高,不知道楼主是怎么得出这个结论的,双端队列虽然需要互斥,但可以通过中间加一个搬运线程来减少同步次数,从而大大提升效率,根据我测试的效果是LinkedBLockingQueue效率要好一些。而且用LinkedBLockingQueue实现阻塞队列占用的空间理论是没有Array大的,因为基本不会出现两端都满的情况。源码没时间仔细去看,是凭直觉说的,希望楼主赐教。
另外楼主这个双缓冲我测试了下,确实比JDK自带的好一些, 佩服楼主的研究精神,八过要是楼主能研究下源码说一下原因就更完美啦
30 楼 AlwenS 2011-04-22  
想法不错,本质是的思想是锁分离,有点类似于ConcurrentHashmap 的 Segment lock.
并且在生产速率和消费速率相近时表现很不错,因为基本可以保证每次想交换时写队列都不为空。

不过我觉得优化可以更进一步:
当写队列满时,如果读队列空位较多,这时也可以进行交换。这样都可以保证生产者和消费者都免于条件锁堵塞,在消费速率比生产速率突然要快时有一定效果,能适应更多的场景。
29 楼 integergx 2010-09-13  
我测试了下,一边读(take),一边写(put)
大致是每毫秒2000左右的吞吐量
楼主方法跟LinkedBlockingList不相上下,可喜可贺啊!
但确实比ArrayBlockingList快很多。

不过从原理上看,楼主方法跟LinkedBlockingList双锁没有什么太大优势。
28 楼 whitesock 2010-06-24  
java.util.concurrent.Exchanger
27 楼 囧囧有神 2010-06-24  
whitesock 写道
线程数,CPU个数也是重要的参数。

昨天仔细看了下LinkedBlockingQueue,作者把他的方法称为双锁算法,
也是解决读写互斥问题,想法挺好的,和双缓冲队列比还省了个队列,不过
效率不大可能比双缓冲高。我想导致链表方式效率低的原因可能是每次进队出队都是new或者删除
链表节点,如果把链表做成循环队列方式效率应该不会比ArrrayBlockingQueue,我按照
LinkedBlockingQueue想法再改造下,看看效率能否趋近双缓冲。

JDK concurrent包中的队列都是同一作者写的,有意思的是每个队列实现思想都不一样,
ArrayBlockingQueue是用循环数组,LinkedBlockingQueue用所谓的双锁算法避免读写冲突,
LinkedBlockingDQueue用了读写互斥,难道双锁算法对没法应用在这个双向链表中,值得
研究下。
26 楼 whitesock 2010-06-24  
线程数,CPU个数也是重要的参数。
25 楼 trydofor 2010-06-24  
whitesock 写道
http://dennis-zane.iteye.com/blog/238431
这个测试如果把测试用的代码贴上,给出测试方式的话,也许可以评价一下,否则没有任何意义。


2008-09-08 的博客了,测试代码可能没了。
看意思,好像n个线程offer,统计出个时间。同样对poll也是。
不像是,2n个线程,n个offer,n个poll,同时进行的。
如果是读写同时进行的,时间的统计方法必须得可论证。
如果不是同时读写,以上结论毫无意义。
24 楼 whitesock 2010-06-23  
http://dennis-zane.iteye.com/blog/238431
这个测试如果把测试用的代码贴上,给出测试方式的话,也许可以评价一下,否则没有任何意义。
23 楼 sky3380 2010-06-23  
whitesock 写道
为什么和ArrayBlockingQueue比较?
LinkedBlockingQueue是双端队列,只要队列非空,读写互不影响

http://dennis-zane.iteye.com/blog/238431
22 楼 whitesock 2010-06-23  
互斥什么,你去看看源码。
21 楼 囧囧有神 2010-06-23  
whitesock 写道
为什么和ArrayBlockingQueue比较?
LinkedBlockingQueue是双端队列,只要队列非空,读写互不影响

ArrayBlockingQueue性能比LinkedBLockingQueue高,
双端队列读写也需要互斥的。
20 楼 whitesock 2010-06-23  
为什么和ArrayBlockingQueue比较?
LinkedBlockingQueue是双端队列,只要队列非空,读写互不影响
19 楼 seraphim871211 2010-06-23  
sky3380 写道
seraphim871211 写道
囧囧有神 写道

双缓冲队列总的大小是ArrayBlockingQueue的两倍,总的思想是以空间换时间,
不过这个性能和容量设置没多大关系,就算ArrayBlockingQueue容量设成双缓冲
队列两倍性能也不会有多大提高,双缓冲队列要解决的是读写同步开销问题,不能单纯
靠提升容量就解决


啊,明白了,是不是可以这样理解,这种方式不太适合写速度远大于读速度的场景。

如果写速度远大于读速度,就应该增加读线程数了


嗯~~
18 楼 sky3380 2010-06-23  
seraphim871211 写道
囧囧有神 写道

双缓冲队列总的大小是ArrayBlockingQueue的两倍,总的思想是以空间换时间,
不过这个性能和容量设置没多大关系,就算ArrayBlockingQueue容量设成双缓冲
队列两倍性能也不会有多大提高,双缓冲队列要解决的是读写同步开销问题,不能单纯
靠提升容量就解决


啊,明白了,是不是可以这样理解,这种方式不太适合写速度远大于读速度的场景。

如果写速度远大于读速度,就应该增加读线程数了
17 楼 seraphim871211 2010-06-23  
囧囧有神 写道

双缓冲队列总的大小是ArrayBlockingQueue的两倍,总的思想是以空间换时间,
不过这个性能和容量设置没多大关系,就算ArrayBlockingQueue容量设成双缓冲
队列两倍性能也不会有多大提高,双缓冲队列要解决的是读写同步开销问题,不能单纯
靠提升容量就解决


啊,明白了,是不是可以这样理解,这种方式不太适合写速度远大于读速度的场景。
16 楼 beneo 2010-06-23  
囧囧有神 写道
beneo 写道
我觉得想法挺好的。

不过不知道有什么应用场景?你所优化的时间是产品从产生出来后到交付到消费者手里的时间(这个比喻太失败了),这只是占产品从生产到消费整个时间的一小部分。

你自己也看到了,1000W个数据才有2秒的提升。


在高并发的系统中队列作为重要的基础组件,其性能的提高对提高系统整体性能
是有意义的,这里纯粹是基础组件,就像java中的util包里头的类一样,和业务
关系不大,完成1000w个数据的入队出队总共花了30多秒,但却有12秒的提升,你自己算算性能提高了多少


好吧,这玩意的确很棒
15 楼 囧囧有神 2010-06-23  
beneo 写道
我觉得想法挺好的。

不过不知道有什么应用场景?你所优化的时间是产品从产生出来后到交付到消费者手里的时间(这个比喻太失败了),这只是占产品从生产到消费整个时间的一小部分。

你自己也看到了,1000W个数据才有2秒的提升。


在高并发的系统中队列作为重要的基础组件,其性能的提高对提高系统整体性能
是有意义的,这里纯粹是基础组件,就像java中的util包里头的类一样,和业务
关系不大,完成1000w个数据的入队出队总共花了30多秒,但却有12秒的提升,你自己算算性能提高了多少
14 楼 beneo 2010-06-23  
我觉得想法挺好的。

不过不知道有什么应用场景?你所优化的时间是产品从产生出来后到交付到消费者手里的时间(这个比喻太失败了),这只是占产品从生产到消费整个时间的一小部分。

你自己也看到了,1000W个数据才有2秒的提升。
13 楼 囧囧有神 2010-06-23  
seraphim871211 写道
囧囧有神 写道
sky3380 写道
囧囧有神 写道

再增加一个中间队列等效于给队列扩容,空间换时间,性能肯定能有所提高,
不过还是避免不了读写线程相互等待的,当读队列还没读空,写队列已写满,而中间队列也已满的时候,
写线程就得等待。

我的意思是可能出现这种情况:在读写的速度不一样的情况下,写队列满了,而读队列还没空,这时写队列与中间队列交换,等读队列空了,再把读队列与中间队列交换,这样可以减少读写线程的相互等待了。


同样大小的缓冲区,比如双缓冲队列,每队长N,读写队列一共是2N,
把这2N个空间分成三个队列应该能够提高一定的吞吐量,特别如你所说写速度大于读速度时候,
谁有时间可以帮忙测下这个场景,对于三个队列间的切换要做的灵巧些,不要产生额外的切换开销


那请问测试中java.util.concurrent.ArrayBlockingQueue和LZ自己实现的双缓冲队列设置的大小是否是一样的,如果双缓冲队列占用空间是ArrayBlockingQueue的两倍,我觉得得出的测试数据没有太大的可比性。


双缓冲队列总的大小是ArrayBlockingQueue的两倍,总的思想是以空间换时间,
不过这个性能和容量设置没多大关系,就算ArrayBlockingQueue容量设成双缓冲
队列两倍性能也不会有多大提高,双缓冲队列要解决的是读写同步开销问题,不能单纯
靠提升容量就解决

相关推荐

Global site tag (gtag.js) - Google Analytics