什么是线程池
线程池:管理一组工作线程的资源池。
为什么使用线程池
1.避免反复创建回收线程,降低资源消耗。
2.提供线程的可管理性。
3.提高响应速度
如何创建线程池
ThreadPoolExecutor是jdk提供的线程池的服务类,基于ThreadPoolExecutor可以很容易将一个实现Runnable接口的任务放入线程池中执行,下面是ThreadPoolExecutor实现:
//构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
创建一个线程池需要以下几个参数:
· corePoolSize:线程池中线程的个数。
· maximumPoolSize:线程池最大容量。
· keepAliveTime:线程池中空闲存活时间(单位:TimeUnit定)
· workQueue:存放任务的工作队列。
· threadFactory:线程工厂
· handler:当任务数超过maximumPoolSize+corePoolSize后,任务交给handler处理。
1.corePoolSize
当客户端提交一个新任务时,如果此时线程池中线程的个数小于corePoolSize,则线程池会新建一个线程来处理该任务,即时此时有空闲线程。
2.maximumPoolSize
客户端提交新任务,此时线程池的任务队列也已经满了,那么如果maximumPoolSize < corePoolSize,就新建线程执行该任务。
如果线程池用的是无界队列,那么这个参数也就不起任何作用了。
3.keepAliveTime
线程池中超过corePoolSize的线程会在空闲keepAliveTime时间后被关闭,keepAliveTime单位由TimeUnit指定,如果allowCoreThreadTimeOut=true,即核心线程如果空闲时间超过keepAliveTime时间后同样会被关闭。
4.workQueue
当核心线程池已满,即当前worker数=corePoolSize时,新提交的任务会被放入工作队列中。此时一旦有worker完成手头的任务就会到workQueue中领取一个新任务继续执行。
工作队列可以有以下几种选择:
(1).ArrayBlockingQueue:基于数组的有界阻塞队列
(2).LinkedBlockingQueue:基于链表的阻塞队列,可以不指定队列大小,默认Integer.MAX_VALUE。性能高于ArrayBlockingQueue。
(3).SynchronousQueue
(4).PriorityBlockingQueue:具有优先级的阻塞列,基于数组实现,内部实际上实现了一个最小堆,每次offer、poll,都需要进行堆调整操作O(logn)。队列中元素需要实现Comparable接口或初始化队列时传入一个Comparator对象。虽然初始化队列时需指定队列大小,但PrioriityBlockingQueue支持动态扩容,所以可以认为是无限阻塞队列。
5.threadFactory
每当线程池需要创建一个线程时,都是通过线程工厂方法来完成。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。
我们可以通过实现ThreadFactory接口,来定制线程工厂。样例如下:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String name){
this.poolName = name;
}
@Override
public Thread newThread(Runnable r) {
// TODO Auto-generated method stub
return new MyAppThread(r,poolName);
}
}
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
public MyAppThread(Runnable r){
this(r,DEFAULT_NAME);
}
public MyAppThread(Runnable r,String name){
super(r,name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("UNCATGHT in Thread " + t.getName());
}
});
}
public void run(){
try {
alive.incrementAndGet();
super.run();
} finally{
alive.decrementAndGet();
}
}
public static int getThreadsCreated(){return created.get();}
public static int getThreadAlive(){return alive.get();}
public static boolean getDebug(){return debugLifecycle;}
}
6.RejectedExecutionHandler
当线程池达到最大容量,饱和策略就发挥作用,ThreadPoolExecutor的饱和策略可以通过setRejectedExecutionHandler方法来修改。
JDK提供几种不同的RejectedExecutionHandler的实现:
· AbortPolicy:终止策略是默认饱和策略,直接抛出RejectedExecutionException。调用者可以捕获这个异常,并做相应处理。
· CallerRunsPolicy:“调用者运行”策略实现了一种调用机制,该策略不会抛弃任务,也不抛出异常,而是将任务退回调用者,从而降低新任务的流量
· DiscardPolicy:新任务无法入队列时,直接抛弃该任务。
· DiscardOldestPolicy:抛弃下一个将要被执行的任务,然后尝试重新提交新任务。(若工作队列是一个优先队列,那么“抛弃最旧的”策略导致抛弃优先级最高的任务。)
饱和策略实现类都需要实现RejectedExecutionHandler接口,下面是四种jdk内置的四种饱和策略的源码:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
// 直接抛出RejectedExecutionException异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
//不做操作,直接丢弃了
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
//抛弃队头的任务,即最早提交且未执行的任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
创建线程池
jdk为我们提供了一个工厂类Executors,其中提供了几个静态工厂方法用于新建不同特性的线程池。如下:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
· newFixedThreadPool:将创建一个固定长度的线程池,每提交一个任务时就创建一个线程,直到达到线程池的最大数量,此时线程池的规模不再变化(如果某个线程发生Exception而结束,那么线程池会补充一个新的线程)
· newCachedThreadPool:创建一个可缓存的线程池,如果当前线程池中线程的个数超过了处理需求时,那么空闲线程将被回收,而当需要增加线程时,则可以添加新的线程,线程池中个数不受限制(使用时格外注意,防止内存溢出)
· newSingleThreadPool:这是一个单线程的Executor,它创建单个工作线程来执行任务,如果线程异常结束,会创建一个新的线程来替代。newSingleThreadPool能确保依照任务在队列中的顺序串行执行。
· newScheduledThreadPool:创建一个固定长度的线程池,而且可以延时或定时方式来执行任务。
注意事项
1.newFixedThreadPool和newSingleThreadExecutor都是用了LinkedBlockingQueue(),默认capacity=Integer.MAX_VALUE,线程池工作队列可以认为是无限大的,所以线程池中的线程数不会超过CorePoolSize,maximumPoolSize可以认为是一个无效参数,且饱和策略不可能执行,这几点需要注意。
2.newFixedThreadPool(1)和newSingleThreadPool区别?
newSingleThreadPool返回的是一个代理对象,屏蔽了ThreadPoolExecutor的一些set方法,即newSingleThreadPool一旦返回,就无法在重新配置线程池的参数了。
3.CachedThreadPool的corePoolSize=0,即核心线程池默认为空,maximumPoolSize=Integer.MAX_VALUE,最大线程池为无限大的。且空闲线程等待新任务超过60秒即被终止。
Executor生命周期
由于Executor以异步方式来执行任务,因此在任意时刻,之前提交的任务的状态是无法立刻得到的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。
为了解决执行任务的生命周期问题,ExecutorService扩展了Executor接口,添加了一些生命周期管理的方法。如下:
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// ...还有用于提交任务的一些方法
线程池的生命周期有以下几种状态
· RUNNING:接受新task,并且处理工作队列中的任务。
· SHUTDOWN:不接受新task,但是继续处理工作队列中的任务。
· STOP:不接受新task,不处理工作队列中的任务,并且中断运行中的线程。
· TIDYING:所有任务已被终止,线程池已清空(workerCount=0),此时线程池状态变为TIDYING,并且准备执行terminated()方法。
· TERMINATED:以完成terminated()方法。
线程池如何存储自身状态的?
线程池的状态信息是用一个AtomicInteger类型的变量ctl存储的,定义如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl中除了存放状态信息,还存放了线程池当前工作线程的个数信息。下图展示这两个信息在ctl中的存储形式:
下面是状态相关信息的源码,结合上图应该就不难理解了
private static final int COUNT_BITS = Integer.SIZE - 3;//为啥减3
//0-28位全为1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// RUNNING : 111。。。
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN : 000。。。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP : 001。。。
private static final int STOP = 1 << COUNT_BITS;
// TIDYING : 010。。。
private static final int TIDYING = 2 << COUNT_BITS;
//TERMINATED :110。。。
private static final int TERMINATED = 3 << COUNT_BITS;
// 通过简单的位运算获取ctl中的信息
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
runState有5种状态,所以最少需要3bit来表示。这就是为什么COUNT_BITS=32-3
各种状态之间的转换时机:
· RUNNING -> SHUTDOWN:调用shutdown()方法
· (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()
· SHUTDOWN -> TIDYING:线程池和工作队列都为空时
· STOP -> TIDYING:线程池为空时
· TIDYING -> TERMINATED:调用terminated()后
关闭线程池方法
· shutdown():不再接受新任务,同时已提交的任务执行完成,包括那些还在队列中等待,未开始执行的任务。
· shutdownNow():将取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。之后再提交任务则抛出异常:java.util.concurrent.RejectedExecutionException。
线程池工作流程
线程池处理新提交任务的流程如下:
1. 如果当前运行的线程数小于配置的corePoolSize,就新建一个线程执行该command任务,即时此时线程池中有空闲线程。
2. 如果线程池中线程个数达到corePoolSize,新提交的任务就被放入workQueue,等待线程池任务调度。
3. 当workQueue满后,且maximumPoolSize > corePoolSize,新提交任务会创建新线程执行任务
4. 当workQueue满后,且线程池个数达到maximumPoolSize,则提交的任务交由RejectedExecutionHandler处理。
5. 超过corePoolSize的线程,在空闲keepAliveTime时间后,将被关闭。
6. 当allowCoreThreadTimeOut=true时,corePoolSize个数内的线程空闲时间达到keepAliveTime后,也将被关闭。
ThreadPoolExecutor 中execute源码:如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//新建线程执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//提交到工作队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//
reject(command);
}
线程池监控
//线程池已完成任务数量
private long completedTaskCount;
//当前运行的线程数
public int getActiveCount()
此外,ThreadPoolExecutor还提供了以下几个钩子函数用于扩展它的行为,我们可以在子类中实现自己的逻辑,在每个任务执行的前、后以及worker退出时进行定制处理。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
线程的生命周期
下面从线程池中的某个线程的角度出发,分析一下线程从被创建一直到被销毁,整个生命周期里的工作流程
1. 线程的创建时机
· 提交任务时被创建(即客户端调用submit方法)。不过提交任务未必一定会创建线程,这在前面线程池的工作流程里已经提到。
· 预先启动线程池中核心线程池。(如:调用prestartCoreThread、prestartAllCoreThreads()等方法),下面是prestartAllCoreThreads的源码
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))//firstTask为null
++n;
return n;
}
2. 线程在线程池中的数据结构
线程池中的工作线程是被封装到一个Worker类中,部分源码如下:
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable
{
/** worker关联的线程 */
final Thread thread;
/** worker的第一个任务,可能为null */
Runnable firstTask;
/** worker完成的任务数量 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 将任务代理给runWorker方法 */
public void run() {
runWorker(this);
}
。。。。
}
//下面是ThreadPoolExecutor中的runWorker方法源码:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//钩子函数
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//钩子函数
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
当firstTassk为null的情况下,线程的执行流程如下图
对于Executor框架,需要明白以下两点
Executor框架基于生产者-消费者模式:提交任务的执行者是生成者,执行任务的线程是消费者。
Executor是异步执行任务,这是通过队列来实现的。
相关推荐
- **Executor框架**:Java 5引入的ExecutorService,它管理线程池,提供了更灵活的线程管理方式。 - **ThreadPoolExecutor**:最常见的线程池实现,包括核心线程数、最大线程数、线程空闲时间等参数。 - **...
- **Executor框架**: Java中的`Executor`框架提供了创建和管理线程池的能力,它可以帮助我们有效地管理线程生命周期并提高系统的整体性能。 - **ThreadPoolExecutor**: 是`Executor`框架的核心类之一,提供了创建...
Java通过Executor框架支持线程池,常用的线程池有: - newFixedThreadPool:创建固定大小的线程池。 - newCachedThreadPool:创建可缓存线程池。 - newScheduledThreadPool:创建一个支持定时及周期性任务执行的...
Java的Executor框架是处理并发任务的重要组件,它抽象了线程池的概念,使得线程的创建、管理和复用更加高效。ThreadPoolExecutor是核心类,提供了灵活的配置选项,如核心线程数、最大线程数、任务队列等。通过分析...
1. **线程池**:通过`Executor`框架实现,常见的有`ExecutorService`、`ThreadPoolExecutor`等,能够有效管理和控制线程的生命周期。 2. **锁**:除了基本的`synchronized`关键字外,JUC还提供了更灵活的锁机制,如`...
1. Executor框架:基于任务队列的线程池管理,如ThreadPoolExecutor,它可以有效地管理和控制线程的生命周期,提高系统资源利用率。 2. CountDownLatch:用于多线程协调,允许一个或多个线程等待其他线程完成操作。...
- **Executor框架**:`ExecutorService`、`ThreadPoolExecutor`和`ScheduledExecutorService`是实现线程池的关键类,它们可以帮助我们更好地管理和控制线程的生命周期。 - **Future和Callable**:`Future`接口表示...
4. **Executor框架**:介绍ExecutorService、ThreadPoolExecutor和ScheduledExecutorService等,理解任务调度和线程池的管理,学习如何合理配置线程池参数以优化系统性能。 5. **并发工具类**:包括CountDownLatch...
1. **Executor框架**: - Executor接口:它是并发处理的核心,定义了执行任务的方法,如`execute()`。 - ExecutorService:Executor接口的主要实现,提供了管理和控制线程池的方法,如`submit()`,`shutdown()`和`...
Executor框架是JUC中的核心组件,它管理线程池并提供了一种优雅的处理并发任务的方式。ThreadPoolExecutor是最常用的线程池实现,可以通过调整参数来优化性能,例如设置核心线程数、最大线程数、任务队列和拒绝策略...
1. **Executor框架**:这是JUC的核心,它定义了任务执行的基本接口和类,如Executor、ExecutorService和ScheduledExecutorService。ExecutorService管理线程池,可以提交Runnable或Callable任务,并控制任务的执行...
1. **线程池**:JUC通过Executor框架实现了线程池,允许开发者根据需求创建和管理线程。线程池可以减少线程创建和销毁的开销,提高系统的响应速度和资源利用率。线程池的核心组件包括ExecutorService、...
2. **线程池**:Executor框架是JUC的核心,它提供ThreadPoolExecutor和Executors类来创建和管理线程池。线程池可以有效地重用已存在的线程,减少创建和销毁线程的开销,提高系统性能。 3. **并发工具类**:如...
1. Executor框架:核心接口Executor,用于执行任务。ExecutorService是Executor的子接口,提供了更丰富的管理功能,如shutdown()和shutdownNow()。 2. 线程池:ThreadPoolExecutor是ExecutorService的实现,允许...
- **Executor框架**:是Java中用于管理和控制线程的核心框架。 - **ThreadPoolExecutor**:实现了ExecutorService接口,是Java中最常用的线程池实现类。 - **ScheduledThreadPoolExecutor**:用于调度执行任务,支持...
4. Executor框架:它包含ExecutorService、ThreadPoolExecutor和Executors等类,提供了线程池管理,可以有效地管理和控制线程的生命周期,避免大量创建和销毁线程带来的性能开销。 5. Future和Callable接口:...
1. **Executor框架**:Executor框架是JUC的核心,它基于任务和执行器的概念,简化了线程的创建和管理。ExecutorService接口是其主要入口,实现了ThreadPoolExecutor和ScheduledThreadPoolExecutor等不同类型的线程池...
根据提供的文件信息,我们可以归纳出以下几个关键知识点:Java 8的新特性、Shiro框架的应用、Java并发编程(JUC)、Java I/O新特性(JIO)、高级MySQL操作以及Redis数据库的使用。 ### Java 8的新特性 #### Lambda...
1. Executor框架:它提供了一种创建、管理和控制线程的统一方式,避免了直接使用Thread类的繁琐。ExecutorService、ThreadPoolExecutor和Executors是其核心组件。 2. Future和Callable:Future接口代表异步计算的...
5. **Executor框架**: Java通过`Executor`框架提供了一种更高级的线程管理方式,它允许用户创建线程池来管理线程的生命周期。 - `ThreadPoolExecutor`是`Executor`框架的核心类,它提供了创建固定大小线程池的能力...