本文由黄文海首次发布在infoq中文站上:http://www.infoq.com/cn/articles/Java-multithreaded-programming-mode-active-object-part1 。转载请注明作者: 黄文海 出处:http://viscent.iteye.com。
Active Object模式简介
Active Object模式是一种异步编程模式。它通过对方法的调用与方法的执行进行解耦来提高并发性。若以任务的概念来说,Active Object模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。这有点类似于 System.gc()这个方法:客户端代码调用完gc()后,一个进行垃圾回收的任务被提交,但此时JVM并不一定进行了垃圾回收,而可能是在gc() 方法调用返回后的某段时间才开始执行任务——回收垃圾。我们知道,System.gc()的调用方代码是运行在自己的线程上(通常是main线程派生的子 线程),而JVM的垃圾回收这个动作则由专门的线程(垃圾回收线程)来执行的。换言之,System.gc()这个方法所代表的动作(其所定义的功能)的 调用方和执行方是运行在不同的线程中的,从而提高了并发性。
再进一步介绍Active Object模式,我们可先简单地将其核心理解为一个名为ActiveObject的类,该类对外暴露了一些异步方法,如图1所示。
图 1. ActiveObject对象示例
doSomething方法的调用方和执行方运行在各自的线程上。在并发的环境下,doSomething方法会被多个线程调用。这时所需的线程安 全控制封装在doSomething方法背后,使得调用方代码无需关心这点,从而简化了调用方代码:从调用方代码来看,调用一个Active Object对象的方法与调用普通Java对象的方法并无太大差别。如清单1所示。
清单 1. Active Object方法调用示例
ActiveObject ao=...; Future future = ao.doSomething("data"); //执行其它操作 String result = future.get(); System.out.println(result);
Active Object模式的架构
当Active Object模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的操作)、调用方代码所传递的参数等,会 被封装成一个对象。该对象被称为方法请求(Method Request)。方法请求对象会被存入Active Object模式所维护的缓冲区(Activation Queue)中,并由专门的工作线程负责根据其包含的上下文信息执行相应的操作。也就是说,方法请求对象是由运行调用方代码的线程通过调用Active Object模式对外暴露的异步方法生成的,而方法请求所代表的操作则由专门的线程来执行,从而实现了方法的调用与执行的分离,产生了并发。
Active Object模式的主要参与者有以下几种。其类图如图2所示。
图 2. Active Object模式的类图
(点击图像放大)
- Proxy:负责对外暴露异步方法接口。当调用方代码调用该参与者实例的异步方法doSomething时,该方法会生成一个相 应的MethodRequest实例并将其存储到Scheduler所维护的缓冲区中。doSomething方法的返回值是一个表示其执行结果的外包装 对象:Future参与者的实例。异步方法doSomething运行在调用方代码所在的线程中。
- MethodRequest:负责将调用方代码对Proxy实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及调用方代码传递的参数等上下文信息。它使得将Proxy的异步方法的调用和执行分离成为可能。其call方法会根据其所包含上下文信息调用Servant实例的相应方法。
- ActivationQueue:负责临时存储由Proxy的异步方法被调用时所创建的MethodRequest实例的缓冲区。
- Scheduler:负责将Proxy的异步方法所创建的MethodRequest实例存入其维护的缓冲区中。并根据一定的调 度策略,对其维护的缓冲区中的MethodRequest实例进行执行。其调度策略可以根据实际需要来定,如FIFO、LIFO和根据 MethodRequest中包含的信息所定的优先级等。
- Servant:负责对Proxy所暴露的异步方法的具体实现。
- Future:负责存储和返回Active Object异步方法的执行结果。
Active Object模式的序列图如图3所示。
图 3. Active Object模式的序列图
(点击图像放大)
第1步:调用方代码调用Proxy的异步方法doSomething。
第2~7步:doSomething方法创建Future实例作为该方法的返回值。并将调用方代码对该方法的调用封装为MethodRequest 对象。然后以所创建的MethodRequest对象作为参数调用Scheduler的enqueue方法,以将MethodRequest对象存入缓冲 区。Scheduler的enqueue方法会调用Scheduler所维护的ActivationQueue实例的enqueue方法,将 MethodRequest对象存入缓冲区。
第8步:doSomething返回其所创建的Future实例。
第9步:Scheduler实例采用专门的工作线程运行dispatch方法。
第10~12步:dispatch方法调用ActivationQueue实例的dequeue方法,获取一个MethodRequest对象。然后调用MethodRequest对象的call方法
第13~16步:MethodRequest对象的call方法调用与其关联的Servant实例的相应方法doSomething。并将Servant.doSomething方法的返回值设置到Future实例上。
第17步:MethodRequest对象的call方法返回。
上述步骤中,第1~8步是运行在Active Object的调用者线程中的,这几个步骤实现了将调用方代码对Active Object所提供的异步方法的调用封装成对象(Method Request),并将其存入缓冲区。这几个步骤实现了任务的提交。第9~17步是运行在Active Object的工作线程中,这些步骤实现从缓冲区中读取Method Request,并对其进行执行,实现了任务的执行。从而实现了Active Object对外暴露的异步方法的调用与执行的分离。
如果调用方代码关心Active Object的异步方法的返回值,则可以在其需要时,调用Future实例的get方法来获得异步方法的真正执行结果。
Active Object模式实战案例
某电信软件有一个彩信短号模块。其主要功能是实现手机用户给其它手机用户发送彩信时,接收方号码可以填写为对方的短号。例如,用户13612345678给其同事13787654321发送彩信时,可以将接收方号码填写为对方的短号,如776,而非其真实的号码。
该模块处理其接收到的下发彩信请求的一个关键操作是查询数据库以获得接收方短号对应的真实号码(长号)。该操作可能因为数据库故障而失败,从而使整 个请求无法继续被处理。而数据库故障是可恢复的故障,因此在短号转换为长号的过程中如果出现数据库异常,可以先将整个下发彩信请求消息缓存到磁盘中,等到 数据库恢复后,再从磁盘中读取请求消息,进行重试。为方便起见,我们可以通过Java的对象序列化API,将表示下发彩信的对象序列化到磁盘文件中从而实 现请求缓存。下面我们讨论这个请求缓存操作还需要考虑的其它因素,以及Active Object模式如何帮助我们满足这些考虑。
首先,请求消息缓存到磁盘中涉及文件I/O这种慢的操作,我们不希望它在请求处理的主线程(即Web服务器的工作线程)中执行。因为这样会使该模块 的响应延时增大,降低系统的响应性。并使得Web服务器的工作线程因等待文件I/O而降低了系统的吞吐量。这时,异步处理就派上用场了。Active Object模式可以帮助我们实现请求缓存这个任务的提交和执行分离:任务的提交是在Web服务器的工作线程中完成,而任务的执行(包括序列化对象到磁盘 文件中等操作)则是在Active Object工作线程中执行。这样,请求处理的主线程在侦测到短号转长号失败时即可以触发对当前彩信下发请求进行缓存,接着继续其请求处理,如给客户端响 应。而此时,当前请求消息可能正在被Active Object线程缓存到文件中。如图4所示。
图 4 .异步实现缓存
其次,每个短号转长号失败的彩信下发请求消息会被缓存为一个磁盘文件。但我们不希望这些缓存文件被存在同一个子目录下。而是希望多个缓存文件会被存 储到多个子目录中。每个子目录最多可以存储指定个数(如2000个)的缓存文件。若当前子目录已存满,则新建一个子目录存放新的缓存文件,直到该子目录也 存满,依此类推。当这些子目录的个数到达指定数量(如100个)时,最老的子目录(连同其下的缓存文件,如果有的话)会被删除。从而保证子目录的个数也是 固定的。显然,在并发环境下,实现这种控制需要一些并发访问控制(如通过锁来控制),但是我们不希望这种控制暴露给处理请求的其它代码。而Active Object模式中的Proxy参与者可以帮助我们封装并发访问控制。
下面,我们看该案例的相关代码通过应用Active Object模式在实现缓存功能时满足上述两个目标。首先看请求处理的入口类。该类就是本案例的Active Object模式的客调用方代码。如清单2所示。
清单 2. 彩信下发请求处理的入口类
public class MMSDeliveryServlet extends HttpServlet { private static final long serialVersionUID = 5886933373599895099L; @Override public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { //将请求中的数据解析为内部对象 MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream()); Recipient shortNumberRecipient = mmsDeliverReq.getRecipient(); Recipient originalNumberRecipient = null; try { // 将接收方短号转换为长号 originalNumberRecipient = convertShortNumber(shortNumberRecipient); } catch (SQLException e) { // 接收方短号转换为长号时发生数据库异常,触发请求消息的缓存 AsyncRequestPersistence.getInstance().store(mmsDeliverReq); // 继续对当前请求的其它处理,如给客户端响应 resp.setStatus(202); } } private MMSDeliverRequest parseRequest(InputStream reqInputStream) { MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest(); //省略其它代码 return mmsDeliverReq; } private Recipient convertShortNumber(Recipient shortNumberRecipient) throws SQLException { Recipient recipent = null; //省略其它代码 return recipent; } }
清单2中的doPost方法在侦测到短号转换过程中发生的数据库异常后,通过调用AsyncRequestPersistence类的store方 法触发对彩信下发请求消息的缓存。这里,AsyncRequestPersistence类相当于Active Object模式中的Proxy参与者。尽管本案例涉及的是一个并发环境,但从清单2中的代码可见,AsyncRequestPersistence类的 调用方代码无需处理多线程同步问题。这是因为多线程同步问题被封装在AsyncRequestPersistence类之后。
AsyncRequestPersistence类的代码如清单3所示。
清单 3. 彩信下发请求缓存入口类(Active Object模式的Proxy)
// ActiveObjectPattern.Proxy public class AsyncRequestPersistence implements RequestPersistence { private static final long ONE_MINUTE_IN_SECONDS = 60; private final Logger logger; private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0); private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0); // ActiveObjectPattern.Servant private final DiskbasedRequestPersistence delegate = new DiskbasedRequestPersistence(); // ActiveObjectPattern.Scheduler private final ThreadPoolExecutor scheduler; private static class InstanceHolder { final static RequestPersistence INSTANCE = new AsyncRequestPersistence(); } private AsyncRequestPersistence() { logger = Logger.getLogger(AsyncRequestPersistence.class); scheduler = new ThreadPoolExecutor(1, 3, 60 * ONE_MINUTE_IN_SECONDS, TimeUnit.SECONDS, // ActiveObjectPattern.ActivationQueue new LinkedBlockingQueue(200), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t; t = new Thread(r, "AsyncRequestPersistence"); return t; } }); scheduler.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardOldestPolicy()); // 启动队列监控定时任务 Timer monitorTimer = new Timer(true); monitorTimer.scheduleAtFixedRate( new TimerTask() { @Override public void run() { if (logger.isInfoEnabled()) { logger.info("task count:" + requestSubmittedPerIterval + ",Queue size:" + scheduler.getQueue().size() + ",taskTimeConsumedPerInterval:" + taskTimeConsumedPerInterval.get() + " ms"); } taskTimeConsumedPerInterval.set(0); requestSubmittedPerIterval.set(0); } }, 0, ONE_MINUTE_IN_SECONDS * 1000); } public static RequestPersistence getInstance() { return InstanceHolder.INSTANCE; } @Override public void store(final MMSDeliverRequest request) { /* * 将对store方法的调用封装成MethodRequest对象, 并存入缓冲区。 */ // ActiveObjectPattern.MethodRequest Callable methodRequest = new Callable() { @Override public Boolean call() throws Exception { long start = System.currentTimeMillis(); try { delegate.store(request); } finally { taskTimeConsumedPerInterval.addAndGet( System.currentTimeMillis() - start); } return Boolean.TRUE; } }; scheduler.submit(methodRequest); requestSubmittedPerIterval.incrementAndGet(); } }
AsyncRequestPersistence类所实现的接口RequestPersistence定义了Active Object对外暴露的异步方法:store方法。由于本案例不关心请求缓存的结果,故该方法没有返回值。其代码如清单4所示。
清单 4. RequestPersistence接口源码
public interface RequestPersistence { void store(MMSDeliverRequest request); }
AsyncRequestPersistence类的实例变量scheduler相当于Active Object模式中的Scheduler参与者实例。这里我们直接使用了JDK1.5引入的Executor Framework中的ThreadPoolExecutor。在ThreadPoolExecutor类的实例化时,其构造器的第5个参数 (BlockingQueue<Runnable> workQueue)我们指定了一个有界阻塞队列:new LinkedBlockingQueue<Runnable>(200)。该队列相当于Active Object模式中的ActivationQueue参与者实例。
AsyncRequestPersistence类的实例变量delegate相当于Active Object模式中的Servant参与者实例。
AsyncRequestPersistence类的store方法利用匿名类生成一个 java.util.concurrent.Callable实例methodRequest。该实例相当于Active Object模式中的MethodRequest参与者实例。利用闭包(Closure),该实例封装了对store方法调用的上下文信息(包括调用参 数、所调用的方法对应的操作信息)。AsyncRequestPersistence类的store方法通过调用scheduler的submit方法, 将methodRequest送入ThreadPoolExecutor所维护的缓冲区(阻塞队列)中。确切地说,ThreadPoolExecutor 是Scheduler参与者的一个“近似”实现。ThreadPoolExecutor的submit方法相对于Scheduler的enqueue方 法,该方法用于接纳MethodRequest对象,以将其存入缓冲区。当ThreadPoolExecutor当前使用的线程数量小于其核心线程数量 时,submit方法所接收的任务会直接被新建的线程执行。当ThreadPoolExecutor当前使用的线程数量大于其核心线程数时,submit 方法所接收的任务才会被存入其维护的阻塞队列中。不过,ThreadPoolExecutor的这种任务处理机制,并不妨碍我们将它用作 Scheduler的实现。
methodRequest的call方法会调用delegate的store方法来真正实现请求缓存功能。delegate实例对应的类DiskbasedRequestPersistence是请求消息缓存功能的真正实现者。其代码如清单5所示。
清单 5. DiskbasedRequestPersistence类的源码
public class DiskbasedRequestPersistence implements RequestPersistence { // 负责缓存文件的存储管理 private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage(); private final Logger logger = Logger .getLogger(DiskbasedRequestPersistence.class); @Override public void store(MMSDeliverRequest request) { // 申请缓存文件的文件名 String[] fileNameParts = storage.apply4Filename(request); File file = new File(fileNameParts[0]); try { ObjectOutputStream objOut = new ObjectOutputStream( new FileOutputStream(file)); try { objOut.writeObject(request); } finally { objOut.close(); } } catch (FileNotFoundException e) { storage.decrementSectionFileCount(fileNameParts[1]); logger.error("Failed to store request", e); } catch (IOException e) { storage.decrementSectionFileCount(fileNameParts[1]); logger.error("Failed to store request", e); } } class SectionBasedDiskStorage { private Deque sectionNames = new LinkedList(); /* * Key->value: 存储子目录名->子目录下缓存文件计数器 */ private Map sectionFileCountMap = new HashMap(); private int maxFilesPerSection = 2000; private int maxSectionCount = 100; private String storageBaseDir = System.getProperty("user.dir") + "/vpn"; private final Object sectionLock = new Object(); public String[] apply4Filename(MMSDeliverRequest request) { String sectionName; int iFileCount; boolean need2RemoveSection = false; String[] fileName = new String[2]; synchronized (sectionLock) { //获取当前的存储子目录名 sectionName = this.getSectionName(); AtomicInteger fileCount; fileCount = sectionFileCountMap.get(sectionName); iFileCount = fileCount.get(); //当前存储子目录已满 if (iFileCount >= maxFilesPerSection) { if (sectionNames.size() >= maxSectionCount) { need2RemoveSection = true; } //创建新的存储子目录 sectionName = this.makeNewSectionDir(); fileCount = sectionFileCountMap.get(sectionName); } iFileCount = fileCount.addAndGet(1); } fileName[0] = storageBaseDir + "/" + sectionName + "/" + new DecimalFormat("0000").format(iFileCount) + "-" + request.getTimeStamp().getTime() / 1000 + "-" + request.getExpiry() + ".rq"; fileName[1] = sectionName; if (need2RemoveSection) { //删除最老的存储子目录 String oldestSectionName = sectionNames.removeFirst(); this.removeSection(oldestSectionName); } return fileName; } public void decrementSectionFileCount(String sectionName) { AtomicInteger fileCount = sectionFileCountMap.get(sectionName); if (null != fileCount) { fileCount.decrementAndGet(); } } private boolean removeSection(String sectionName) { boolean result = true; File dir = new File(storageBaseDir + "/" + sectionName); for (File file : dir.listFiles()) { result = result && file.delete(); } result = result && dir.delete(); return result; } private String getSectionName() { String sectionName; if (sectionNames.isEmpty()) { sectionName = this.makeNewSectionDir(); } else { sectionName = sectionNames.getLast(); } return sectionName; } private String makeNewSectionDir() { String sectionName; SimpleDateFormat sdf = new SimpleDateFormat("MMddHHmmss"); sectionName = sdf.format(new Date()); File dir = new File(storageBaseDir + "/" + sectionName); if (dir.mkdir()) { sectionNames.addLast(sectionName); sectionFileCountMap.put(sectionName, new AtomicInteger(0)); } else { throw new RuntimeException( "Cannot create section dir " + sectionName); } return sectionName; } } }
methodRequest的call方法的调用者代码是运行在ThreadPoolExecutor所维护的工作者线程中,这就保证了store 方法的调用方和真正的执行方是分别运行在不同的线程中:服务器工作线程负责触发请求消息缓存,ThreadPoolExecutor所维护的工作线程负责 将请求消息序列化到磁盘文件中。
DiskbasedRequestPersistence类的store方法中调用的SectionBasedDiskStorage类的 apply4Filename方法包含了一些多线程同步控制代码(见清单5)。这部分控制由于是封装在 DiskbasedRequestPersistence的内部类中,对于该类之外的代码是不可见的。因 此,AsyncRequestPersistence的调用方代码无法知道该细节,这体现了Active Object模式对并发访问控制的封装。
小结
本篇介绍了Active Object模式的意图及架构,并以一个实际的案例展示了该模式的代码实现。下篇将对Active Object模式进行评价,并结合本文案例介绍实际运用Active Object模式时需要注意的一些事项。
作者简介
黄文海,著有《Java多线程编程实战指南(设计模式篇)》。有多年敏捷项目管理经验和丰富的技术指导经验。关注敏捷开发、Java多线程编程和Web开发。在InfoQ中文站和IBM DeveloperWorks上发表过多篇文章。其博客:http://viscent.iteye.com/
相关推荐
《Java多线程编程实战指南》这本书深入浅出地讲解了Java多线程的核心概念和实战技巧,分为核心篇和设计模式篇,旨在帮助开发者掌握并应用多线程技术。 1. **线程基础** - **线程的创建**:Java提供了两种创建线程...
《Java多线程编程实战指南-核心篇》是一本深入探讨Java并发编程的书籍,旨在帮助读者掌握在Java环境中创建、管理和同步线程的核心技术。Java的多线程能力是其强大之处,使得开发者能够在同一时间执行多个任务,提高...
在Java编程中,多线程是一项关键技能,它允许程序同时执行多个任务,极大地提高了程序的...通过阅读"Java多线程编程实战指南 设计模式篇.pdf",你将获得更深入的理论知识和实践技巧,为你的编程事业奠定坚实的基础。
总之,“Java多线程编程实战指南+设计模式篇(全部)”是一份宝贵的资源,它将帮助读者深入理解Java多线程编程的各个方面,以及如何利用设计模式解决实际问题。学习并掌握这些知识,对于提升Java程序员的专业技能和...
《汪文君JAVA多线程编程实战》是一本专注于Java多线程编程的实战教程,由知名讲师汪文君倾力打造。这本书旨在帮助Java开发者深入理解和熟练掌握多线程编程技术,提升软件开发的效率和质量。在Java平台中,多线程是...
《Java多线程编程实战指南(设计模式篇)》由黄文海撰写,是一本深入探讨Java多线程编程和设计模式的专业书籍。书中详细介绍了如何在Java环境中利用多线程来实现高效的并发处理,同时结合设计模式,帮助开发者更好地...
本书以理论结合示例的方式介绍了多线程常见设计模式。
本资源“Java多线程编程实战指南+设计模式篇”深入探讨了这两个主题,旨在帮助开发者提升其在并发编程和软件设计上的技能。 **一、Java多线程** 1. **线程基础**:Java中的线程是程序执行的最小单位。Java通过`...
《Java多线程编程实战指南(核心篇)》以基本概念、原理与方法为主线,辅以丰富的实战案例和生活化实例,并从Java虚拟机、操作系统和硬件多个层次与角度出发,循序渐进、系统地介绍Java平台下的多线程编程核心技术及...
《Java多线程编程实战指南(设计模式篇)》采用Java(JDK1.6)语言和UML 为描述语言,并结合作者多年工作经历的相关实战案例,介绍了多线程环境下常用设计模式的来龙去脉:各个设计模式是什么样的及其典型的实际应用...
《Java多线程编程实战指南(设计篇模式)》源码这是国内首部Java多线程设计模式原创作品《Java多线程编程实战指南(设计模式篇)》一书的源码。新书《Java多线程编程实战指南(设计模式篇)》,张龙老师作推荐序。书...
《Java多线程编程实战指南》是一本深入探讨Java并发编程的书籍,涵盖了核心篇与设计模式篇。这本书旨在帮助开发者理解和掌握Java平台上的多线程编程,提升系统性能和可扩展性。在Java世界中,多线程是实现并发处理、...
《Java多线程编程实战指南 设计模式篇》是一本深度探讨Java并发编程与设计模式融合的书籍。在Java编程中,多线程是提升系统性能、实现并行计算的关键技术,而设计模式则是解决常见问题的最佳实践。本书旨在帮助...
Java多线程是Java编程中的重要概念,尤其在如今的多核处理器环境下,理解并熟练掌握多线程技术对于提高程序性能和响应速度至关重要。本资料详细讲解了Java多线程的原理,并提供了丰富的实战代码,非常适合Java初学者...
本资源"《C#多线程编程实战》完整源码"提供了丰富的实例,适用于学习和实践C#中的多线程概念。 在C#中,多线程允许应用程序同时执行多个独立的任务,提高系统利用率并优化性能。.NET框架为开发者提供了强大的支持,...
Java多线程编程实战指南设计模式篇.mobi
Java多线程编程是Java开发中的重要组成部分,它允许程序同时执行多个任务,极大地提高了程序的效率和响应性。在Java中,多线程主要通过`Thread`类和并发工具来实现,接下来我们将深入探讨这些关键知识点。 1. **...
Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式...
读书笔记:Java多线程编程实战指南设计模式篇 之读书笔记