java并发编程实战-第8章-线程池的使用
重点:配置调优的高级选项,并分析在任务执行框架时需要注意的各种危险
8.1 在任务与执行策略之间的隐性耦合
8.1.1 线程饥饿死锁
条件:任务依赖其他任务,只要线程池中的正在执行的任务的线程需要等待一工作队列中的任务而阻塞。
调整策略:调整线程池的大小
例子:ThreadDeadlock中主任务等待子任务的完成(注意:例子是单线程执行,如果是多线程则不会)
Task that Deadlocks in a Single-threaded Executor Don't Do this.
public class ThreadDeadlock {
ExecutorService exec = Executors.newSingleThreadExecutor();
public class RenderPageTask implements Callable<String> {
public String call() throws Exception {
Future<String> header, footer;
header = exec.submit(new LoadFileTask("header.html"));
footer = exec.submit(new LoadFileTask("footer.html"));
String page = renderBody();
// Will deadlock -- task waiting for result of subtask
return header.get() + page + footer.get();
}
}
}
8.1.2 运行较长时间的任务
条件:任务时间阻塞时间过长,即使不死锁,线程池的响应性也会变的很糟糕
调整策略:1、(缓解)限定任务等待资源时间,用限时版本的阻塞方法
2、如果总是充满被阻塞的任务,那么调大线程池的大小
例如:Thread.join ,BlockingQueue.put、CountDownLatch.await
8.2 设置线程池的大小
必须分析计算机环境,资源预算和任务的特性,要正确的设置线程池的大小,必须估算出任务的等待时间和计算时间的比值
The optimal pool size for keeping the processors at the desired utilization is:
N(线程数)=N(cpu总数)*U(cpu的利用率)*(1+W(等待时间)/C(计算时间))
(和任务的数量没关系,只有cpu 和 任务的等待执行时间有关系)
You can determine the number of CPUs using Runtime:
int N_CPUS = Runtime.getRuntime().availableProcessors();
如果阻塞操作多了。比如i/o 操作,有些任务并不会一直执行,所以要调大线程池的大小
8.3 配置ThreadPoolExcutor
首先,来看看ThreadPoolExcutor的本质与默认配置
本质:ExcutorService
适用:Executors 工厂方法配置
默认配置(Executors 工厂方法提供,它们均为大多数使用场景预定义了设置 ):
Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)
Executors.newSingleThreadExecutor()(单个后台线程)
如果默认执行策略不能满足需求,那么可以通过ThreadPoolExcutor的构造函数来实例化一个对象,根据自己的需求定制
执行策略包括的内容即是构造函数的参数内容:
如下:
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler
)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime:线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
如下蓝色内容来自:http://blog.chinaunix.net/uid-20577907-id-3519578.html
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程
8.3.1 线程的创建和销毁
corePoolSize, maximumPoolSize,keepAliveTime 负责线程的创建和销毁
8.3.2 管理队列任务
8.3.3 饱和策略
ThreadPoolExecutor的饱和策略通过setRejectedExecutionHandler来修改
1、Abort 中止 默认,将抛出RejectedExecutionException,调用者捕获改异常,根据需求处理代码
2、抛弃(Discard)
3、抛弃最旧的(Discard-Oldest)会抛弃下一个需要执行的任务,在FIFO队列中时成立的,但在优先队列中会抛弃最优先的任务,所以该策略不该和优先队列一起使用
4、调用者运行(Caller-Runs),该策略是个调节机制,比如在webServer服务器中,使用有界队列和调用者运行饱和策略,在线程池中所有线程都被调用,工作队列已满,
下一个任务会在调用excute的主线程中运行,此时主线程不会在调用accept,因此到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中。如果持续过载,tcp层同样开始抛弃任务。
当服务器过载时,过载情况会向外蔓延开来-从线程池到工作者队列到应用程序到tcp层,最终达到客户端,导致服务器在过高负载的情况下实现一种平缓的性能降低。
例子:
ThreadPoolExecutor executor
= new ThreadPoolExecutor(N_THREADS, N_THREADS,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(CAPACITY));
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
那么当任务达到后,如何做到不中止、不抛弃、不被调节,而是阻塞呢?
通过Semaphore(信号量)来控制任务的到达率,来实现
8.3.4 线程工厂
通过实现ThreadFactory接口,可以实现自定义线程
指定name,设置UncaughtExceptionHandler,log等
8.3.5 在调用构造函数后再定制ThreadPoolExcutor(允许指定和不允许指定的处理)
Listing 8.8. Modifying an Executor Created with the Standard Factories.
ExecutorService exec = Executors.newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor)
((ThreadPoolExecutor) exec).setCorePoolSize(10);
else
throw new AssertionError("Oops, bad assumption");
补充理解:
ExecutorService 和 ThreadPoolExecutor 的关系 :ThreadPoolExecutor是ExecutorService实现的子类
public interface ExecutorServiceextends Executor
public abstract class AbstractExecutorServiceextends Objectimplements ExecutorService
public class ThreadPoolExecutorextends AbstractExecutorService
如果不让外部代码定制ThreadPoolExcutor,可以使用Excutors中的 unconfigurableExecutorService方法来包裹ExecutorService
static ExecutorService unconfigurableExecutorService(ExecutorService executor)
返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法
改方法仅仅代理了定义在ExecutorService的方法,比如shutdown、submit 但是屏蔽了子类比如ThreadPoolExecutor中的setCorePoolSize,setKeepAliveTime等方法
8.4 扩展ThreadPoolExcutor
override beforeExecute, afterExecute , and terminatethat
Listing 8.9. TimingThreadPool 例子不错
8.5 并行化递归算法
经典应用场景:
谜题框架 (puzzle Framework)
串行执行,深度优先算法,对栈要求高,不一定找到是最短路径
并行执行,广度优先算法,需要存的节点多,所以对内存需求高,能找出最短路径
public class ConcurrentPuzzleSolver <P, M> {
private final Puzzle<P, M> puzzle;
private final ExecutorService exec;
private final ConcurrentMap<P, Boolean> seen;
protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();
public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
this.exec = initThreadPool();
this.seen = new ConcurrentHashMap<P, Boolean>();
if (exec instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
}
}
private ExecutorService initThreadPool() {
return Executors.newCachedThreadPool();
}
public List<M> solve() throws InterruptedException {
try {
P p = puzzle.initialPosition();
exec.execute(newTask(p, null, null));
// block until solution found
PuzzleNode<P, M> solnPuzzleNode = solution.getValue(); // The main thread needs to wait until a solution is found
return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
} finally {
exec.shutdown();
}
}
protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
return new SolverTask(p, m, n);
}
protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
super(pos, move, prev);
}
public void run() {
if (solution.isSet()
|| seen.putIfAbsent(pos, true) != null)
return; // already solved or seen this position
if (puzzle.isGoal(pos))
solution.setValue(this);
else
for (M m : puzzle.legalMoves(pos))
exec.execute(newTask(puzzle.move(pos, m), m, this));//启动子线程继续搜索,父线程终止,不影响子线程继续执行
}
}
}
小结:
策略:
创建线程策略
关闭线程策略
处理队列任务策略
处理过多任务策略
钩子方法来扩展行为
相关推荐
《Java并发编程实践》是Java开发者深入理解并发编程的重要参考资料,尤其对于想要提升多线程应用设计和性能优化技能的程序员来说,这本书提供了丰富的实践经验和深入的理论知识。以下是根据提供的章节内容概述的一些...
第8章 线程池的使用 第9章 图形用户界面应用程序 第三部分 活跃性、性能与测试 第10章 避免活跃性危险 第11章 性能与可伸缩性 第12章 并发程序的测试 第四部分 高级主题 第13章 显式锁 第14章 构建自定义的...
Java并发编程实战,第1章 简介,第2章 线程安全性 第3章 对象的共享 第4章 对象的组合 第5章 基础构建模块 第6章 任务执行 第7章 取消与关闭 第8章 线程池的使用 第9章 图形用户界面应用程序 第10章 避免...
第8章 线程池的使用 第9章 图形用户界面应用程序 第三部分 活跃性、性能与测试 第10章 避免活跃性危险 第11章 性能与可伸缩性 第12章 并发程序的测试 第四部分 高级主题 第13章 显式锁 第14章 构建自定义的...
│ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...
第1章介绍Java并发编程的挑战,向读者说明进入并发编程的世界可能会遇到哪些问题,以及如何解决。 第2章介绍Java并发编程的底层实现原理,介绍在CPU和JVM这个层面是如何帮助Java实现并发编程的。 第3章介绍深入介绍...
龙果 java并发编程原理实战 第2节理解多线程与并发的之间的联系与区别 [免费观看] 00:11:59分钟 | 第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四...
这本书涵盖了Java并发编程的核心概念和技术,旨在帮助开发者在实际项目中高效地处理高并发场景。随书附带的代码提供了丰富的示例,以便读者能够更直观地理解并实践这些理论知识。 1. **Java并发基础** - **线程与...
第8章 应用线程池 8.1 任务与执行策略问的隐性耦合 8.2 定制线程池的大小 8.3 配置ThreadPoolExecutor 8.4 扩展ThreadPoolExecutor 8.5 并行递归算法 第9章 GUI应用程序 9.1 为什么GUI是单线程化的 9.2 短期的GUI...
第八章关注于Java提供的并发工具类,这些工具类包括CountDownLatch、CyclicBarrier、Semaphore等,它们是Java并发编程库的重要组成部分。本章不仅详细介绍了这些工具类的功能和用途,还提供了一系列实用的例子,帮助...
│ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...
11. 第11章提供并发编程实战案例,介绍问题排查方法。 本书适合已有一定Java基础和开发经验的读者,特别是Java开发工程师、架构师和并发编程爱好者。初学者可以按顺序阅读并实践书中的例子,有经验的读者则可根据...
《Java并发编程实践》这本书是Java开发者深入理解并发编程的重要参考资料。它涵盖了从基础到高级的并发编程概念,旨在帮助读者提升在多线程环境下的编程能力,优化系统性能,避免并发问题。以下是对该书内容的详细...
10. **并发编程实战**:提供实际案例分析和练习,帮助读者将理论知识应用于实际项目,增强解决并发问题的能力。 11. **分布式并发**:简述在分布式系统中的并发问题,如分布式锁、分布式协调服务(如Zookeeper)...