锁定老帖子 主题:服务器端利器--双缓冲队列
精华帖 (0) :: 良好帖 (6) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2010-06-22
最后修改:2010-06-22
传统队列是生产者线程和消费者线程从同一个队列中存取数据,必然需要互斥访 问,在互相同步等待中浪费了宝贵的时间,使队列吞吐量受影响。双缓冲队使用两个队列,将读写分离,一个队列专门用来读,另一个专门用来写,当读队列空或写 队列满时将两个队列互换。这里为了保证队列的读写顺序,当读队列为空且写队列不为空时候才允许两个队列互换。 经过测试性能较JDK自带的queue的确有不小提高。
队列主要方法如下: /**
*
* CircularDoubleBufferedQueue.java
* 囧囧有神
* @param <E>2010-6-12
*/
public class CircularDoubleBufferedQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
{
private static final long serialVersionUID = 1L;
private Logger logger = Logger.getLogger(CircularDoubleBufferedQueue.class.getName());
/** The queued items */
private final E[] itemsA;
private final E[] itemsB;
private ReentrantLock readLock, writeLock;
private Condition notEmpty;
private Condition notFull;
private Condition awake;
private E[] writeArray, readArray;
private volatile int writeCount, readCount;
private int writeArrayHP, writeArrayTP, readArrayHP, readArrayTP;
public CircularDoubleBufferedQueue(int capacity)
{
if(capacity<=0)
{
throw new IllegalArgumentException("Queue initial capacity can't less than 0!");
}
itemsA = (E[])new Object[capacity];
itemsB = (E[])new Object[capacity];
readLock = new ReentrantLock();
writeLock = new ReentrantLock();
notEmpty = readLock.newCondition();
notFull = writeLock.newCondition();
awake = writeLock.newCondition();
readArray = itemsA;
writeArray = itemsB;
}
private void insert(E e)
{
writeArray[writeArrayTP] = e;
++writeArrayTP;
++writeCount;
}
private E extract()
{
E e = readArray[readArrayHP];
readArray[readArrayHP] = null;
++readArrayHP;
--readCount;
return e;
}
/**
*switch condition:
*read queue is empty && write queue is not empty
*
*Notice:This function can only be invoked after readLock is
* grabbed,or may cause dead lock
* @param timeout
* @param isInfinite: whether need to wait forever until some other
* thread awake it
* @return
* @throws InterruptedException
*/
private long queueSwitch(long timeout, boolean isInfinite) throws InterruptedException
{
writeLock.lock();
try
{
if (writeCount <= 0)
{
logger.debug("Write Count:" + writeCount + ", Write Queue is empty, do not switch!");
try
{
logger.debug("Queue is empty, need wait....");
if(isInfinite && timeout<=0)
{
awake.await();
return -1;
}
else
{
return awake.awaitNanos(timeout);
}
}
catch (InterruptedException ie)
{
awake.signal();
throw ie;
}
}
else
{
E[] tmpArray = readArray;
readArray = writeArray;
writeArray = tmpArray;
readCount = writeCount;
readArrayHP = 0;
readArrayTP = writeArrayTP;
writeCount = 0;
writeArrayHP = readArrayHP;
writeArrayTP = 0;
notFull.signal();
logger.debug("Queue switch successfully!");
return -1;
}
}
finally
{
writeLock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
{
if(e == null)
{
throw new NullPointerException();
}
long nanoTime = unit.toNanos(timeout);
writeLock.lockInterruptibly();
try
{
for (;;)
{
if(writeCount < writeArray.length)
{
insert(e);
if (writeCount == 1)
{
awake.signal();
}
return true;
}
//Time out
if(nanoTime<=0)
{
logger.debug("offer wait time out!");
return false;
}
//keep waiting
try
{
logger.debug("Queue is full, need wait....");
nanoTime = notFull.awaitNanos(nanoTime);
}
catch(InterruptedException ie)
{
notFull.signal();
throw ie;
}
}
}
finally
{
writeLock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException
{
long nanoTime = unit.toNanos(timeout);
readLock.lockInterruptibly();
try
{
for(;;)
{
if(readCount>0)
{
return extract();
}
if(nanoTime<=0)
{
logger.debug("poll time out!");
return null;
}
nanoTime = queueSwitch(nanoTime, false);
}
}
finally
{
readLock.unlock();
}
}
}
附带队列类代码和测试类代码如下:
欢迎大家提意见! 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2010-06-22
这主意不错,学习了。我在想,按照楼主的这个思路,如果增加一个中间队列,当读队列空或写队列满时都和中间队列交换,性能会不会更好一些。拙见,见笑~
|
|
返回顶楼 | |
发表时间:2010-06-23
最后修改:2010-06-23
sky3380 写道 这主意不错,学习了。我在想,按照楼主的这个思路,如果增加一个中间队列,当读队列空或写队列满时都和中间队列交换,性能会不会更好一些。拙见,见笑~
再增加一个中间队列等效于给队列扩容,空间换时间,性能肯定能有所提高, 不过还是避免不了读写线程相互等待的,当读队列还没读空,写队列已写满,而中间队列也已满的时候, 写线程就得等待。 |
|
返回顶楼 | |
发表时间:2010-06-23
最后修改:2010-06-23
囧囧有神 写道 再增加一个中间队列等效于给队列扩容,空间换时间,性能肯定能有所提高, 不过还是避免不了读写线程相互等待的,当读队列还没读空,写队列已写满,而中间队列也已满的时候, 写线程就得等待。 我的意思是可能出现这种情况:在读写的速度不一样的情况下,写队列满了,而读队列还没空,这时写队列与中间队列交换,等读队列空了,再把读队列与中间队列交换,这样可以减少读写线程的相互等待了。 |
|
返回顶楼 | |
发表时间:2010-06-23
楼主我有个问题, 说的不一定对,请回答一下。
就是insert和extract操作, 里面步骤很多, 而且++ --本身也不是原子操作。 这种情况下,这个blockingqueue在并发访问时,会不会出现问题? 我的意思是你的insert和extract操作好像都有计算错误的可能性。 |
|
返回顶楼 | |
发表时间:2010-06-23
还有能否列出内存消耗情况的对比?
|
|
返回顶楼 | |
发表时间:2010-06-23
这个思路和我以前在oracle上做定时job时,用两个相同的表一个读数据、一个写数据然后切换表的别名的思路和像。不过性能的确很好。
|
|
返回顶楼 | |
发表时间:2010-06-23
berlou 写道 楼主我有个问题, 说的不一定对,请回答一下。
就是insert和extract操作, 里面步骤很多, 而且++ --本身也不是原子操作。 这种情况下,这个blockingqueue在并发访问时,会不会出现问题? 我的意思是你的insert和extract操作好像都有计算错误的可能性。 这个 inser和extract操作本身是不支持同步的,而是要求调用这两个 操作的方法解决同步问题,就像类中poll或offer方法自己实现了 读写同步了。 |
|
返回顶楼 | |
发表时间:2010-06-23
berlou 写道 还有能否列出内存消耗情况的对比?
这里的缓冲队列都是定长队列,双缓冲队列自然需要申请两倍大小的单缓冲队列的内存 |
|
返回顶楼 | |
发表时间:2010-06-23
sky3380 写道 囧囧有神 写道 再增加一个中间队列等效于给队列扩容,空间换时间,性能肯定能有所提高, 不过还是避免不了读写线程相互等待的,当读队列还没读空,写队列已写满,而中间队列也已满的时候, 写线程就得等待。 我的意思是可能出现这种情况:在读写的速度不一样的情况下,写队列满了,而读队列还没空,这时写队列与中间队列交换,等读队列空了,再把读队列与中间队列交换,这样可以减少读写线程的相互等待了。 同样大小的缓冲区,比如双缓冲队列,每队长N,读写队列一共是2N, 把这2N个空间分成三个队列应该能够提高一定的吞吐量,特别如你所说写速度大于读速度时候, 谁有时间可以帮忙测下这个场景,对于三个队列间的切换要做的灵巧些,不要产生额外的切换开销 |
|
返回顶楼 | |