传统队列是生产者线程和消费者线程从同一个队列中存取数据,必然需要互斥访
问,在互相同步等待中浪费了宝贵的时间,使队列吞吐量受影响。双缓冲队使用两个队列,将读写分离,一个队列专门用来读,另一个专门用来写,当读队列空或写
队列满时将两个队列互换。这里为了保证队列的读写顺序,当读队列为空且写队列不为空时候才允许两个队列互换。
经过测试性能较JDK自带的queue的确有不小提高。
测试是和JDK6中性能最高的阻塞Queue:java.util.concurrent.ArrayBlockingQueue做比较,这个队列是环形队列的实现方式,性能还算不错,不过我们的目标是没有最好,只有更好。
测试场景:
起若干个生产者线程,往Queue中放数据,起若干个消费者线程从queue中取数据,统计每个消费者线程取N个数据的平均时间。
数据如下:
场景1
生产者线程数:1
消费者线程数:1
Queue容量:5w
取元素个数:1000w
JDK ArrayBlockingQueue用时平均为: 5,302,938,177纳秒
双缓冲队列用时平均为: 5,146,302,116纳秒
相差大概160毫秒
场景2:
生产者线程数:5
消费者线程数:4
Queue容量:5w
取元素个数:1000w
JDK ArrayBlockingQueue用时平均为: 32,824,744,868纳秒
双缓冲队列用时平均为: 20,508,495,221纳秒
相差大概12.3秒
可见在生产者消费者都只有一个的时候存和取的同步冲突比较小,双缓冲队列
优势不是很大,当存取线程比较多的时候优势就很明显了。
队列主要方法如下:
/**
*
* CircularDoubleBufferedQueue.java
* 囧囧有神
* @param <E>2010-6-12
*/
public class CircularDoubleBufferedQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
{
private static final long serialVersionUID = 1L;
private Logger logger = Logger.getLogger(CircularDoubleBufferedQueue.class.getName());
/** The queued items */
private final E[] itemsA;
private final E[] itemsB;
private ReentrantLock readLock, writeLock;
private Condition notEmpty;
private Condition notFull;
private Condition awake;
private E[] writeArray, readArray;
private volatile int writeCount, readCount;
private int writeArrayHP, writeArrayTP, readArrayHP, readArrayTP;
public CircularDoubleBufferedQueue(int capacity)
{
if(capacity<=0)
{
throw new IllegalArgumentException("Queue initial capacity can't less than 0!");
}
itemsA = (E[])new Object[capacity];
itemsB = (E[])new Object[capacity];
readLock = new ReentrantLock();
writeLock = new ReentrantLock();
notEmpty = readLock.newCondition();
notFull = writeLock.newCondition();
awake = writeLock.newCondition();
readArray = itemsA;
writeArray = itemsB;
}
private void insert(E e)
{
writeArray[writeArrayTP] = e;
++writeArrayTP;
++writeCount;
}
private E extract()
{
E e = readArray[readArrayHP];
readArray[readArrayHP] = null;
++readArrayHP;
--readCount;
return e;
}
/**
*switch condition:
*read queue is empty && write queue is not empty
*
*Notice:This function can only be invoked after readLock is
* grabbed,or may cause dead lock
* @param timeout
* @param isInfinite: whether need to wait forever until some other
* thread awake it
* @return
* @throws InterruptedException
*/
private long queueSwitch(long timeout, boolean isInfinite) throws InterruptedException
{
writeLock.lock();
try
{
if (writeCount <= 0)
{
logger.debug("Write Count:" + writeCount + ", Write Queue is empty, do not switch!");
try
{
logger.debug("Queue is empty, need wait....");
if(isInfinite && timeout<=0)
{
awake.await();
return -1;
}
else
{
return awake.awaitNanos(timeout);
}
}
catch (InterruptedException ie)
{
awake.signal();
throw ie;
}
}
else
{
E[] tmpArray = readArray;
readArray = writeArray;
writeArray = tmpArray;
readCount = writeCount;
readArrayHP = 0;
readArrayTP = writeArrayTP;
writeCount = 0;
writeArrayHP = readArrayHP;
writeArrayTP = 0;
notFull.signal();
logger.debug("Queue switch successfully!");
return -1;
}
}
finally
{
writeLock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
{
if(e == null)
{
throw new NullPointerException();
}
long nanoTime = unit.toNanos(timeout);
writeLock.lockInterruptibly();
try
{
for (;;)
{
if(writeCount < writeArray.length)
{
insert(e);
if (writeCount == 1)
{
awake.signal();
}
return true;
}
//Time out
if(nanoTime<=0)
{
logger.debug("offer wait time out!");
return false;
}
//keep waiting
try
{
logger.debug("Queue is full, need wait....");
nanoTime = notFull.awaitNanos(nanoTime);
}
catch(InterruptedException ie)
{
notFull.signal();
throw ie;
}
}
}
finally
{
writeLock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException
{
long nanoTime = unit.toNanos(timeout);
readLock.lockInterruptibly();
try
{
for(;;)
{
if(readCount>0)
{
return extract();
}
if(nanoTime<=0)
{
logger.debug("poll time out!");
return null;
}
nanoTime = queueSwitch(nanoTime, false);
}
}
finally
{
readLock.unlock();
}
}
}
附带队列类代码和测试类代码如下:
欢迎大家提意见!
Ps:测试时候要把queue类中debug关掉,否则打印debug日志会对queue性能有不小的影响。
分享到:
相关推荐
免杀破解利器-OEP查找工具-OepFinder汉化版免杀破解利器-OEP查找工具-OepFinder汉化版
【编程利器--ed】是一款强大的文本编辑工具,它在功能上超越了EdiPlus 3.0,成为了许多程序员和Web开发者的心头好。这款编辑器不仅具备了基础的文本编辑功能,还针对多种编程语言提供了丰富的支持,包括但不限于C/...
ip-guard网络监控-管理利器--完美破解406用户[2
视频学习利器--复读机。
ip-guard网络监控-管理利器--完美破解406用户[1].part3
电子书制作利器--友益文书 V6.5.1 该软件是一款集资料管理、电子图书制作、多媒体课件制
ip-guard网络监控-管理利器--完美破解406用户. 本人用了一年
ip-guard网络监控-管理利器--完美破解406用户[1]
GIS开发利器-MapXtreme 2008
将PCB的GERBER文件转换成G代码的利器---coppercam软件下载
Excel交互利器 - 切片器
Windows下访问LINUX的利器-SSH
扫鸡利器 - 自动整理ip.zip
网管利器-破解版聚生网络管理软件,有效监控网络,分时段分机器限制流量
WebSocket:实时通信的利器-pdf
08-05-01 Windows2000 服务器端应用程序开发设计指南-安全连接(3) 08-05-01 Windows2000 服务器端应用程序开发设计指南-附录 08-04-30 Windows Sockets 规范及应用 08-04-30 C/C++内存问题检查利器——Purify 08-...
【标题】:“自己写的必备利器--消息狂刷器---有源代码”揭示了这是一个由个人开发者编写的软件工具,主要用于大量发送消息。该工具可能是为了测试、自动化或某种特定目的而设计,它提供了源代码,意味着用户可以...
思考的利器-21个进发灵感的考具-21个进发灵感的考具
之前发布的SNS利器--通过邮箱获取通讯录近期发现问题。现在已经确认sohu无法获取其通讯录。126获取的问题已经修正。特发布该1.1版本,希望不要耽误大家的项目!已经下载过1.0版本的请下载专门针对126的补丁。没有...
网站利器-海纳采集器, 比火车头好用多了