浏览 5914 次
锁定老帖子 主题:请各位给个JAVA缓存队列实现思路
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2011-12-18
下面是一段实现,运行起来出现丢失的情况,请各位看看问题。 package com.pb.test.queue; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; public class MessageQueue { private BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(10); private BlockingQueue<Object> bufferQueue = new LinkedBlockingQueue<Object>(); // 模拟数据库 private BlockingQueue<Object> dbQueue = new LinkedBlockingQueue<Object>(); private final AtomicBoolean isQueueAdded = new AtomicBoolean(true); private final ReentrantLock putLock = new ReentrantLock(); private final ReentrantLock takeLock = new ReentrantLock(); public void add(Object o) { if (isQueueAdded.get()) { final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (!queue.offer(o)) { addBuffer(o); isQueueAdded.set(false); } } finally { putLock.unlock(); } } else { addBuffer(o); } } private void addBuffer(Object o) { bufferQueue.offer(o); final ReentrantLock putLock = this.putLock; putLock.lock(); try { // 模拟向数据库写入 if (bufferQueue.size() >= 20) { List<Object> ary = new ArrayList<Object>(); bufferQueue.drainTo(ary); for (Object i : ary) { dbQueue.offer(i); } } } finally { putLock.unlock(); } } /** * 先从queue开始消费,消费完了开始消费数据库缓存的,然后在消费bufferQueue队列中的 * * @return */ public Object poll() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Object o = queue.poll(); if (o != null) { return o; } List<Object> ary = new ArrayList<Object>(10); int s = dbQueue.drainTo(ary, 10); if (s == 0) { Object e = bufferQueue.poll(); if (e != null) { return e; } else { isQueueAdded.set(true); try { e = queue.take(); } catch (InterruptedException e1) { e1.printStackTrace(); } return e; } } else { for (Object i : ary) { queue.offer(i); } return queue.poll(); } } finally { takeLock.unlock(); } } public void display() { System.out.println(queue.size() + "\t" + bufferQueue.size() + "\t" + dbQueue.size() + "\t" + (queue.size() + bufferQueue.size() + dbQueue.size())); } } 测试代码 package com.pb.test.queue; public class TestMessageQueue { private MessageQueue mq = new MessageQueue(); public static void main(String[] args) throws Exception { new TestMessageQueue().m(); } public void m() { new Consumer().start(); for (int i = 0; i < 5000; i++) { new Producers().start(); } } public void d() { for (;;) { mq.display(); } } class Consumer extends Thread { @Override public void run() { int count = 0; while (true) { Object o = mq.poll(); if (o != null) { count++; System.out.println(count); } else { System.err.println(count + "\tnull"); } } } } class Producers extends Thread { @Override public void run() { for (int i = 0; i < 100; i++) { mq.add(i); } } } } 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2011-12-18
有没有第三方缓存框架能实现这个呢?不要Key-Value那种的。
|
|
返回顶楼 | |