`
543089122
  • 浏览: 153805 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

山寨同步队列 VS 官方BT的ArrayBlockingQueue ,结果官方落马!!!

    博客分类:
  • java
阅读更多
官方的java.util.concurrent.ArrayBlockingQueue的性能是很BT的,我下午无聊然后就想去测试下到底有多BT就写了如下测试代码,也不知道是我的代码写的有问题还是怎么的啦,测试结果和我想的完全不一样。

条件:20个线程,存取线程各半,队列大小是30W,其他电脑配置啥的啊很大众化就不描述了。

耗时:
山寨版的:2400左右
官方版的:3400左右

--------------------------------------------------------------------------------------------
官方的java.util.concurrent.ArrayBlockingQueue 生产者消费者代码如下:
package thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class ProducerCustomerQueue2<E> {
	public static void main(String[] args) throws InterruptedException {
		/**
		 * 20个线程,存取线程各半,队列大小是30W,耗时:2400左右
		 */
		int total = 300000, threadCount = 20,queueSize = 100;
		ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
		final long start = System.currentTimeMillis();
		CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() {
			public void run() {
				System.out.println("统计时间:"
						+ (System.currentTimeMillis() - start));
			}
		});
		for (int i = 0; i < threadCount >>> 1; i++) {
			new Thread(new ProducerThread2(queue, barrier, total)).start();
			new Thread(new CustomerThread2(queue, barrier, total)).start();
		}
	}

}

// 生产者线程
class ProducerThread2 implements Runnable {
	ArrayBlockingQueue<Integer> queue;
	CyclicBarrier barrier;
	int total;

	public ProducerThread2(ArrayBlockingQueue<Integer> queue,
			CyclicBarrier barrier, int total) {
		super();
		this.queue = queue;
		this.barrier = barrier;
		this.total = total;
	}

	public void run() {
		System.out.println("ProducerThread启动...");
		for (int i = 0; i < total; i++) {
			try {
				queue.put(i);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		try {
			System.out.println("ProducerThread完毕!");
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

// 消费者线程
class CustomerThread2 implements Runnable {
	ArrayBlockingQueue<Integer> queue;
	CyclicBarrier barrier;
	int total;

	public CustomerThread2(ArrayBlockingQueue<Integer> queue,
			CyclicBarrier barrier, int total) {
		super();
		this.queue = queue;
		this.barrier = barrier;
		this.total = total;
	}

	public void run() {
		System.out.println("CustomerThread启动...");
		for (int i = 0; i < total; i++) {
			Object o;
			try {
				o = queue.take();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		try {
			System.out.println("CustomerThread完毕!");
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}




自己实现的同步队列如下:
package thread;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class ProducerCustomerQueue<E> {
	public static void main(String[] args) throws InterruptedException {
		/**
		 * 20个线程,存取线程各半,队列大小是30W,耗时:2400左右
		 */
		int total = 300000, threadCount = 20, queueSize = 100;
		ProducerCustomerQueue<Integer> queue = new ProducerCustomerQueue<Integer>(
				queueSize);
		final long start = System.currentTimeMillis();
		CyclicBarrier barrier = new CyclicBarrier(threadCount, new Runnable() {
			public void run() {
				System.out.println("统计时间:"
						+ (System.currentTimeMillis() - start));
			}
		});
		for (int i = 0; i < threadCount >>> 1; i++) {
			new Thread(new ProducerThread(queue, barrier, total)).start();
			new Thread(new CustomerThread(queue, barrier, total)).start();
		}
	}

	private Object lock = new Object();
	private final E[] array;
	private int putIndex;
	private int takeIndex;
	private int count;

	public synchronized int size() {
		return count;
	}

	public ProducerCustomerQueue(int n) {
		array = (E[]) new Object[n];
	}

	/**
	 * 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
	 * 
	 * @param o
	 */
	public boolean offer(E o) {
		if (o == null) {
			throw new NullPointerException();
		}
		synchronized (lock) {
			if (count == array.length) {// 队列已满
				return false;
			} else {
				insert(o);
				return true;
			}
		}
	}

	/**
	 * 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
	 * 
	 * @param o
	 * @throws InterruptedException
	 */
	public boolean offer(E o, long timeout) throws InterruptedException {
		if (o == null) {
			throw new NullPointerException();
		}
		synchronized (lock) {
			if (count == array.length) {// 队列已满
				lock.wait(timeout);
			}

			if (count == array.length) {
				return false;
			} else {
				insert(o);
			}
		}
		return false;
	}

	/**
	 * 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。
	 * 
	 * @param o
	 * @throws InterruptedException
	 */
	public void put(E o) throws InterruptedException {
		synchronized (lock) {
			while (count == array.length) {// 队列已满
				// System.out.println("put wait");
				lock.wait();
			}
			insert(o);
		}
	}

	private void insert(E o) {
		array[putIndex] = o;
		putIndex = (++putIndex == array.length) ? 0 : putIndex;
		count++;
		lock.notify();
	}

	/**
	 * 获取并移除此队列的头,如果此队列为空,则返回 null。
	 */
	public E poll() {
		synchronized (lock) {
			if (count == 0) {// 空队列
				return null;
			}
			return extract();
		}
	}

	/**
	 * 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
	 * 
	 * @throws InterruptedException
	 */
	public E poll(long timeout) throws InterruptedException {
		synchronized (lock) {
			if (count == 0) {// 空队列
				lock.wait(timeout);
			}
			if (count == 0) {
				return null;
			} else {
				return extract();
			}
		}
	}

	/**
	 * 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
	 * 
	 * @return
	 * @throws InterruptedException
	 */
	public E take() throws InterruptedException {
		synchronized (lock) {
			for (;;) {
				if (count == 0) {// 空队列
					lock.wait();
				} else {
					return extract();
				}
			}
		}
	}

	private E extract() {
		E o = array[takeIndex];
		takeIndex = (++takeIndex == array.length) ? 0 : takeIndex;
		count--;
		lock.notify();
		return o;
	}
}

// 生产者线程
class ProducerThread implements Runnable {
	ProducerCustomerQueue queue;
	CyclicBarrier barrier;
	int total;

	public ProducerThread(ProducerCustomerQueue queue, CyclicBarrier barrier,
			int total) {
		super();
		this.queue = queue;
		this.barrier = barrier;
		this.total = total;
	}

	public void run() {
		System.out.println("ProducerThread启动...");
		for (int i = 0; i < total; i++) {
			try {
				// System.out.println("ProducerThread.." + queue.size());
				queue.put(i);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		try {
			System.out.println("ProducerThread完毕!");
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}

}

// 消费者线程
class CustomerThread implements Runnable {
	ProducerCustomerQueue queue;
	CyclicBarrier barrier;
	int total;

	public CustomerThread(ProducerCustomerQueue queue, CyclicBarrier barrier,
			int total) {
		super();
		this.queue = queue;
		this.barrier = barrier;
		this.total = total;
	}

	public void run() {
		System.out.println("CustomerThread启动...");
		for (int i = 0; i < total; i++) {
			Object o;
			try {
				o = queue.take();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		try {
			System.out.println("CustomerThread完毕!");
			barrier.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}



补充:
为什么synchronized反而比ReentrantLock还要快?这个问题终于自己找到了答案。

jdk.16下synchronized和ReentrantLock的速度差不多,似乎还快一些,但是synchronized的缺点很多 ,参考http://houlinyan.iteye.com/blog/1112535两者的对比。
jdk1.5下ReentrantLock快synchronized接近9-10倍(非专业测试仅供参考)。

分享到:
评论

相关推荐

    ArrayBlockingQueue源码分析.docx

    `ArrayBlockingQueue` 是 Java 中实现并发编程时常用的一个线程安全的数据结构,它是一个有界的阻塞队列。在 `java.util.concurrent` 包下,`ArrayBlockingQueue` 继承自 `java.util.concurrent.BlockingQueue` 接口...

    26不让我进门,我就在门口一直等!—BlockingQueue和ArrayBlockingQueue.pdf

    【标题】:“26不让我进门,我就在门口一直等...理解BlockingQueue和ArrayBlockingQueue的工作原理对于优化Java并发程序至关重要,尤其是在处理高并发、大量数据交换的场景下,它们能提供高效、低开销的线程同步机制。

    Java源码解析阻塞队列ArrayBlockingQueue功能简介

    Java源码解析阻塞队列ArrayBlockingQueue功能简介 ArrayBlockingQueue是Java中一个重要的阻塞队列实现,它基于数组实现了有界阻塞队列,提供FIFO(First-In-First-Out)功能。该队列的头元素是最长时间呆在队列中的...

    Java源码解析阻塞队列ArrayBlockingQueue常用方法

    Java中的ArrayBlockingQueue是一个高效的并发数据结构,它是Java并发包`java.util.concurrent`下的一个阻塞队列。本文将深入解析ArrayBlockingQueue的常用方法及其内部实现机制。 ArrayBlockingQueue的核心是一个...

    并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解

    ArrayBlockingQueue和LinkedBlockingQueue是Java并发容器中两个常用的阻塞队列实现,分别基于数组和链表存储元素。它们都继承自AbstractQueue类,并实现了BlockingQueue接口,提供了线程安全的队列操作。 ...

    Java源码解析阻塞队列ArrayBlockingQueue介绍

    Java源码解析阻塞队列ArrayBlockingQueue介绍是Java中的一种阻塞队列实现,使用ReentrantLock和Condition来实现同步和阻塞机制。本文将对ArrayBlockingQueue的源码进行详细分析,介绍其主要方法和实现机制。 1. ...

    Java可阻塞队列-ArrayBlockingQueue

    在前面的的文章,写了一个带有缓冲区的队列,是用JAVA的Lock下的Condition实现的,但是JAVA类中提供了这项功能,是ArrayBlockingQueue,  ArrayBlockingQueue是由数组支持的有界阻塞队列,次队列按照FIFO(先进先...

    详细分析Java并发集合ArrayBlockingQueue的用法

    ArrayBlockingQueue使用数组来实现队列,使用四个变量:Object[] array来存储队列中元素,headIndex和tailIndex分别记录队列头和队列尾,count记录队列的个数。这样可以实现队列的基本操作,如添加元素、删除元素等...

    ArrayBlockingQueue源码解析-动力节点共

    总的来说,ArrayBlockingQueue是一个强大的并发工具,它的实现细节涉及到线程同步、锁机制以及阻塞队列的设计。理解并熟练运用ArrayBlockingQueue可以帮助开发者构建出更加稳定、高效的并发程序。在阅读其源码时,...

    java并发之ArrayBlockingQueue详细介绍

    ArrayBlockingQueue是Java并发编程中常用的线程安全队列,经常被用作任务队列在线程池中。它是基于数组实现的循环队列,具有线程安全的实现。 ArrayBlockingQueue的实现 ArrayBlockingQueue使用ReentrantLock和两...

    Java concurrency集合之ArrayBlockingQueue_动力节点Java学院整理

    ArrayBlockingQueue是Java并发编程中一个重要的集合类,它属于 BlockingQueue 接口的一个实现,主要特点是线程安全、有界以及基于数组的阻塞队列。线程安全得益于其内部使用了Java并发包中的ReentrantLock(可重入锁...

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    数组阻塞队列ArrayBlockingQueue,延迟队列DelayQueue, 链阻塞队列 LinkedBlockingQueue,具有优先级的阻塞队列 PriorityBlockingQueue, 同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 ...

    java中LinkedBlockingQueue与ArrayBlockingQueue的异同

    Java中的`LinkedBlockingQueue`和`ArrayBlockingQueue`都是`java.util.concurrent`包下的线程安全队列,它们都实现了`BlockingQueue`接口,提供了一种高效、线程安全的数据同步方式。这两种队列在很多方面都有相似之...

    多线程 队列利用

    3. **并发容器**:Java的`java.util.concurrent`包提供了多种并发队列,如`ArrayBlockingQueue`、`LinkedBlockingQueue`和`ConcurrentLinkedQueue`等,它们为多线程环境提供了高效的队列操作。 4. **工作窃取算法**...

    java队列

    `LinkedBlockingQueue`和`ArrayBlockingQueue`是两个常见的阻塞队列实现。 3. **并发工具**:Java并发库提供了一些工具类,如`ExecutorService`和`ThreadPoolExecutor`,它们使用队列来管理待执行的任务。这些工具...

    java并发工具包详解

    3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. 阻塞双端队列 BlockingDeque 9...

    Using_Java_Queue.zip_java队列

    Java队列是Java集合框架中的一个关键组成部分,主要用于在多个线程之间同步数据传输或实现异步处理。队列遵循先进先出(FIFO)的原则,即最早添加到队列中的元素将首先被处理。本教程将深入探讨如何在Java中使用队列...

Global site tag (gtag.js) - Google Analytics