一、简介
Exchanger是自jdk1.5起开始提供的工具套件,一般用于两个工作线程之间交换数据。在本文中我将采取由浅入深的方式来介绍分析这个工具类。首先我们来看看官方的api文档中的叙述:
在以上的描述中,有几个要点:
- 此类提供对外的操作是同步的;
- 用于成对出现的线程之间交换数据;
- 可以视作双向的同步队列;
- 可应用于基因算法、流水线设计等场景。
接着看api文档,这个类提供对外的接口非常简洁,一个无参构造函数,两个重载的范型exchange方法:
public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
从官方的javadoc可以知道,当一个线程到达exchange调用点时,如果它的伙伴线程此前已经调用了此方法,那么它的伙伴会被调度唤醒并与之进行对象交换,然后各自返回。如果它的伙伴还没到达交换点,那么当前线程将会被挂起,直至伙伴线程到达——完成交换正常返回;或者当前线程被中断——抛出中断异常;又或者是等候超时——抛出超时异常。
二、一个简单的例子
按照某大师的观点,行为知之先,在知道了Exchanger的大致用途并参阅了使用说明后,我们马上动手写个例子来跑一跑:
import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; /** * @Title: ExchangerTest * @Description: Test class for Exchanger * @Company: CSAIR * @Author: lixuanbin * @Creation: 2014年12月14日 * @Version:1.0 */ public class ExchangerTest { protected static final Logger log = Logger.getLogger(ExchangerTest.class); private static volatile boolean isDone = false; static class ExchangerProducer implements Runnable { private Exchanger<Integer> exchanger; private static int data = 1; ExchangerProducer(Exchanger<Integer> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (!Thread.interrupted() && !isDone) { for (int i = 1; i <= 3; i++) { try { TimeUnit.SECONDS.sleep(1); data = i; System.out.println("producer before: " + data); data = exchanger.exchange(data); System.out.println("producer after: " + data); } catch (InterruptedException e) { log.error(e, e); } } isDone = true; } } } static class ExchangerConsumer implements Runnable { private Exchanger<Integer> exchanger; private static int data = 0; ExchangerConsumer(Exchanger<Integer> exchanger) { this.exchanger = exchanger; } @Override public void run() { while (!Thread.interrupted() && !isDone) { data = 0; System.out.println("consumer before : " + data); try { TimeUnit.SECONDS.sleep(1); data = exchanger.exchange(data); } catch (InterruptedException e) { log.error(e, e); } System.out.println("consumer after : " + data); } } } /** * @param args */ public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); Exchanger<Integer> exchanger = new Exchanger<Integer>(); ExchangerProducer producer = new ExchangerProducer(exchanger); ExchangerConsumer consumer = new ExchangerConsumer(exchanger); exec.execute(producer); exec.execute(consumer); exec.shutdown(); try { exec.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error(e, e); } } }
这大致可以看作是一个简易的生产者消费者模型,有两个任务类,一个递增地产生整数,一个产生整数0,然后双方进行交易。每次交易前的生产者和每次交易后的消费者都会sleep 1秒来模拟数据处理的消耗,并在交易前后把整数值打印到控制台以便检测结果。在这个例子里交易循环只执行三次,采用一个volatile boolean来控制交易双方线程的退出。
我们来看看程序的输出:
producer before: 1
consumer after : 1
producer after: 0
consumer before : 0
producer before: 2
producer after: 0
consumer after : 2
consumer before : 0
producer before: 3
producer after: 0
consumer after : 3
输出结果验证了以下两件事情:
- exchange方法真的帮一对线程交换了数据;
- exchange方法真的会阻塞调用方线程直至另一方线程参与交易。
那么在中断和超时两种情况下程序的运行表现会是怎样呢?作为一个小练习,有兴趣的观众可以设想并编写测试用例覆盖验证之。接下来谈谈最近我在生产场景中对Exchanger的应用。
三、实战场景
1.问题描述
最近接到外部项目组向我组提出的接口需求,需要查询我们业务办理量的统计情况。我们系统目前的情况是,有一个日增长十多万、总数据量为千万级别的业务办理明细表(xxx_info),每人次的业务办理结果会实时写入其中。以往对外提供的业务统计接口是在每次被调用时候在明细表中执行SQL查询(select、count、where、group by等),响应时间很长,对原生产业务的使用也有很大的影响。于是我决定趁着这次新增接口的上线机会对系统进行优化。
2.优化思路
首先是在明细表之外再建立一个数据统计(xxx_statistics)表,考虑到目前数据库的压力以及公司内部质管流控等因素,暂没有分库存放,仍旧与原明细表放在同一个库。再设置一个定时任务于每日凌晨对明细表进行查询、过滤、统计、排序等操作,把统计结果插入到统计表中。然后对外暴露统计接口查询统计报表。现在的设计与原来的实现相比,虽然牺牲了统计表所占用的少量额外的存储空间(每日新增的十来万条业务办理明细记录经过处理最终会变成几百条统计表的记录),但是却能把select、count这样耗时的数据统计操作放到凌晨时段执行以避开白天的业务办理高峰,分表处理能够大幅降低对生产业务明细表的性能影响,而对外提供的统计接口的查询速度也将得到几个数量级的提升。当然,还有一个缺点是,不能实时提供当天的统计数据,不过这也是双方可以接受的。
3.设计实现
设计一个定时任务,每日凌晨执行。在定时任务中启动两个线程,一个线程负责对业务明细表(xxx_info)进行查询统计,把统计的结果放置在内存缓冲区,另一个线程负责读取缓冲区中的统计结果并插入到业务统计表(xxx_statistics)中。
亲,这样的场景是不是听起来很有感觉?没错!两个线程在内存中批量交换数据,这个事情我们可以使用Exchanger去做!我们马上来看看代码如何实现。
生产者线程:
class ExchangerProducer implements Runnable { private Exchanger<Set<XXXStatistics>> exchanger; private Set<XXXStatistics> holder; private Date fltDate; private int threshold; ExchangerProducer(Exchanger<Set<XXXStatistics>> exchanger, Set<XXXStatistics> holder, Date fltDate, int threshold) { this.exchanger = exchanger; this.holder = holder; this.fltDate = fltDate; this.threshold = threshold; } @Override public void run() { try { while (!Thread.interrupted() && !isDone) { List<XXXStatistics> temp1 = null; List<XXXStatistics> temp11 = null; for (int i = 0; i < allCities.size(); i++) { try { temp1 = xxxDao .findStatistics1( fltDate, allCities.get(i)); temp11 = xxxDao .findStatistics2( fltDate, allCities.get(i), internationalList); if (temp1 != null && !temp1.isEmpty()) { calculationCounter.addAndGet(temp1.size()); if (temp11 != null && !temp11.isEmpty()) { // merge two lists into temp1 mergeLists(temp1, temp11); temp11.clear(); temp11 = null; } // merge temp1 into holder set mergeListToSet(holder, temp1); temp1.clear(); temp1 = null; } } catch (Exception e) { log.error(e, e); } // Insert every ${threshold} or the last into database. if (holder.size() >= threshold || i == (allCities.size() - 1)) { log.info("data collected: \n" + holder); holder = exchanger.exchange(holder); log.info("data submitted"); } } // all cities are calculated isDone = true; } log.info("calculation job done, calculated: " + calculationCounter.get()); } catch (InterruptedException e) { log.error(e, e); } exchanger = null; holder.clear(); holder = null; fltDate = null; } }
代码说明:
- threshold:缓冲区的容量阀值;
- allCities:城市列表,迭代这个列表作为入参来执行查询统计;
- XXXStatistics:统计数据封装实体类,实现了Serializable和Comparable接口,覆写equals和compareTo方法,以利用TreeSet提供的去重和排序处理;
- isDone:volatile boolean,标识统计任务是否完成;
- holder:TreeSet<XXXStatistics>,存放统计结果的内存缓冲区,容量达到阀值后提交给Exchanger执行exchange操作;
- dao.findStatistics1,dao.findStatistics2:简化的数据库查询统计操作,此处仅供示意;
- calculationCounter:AtomicInteger,标记生产端所提交的记录总数;
- mergeLists,mergeListToSet:内部私有工具方法,把dao查询返回的列表合并到holder中;
消费者线程:
class ExchangerConsumer implements Runnable { private Exchanger<Set<XXXStatistics>> exchanger; private Set<XXXStatistics> holder; ExchangerConsumer(Exchanger<Set<XXXStatistics>> exchanger, Set<XXXStatistics> holder) { this.exchanger = exchanger; this.holder = holder; } @Override public void run() { try { List<XXXStatistics> tempList; while (!Thread.interrupted() && !isDone) { holder = exchanger.exchange(holder); log.info("got data: \n" + holder); if (holder != null && !holder.isEmpty()) { try { // insert data into database tempList = convertSetToList(holder); insertionCounter.addAndGet(xxxDao .batchInsertXXXStatistics(tempList)); tempList.clear(); tempList = null; } catch (Exception e) { log.error(e, e); } // clear the set holder.clear(); } else { log.info("wtf, got an empty list"); } log.info("data processed"); } log.info("insert job done, inserted: " + insertionCounter.get()); } catch (InterruptedException e) { log.error(e, e); } exchanger = null; holder.clear(); holder = null; } }
代码说明:
- convertSetToList:由于dao接口的限制,需把交换得到的Set转换为List;
- batchInsertXXXStatistics:使用jdbc4的batch update而实现的批量插入dao接口;
- insertionCounter:AtomicInteger,标记消费端插入成功的记录总数;
调度器代码:
public boolean calculateStatistics(Date fltDate) { // initialization calculationCounter.set(0); insertionCounter.set(0); isDone = false; exec = Executors.newCachedThreadPool(); Set<XXXStatistics> producerSet = new TreeSet<XXXStatistics>(); Set<XXXStatistics> consumerSet = new TreeSet<XXXStatistics>(); Exchanger<Set<XXXStatistics>> xc = new Exchanger<Set<XXXStatistics>>(); ExchangerProducer producer = new ExchangerProducer(xc, producerSet, fltDate, threshold); ExchangerConsumer consumer = new ExchangerConsumer(xc, consumerSet); // execution exec.execute(producer); exec.execute(consumer); exec.shutdown(); boolean isJobDone = false; try { // wait for termination isJobDone = exec.awaitTermination(calculationTimeoutMinutes, TimeUnit.MINUTES); } catch (InterruptedException e) { log.error(e, e); } if (!isJobDone) { // force shutdown exec.shutdownNow(); log.error("time elapsed for " + calculationTimeoutMinutes + " minutes, but still not finished yet, shut it down anyway."); } // clean up exec = null; producerSet.clear(); producerSet = null; consumerSet.clear(); consumerSet = null; xc = null; producer = null; consumer = null; System.gc(); // return the result if (isJobDone && calculationCounter.get() > 0 && calculationCounter.get() == insertionCounter.get()) { return true; } return false; }
代码说明:
调度器的代码就四个步骤:初始化、提交任务并等候处理结果、清理、返回。初始化阶段使用了jdk提供的线程池提交生产者和消费者任务,设置了最长等候时间calculationTimeoutMinutes,如果调度器线程被中断或者任务执行超时,awaitTermination会返回false,此时就强行关闭线程池并记录到日志。统计操作每日凌晨执行一次,所以在任务退出前的清理阶段建议jvm执行gc以尽早释放计算时所产生的垃圾对象。在结果返回阶段,如果查询统计出来的记录条数和插入成功的条数相等则返回true,否则返回false。
4.小结
在这个案例中,使用Exchanger进行批量的双向数据交换可谓恰如其分:生产者在执行新的查询统计任务填入数据到缓冲区的同时,消费者正在批量插入生产者换入的上一次产生的数据,系统的吞吐量得到平滑的提升;计算复杂度、内存消耗、系统性能也能通过相关的参数设置而得到有效的控制(在消费端也可以对holder进行再次分割以控制每次批插入的大小,建议参阅数据库厂商以及数据库驱动包的说明文档以确定jdbc的最优batch update size);代码的实现也很简洁易懂。这些优点,是采用有界阻塞队列所难以达到的。
程序的输出结果与业务紧密相关,就不打印出来了。可以肯定的是,经过了一段时间的摸索调优,内存消耗、执行速度和处理结果还是比较满意的。
相关推荐
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
"java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError" 是一个典型的错误提示,它表明在并发执行过程中遇到了内存不足的问题。下面我们将深入探讨这个问题的原因、影响以及如何解决。 内存溢出...
java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
### Java.util.Date与Java.sql.Date互转及字符串转换为日期时间格式 #### 一、Java.util.Date与Java.sql.Date的基本概念 在Java编程语言中,处理日期和时间时经常使用到`java.util.Date`和`java.sql.Date`这两个类...
### Java.util.logging.Logger 使用详解 #### 一、创建Logger对象 在Java中,`java.util.logging.Logger` 是标准的日志框架之一,它提供了基础的日志记录功能。为了使用这一功能,首先需要获得 `java.util.logging...
Java.util.concurrent是Java 5.0引入的一个重要包,它为多线程编程提供了一组高级并发工具。这个包的设计者是Doug Lea,它的出现是JSR-166的一部分,也被称作Tiger更新。Java.util.concurrent的引入是为了解决传统...
### Java.util.Date与Java.sql.Date相互转换 #### 知识点概述 在Java开发中,经常需要处理日期和时间相关的操作。Java标准库提供了两个重要的日期类:`java.util.Date` 和 `java.sql.Date`。虽然它们名字相似,但...
### 使用 Java.util.zip 包实现数据压缩与解压 在计算机科学领域,数据压缩技术是一项重要的功能,它能够帮助减少存储空间的需求以及提高网络传输效率。本文将通过一系列的示例来详细介绍如何利用 Java 中的 `java....
### Java并发工具包 `java.util.concurrent` 知识点详解 #### 一、引言 随着多核处理器的普及和应用程序复杂度的增加,多线程编程成为了现代软件开发不可或缺的一部分。为了简化并发编程的复杂性,Java 5 引入了 `...
java.util.concurrent总体概览图。 收取资源分3分。需要的同学可以下载一下。 java.util.concurrent主要包括5个部分executor,colletions,locks,atomic,tools。 该图详细的列举了并发包下面的结构,包含所有接口和...
一个高性能的Java线程库,该库是 JDK 1.5 中的 java.util.concurrent 包的补充,可用于基于并发消息机制的应用。该类库不提供远程的消息功能,其设计的宗旨是实现一个内存中的消息传递机制. 主要特点有: * All ...
### Java.util.concurrent.Synchronizer框架详解 #### 一、引言与背景 随着Java技术的发展,多线程编程成为了一项重要的技术需求。为了更好地支持并发编程,Java平台在J2SE 1.5版本中引入了`java.util.concurrent`...
"JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...
标题中提到了“java.util.concurrent.uml.pdf”,这表明文件是一份Java并发编程工具包java.util.concurrent的UML(统一建模语言)类结构图的PDF格式文件。UML图能够帮助开发者理解Java并发包中的类、接口及其关系,...
Java提供日期(Data)类、日历(Calendar)类,随机数(Random)类,堆栈(Stack)、向量(Vector) 、位集合(Bitset)以及哈希表(Hashtable)等类来表示相应的数据结构
`java.util.concurrent.ExecutionException` 是Java并发编程中一个常见的异常,通常在执行Future对象的get()方法时抛出。这个异常表明在异步任务的执行过程中发生了异常。当我们使用ExecutorService提交任务并尝试...
如何启动:以win7系统为例,最好jdk8 1.打开cmd,cd到jdk的path,本机是:cd C:\Java\jdk6\bin ...java -cp D:\javaConcurrentAnimated.jar vgrazi.concurrent.samples.launcher.ConcurrentExampleLauncher
AQS(AbstractQueuedSynchronizer)是Java.util.concurrent包中同步器的基础框架,它的核心设计思想与实现方法在Doug Lea先生的这篇论文中有详细的介绍。论文详细阐述了AQS框架的原理、设计、实现、应用以及性能等...