`
winnerbao
  • 浏览: 10441 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

20_[实践]用BlockingQueue替换原有SynchronizeQueue块,带来的性能巨大提升

    博客分类:
  • Java
 
阅读更多

[介绍]

去年(2014),对我们产品中的一个模块,通过使用BlockingQueue,性能提升很多。我觉得有些借鉴意义,这里分享给大家。可以说,此更改是所有java对block q有所了解的人都能够做到的,但是实际工作中确实可能碰到类似的情况。
 
简而言之:用BlockingQueue替换原有SynchronizeQueue块。更改后,该模块的性能从50msg/second, 提升到700 msg/second!
 
 
 
[代码]
直接看代码比较清楚。看完如下代码,你可能会莞尔一笑,so easy. 那就结束了吧。不过我下面将给出如何发现这个问题,以及自己做的一个小例子来验证性能提升。
 
 
修改前
      private final Queue<ProtocolMessageEvent> queue  = new LinkedList<ProtocolMessageEvent>();
             while (true)
             {
                 synchronized (this.queue)
                 {
                     final ProtocolMessageEvent event = this.queue.poll();
                     // TODO does the sync have to be held during message processing?
                     if (event != null)
                     {
                         if (event.getProtocolMessage().getType().equals("5"))
                         {
                             break;
                         }
                         handleProtocolMessageEvent(event);
                     }
                 }
                 Thread.sleep(1);
             }
 修改后
    private final LinkedBlockingQueue<ProtocolMessageEvent> queue  = new LinkedBlockingQueue<ProtocolMessageEvent>();
             while (true)
             {
                 ProtocolMessageEvent event = queue.take();
 
                 if (event != null)
                 {
                     if (event.getProtocolMessage().getType().equals("5"))
                     {
                         break;
                     }
                     handleProtocolMessageEvent(event);
                 }
             }
 [如何发现]
有几种方式都可以发现这个问题
1) 作为一个老程序员,review代码的时候就可以发现这个问题。其实这个实在是太简单直接了
2) 做自动化性能测试时,发现性能一直上不去,在30 msg/second徘徊。通过jvisualvm,看各个线程状态,会发现此线程sleep时间比较长。
注意:应用中有很多线程,要发现bottleneck线程,很多时候比较困难,因为要查看所有线程
 
[扩展]
1. 学习新知识很重要,尤其是jdk的重要新feature.此类问题就永远不会出现。因为此应用使用的jdk1.6. concurrent framework 是1.5就引入了的。
2. 要有自动化的performance测试,这样修改程序以后,可以很容易知道性能提升有多少。
 
[简单例子]
为了加深认识,写了一个简化的验证程序。该程序有278行,解释了两种的性能差别。
使用synchronizeQ块的方式,最快不到1000msg/s。而使用blockingQ到了20,000msg/s还完全没问题。
 
运行结果如下
$$$$ synchronize Q test $$$$
stats result -- acturely duration:9.951 throughput:502.4620641141594 sending speed:500.0
stats result -- acturely duration:10.207 throughput:979.7198001371607 sending speed:999.30048965724
stats result -- acturely duration:20.362 throughput:982.2217856792064 sending speed:1998.201618543311
$$$$ blocking Q test $$$$
stats result -- acturely duration:9.905 throughput:504.7955577990914 sending speed:499.7501249375312
stats result -- acturely duration:9.975 throughput:5012.5313283208025 sending speed:5002.501250625313
stats result -- acturely duration:9.993 throughput:10007.004903432402 sending speed:9997.000899730081
stats result -- acturely duration:9.926 throughput:20149.10336490026 sending speed:20128.824476650563
 
 
note:作为测试代码,200+行,略长
note:它引用了我写的一个batch发送的框架程序,极大方便了设定发送速度。此程序非常方便,有空我会分享出来。
代码贴在下面 - 有点冗长:
package baoying.perf.trtnfix;

import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

import baoying.util.PerfInvoker;
import baoying.util.PerfInvoker.PerfInvokerCallback;

/**
 result: 20150711
$$$$ synchronize Q test $$$$
stats result -- acturely duration:9.951 throughput:502.4620641141594 sending speed:500.0
stats result -- acturely duration:10.207 throughput:979.7198001371607 sending speed:999.30048965724
stats result -- acturely duration:20.362 throughput:982.2217856792064 sending speed:1998.201618543311
$$$$ blocking Q test $$$$
stats result -- acturely duration:9.905 throughput:504.7955577990914 sending speed:499.7501249375312
stats result -- acturely duration:9.975 throughput:5012.5313283208025 sending speed:5002.501250625313
stats result -- acturely duration:9.993 throughput:10007.004903432402 sending speed:9997.000899730081
stats result -- acturely duration:9.926 throughput:20149.10336490026 sending speed:20128.824476650563
 *
 */
public class PollingVSBlockingQ {


	/**
	 * @param args
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException {

		
		PollingVSBlockingQ vs = new PollingVSBlockingQ();
		
		System.out.println("$$$$ synchronize Q test $$$$");
		final LogicInterface pollingLogic = new PollingLogic();
		vs.runTest(pollingLogic, 500, 10); //500 msg per second, 10 seconds
		vs.runTest(pollingLogic, 1000, 10);
		vs.runTest(pollingLogic, 2000, 10);

		System.out.println("$$$$ blocking Q test $$$$");
		final LogicInterface blockingLogic = new BlockingQLogic();
		vs.runTest(blockingLogic, 500, 10);
		vs.runTest(blockingLogic, 5000, 10);
		vs.runTest(blockingLogic, 10000, 10);
		vs.runTest(blockingLogic, 20000, 10);

	}
	
	public void runTest(final LogicInterface logic, final int ratePerSec, final int duarationInSec) throws InterruptedException{
		Thread feedThread = new Thread(new Runnable() {
			public void run() {
				try {
					logic.feedQ(ratePerSec,	duarationInSec);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}, "feedThread");

		Thread consumeThread = new Thread(new Runnable() {
			public void run() {
				try {
					logic.consumeQ();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}, "consumeThread");

		feedThread.start();
		consumeThread.start();
		
		feedThread.join();
		consumeThread.join();
		
	}

}

interface LogicInterface{
	public void feedQ(final int ratePerSec, final int duarationInSec) throws InterruptedException;
	public void consumeQ() throws InterruptedException;
}


class PollingLogic implements LogicInterface{

	int ratePerSec;
	int duarationInSec;

	Date start = null;
	Date endSending = null;
	Date recEnd = null;

	private final Queue<ProtocolMessageEvent> queue = new LinkedList<ProtocolMessageEvent>();

	public void feedQ(final int ratePerSec, final int duarationInSec) throws InterruptedException {
		this.ratePerSec = ratePerSec;
		this.duarationInSec=duarationInSec;

		PerfInvokerCallback c = new PerfInvokerCallback() {

			@Override
			public void execute(long seq) {
				synchronized (queue) {

					ProtocolMessageEvent e = new ProtocolMessageEvent(new Type(
							"3"));
					queue.add(e);
				}

			}
		};
		PerfInvoker ferfInvoker = new PerfInvoker(ratePerSec, duarationInSec, c);

		start = new java.util.Date();
		ferfInvoker.execute();
		endSending = new java.util.Date();

	}

	public void consumeQ() throws InterruptedException {

		int iReceived = 0;
		while (true) {
			synchronized (this.queue) {
				final ProtocolMessageEvent event = this.queue.poll();
				// TODO does the sync have to be held during message processing?
				if (event != null) {
					iReceived++;
					if (iReceived == ratePerSec * duarationInSec) {
						recEnd = new java.util.Date();
						if (endSending == null) {
							Thread.sleep(1000);
						}
						SimpleStatsHelper stats = new SimpleStatsHelper(
								ratePerSec, duarationInSec, start, endSending,
								recEnd);
						stats.calcStatsResult();
						break;
						// System.exit(0);

					}
					if (event.getProtocolMessage().getType().equals("5")) {
						break;
					}
					handleProtocolMessageEvent(event);
				}
			}
			Thread.sleep(1);
		}
	}

	private void handleProtocolMessageEvent(ProtocolMessageEvent event) {

		int x = 6;
		if (!(x * System.currentTimeMillis() > 200)) {
			System.out.println("impossible");
		}

	}
}

class BlockingQLogic implements LogicInterface{

	private final BlockingQueue<ProtocolMessageEvent> queue = new LinkedBlockingQueue<ProtocolMessageEvent>();

	int ratePerSec;
	int duarationInSec;

	Date start = null;
	Date endSending = null;
	Date recEnd = null;

	public void feedQ(final int ratePerSec, final int duarationInSec) throws InterruptedException {
		this.ratePerSec = ratePerSec;
		this.duarationInSec=duarationInSec;

		PerfInvokerCallback c = new PerfInvokerCallback() {

			@Override
			public void execute(long seq) {

				ProtocolMessageEvent e = new ProtocolMessageEvent(new Type("3"));
				queue.add(e);
			}

		};
		PerfInvoker ferfInvoker = new PerfInvoker(ratePerSec, duarationInSec, c);

		start = new java.util.Date();

		ferfInvoker.execute();
		endSending = new java.util.Date();
		

	}

	public void consumeQ() throws InterruptedException {

		int iReceived = 0;
		while (true) {
			final ProtocolMessageEvent event = this.queue.take();
			// TODO does the sync have to be held during message processing?
			if (event != null) {
				iReceived++;
				if (iReceived == ratePerSec * duarationInSec) {
					recEnd = new java.util.Date();

					//TODO refactor - bad code to wait endSeding be assigned value in sending thread.
					//how to refactor 
					if (endSending == null) {
						Thread.sleep(1000);
					}
					
					SimpleStatsHelper stats = new SimpleStatsHelper(ratePerSec,
							duarationInSec, start, endSending, recEnd);
					stats.calcStatsResult();
					
					//TODO as bad as above, too.
					endSending =null;
					
					break;
					// System.exit(0);

				}
				if (event.getProtocolMessage().getType().equals("5")) {
					break;
				}
				handleProtocolMessageEvent(event);
			}

		}
	}

	private void handleProtocolMessageEvent(ProtocolMessageEvent event) {

		int x = 6;
		if (!(x * System.currentTimeMillis() > 200)) {
			System.out.println("impossible");
		}

	}
}

class ProtocolMessageEvent {

	Type _t;

	ProtocolMessageEvent(Type t) {
		_t = t;
	}

	public Type getProtocolMessage() {
		return _t;
	}

}

class Type {

	String _t;

	Type(String t) {
		_t = t;
	}

	String getType() {
		return _t;
	}

}
 
 

 

分享到:
评论

相关推荐

    BlockingQueue的使用

    BlockingQueue是Java并发编程中非常重要的一个数据结构,它是一个具有阻塞特性的队列,主要用于线程...通过理解和实践BlockingQueue的使用,开发者可以更好地掌握Java并发编程的核心技术,提高系统的并发性能和稳定性。

    C++写的跨平台BlockingQueue

    《C++实现的跨平台BlockingQueue详解》 在软件开发中,线程间的通信和同步是必不可少的部分。...在理解其工作原理和实现细节后,我们可以灵活地应用到各种并发场景中,提升软件的性能和可维护性。

    线程----BlockingQueue

    ### 线程与BlockingQueue知识点详解 #### 1. BlockingQueue简介 `BlockingQueue`是一种特殊类型的队列,主要用于多线程环境中的任务管理。它具有以下特性:当队列为空时,从队列中获取元素的操作会被阻塞;同样地,...

    Java中的BlockingQueue:深入理解与实践应用

    本文将深入探讨BlockingQueue的工作原理、常见实现、使用场景以及代码示例。 在Java并发编程中,BlockingQueue是一个非常重要的接口,它提供了线程安全的队列操作,特别是在生产者-消费者模式中发挥着核心作用。本文...

    BlockingQueue(阻塞队列)详解

    - **提高并发效率**:避免了因频繁同步带来的性能开销。 **5. 图形化理解** - 当队列为空时,所有试图从中取出元素的线程将会被阻塞,直至其他线程向队列中添加了元素。 - 当队列为满时,所有试图向队列中添加...

    简单实现BlockingQueue,BlockingQueue源码详解

    BlockingQueue是Java并发编程中非常重要的一个接口,它在`java.util.concurrent`包下,是线程安全的队列,主要用于解决...理解其工作原理,并能够熟练运用到实际项目中,可以显著提升代码的并发处理能力和系统性能。

    BlockingQueue

    BlockingQueue java 的工具类,初次要用于消费者,生产者的同步问题。

    BlockingQueue队列自定义超时时间取消线程池任务

    在Java编程中,`BlockingQueue`是一个非常重要的并发工具类,它主要用于线程间的数据通信。`newFixedThreadPool`是`java.util.concurrent`包中的一个线程池工厂方法,用于创建固定数量线程的线程池。`FutureTask`则...

    spring-blockingqueue:用Spring Boot阻止队列

    在Java开发中,Spring Boot框架以其便捷的配置和...通过合理配置和使用,可以显著提升Java应用的并发性能和可扩展性。在实际项目中,应根据业务需求选择适当的队列实现,并注意处理好异常情况,确保系统的稳定运行。

    java中线程队列BlockingQueue的用法

    在Java编程中,`BlockingQueue`(阻塞队列)是一种重要的并发工具,它结合了队列的数据结构和线程同步机制。`BlockingQueue`接口位于`java.util....正确使用`BlockingQueue`可以显著提升程序的并发性能和稳定性。

    并发容器——BlockingQueue相关类

    在"生产者-消费者 测试.txt"文件中,很可能是对以上所述的`BlockingQueue`的使用示例或者性能测试代码。这个文件可能包含了如何创建`BlockingQueue`实例,以及如何在生产者和消费者线程之间正确使用它的代码片段。...

    阻塞队列BlockingQueue的使用

    在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文简要介绍下BlockingQueue...

    java线程并发blockingqueue类使用示例

    Java中的`BlockingQueue`是一个非常重要的并发工具类,它提供了线程安全的队列操作,主要用于生产者-消费者模式。这个接口定义了一种在多线程环境...在设计高性能、低延迟的系统时,`BlockingQueue`经常被用作构建块。

    基于java中BlockingQueue的使用介绍

    **基于Java中的BlockingQueue使用介绍** Java的并发编程框架提供了多种高级并发工具,其中BlockingQueue是一种非常实用的数据结构,它实现了生产者-消费者模式。在多线程环境下,BlockingQueue可以高效地处理线程间...

    (转)Log4j的AsyncAppender能否提升性能

    标题提及的"Log4j的AsyncAppender能否提升性能"是一个关键的问题,因为传统的Log4j Appender在记录日志时会阻塞应用程序的执行线程,导致性能下降。AsyncAppender的出现就是为了应对这个问题,通过异步处理日志,它...

    2011.08.30(2)——— java BlockingQueue ExecutorService

    BlockingQueue的一些常见实现包括ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue等,每种实现都有其特定的性能特性和使用场景。例如,ArrayBlockingQueue是基于数组的,提供了固定容量,而...

    14-阻塞队列BlockingQueue实战及其原理分析二.pdf

    阻塞队列(BlockingQueue)是一种特殊的队列,它支持两个附加操作:阻塞的插入方法put和阻塞的移除方法take。BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一...

    多核多线程下java设计模式性能提升.zip

    在多核环境下,Java的并发处理能力是性能提升的关键。Java提供了丰富的并发工具,如线程池(ExecutorService)、Future、Callable接口等,这些工具能够帮助我们更好地控制线程的创建和执行,避免过度创建线程导致的...

    Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析

    Java多线程编程中,`Queue`和`BlockingQueue`是两种重要的数据结构,它们在并发处理和线程间通信中扮演着关键角色。...了解并熟练掌握`BlockingQueue`的使用,对于提升Java并发程序的设计和实现能力至关重要。

    disruptor:Disruptor BlockingQueue

    Conversant ConcurrentQueue、Disruptor BlockingQueue 和 ConcurrentStack Disruptor是Java中性能最高的线程内传输机制。 Conversant Disruptor 是这种环形缓冲区中性能最高的实现,因为它几乎没有开销,并且采用了...

Global site tag (gtag.js) - Google Analytics