[介绍]
去年(2014),对我们产品中的一个模块,通过使用BlockingQueue,性能提升很多。我觉得有些借鉴意义,这里分享给大家。可以说,此更改是所有java对block q有所了解的人都能够做到的,但是实际工作中确实可能碰到类似的情况。
简而言之:用BlockingQueue替换原有SynchronizeQueue块。更改后,该模块的性能从50msg/second, 提升到700 msg/second!
[代码]
直接看代码比较清楚。看完如下代码,你可能会莞尔一笑,so easy. 那就结束了吧。不过我下面将给出如何发现这个问题,以及自己做的一个小例子来验证性能提升。
修改前
private final Queue<ProtocolMessageEvent> queue = new LinkedList<ProtocolMessageEvent>(); while (true) { synchronized (this.queue) { final ProtocolMessageEvent event = this.queue.poll(); // TODO does the sync have to be held during message processing? if (event != null) { if (event.getProtocolMessage().getType().equals("5")) { break; } handleProtocolMessageEvent(event); } } Thread.sleep(1); }修改后
private final LinkedBlockingQueue<ProtocolMessageEvent> queue = new LinkedBlockingQueue<ProtocolMessageEvent>(); while (true) { ProtocolMessageEvent event = queue.take(); if (event != null) { if (event.getProtocolMessage().getType().equals("5")) { break; } handleProtocolMessageEvent(event); } }[如何发现]
有几种方式都可以发现这个问题
1) 作为一个老程序员,review代码的时候就可以发现这个问题。其实这个实在是太简单直接了
2) 做自动化性能测试时,发现性能一直上不去,在30 msg/second徘徊。通过jvisualvm,看各个线程状态,会发现此线程sleep时间比较长。
注意:应用中有很多线程,要发现bottleneck线程,很多时候比较困难,因为要查看所有线程
[扩展]
1. 学习新知识很重要,尤其是jdk的重要新feature.此类问题就永远不会出现。因为此应用使用的jdk1.6. concurrent framework 是1.5就引入了的。
2. 要有自动化的performance测试,这样修改程序以后,可以很容易知道性能提升有多少。
[简单例子]
为了加深认识,写了一个简化的验证程序。该程序有278行,解释了两种的性能差别。
使用synchronizeQ块的方式,最快不到1000msg/s。而使用blockingQ到了20,000msg/s还完全没问题。
运行结果如下
$$$$ synchronize Q test $$$$ stats result -- acturely duration:9.951 throughput:502.4620641141594 sending speed:500.0 stats result -- acturely duration:10.207 throughput:979.7198001371607 sending speed:999.30048965724 stats result -- acturely duration:20.362 throughput:982.2217856792064 sending speed:1998.201618543311 $$$$ blocking Q test $$$$ stats result -- acturely duration:9.905 throughput:504.7955577990914 sending speed:499.7501249375312 stats result -- acturely duration:9.975 throughput:5012.5313283208025 sending speed:5002.501250625313 stats result -- acturely duration:9.993 throughput:10007.004903432402 sending speed:9997.000899730081 stats result -- acturely duration:9.926 throughput:20149.10336490026 sending speed:20128.824476650563
note:作为测试代码,200+行,略长
note:它引用了我写的一个batch发送的框架程序,极大方便了设定发送速度。此程序非常方便,有空我会分享出来。
代码贴在下面 - 有点冗长:
package baoying.perf.trtnfix; import java.util.Date; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import baoying.util.PerfInvoker; import baoying.util.PerfInvoker.PerfInvokerCallback; /** result: 20150711 $$$$ synchronize Q test $$$$ stats result -- acturely duration:9.951 throughput:502.4620641141594 sending speed:500.0 stats result -- acturely duration:10.207 throughput:979.7198001371607 sending speed:999.30048965724 stats result -- acturely duration:20.362 throughput:982.2217856792064 sending speed:1998.201618543311 $$$$ blocking Q test $$$$ stats result -- acturely duration:9.905 throughput:504.7955577990914 sending speed:499.7501249375312 stats result -- acturely duration:9.975 throughput:5012.5313283208025 sending speed:5002.501250625313 stats result -- acturely duration:9.993 throughput:10007.004903432402 sending speed:9997.000899730081 stats result -- acturely duration:9.926 throughput:20149.10336490026 sending speed:20128.824476650563 * */ public class PollingVSBlockingQ { /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { PollingVSBlockingQ vs = new PollingVSBlockingQ(); System.out.println("$$$$ synchronize Q test $$$$"); final LogicInterface pollingLogic = new PollingLogic(); vs.runTest(pollingLogic, 500, 10); //500 msg per second, 10 seconds vs.runTest(pollingLogic, 1000, 10); vs.runTest(pollingLogic, 2000, 10); System.out.println("$$$$ blocking Q test $$$$"); final LogicInterface blockingLogic = new BlockingQLogic(); vs.runTest(blockingLogic, 500, 10); vs.runTest(blockingLogic, 5000, 10); vs.runTest(blockingLogic, 10000, 10); vs.runTest(blockingLogic, 20000, 10); } public void runTest(final LogicInterface logic, final int ratePerSec, final int duarationInSec) throws InterruptedException{ Thread feedThread = new Thread(new Runnable() { public void run() { try { logic.feedQ(ratePerSec, duarationInSec); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }, "feedThread"); Thread consumeThread = new Thread(new Runnable() { public void run() { try { logic.consumeQ(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }, "consumeThread"); feedThread.start(); consumeThread.start(); feedThread.join(); consumeThread.join(); } } interface LogicInterface{ public void feedQ(final int ratePerSec, final int duarationInSec) throws InterruptedException; public void consumeQ() throws InterruptedException; } class PollingLogic implements LogicInterface{ int ratePerSec; int duarationInSec; Date start = null; Date endSending = null; Date recEnd = null; private final Queue<ProtocolMessageEvent> queue = new LinkedList<ProtocolMessageEvent>(); public void feedQ(final int ratePerSec, final int duarationInSec) throws InterruptedException { this.ratePerSec = ratePerSec; this.duarationInSec=duarationInSec; PerfInvokerCallback c = new PerfInvokerCallback() { @Override public void execute(long seq) { synchronized (queue) { ProtocolMessageEvent e = new ProtocolMessageEvent(new Type( "3")); queue.add(e); } } }; PerfInvoker ferfInvoker = new PerfInvoker(ratePerSec, duarationInSec, c); start = new java.util.Date(); ferfInvoker.execute(); endSending = new java.util.Date(); } public void consumeQ() throws InterruptedException { int iReceived = 0; while (true) { synchronized (this.queue) { final ProtocolMessageEvent event = this.queue.poll(); // TODO does the sync have to be held during message processing? if (event != null) { iReceived++; if (iReceived == ratePerSec * duarationInSec) { recEnd = new java.util.Date(); if (endSending == null) { Thread.sleep(1000); } SimpleStatsHelper stats = new SimpleStatsHelper( ratePerSec, duarationInSec, start, endSending, recEnd); stats.calcStatsResult(); break; // System.exit(0); } if (event.getProtocolMessage().getType().equals("5")) { break; } handleProtocolMessageEvent(event); } } Thread.sleep(1); } } private void handleProtocolMessageEvent(ProtocolMessageEvent event) { int x = 6; if (!(x * System.currentTimeMillis() > 200)) { System.out.println("impossible"); } } } class BlockingQLogic implements LogicInterface{ private final BlockingQueue<ProtocolMessageEvent> queue = new LinkedBlockingQueue<ProtocolMessageEvent>(); int ratePerSec; int duarationInSec; Date start = null; Date endSending = null; Date recEnd = null; public void feedQ(final int ratePerSec, final int duarationInSec) throws InterruptedException { this.ratePerSec = ratePerSec; this.duarationInSec=duarationInSec; PerfInvokerCallback c = new PerfInvokerCallback() { @Override public void execute(long seq) { ProtocolMessageEvent e = new ProtocolMessageEvent(new Type("3")); queue.add(e); } }; PerfInvoker ferfInvoker = new PerfInvoker(ratePerSec, duarationInSec, c); start = new java.util.Date(); ferfInvoker.execute(); endSending = new java.util.Date(); } public void consumeQ() throws InterruptedException { int iReceived = 0; while (true) { final ProtocolMessageEvent event = this.queue.take(); // TODO does the sync have to be held during message processing? if (event != null) { iReceived++; if (iReceived == ratePerSec * duarationInSec) { recEnd = new java.util.Date(); //TODO refactor - bad code to wait endSeding be assigned value in sending thread. //how to refactor if (endSending == null) { Thread.sleep(1000); } SimpleStatsHelper stats = new SimpleStatsHelper(ratePerSec, duarationInSec, start, endSending, recEnd); stats.calcStatsResult(); //TODO as bad as above, too. endSending =null; break; // System.exit(0); } if (event.getProtocolMessage().getType().equals("5")) { break; } handleProtocolMessageEvent(event); } } } private void handleProtocolMessageEvent(ProtocolMessageEvent event) { int x = 6; if (!(x * System.currentTimeMillis() > 200)) { System.out.println("impossible"); } } } class ProtocolMessageEvent { Type _t; ProtocolMessageEvent(Type t) { _t = t; } public Type getProtocolMessage() { return _t; } } class Type { String _t; Type(String t) { _t = t; } String getType() { return _t; } }
相关推荐
BlockingQueue是Java并发编程中非常重要的一个数据结构,它是一个具有阻塞特性的队列,主要用于线程...通过理解和实践BlockingQueue的使用,开发者可以更好地掌握Java并发编程的核心技术,提高系统的并发性能和稳定性。
《C++实现的跨平台BlockingQueue详解》 在软件开发中,线程间的通信和同步是必不可少的部分。...在理解其工作原理和实现细节后,我们可以灵活地应用到各种并发场景中,提升软件的性能和可维护性。
### 线程与BlockingQueue知识点详解 #### 1. BlockingQueue简介 `BlockingQueue`是一种特殊类型的队列,主要用于多线程环境中的任务管理。它具有以下特性:当队列为空时,从队列中获取元素的操作会被阻塞;同样地,...
本文将深入探讨BlockingQueue的工作原理、常见实现、使用场景以及代码示例。 在Java并发编程中,BlockingQueue是一个非常重要的接口,它提供了线程安全的队列操作,特别是在生产者-消费者模式中发挥着核心作用。本文...
- **提高并发效率**:避免了因频繁同步带来的性能开销。 **5. 图形化理解** - 当队列为空时,所有试图从中取出元素的线程将会被阻塞,直至其他线程向队列中添加了元素。 - 当队列为满时,所有试图向队列中添加...
BlockingQueue是Java并发编程中非常重要的一个接口,它在`java.util.concurrent`包下,是线程安全的队列,主要用于解决...理解其工作原理,并能够熟练运用到实际项目中,可以显著提升代码的并发处理能力和系统性能。
BlockingQueue java 的工具类,初次要用于消费者,生产者的同步问题。
在Java编程中,`BlockingQueue`是一个非常重要的并发工具类,它主要用于线程间的数据通信。`newFixedThreadPool`是`java.util.concurrent`包中的一个线程池工厂方法,用于创建固定数量线程的线程池。`FutureTask`则...
在Java开发中,Spring Boot框架以其便捷的配置和...通过合理配置和使用,可以显著提升Java应用的并发性能和可扩展性。在实际项目中,应根据业务需求选择适当的队列实现,并注意处理好异常情况,确保系统的稳定运行。
在Java编程中,`BlockingQueue`(阻塞队列)是一种重要的并发工具,它结合了队列的数据结构和线程同步机制。`BlockingQueue`接口位于`java.util....正确使用`BlockingQueue`可以显著提升程序的并发性能和稳定性。
在"生产者-消费者 测试.txt"文件中,很可能是对以上所述的`BlockingQueue`的使用示例或者性能测试代码。这个文件可能包含了如何创建`BlockingQueue`实例,以及如何在生产者和消费者线程之间正确使用它的代码片段。...
在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文简要介绍下BlockingQueue...
Java中的`BlockingQueue`是一个非常重要的并发工具类,它提供了线程安全的队列操作,主要用于生产者-消费者模式。这个接口定义了一种在多线程环境...在设计高性能、低延迟的系统时,`BlockingQueue`经常被用作构建块。
**基于Java中的BlockingQueue使用介绍** Java的并发编程框架提供了多种高级并发工具,其中BlockingQueue是一种非常实用的数据结构,它实现了生产者-消费者模式。在多线程环境下,BlockingQueue可以高效地处理线程间...
标题提及的"Log4j的AsyncAppender能否提升性能"是一个关键的问题,因为传统的Log4j Appender在记录日志时会阻塞应用程序的执行线程,导致性能下降。AsyncAppender的出现就是为了应对这个问题,通过异步处理日志,它...
BlockingQueue的一些常见实现包括ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue等,每种实现都有其特定的性能特性和使用场景。例如,ArrayBlockingQueue是基于数组的,提供了固定容量,而...
阻塞队列(BlockingQueue)是一种特殊的队列,它支持两个附加操作:阻塞的插入方法put和阻塞的移除方法take。BlockingQueue继承了Queue接口,是Java 5中加入的。 BlockingQueue常用方法示例: 1. add(E e):添加一...
在多核环境下,Java的并发处理能力是性能提升的关键。Java提供了丰富的并发工具,如线程池(ExecutorService)、Future、Callable接口等,这些工具能够帮助我们更好地控制线程的创建和执行,避免过度创建线程导致的...
Java多线程编程中,`Queue`和`BlockingQueue`是两种重要的数据结构,它们在并发处理和线程间通信中扮演着关键角色。...了解并熟练掌握`BlockingQueue`的使用,对于提升Java并发程序的设计和实现能力至关重要。
Conversant ConcurrentQueue、Disruptor BlockingQueue 和 ConcurrentStack Disruptor是Java中性能最高的线程内传输机制。 Conversant Disruptor 是这种环形缓冲区中性能最高的实现,因为它几乎没有开销,并且采用了...