论坛首页 Java企业应用论坛

请各位给个JAVA缓存队列实现思路

浏览 5905 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-12-18  
我想做一个队列,当队列里面达到一定量后批量写入文件或者数据库,同时也有多套线程在消费这个队列,类似生产者和消费者,生产能力大于消费能力,要在多线程环境下运行。
下面是一段实现,运行起来出现丢失的情况,请各位看看问题。
package com.pb.test.queue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

public class MessageQueue {

	private BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(10);

	private BlockingQueue<Object> bufferQueue = new LinkedBlockingQueue<Object>();

	// 模拟数据库
	private BlockingQueue<Object> dbQueue = new LinkedBlockingQueue<Object>();

	private final AtomicBoolean isQueueAdded = new AtomicBoolean(true);

	private final ReentrantLock putLock = new ReentrantLock();

	private final ReentrantLock takeLock = new ReentrantLock();

	public void add(Object o) {
		if (isQueueAdded.get()) {

			final ReentrantLock putLock = this.putLock;
			putLock.lock();

			try {
				if (!queue.offer(o)) {
					addBuffer(o);
					isQueueAdded.set(false);
				}
			} finally {
				putLock.unlock();
			}
		} else {
			addBuffer(o);
		}
	}

	private void addBuffer(Object o) {
		bufferQueue.offer(o);

		final ReentrantLock putLock = this.putLock;
		putLock.lock();
		try {
			// 模拟向数据库写入
			if (bufferQueue.size() >= 20) {
				List<Object> ary = new ArrayList<Object>();
				bufferQueue.drainTo(ary);

				for (Object i : ary) {
					dbQueue.offer(i);
				}
			}
		} finally {
			putLock.unlock();
		}
	}

	/**
	 * 先从queue开始消费,消费完了开始消费数据库缓存的,然后在消费bufferQueue队列中的
	 * 
	 * @return
	 */
	public Object poll() {
		final ReentrantLock takeLock = this.takeLock;
		takeLock.lock();

		try {
			Object o = queue.poll();
			if (o != null) {
				return o;
			}

			List<Object> ary = new ArrayList<Object>(10);
			int s = dbQueue.drainTo(ary, 10);

			if (s == 0) {
				Object e = bufferQueue.poll();

				if (e != null) {
					return e;
				} else {
					isQueueAdded.set(true);
					try {
						e = queue.take();
					} catch (InterruptedException e1) {
						e1.printStackTrace();
					}
					return e;
				}

			} else {
				for (Object i : ary) {
					queue.offer(i);
				}

				return queue.poll();
			}

		} finally {
			takeLock.unlock();
		}
	}

	public void display() {
		System.out.println(queue.size() + "\t" + bufferQueue.size() + "\t" + dbQueue.size() + "\t" + (queue.size() + bufferQueue.size() + dbQueue.size()));
	}
}

测试代码
package com.pb.test.queue;

public class TestMessageQueue {

	private MessageQueue mq = new MessageQueue();

	public static void main(String[] args) throws Exception {
		new TestMessageQueue().m();
	}

	public void m() {
		new Consumer().start();
		for (int i = 0; i < 5000; i++) {
			new Producers().start();
		}

	}

	public void d() {
		for (;;) {
			mq.display();
		}
	}

	class Consumer extends Thread {
		@Override
		public void run() {
			int count = 0;
			while (true) {
				Object o = mq.poll();
				if (o != null) {
					count++;
					System.out.println(count);
				} else {
					System.err.println(count + "\tnull");
				}
			}
		}
	}

	class Producers extends Thread {
		@Override
		public void run() {
			for (int i = 0; i < 100; i++) {
				mq.add(i);
			}
		}
	}
}
   发表时间:2011-12-18  
有没有第三方缓存框架能实现这个呢?不要Key-Value那种的。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics