论坛首页 Java企业应用论坛

山寨同步队列 VS 官方BT的ArrayBlockingQueue ,结果官方落马!!!

浏览 5651 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (1)
作者 正文
   发表时间:2011-09-29   最后修改:2011-09-29
官方的java.util.concurrent.ArrayBlockingQueue的性能是很BT的,我下午无聊然后就想去测试下到底有多BT就写了如下测试代码,也不知道是我的代码写的有问题还是怎么的啦,测试结果和我想的完全不一样。

条件:20个线程,存取线程各半,队列大小是30W,其他电脑配置啥的啊很大众化就不描述了。

耗时:
山寨版的:2400左右
官方版的:3400左右

--------------------------------------------------------------------------------------------
官方的java.util.concurrent.ArrayBlockingQueue 生产者消费者代码如下:
package thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class ProducerCustomerQueue2<E> {
	public static void main(String[] args) throws InterruptedException {
		int total = 300000, threadCount = 20,queueSize = 100;
		ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
		final long start = System.currentTimeMillis();
		CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() {
			public void run() {
				System.out.println("统计时间:"
						+ (System.currentTimeMillis() - start));
			}
		});
		for (int i = 0; i < threadCount >>> 1; i++) {
			new Thread(new ProducerThread2(queue, barrier, total)).start();
			new Thread(new CustomerThread2(queue, barrier, total)).start();
		}
	}

}

// 生产者线程
class ProducerThread2 implements Runnable {
	ArrayBlockingQueue<Integer> queue;
	CyclicBarrier barrier;
	int total;

	public ProducerThread2(ArrayBlockingQueue<Integer> queue,
			CyclicBarrier barrier, int total) {
		super();
		this.queue = queue;
		this.barrier = barrier;
		this.total = total;
	}

	public void run() {
		System.out.println("ProducerThread启动...");
		for (int i = 0; i < total; i++) {
			try {
				queue.put(i);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		try {
			System.out.println("ProducerThread完毕!");
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

// 消费者线程
class CustomerThread2 implements Runnable {
	ArrayBlockingQueue<Integer> queue;
	CyclicBarrier barrier;
	int total;

	public CustomerThread2(ArrayBlockingQueue<Integer> queue,
			CyclicBarrier barrier, int total) {
		super();
		this.queue = queue;
		this.barrier = barrier;
		this.total = total;
	}

	public void run() {
		System.out.println("CustomerThread启动...");
		for (int i = 0; i < total; i++) {
			Object o;
			try {
				o = queue.take();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		try {
			System.out.println("CustomerThread完毕!");
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}




自己实现的同步队列如下:
package thread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class ProducerCustomerQueue<E> {
	public static void main(String[] args) throws InterruptedException {
		/**
		 * 20个线程,存取线程各半,队列大小是30W,耗时:2400左右
		 */
		int total = 300000, threadCount = 20, queueSize = 100;
		ProducerCustomerQueue<Integer> queue = new ProducerCustomerQueue<Integer>(
				queueSize);
		final long start = System.currentTimeMillis();
		CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() {
			public void run() {
				System.out.println("统计时间:"
						+ (System.currentTimeMillis() - start));
			}
		});
		for (int i = 0; i < threadCount >>> 1; i++) {
			new Thread(new ProducerThread(queue, barrier, total)).start();
			new Thread(new CustomerThread(queue, barrier, total)).start();
		}
	}

	private Object lock = new Object();
	private final E[] array;
	private int putIndex;
	private int takeIndex;
	private int count;

	public synchronized int size() {
		return count;
	}

	public ProducerCustomerQueue(int n) {
		array = (E[]) new Object[n];
	}

	/**
	 * 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
	 * 
	 * @param o
	 */
	public boolean offer(E o) {
		if (o == null) {
			throw new NullPointerException();
		}
		synchronized (lock) {
			if (count == array.length) {// 队列已满
				return false;
			} else {
				insert(o);
				return true;
			}
		}
	}

	/**
	 * 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
	 * 
	 * @param o
	 * @throws InterruptedException
	 */
	public boolean offer(E o, long timeout) throws InterruptedException {
		if (o == null) {
			throw new NullPointerException();
		}
		synchronized (lock) {
			if (count == array.length) {// 队列已满
				lock.wait(timeout);
			}

			if (count == array.length) {
				return false;
			} else {
				insert(o);
			}
		}
		return false;
	}

	/**
	 * 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。
	 * 
	 * @param o
	 * @throws InterruptedException
	 */
	public void put(E o) throws InterruptedException {
		synchronized (lock) {
			while (count == array.length) {// 队列已满
				// System.out.println("put wait");
				lock.wait();
			}
			insert(o);
		}
	}

	private void insert(E o) {
		array[putIndex] = o;
		putIndex = (++putIndex == array.length) ? 0 : putIndex;
		count++;
		lock.notify();
	}

	/**
	 * 获取并移除此队列的头,如果此队列为空,则返回 null。
	 */
	public E poll() {
		synchronized (lock) {
			if (count == 0) {// 空队列
				return null;
			}
			return extract();
		}
	}

	/**
	 * 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
	 * 
	 * @throws InterruptedException
	 */
	public E poll(long timeout) throws InterruptedException {
		synchronized (lock) {
			if (count == 0) {// 空队列
				lock.wait(timeout);
			}
			if (count == 0) {
				return null;
			} else {
				return extract();
			}
		}
	}

	/**
	 * 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
	 * 
	 * @return
	 * @throws InterruptedException
	 */
	public E take() throws InterruptedException {
		synchronized (lock) {
			for (;;) {
				if (count == 0) {// 空队列
					lock.wait();
				} else {
					return extract();
				}
			}
		}
	}

	private E extract() {
		E o = array[takeIndex];
		takeIndex = (++takeIndex == array.length) ? 0 : takeIndex;
		count--;
		lock.notify();
		return o;
	}
}

// 生产者线程
class ProducerThread implements Runnable {
	ProducerCustomerQueue queue;
	CyclicBarrier barrier;
	int total;

	public ProducerThread(ProducerCustomerQueue queue, CyclicBarrier barrier,
			int total) {
		super();
		this.queue = queue;
		this.barrier = barrier;
		this.total = total;
	}

	public void run() {
		System.out.println("ProducerThread启动...");
		for (int i = 0; i < total; i++) {
			try {
				// System.out.println("ProducerThread.." + queue.size());
				queue.put(i);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		try {
			System.out.println("ProducerThread完毕!");
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}

}

// 消费者线程
class CustomerThread implements Runnable {
	ProducerCustomerQueue queue;
	CyclicBarrier barrier;
	int total;

	public CustomerThread(ProducerCustomerQueue queue, CyclicBarrier barrier,
			int total) {
		super();
		this.queue = queue;
		this.barrier = barrier;
		this.total = total;
	}

	public void run() {
		System.out.println("CustomerThread启动...");
		for (int i = 0; i < total; i++) {
			Object o;
			try {
				o = queue.take();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		try {
			System.out.println("CustomerThread完毕!");
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

   发表时间:2011-09-30  
lz你好好测测你的程序,我在机子上跑的你写的比官方的落后好几倍呢!


我感觉你写的东西,基本上停留在初学者的水平,任何一个初学者,基本上都能写出这样的代码,没有任何优化的代码,你跟官方比,简直是自取其辱。

你懂什么叫java的current的包得真正含义嘛?
0 请登录后投票
   发表时间:2011-09-30  
呵呵,我用你的代码测试了一下,3000 * 1000的队列长度
官方的:22765 ms
你的:39998 ms

你自己的测试数量太小了,很容易受到Thread启动顺序不一致的影响。严格一点的测试,应该是确保Tread都起来了这个时间点,到任务做完Thread销毁之前的时间。

0 请登录后投票
   发表时间:2011-09-30  
bao231 写道
lz你好好测测你的程序,我在机子上跑的你写的比官方的落后好几倍呢!


我感觉你写的东西,基本上停留在初学者的水平,任何一个初学者,基本上都能写出这样的代码,没有任何优化的代码,你跟官方比,简直是自取其辱。

你懂什么叫java的current的包得真正含义嘛?



本着学习与分享的态度把这个问题拿出来讨论却受到这样的语言讽刺。。。

我不懂? 你怎么知道我不懂 java的current的包得真正含义?
初学者?别老把别人比成初学者以显示自己比别人强!!!没看到我文字写的是 “是不是我的代码写的有问题”。

最后:看来这个问题还是由我自己来回答好了。昨天晚上换了下JDK,原来测试的是1.6的,换成1.5的后在我破电脑上测试速度比官方的慢了接近9倍多。想不明白与是网上查了查,查到几个前辈的博客说是1.6的synchronized速度基本和ReentrantLock速度差不多,但是1.5的synchronized就慢的多了。

0 请登录后投票
   发表时间:2011-09-30   最后修改:2011-09-30
543089122 写道
bao231 写道
lz你好好测测你的程序,我在机子上跑的你写的比官方的落后好几倍呢!


我感觉你写的东西,基本上停留在初学者的水平,任何一个初学者,基本上都能写出这样的代码,没有任何优化的代码,你跟官方比,简直是自取其辱。

你懂什么叫java的current的包得真正含义嘛?



本着学习与分享的态度把这个问题拿出来讨论却受到这样的语言讽刺。。。

我不懂? 你怎么知道我不懂 java的current的包得真正含义?
初学者?别老把别人比成初学者以显示自己比别人强!!!没看到我文字写的是 “是不是我的代码写的有问题”。

最后:看来这个问题还是由我自己来回答好了。昨天晚上换了下JDK,原来测试的是1.6的,换成1.5的后在我破电脑上测试速度比官方的慢了接近9倍多。想不明白与是网上查了查,查到几个前辈的博客说是1.6的synchronized速度基本和ReentrantLock速度差不多,但是1.5的synchronized就慢的多了。


我感觉你这片博文应该这样写:synchronized速度和ReentrantLock速度到底有多大?比较合适
上边的言语有点过激,还请谅解。

有的时候发现lz说的东西跟实际测试的东西不一致的时候,感觉就像骗了我们一样,所以有点气愤,呵呵。
current的包得lock跟synchronized还是有本质上的区别的,有很多cas的相关优化的东西在里边。不能一概而论吧。
0 请登录后投票
   发表时间:2011-09-30  
concurrent
0 请登录后投票
   发表时间:2011-09-30  

我觉得尊重是相互的,如今iteye活跃的群体大多是毕业不久的,很多的人心浮气躁,互相嘲讽总认为自己多牛。我们应该好好沉静下来思考思考了。做为新手自己曾发表一帖子,却得到楼主的冷嘲热讽http://www.iteye.com/topic/1115828#2250991。

0 请登录后投票
   发表时间:2011-09-30  
543089122 写道
bao231 写道
lz你好好测测你的程序,我在机子上跑的你写的比官方的落后好几倍呢!


我感觉你写的东西,基本上停留在初学者的水平,任何一个初学者,基本上都能写出这样的代码,没有任何优化的代码,你跟官方比,简直是自取其辱。

你懂什么叫java的current的包得真正含义嘛?



本着学习与分享的态度把这个问题拿出来讨论却受到这样的语言讽刺。。。

我不懂? 你怎么知道我不懂 java的current的包得真正含义?
初学者?别老把别人比成初学者以显示自己比别人强!!!没看到我文字写的是 “是不是我的代码写的有问题”。

最后:看来这个问题还是由我自己来回答好了。昨天晚上换了下JDK,原来测试的是1.6的,换成1.5的后在我破电脑上测试速度比官方的慢了接近9倍多。想不明白与是网上查了查,查到几个前辈的博客说是1.6的synchronized速度基本和ReentrantLock速度差不多,但是1.5的synchronized就慢的多了。


反正我初学的时候,对多线程、队列之类的东西无从下手,羡慕嫉妒恨啊。
楼主别灰心,负面的言论有益于你的进步。我看你写的不错。
如果你的标题不把官方扯进去,还是一篇不错的技术参考贴的。
我给你YES
0 请登录后投票
   发表时间:2011-09-30  
lantian_123 写道

我觉得尊重是相互的,如今iteye活跃的群体大多是毕业不久的,很多的人心浮气躁,互相嘲讽总认为自己多牛。我们应该好好沉静下来思考思考了。做为新手自己曾发表一帖子,却得到楼主的冷嘲热讽http://www.iteye.com/topic/1115828#2250991。

不好意思,我觉得我没嘲讽吧。。。我当时突然有一种感叹,是为以前看到好的文章被别人投隐感叹。

 

另,过激的言论总是突然就来突然就消失,上头那位看来我们都是彼此彼此啊

0 请登录后投票
   发表时间:2011-09-30  
能动手就已经慢慢不是新手了
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics