本文由黄文海首次发布在infoq中文站上:http://www.infoq.com/cn/articles/Java-multithreaded-programming-mode-active-object-part2。转载请注明作者: 黄文海 出处:http://viscent.iteye.com。
Active Object模式的评价与实现考量
Active Object模式通过将方法的调用与执行分离,实现了异步编程。有利于提高并发性,从而提高系统的吞吐率。
Active Object模式还有个好处是它可以将任务(MethodRequest)的提交(调用异步方法)和任务的执行策略(Execution Policy)分离。任务的执行策略被封装在Scheduler的实现类之内,因此它对外是不“可见”的,一旦需要变动也不会影响其它代码,降低了系统的 耦合性。任务的执行策略可以反映以下一些问题:
- 采用什么顺序去执行任务,如FIFO、LIFO、或者基于任务中包含的信息所定的优先级?
- 多少个任务可以并发执行?
- 多少个任务可以被排队等待执行?
- 如果有任务由于系统过载被拒绝,此时哪个任务该被选中作为牺牲品,应用程序该如何被通知到?
- 任务执行前、执行后需要执行哪些操作?
这意味着,任务的执行顺序可以和任务的提交顺序不同,可以采用单线程也可以采用多线程去执行任务等等。
当然,好处的背后总是隐藏着代价,Active Object模式实现异步编程也有其代价。该模式的参与者有6个之多,其实现过程也包含了不少中间的处理:MethodRequest对象的生成、 MethodRequest对象的移动(进出缓冲区)、MethodRequest对象的运行调度和线程上下文切换等。这些处理都有其空间和时间的代价。 因此,Active Object模式适合于分解一个比较耗时的任务(如涉及I/O操作的任务):将任务的发起和执行进行分离,以减少不必要的等待时间。
虽然模式的参与者较多,但正如本文案例的实现代码所展示的,其中大部分的参与者我们可以利用JDK自身提供的类来实现,以节省编码时间。如表1所示。
表 1. 使用JDK现有类实现Active Object的一些参与者
参与者名称 |
可以借用的JDK类 |
备注 |
Scheduler |
Java Executor Framework中的java.util.concurrent.ExecutorService接口的相关实现类,如java.util.concurrent.ThreadPoolExecutor。 |
ExecutorService接口所定义的submit(Callable<T> task)方法相当于图2中的enqueue方法。 |
ActivationQueue |
java.util.concurrent.LinkedBlockingQueue |
若Scheduler采用java.util.concurrent.ThreadPoolExecutor,则java.util.concurrent.LinkedBlockingQueue实例作为ThreadPoolExecutor构造器的参数。 |
MethodRequest |
java.util.concurrent.Callable接口的匿名实现类。 |
Callable接口比起Runnable接口的优势在于它定义的call方法有返回值,便于将该返回值传递给Future实例。 |
Future |
java.util.concurrent.Future |
ExecutorService接口所定义的submit(Callable<T> task)方法的返回值类型就是java.util.concurrent.Future。 |
错误隔离
错误隔离指一个任务的处理失败不影响其它任务的处理。每个MethodRequest实例可以看作一个任务。那么,Scheduler的实现类在执 行MethodRequest时需要注意错误隔离。选用JDK中现成的类(如ThreadPoolExecutor)来实现Scheduler的一个好处 就是这些类可能已经实现了错误隔离。而如果自己编写代码实现Scheduler,用单个Active Object工作线程逐一执行所有任务,则需要特别注意线程的run方法的异常处理,确保不会因为个别任务执行时遇到一些运行时异常而导致整个线程终止。 如清单6的示例代码所示。
清单 6. 自己动手实现Scheduler的错误隔离示例代码
public class CustomScheduler implements Runnable { private LinkedBlockingQueue<Runnable> activationQueue = new LinkedBlockingQueue<Runnable>(); @Override public void run() { dispatch(); } public <T> Future<T> enqueue(Callable<T> methodRequest) { final FutureTask<T> task = new FutureTask<T>(methodRequest) { @Override public void run() { try { super.run(); //捕获所以可能抛出的对象,避免该任务运行失败而导致其所在的线程终止。 } catch (Throwable t) { this.setException(t); } } }; try { activationQueue.put(task); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return task; } public void dispatch() { while (true) { Runnable methodRequest; try { methodRequest = activationQueue.take(); //防止个别任务执行失败导致线程终止的代码在run方法中 methodRequest.run(); } catch (InterruptedException e) { // 处理该异常 } } } }
缓冲区监控
如果ActivationQueue是有界缓冲区,则对缓冲区的当前大小进行监控无论是对于运维还是测试来说都有其意义。从测试的角度来看,监控缓 冲区有助于确定缓冲区容量的建议值(合理值)。清单3所示的代码,即是通过定时任务周期性地调用ThreadPoolExecutor的getQueue 方法对缓冲区的大小进行监控。当然,在监控缓冲区的时候,往往只需要大致的值,因此在监控代码中要避免不必要的锁。
缓冲区饱和处理策略
当任务的提交速率大于任务的执行数率时,缓冲区可能逐渐积压到满。这时新提交的任务会被拒绝。无论是自己编写代码还是利用JDK现有类来实现 Scheduler,对于缓冲区满时新任务提交失败,我们需要一个处理策略用于决定此时哪个任务会成为“牺牲品”。若使用 ThreadPoolExecutor来实现Scheduler有个好处是它已经提供了几个缓冲区饱和处理策略的实现代码,应用代码可以直接调用。如清单 3的代码所示,本文案例中我们选择了抛弃最老的任务作为处理策略。 java.util.concurrent.RejectedExecutionHandler接口是ThreadPoolExecutor对缓冲区饱和 处理策略的抽象,JDK中提供的具体实现如表2所示。
表 2. JDK提供的缓冲区饱和处理策略实现类
实现类 |
所实现的处理策略 |
ThreadPoolExecutor.AbortPolicy |
直接抛出异常。 |
ThreadPoolExecutor.DiscardPolicy |
放弃当前被拒绝的任务(而不抛出任何异常)。 |
ThreadPoolExecutor.DiscardOldestPolicy |
将缓冲区中最老的任务放弃,然后重新尝试接纳被拒绝的任务。 |
ThreadPoolExecutor.CallerRunsPolicy |
在任务的提交方线程中运行被拒绝的任务。 |
当然,对于ThreadPoolExecutor而言,其工作队列满不一定就意味着新提交的任务会被拒绝。当其最大线程池大小大于其核心线程池大小 时,工作队列满的情况下,新提交的任务会用所有核心线程之外的新增线程来执行,直到工作线程数达到最大线程数时,新提交的任务会被拒绝。
Scheduler空闲工作线程清理
如果Scheduler采用多个工作线程(如采用ThreadPoolExecutor这样的线程池)来执行任务。则可能需要清理空闲的线程以节约 资源。清单3的代码就是直接使用了ThreadPoolExecutor的现有功能,在初始化其实例时通过指定其构造器的第3、4个参数( long keepAliveTime, TimeUnit unit),告诉ThreadPoolExecutor对于核心工作线程以外的线程若其已经空闲了指定时间,则将其清理掉。
可复用的Active Object模式实现
尽管利用JDK中的现成类可以极大地简化Active Object模式的实现。但如果需要频繁地在不同场景下使用Active Object模式,则需要一套更利于复用的代码,以节约编码的时间和使代码更加易于理解。清单7展示一段基于Java动态代理的可复用的Active Object模式的Proxy参与者的实现代码。
清单 7. 可复用的Active Object模式Proxy参与者实现
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; public abstract class ActiveObjectProxy { private static class DispatchInvocationHandler implements InvocationHandler { private final Object delegate; private final ExecutorService scheduler; public DispatchInvocationHandler(Object delegate, ExecutorService executorService) { this.delegate = delegate; this.scheduler = executorService; } private String makeDelegateMethodName(final Method method, final Object[] arg) { String name = method.getName(); name = "do" + Character.toUpperCase(name.charAt(0)) + name.substring(1); return name; } @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { Object returnValue = null; final Object delegate = this.delegate; final Method delegateMethod; //如果拦截到的被调用方法是异步方法,则将其转发到相应的doXXX方法 if (Future.class.isAssignableFrom(method.getReturnType())) { delegateMethod = delegate.getClass().getMethod( makeDelegateMethodName(method, args), method.getParameterTypes()); final ExecutorService scheduler = this.scheduler; Callable<Object> methodRequest = new Callable<Object>() { @Override public Object call() throws Exception { Object rv = null; try { rv = delegateMethod.invoke(delegate, args); } catch (IllegalArgumentException e) { throw new Exception(e); } catch (IllegalAccessException e) { throw new Exception(e); } catch (InvocationTargetException e) { throw new Exception(e); } return rv; } }; Future<Object> future = scheduler.submit(methodRequest); returnValue = future; } else { //若拦截到的方法调用不是异步方法,则直接转发 delegateMethod = delegate.getClass() .getMethod(method.getName(),method.getParameterTypes()); returnValue = delegateMethod.invoke(delegate, args); } return returnValue; } } /** * 生成一个实现指定接口的Active Object proxy实例。 * 对interf所定义的异步方法的调用会被装发到servant的相应doXXX方法。 * @param interf 要实现的Active Object接口 * @param servant Active Object的Servant参与者实例 * @param scheduler Active Object的Scheduler参与者实例 * @return Active Object的Proxy参与者实例 */ public static <T> T newInstance(Class<T> interf, Object servant, ExecutorService scheduler) { @SuppressWarnings("unchecked") T f = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class[] { interf }, new DispatchInvocationHandler(servant, scheduler)); return f; } }
清单7的代码实现了可复用的Active Object模式的Proxy参与者ActiveObjectProxy。ActiveObjectProxy通过使用Java动态代理,动态生成指定接 口的代理对象。对该代理对象的异步方法(即返回值类型为java.util.concurrent.Future的方法)的调用会被 ActiveObjectProxy实现InvocationHandler(DispatchInvocationHandler)所拦截,并转发给 ActiveObjectProxy的newInstance方法中指定的Servant处理。
清单8所示的代码展示了通过使用ActiveObjectProxy快速Active Object模式。
清单 8. 基于可复用的API快速实现Active Object模式
public static void main(String[] args) throws InterruptedException, ExecutionException { SampleActiveObject sao = ActiveObjectProxy.newInstance( SampleActiveObject.class, new SampleActiveObjectImpl(), Executors.newCachedThreadPool()); Future<String> ft = sao.process("Something", 1); Thread.sleep(500); System.out.println(ft.get()); }
从清单8的代码可见,利用可复用的Active Object模式Proxy实现,应用开发人员只要指定Active Object模式对外保留的接口(对应ActiveObjectProxy.newInstance方法的第1个参数),并提供一个该接口的实现类(对应 ActiveObjectProxy.newInstance方法的第2个参数),再指定一个 java.util.concurrent.ExecutorService实例(对应ActiveObjectProxy.newInstance方法 的第3个参数)即可以实现Active Object模式。
总结
本文介绍了Active Object模式的意图及架构。并提供了一个实际的案例用于展示使用Java代码实现Active Object模式,在此基础上对该模式进行了评价并分享了在实际运用该模式时需要注意的事项。
参考资源
- 本文的源代码在线阅读:https://github.com/Viscent/JavaConcurrencyPattern/
- 维基百科Active Object模式词条:http://en.wikipedia.org/wiki/Active_object
- Douglas C. Schmidt对Active Object模式的定义:http://www.laputan.org/pub/sag/act-obj.pdf。
- Schmidt, Douglas et al. Pattern-Oriented Software Architecture Volume 2: Patterns for Concurrent and Networked Objects. Volume 2. Wiley, 2000
- Java theory and practice: Decorating with dynamic proxies:http://www.ibm.com/developerworks/java/library/j-jtp08305/index.html
作者简介
黄文海,著有《Java多线程编程实战指南(设计模式篇)》。有多年敏捷项目管理经验和丰富的技术指导经验。关注敏捷开发、Java多线程编程和Web开发。在InfoQ中文站和IBM DeveloperWorks上发表过多篇文章。其博客:http://viscent.iteye.com/
感谢张龙对本文的审校。
相关推荐
《Java多线程编程实战指南》这本书深入浅出地讲解了Java多线程的核心概念和实战技巧,分为核心篇和设计模式篇,旨在帮助开发者掌握并应用多线程技术。 1. **线程基础** - **线程的创建**:Java提供了两种创建线程...
《Java多线程编程实战指南-核心篇》是一本深入探讨Java并发编程的书籍,旨在帮助读者掌握在Java环境中创建、管理和同步线程的核心技术。Java的多线程能力是其强大之处,使得开发者能够在同一时间执行多个任务,提高...
在Java编程中,多线程是一项关键技能,它允许程序同时执行多个任务,极大地提高了程序的...通过阅读"Java多线程编程实战指南 设计模式篇.pdf",你将获得更深入的理论知识和实践技巧,为你的编程事业奠定坚实的基础。
总之,“Java多线程编程实战指南+设计模式篇(全部)”是一份宝贵的资源,它将帮助读者深入理解Java多线程编程的各个方面,以及如何利用设计模式解决实际问题。学习并掌握这些知识,对于提升Java程序员的专业技能和...
《Java多线程编程实战指南(设计模式篇)》由黄文海撰写,是一本深入探讨Java多线程编程和设计模式的专业书籍。书中详细介绍了如何在Java环境中利用多线程来实现高效的并发处理,同时结合设计模式,帮助开发者更好地...
《汪文君JAVA多线程编程实战》是一本专注于Java多线程编程的实战教程,由知名讲师汪文君倾力打造。这本书旨在帮助Java开发者深入理解和熟练掌握多线程编程技术,提升软件开发的效率和质量。在Java平台中,多线程是...
本书以理论结合示例的方式介绍了多线程常见设计模式。
本资源“Java多线程编程实战指南+设计模式篇”深入探讨了这两个主题,旨在帮助开发者提升其在并发编程和软件设计上的技能。 **一、Java多线程** 1. **线程基础**:Java中的线程是程序执行的最小单位。Java通过`...
《Java多线程编程实战指南(核心篇)》以基本概念、原理与方法为主线,辅以丰富的实战案例和生活化实例,并从Java虚拟机、操作系统和硬件多个层次与角度出发,循序渐进、系统地介绍Java平台下的多线程编程核心技术及...
《Java多线程编程实战指南(设计模式篇)》采用Java(JDK1.6)语言和UML 为描述语言,并结合作者多年工作经历的相关实战案例,介绍了多线程环境下常用设计模式的来龙去脉:各个设计模式是什么样的及其典型的实际应用...
《Java多线程编程实战指南(设计篇模式)》源码这是国内首部Java多线程设计模式原创作品《Java多线程编程实战指南(设计模式篇)》一书的源码。新书《Java多线程编程实战指南(设计模式篇)》,张龙老师作推荐序。书...
《Java多线程编程实战指南》是一本深入探讨Java并发编程的书籍,涵盖了核心篇与设计模式篇。这本书旨在帮助开发者理解和掌握Java平台上的多线程编程,提升系统性能和可扩展性。在Java世界中,多线程是实现并发处理、...
《Java多线程编程实战指南 设计模式篇》是一本深度探讨Java并发编程与设计模式融合的书籍。在Java编程中,多线程是提升系统性能、实现并行计算的关键技术,而设计模式则是解决常见问题的最佳实践。本书旨在帮助...
Java多线程是Java编程中的重要概念,尤其在如今的多核处理器环境下,理解并熟练掌握多线程技术对于提高程序性能和响应速度至关重要。本资料详细讲解了Java多线程的原理,并提供了丰富的实战代码,非常适合Java初学者...
本资源"《C#多线程编程实战》完整源码"提供了丰富的实例,适用于学习和实践C#中的多线程概念。 在C#中,多线程允许应用程序同时执行多个独立的任务,提高系统利用率并优化性能。.NET框架为开发者提供了强大的支持,...
Java多线程编程实战指南设计模式篇.mobi
Java多线程编程是Java开发中的重要组成部分,它允许程序同时执行多个任务,极大地提高了程序的效率和响应性。在Java中,多线程主要通过`Thread`类和并发工具来实现,接下来我们将深入探讨这些关键知识点。 1. **...
Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式上传文件Java多线程设计模式...
《深入学习:Java多线程编程》是一本专注于Java并发技术的专业书籍,旨在帮助开发者深入理解和熟练运用Java中的多线程编程。Java多线程是Java编程中的核心部分,尤其在现代高性能应用和分布式系统中不可或缺。理解并...