任务管理
线程创建
线程最普遍的一个应用程序是创建一个或多个线程,以执行特定类型的任务。Timer 类创建线程来执行 TimerTask 对象,Swing 创建线程来处理 UI 事件。在这两种情况中,在单独线程中执行的任务都假定是短期的,这些线程是为了处理大量短期任务而存在的。
在其中每种情况中,这些线程一般都有非常简单的结构:
while (true) { if (no tasks) wait for a task; execute the task; }
通过例示从 Thread 获得的对象并调用 Thread.start() 方法来创建线程。可以用两种方法创建线程:通过扩展 Thread 和覆盖 run() 方法,或者通过实现 Runnable 接口和使用 Thread(Runnable) 构造函数:
class WorkerThread extends Thread { public void run() { /* do work */ } } Thread t = new WorkerThread(); t.start();
或者:
Thread t = new Thread(new Runnable() { public void run() { /* do work */ } } t.start();
重新使用线程
因为多个原因,类似 Swing GUI 的框架为事件任务创建单一线程,而不是为每项任务创建新的线程。首先是因为创建线程会有间接成本,所以创建线程来执行简单任务将是一种资源浪费。通过重新使用事件线程来处理多个事件,启动和拆卸成本(随平台而变)会分摊在多个事件上。
Swing 为事件使用单一后台线程的另一个原因是确保事件不会互相干涉,因为直到前一事件结束,下一事件才开始处理。该方法简化了事件处理程序的编写。使用多个线程,将要做更多的工作来确保一次仅一个线程地执行线程相关的代码。
如何不对任务进行管理
大多数服务器应用程序(如 Web 服务器、POP 服务器、数据库服务器或文件服务器)代表远程客户机处理请求,这些客户机通常使用 socket 连接到服务器。对于每个请求,通常要进行少量处理(获得该文件的代码块,并将其发送回 socket),但是可能会有大量(且不受限制)的客户机请求服务。
用于构建服务器应用程序的简单化模型会为每个请求创建新的线程。下列代码段实现简单的 Web 服务器,它接受端口 80 的 socket 连接,并创建新的线程来处理请求。不幸的是,该代码不是实现 Web 服务器的好方法,因为在重负载条件下它将失败,停止整台服务器。
class UnreliableWebServer { public static void main(String[] args) { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable r = new Runnable() { public void run() { handleRequest(connection); } }; // Don't do this! new Thread(r).start(); } } }
当服务器被请求吞没时,UnreliableWebServer 类不能很好地处理这种情况。每次有请求时,就会创建新的类。根据操作系统和可用内存,可以创建的线程数是有限的。不幸的是,您通常不知道限制是多少 —— 只有当应用程序因为 OutOfMemoryError 而崩溃时才发现。
如果足够快地在这台服务器上抛出请求的话,最终其中一个线程创建将失败,生成的 Error 会关闭整个应用程序。当一次仅能有效支持很少线程时,没有必要创建上千个线程,无论如何,这样使用资源可能会损害性能。创建线程会使用相当一部分内存,其中包括有两个堆栈(Java 和 C),以及每线程数据结构。如果创建过多线程,其中每个线程都将占用一些 CPU 时间,结果将使用许多内存来支持大量线程,每个线程都运行得很慢。这样就无法很好地使用计算资源。
使用线程池解决问题
为任务创建新的线程并不一定不好,但是如果创建任务的频率高,而平均任务持续时间低,我们可以看到每项任务创建一个新的线程将产生性能(如果负载不可预知,还有稳定性)问题。
如果不是每项任务创建一个新的线程,则服务器应用程序必须采取一些方法来限制一次可以处理的请求数。这意味着每次需要启动新的任务时,它不能仅调用下列代码。
new Thread(runnable).start()
管理一大组小任务的标准机制是组合工作队列和线程池。工作队列就是要处理的任务的队列,前面描述的 Queue 类完全适合。线程池是线程的集合,每个线程都提取公用工作队列。当一个工作线程完成任务处理后,它会返回队列,查看是否有其他任务需要处理。如果有,它会转移到下一个任务,并开始处理。
线程池为线程生命周期间接成本问题和资源崩溃问题提供了解决方案。通过对多个任务重新使用线程,创建线程的间接成本将分布到多个任务中。作为一种额外好处,因为请求到达时,线程已经存在,从而可以消除由创建线程引起的延迟。因此,可以立即处理请求,使应用程序更易响应。而且,通过正确调整线程池中的线程数,可以强制超出特定限制的任何请求等待,直到有线程可以处理它,它们等待时所消耗的资源要少于使用额外线程所消耗的资源,这样可以防止资源崩溃。
Executor 框架
java.util.concurrent 包中包含灵活的线程池实现,但是更重要的是,它包含用于管理实现 Runnable 的任务的执行的整个框架。该框架称为 Executor 框架。
Executor 接口相当简单。它描述将运行 Runnable 的对象:
public interface Executor { void execute(Runnable command); }
任务运行于哪个线程不是由该接口指定的,这取决于使用的 Executor 的实现。它可以运行于后台线程,如 Swing 事件线程,或者运行于线程池,或者调用线程,或者新的线程,它甚至可以运行于其他 JVM!通过同步的 Executor 接口提交任务,从任务执行策略中删除任务提交。Executor 接口独自关注任务提交 —— 这是 Executor 实现的选择,确定执行策略。这使在部署时调整执行策略(队列限制、池大小、优先级排列等等)更加容易,更改的代码最少。
java.util.concurrent 中的大多数 Executor 实现还实现 ExecutorService 接口,这是对 Executor 的扩展,它还管理执行服务的生命周期。这使它们更易于管理,并向生命可能比单独 Executor 的生命更长的应用程序提供服务。
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit); // other convenience methods for submitting tasks }
Executor
java.util.concurrent 包包含多个 Executor 实现,每个实现都实现不同的执行策略。什么是执行策略?执行策略定义何时在哪个线程中运行任务,执行任务可能消耗的资源级别(线程、内存等等),以及如果执行程序超载该怎么办。
执行程序通常通过工厂方法例示,而不是通过构造函数。Executors 类包含用于构造许多不同类型的 Executor 实现的静态工厂方法:
• Executors.newCachedThreadPool() 创建不限制大小的线程池,但是当以前创建的线程可以使用时将重新使用那些线程。如果没有现有线程可用,将创建新的线程并将其添加到池中。使用不到 60 秒的线程将终止并从缓存中删除。
• Executors.newFixedThreadPool(int n) 创建线程池,其重新使用在不受限制的队列之外运行的固定线程组。在关闭前,所有线程都会因为执行过程中的失败而终止,如果需要执行后续任务,将会有新的线程来代替这些线程。
• Executors.newSingleThreadExecutor() 创建 Executor,其使用在不受限制的队列之外运行的单一工作线程,与 Swing 事件线程非常相似。保证顺序执行任务,在任何给定时间,不会有多个任务处于活动状态。
更可靠的 Web 服务器 —— 使用 Executor
前面 如何不对任务进行管理 中的代码显示了如何不用编写可靠服务器应用程序。幸运的是,修复这个示例非常简单,只需将 Thread.start() 调用替换为向 Executor 提交任务即可:
class ReliableWebServer { Executor pool = Executors.newFixedThreadPool(7); public static void main(String[] args) { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable r = new Runnable() { public void run() { handleRequest(connection); } }; pool.execute(r); } } }
注意,本例与前例之间的区别仅在于 Executor 的创建以及如何提交执行的任务。
定制 ThreadPoolExecutor
Executors 中的 newFixedThreadPool 和 newCachedThreadPool 工厂方法返回的 Executor 是类 ThreadPoolExecutor 的实例,是高度可定制的。
通过使用包含 ThreadFactory 变量的工厂方法或构造函数的版本,可以定义池线程的创建。ThreadFactory 是工厂对象,其构造执行程序要使用的新线程。使用定制的线程工厂,创建的线程可以包含有用的线程名称,并且这些线程是守护线程,属于特定线程组或具有特定优先级。
下面是线程工厂的例子,它创建守护线程,而不是创建用户线程:
public class DaemonThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); return thread; } }
有时,Executor 不能执行任务,因为它已经关闭或者因为 Executor 使用受限制队列存储等待任务,而该队列已满。在这种情况下,需要咨询执行程序的 RejectedExecutionHandler 来确定如何处理任务 —— 抛出异常(默认情况),放弃任务,在调用者的线程中执行任务,或放弃队列中最早的任务以为新任务腾出空间。ThreadPoolExecutor.setRejectedExecutionHandler 可以设置拒绝的执行处理程序。
还可以扩展 ThreadPoolExecutor,并覆盖方法 beforeExecute 和 afterExecute,以添加装置,添加记录,添加计时,重新初始化线程本地变量,或进行其他执行定制。
需要特别考虑的问题
使用 Executor 框架会从执行策略中删除任务提交,一般情况下,人们希望这样,那是因为它允许我们灵活地调整执行策略,不必更改许多位置的代码。然而,当提交代码暗含假设特定执行策略时,存在多种情况,在这些情况下,重要的是选择的 Executor 实现一致的执行策略。
这类情况中的其中的一种就是一些任务同时等待其他任务完成。在这种情况下,当线程池没有足够的线程时,如果所有当前执行的任务都在等待另一项任务,而该任务因为线程池已满不能执行,那么线程池可能会死锁。
另一种相似的情况是一组线程必须作为共同操作组一起工作。在这种情况下,需要确保线程池能够容纳所有线程。
如果应用程序对特定执行程序进行了特定假设,那么应该在 Executor 定义和初始化的附近对这些进行说明,从而使善意的更改不会破坏应用程序的正确功能。
调整线程池
创建 Executor 时,人们普遍会问的一个问题是“线程池应该有多大?”。当然,答案取决于硬件和将执行的任务类型(它们是受计算限制或是受 IO 的限制?)。
如果线程池太小,资源可能不能被充分利用,在一些任务还在工作队列中等待执行时,可能会有处理器处于闲置状态。
另一方面,如果线程池太大,则将有许多有效线程,因为大量线程或有效任务使用内存,或者因为每项任务要比使用少量线程有更多上下文切换,性能可能会受损。
所以假设为了使处理器得到充分使用,线程池应该有多大?如果知道系统有多少处理器和任务的计算时间和等待时间的近似比率,Amdahl 法则提供很好的近似公式。
用 WT 表示每项任务的平均等待时间,ST 表示每项任务的平均服务时间(计算时间)。则 WT/ST 是每项任务等待所用时间的百分比。对于 N 处理器系统,池中可以近似有 N*(1+WT/ST) 个线程。
好的消息是您不必精确估计 WT/ST。“合适的”池大小的范围相当大;只需要避免“过大”和“过小”的极端情况即可。
Future 接口
Future 接口允许表示已经完成的任务、正在执行过程中的任务或者尚未开始执行的任务。通过 Future 接口,可以尝试取消尚未完成的任务,查询任务已经完成还是取消了,以及提取(或等待)任务的结果值。
FutureTask 类实现了 Future,并包含一些构造函数,允许将 Runnable 或 Callable(会产生结果的 Runnable)和 Future 接口封装。因为 FutureTask 也实现 Runnable,所以可以只将 FutureTask 提供给 Executor。一些提交方法(如 ExecutorService.submit())除了提交任务之外,还将返回 Future 接口。
Future.get() 方法检索任务计算的结果(或如果任务完成,但有异常,则抛出 ExecutionException)。如果任务尚未完成,那么 Future.get() 将被阻塞,直到任务完成;如果任务已经完成,那么它将立即返回结果。
使用 Future 构建缓存
该示例代码与 java.util.concurrent 中的多个类关联,突出显示了 Future 的功能。它实现缓存,使用 Future 描述缓存值,该值可能已经计算,或者可能在其他线程中“正在构造”。
它利用 ConcurrentHashMap 中的原子 putIfAbsent() 方法,确保仅有一个线程试图计算给定关键字的值。如果其他线程随后请求同一关键字的值,它仅能等待(通过 Future.get() 的帮助)第一个线程完成。因此两个线程不会计算相同的值。
public class Cache<K, V> { ConcurrentMap<K, FutureTask<V>> map = new ConcurrentHashMap(); Executor executor = Executors.newFixedThreadPool(8); public V get(final K key) { FutureTask<V> f = map.get(key); if (f == null) { Callable<V> c = new Callable<V>() { public V call() { // return value associated with key } }; f = new FutureTask<V>(c); FutureTask old = map.putIfAbsent(key, f); if (old == null) executor.execute(f); else f = old; } return f.get(); } }
CompletionService
CompletionService 将执行服务与类似 Queue 的接口组合,从任务执行中删除任务结果的处理。CompletionService 接口包含用来提交将要执行的任务的 submit() 方法和用来询问下一完成任务的 take()/poll() 方法。
CompletionService 允许应用程序结构化,使用 Producer/Consumer 模式,其中生产者创建任务并提交,消费者请求完成任务的结果并处理这些结果。CompletionService 接口由 ExecutorCompletionService 类实现,该类使用 Executor 处理任务并从 CompletionService 导出 submit/poll/take 方法。
下列代码使用 Executor 和 CompletionService 来启动许多“solver”任务,并使用第一个生成非空结果的任务的结果,然后取消其余任务:
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<Future<Result>>(n); Result result = null; try { for (Callable<Result> s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch(ExecutionException ignore) {} } } finally { for (Future<Result> f : futures) f.cancel(true); } if (result != null) use(result); }
分享到:
相关推荐
在JDK5中,对多线程编程进行了大量改进,包括引入了`java.util.concurrent`包,其中包含了许多高效的并发工具类,如Executor框架、Future接口、Semaphore信号量等。 10. **NIO(New Input/Output)** NIO提供了非...
Java中的Executor框架是多线程编程的一个重要组成部分,它始于JDK5,目的是为了更好地管理和控制线程的执行。Executor框架的设计理念是将任务(工作单元)与执行机制分离,从而提高了程序的可扩展性和灵活性。 1. *...
- **与Executor框架比较**:虽然`Callable`结合`FutureTask`可以独立使用,但在实际开发中更多地会结合`Executor`框架来使用,这样可以更好地管理线程资源,避免资源浪费。 #### 二、新增方式2:使用线程池 ##### ...
4. **并发编程**:讲解了JDK中的并发工具类,如Executor框架、Fork/Join框架以及并发集合,让读者能够在多线程环境下写出高效、可靠的代码。 5. **错误与异常处理**:详细介绍Java的异常处理机制,如何正确使用try-...
7. **线程并发API**:引入了java.util.concurrent包,为多线程编程提供了更高级别的抽象,如Executor框架。 **JDK 1.5(Java 5)** JDK 1.5于2004年发布,是Java的一个重大更新,引入了许多重要语言和库改进,例如...
3. **并发编程支持**:Java 6引入了并发工具包`java.util.concurrent`,包含了许多线程管理的实用类,如Executor框架,Semaphore,CyclicBarrier等,这些工具极大地简化了多线程编程。 4. **改进的垃圾回收机制**:...
9. **并发编程**:Java并发API在JDK1.6中进一步完善,包括Executor框架、Future接口和并发集合,帮助开发者更好地编写多线程应用。 10. **改进的性能**:JDK1.6在内存管理、垃圾回收和类加载等方面进行了优化,提高...
在并发编程方面,JDK 1.5引入了并发工具类(java.util.concurrent package),如Executor框架、Semaphore、CountDownLatch和CyclicBarrier等,这些工具大大简化了多线程编程,提高了并发应用的效率和可靠性。...
10. **并发编程改进**:包括`java.util.concurrent`包的引入,提供了线程安全的数据结构和高级并发工具,如Executor框架。 11. **内存模型的改进**:确保多线程环境中的可见性和一致性,增强了Java内存模型(JMM)...
Java的并发库在JDK 1.6中得到了显著增强,包括Executor框架、Future接口、CountDownLatch、CyclicBarrier等。源码解析有助于掌握这些并发工具类的实现原理,从而编写更高效的多线程程序。 4. **I/O流** JDK 1.6对...
10. **并发改进**:添加了`java.util.concurrent`包,包含许多线程管理和同步工具类,如Executor框架,Semaphore,CountDownLatch等。 11. **内存模型的增强**:JDK 1.5对Java内存模型(JMM)进行了改进,确保了多...
JDK 1.5引入了`java.util.concurrent`包,提供了线程安全的数据结构和并发编程工具,如`Executor`框架、`Future`接口、`Semaphore`、`CyclicBarrier`等,极大地简化了多线程编程。 ### 10. **Synchronized关键字...
3. **使用ExecutorService和Callable接口**:JDK 1.5引入的Executor框架允许更高级别的线程管理。Callable接口与Runnable类似,但可以返回结果。使用ExecutorService可以创建线程池,有效管理线程生命周期,提高性能...
同时,Executor框架的引入简化了线程池的管理和任务调度,使得并发编程更加优雅和易用。 除此之外,JDK1.5还引入了自动装箱拆箱、枚举(enum)类型、变量赋值的增强(如for-each循环)、静态导入等新特性,极大地...
7. **并发与多线程**: Java提供了丰富的并发工具类,如`Thread`、`Runnable`、`Executor`等,API文档会详细介绍如何创建和管理线程,以及如何使用同步机制如`synchronized`关键字和`Lock`接口。 8. **Java内存模型*...
2. **并发编程增强**:增加了`java.util.concurrent`包,包含了许多并发工具类,如Executor框架、Future接口、CountDownLatch、CyclicBarrier等,帮助开发者更高效地编写多线程程序。 3. **动态代理**:`java.lang....
- **Executor框架**:提供了一个高级抽象来管理线程池和执行任务。 - **原子变量类**:如`AtomicInteger`,`AtomicLong`等,用于实现无锁编程。 - **并发容器**:如`ConcurrentHashMap`,`ConcurrentLinkedQueue`等...
- `java.util.concurrent`包引入了大量的并发工具类,如Executor框架、Future接口、BlockingQueue等,使得多线程编程更加高效和安全。 5. **注解(Annotations):** - 注解是一种元数据,可以用于提供编译器或...
- **并发工具库**:JDK 1.6引入了java.util.concurrent包,提供了更高级别的并发抽象,如Executor框架、锁和信号量等,简化了多线程编程。 - **动态语言支持**:增加了对动态语言的支持,使得Java能够更好地与其他...