Jdk1.6 JUC源码解析(19)-ScheduledThreadPoolExecutor
作者:大飞
- ScheduledThreadPoolExecutor是一种类似Timer的定时器或者说是调度器,和Timer比起来主要有几点好处:1.多线程的定时调度,timer是单线程的,每个timer实例只有一个工作线程。2.由于继承自ThreadPoolExecutor,更具有灵活性和伸缩性。3.没有timer那种线程泄露问题,timer调度的任务如果异常终止,那么整个timer都会被取消,无法执行其他任务。
- ScheduledThreadPoolExecutor继承ThreadPoolExecutor,并实现了ScheduledExecutorService接口,ThreadPoolExecutor之前做过分析,这里就简单看下ScheduledExecutorService接口:
public interface ScheduledExecutorService extends ExecutorService { /** * 创建并执行一个一次性任务,这个任务过了延迟时间就会被执行。 */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); /** * 创建并执行一个一次性任务,这个任务过了延迟时间就会被执行。 */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * 创建并执行一个周期性任务,当任务过了给定的初始延迟时间,会第一 * 次被执行,然后会以给定的周期时间执行。 * 如果某次执行过程中发生了异常,那么任务就停止了(不会执行下一次任务了)。 * 如果某次执行时长超过了周期时间,那么下一次任务会延迟启动,不会和当前 * 任务并行执行。 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** * 创建并执行一个周期性任务,当任务过了给定的初始延迟时间,会第一 * 次被执行,接下来的任务会在上次任务执行完毕后,延迟给定的时间, * 然后再继续执行。 * 如果某次执行过程中发生了异常,那么任务就停止了(不会执行下一次任务了)。 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
接来下看下ScheduledThreadPoolExecutor内部的一些属性定义:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { /** * 表示是否应该在关闭时取消或者终止周期性任务。 */ private volatile boolean continueExistingPeriodicTasksAfterShutdown; /** * 表示是否应该在关闭时取消非周期性任务。 */ private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /** * 这个序列号的作用是在并列调度(延迟值一样)的情况下保证先入先出的关系。 */ private static final AtomicLong sequencer = new AtomicLong(0); /** Base of nanosecond timings, to avoid wrapping */ private static final long NANO_ORIGIN = System.nanoTime();
继续看下ScheduledThreadPoolExecutor的构造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
从构造方法可以看出,ScheduledThreadPoolExecutor内部固定使用DelayedWorkQueue做为任务队列,DelayedWorkQueue是啥呢?看下代码:
/** * An annoying wrapper class to convince javac to use a * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable> */ private static class DelayedWorkQueue extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> { private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>(); ... public boolean add(Runnable x) { return dq.add((RunnableScheduledFuture)x); } public boolean offer(Runnable x) { return dq.offer((RunnableScheduledFuture)x); } public void put(Runnable x) { dq.put((RunnableScheduledFuture)x); } public boolean offer(Runnable x, long timeout, TimeUnit unit) { return dq.offer((RunnableScheduledFuture)x, timeout, unit); }
- 下面从ScheduledExecutorService接口的方法来入手,查看代码实现细节,首先看下schedule方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); //将任务包装成一个RunnableScheduledFuture RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); //然后延迟执行这个RunnableScheduledFuture delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
两个方法内部基本一致,具体看下将任务包装成RunnableScheduledFuture的过程:
/** * 返回一个延迟动作的触发时间。 */ private long triggerTime(long delay, TimeUnit unit) { //内部要转成纳秒。 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } /** * 返回一个延迟动作的触发时间。 */ long triggerTime(long delay) { //这里一个值得注意的地方是加上了一个now(), //另一个是,如果当前delay很大的话,要调用overflowFree来防止溢出。 return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } /** * Returns nanosecond time offset by origin */ final long now() { return System.nanoTime() - NANO_ORIGIN; } /** * 将队列中所有元素的延迟值彼此的和控制在Long.MAX_VALUE以内,避免 * 在互相比较时溢出。 * 这种情况是可能发生的,比如一个满足条件的任务即将出队,这时来了 * 一个延迟值是Long.MAX_VALUE的任务。 */ private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(TimeUnit.NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; } //下面这两个方法只是简单的实现,可作为钩子方法由子类实现。 protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) { return task; } protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) { return task; }
看下这个ScheduledFutureTask类,首先这个类继承了FutureTask,实现了RunnableScheduledFuture接口。FutureTask之前文章分析过,这里看下后者:
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> { /** * 是否为周期性任务。 */ boolean isPeriodic(); }
看下内部结构:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** 任务序列号 */ private final long sequenceNumber; /** 任务可以执行的时间,单位纳秒 */ private long time; /** * 周期性任务的周期时间,单位纳秒。 * 正数表示固定频率执行,负数表示固定延迟执行,0表示一次性任务。 */ private final long period; /** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a periodic action with given nano time and period. */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a one-shot action with given nanoTime-based trigger. */ ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
由于ScheduledThreadPoolExecutor内部使用延迟队列,而延迟队列中放的元素必须实现Delayed接口,ScheduledFutureTask也必然实现了Delayed接口,看下实现接口方法的细节:
public long getDelay(TimeUnit unit) { //注意2点:1.延迟值都是按照纳秒时间单位来算的。2.这里减去了now(),还记得之前算触发时间时候加上了now()。 return unit.convert(time - now(), TimeUnit.NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) //注意这里,如果触发时间相等,那么比较序列号,从而保证顺序。 return -1; else return 1; } //如果要比较的对象不是ScheduledFutureTask,那么按照延迟值进行比较。 long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0)? 0 : ((d < 0)? -1 : 1); }
最后来看一下ScheduledFutureTask类最重要的方法,run方法:
public void run() { //这里首先判断下当前任务是否为周期任务 if (isPeriodic()) runPeriodic(); //如果是周期任务,按照周期任务方式运行。 else ScheduledFutureTask.super.run(); //一次性任务的话,就直接执行run方法。 } public boolean isPeriodic() { return period != 0; } private void runPeriodic() { //这里执行任务并重置异步任务。 boolean ok = ScheduledFutureTask.super.runAndReset(); //判断当前ScheduledThreadPoolExecutor是否关闭。 boolean down = isShutdown(); // 如果任务执行成功, // 并且ScheduledThreadPoolExecutor没有关闭或者策略允许关闭后继续执行周期任务, // 并且ScheduledThreadPoolExecutor没有停止, // 那么重新调度任务。 if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { //重新计算下次触发时间。 long p = period; if (p > 0) time += p; //如果是固定频率,在原有触发时间上加上周期时间。 else time = triggerTime(-p); //如果是固定延迟,直接指定延迟后的触发时间。 //算好下次触发时间后,再将任务本身重新加入任务队列。 ScheduledThreadPoolExecutor.super.getQueue().add(this); } // 这可能是最后执行的延迟任务。执行完毕后, // 如果当前ScheduledThreadPoolExecutor已关闭,那么中断空闲的工作线程。 else if (down) interruptIdleWorkers(); } public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { return continueExistingPeriodicTasksAfterShutdown; }
看完ScheduledFutureTask类的内容,我们回到schedule方法,看下最后的delayedExecute方法:
private void delayedExecute(Runnable command) { if (isShutdown()) { reject(command); //如果当前ScheduledThreadPoolExecutor已关闭,拒绝任务。 return; } // 如果当前线程数量小于核心线程数量,那么预启动一个核心线程。 if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); //将任务加入任务队列。 super.getQueue().add(command); }
了解了ScheduledFutureTask内部执行逻辑,我们再回头看下两个schedule方法,它们内部包装的都是一次性的ScheduledFutureTask,继续看scheduleAtFixedRate和scheduleWithFixedDelay:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Object>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Boolean>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay))); delayedExecute(t); return t; }
- 最后再看一下和ScheduledThreadPoolExecutor关闭相关的一些特殊处理:
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { continueExistingPeriodicTasksAfterShutdown = value; if (!value && isShutdown()) cancelUnwantedTasks(); } public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { executeExistingDelayedTasksAfterShutdown = value; if (!value && isShutdown()) cancelUnwantedTasks(); } public void shutdown() { cancelUnwantedTasks(); super.shutdown(); } private void cancelUnwantedTasks() { //关闭后是否继续执行延迟的任务。 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); //关闭后是否继续执行周期性的任务。 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) super.getQueue().clear(); //如果都不继续,那么直接清空任务队列。 else if (keepDelayed || keepPeriodic) { //否则会按照相应的策略来取消相应类型的任务。 Object[] entries = super.getQueue().toArray(); for (int i = 0; i < entries.length; ++i) { Object e = entries[i]; if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if (t.isPeriodic()? !keepPeriodic : !keepDelayed) t.cancel(false); } } entries = null; //最后清理一把任务队列里面被取消的任务。 purge(); } }
ScheduledThreadPoolExecutor的代码解析完毕!
相关推荐
aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...
2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02
1. 解压缩"java-jdk1.6-jdk-6u45-windows-x64.zip"文件,这将释放出"jdk-6u45-windows-x64.exe"可执行文件。 2. 双击运行"jdk-6u45-windows-x64.exe",安装向导会引导你完成安装过程。通常,你需要选择安装路径,...
下载的压缩包文件"jdk-6u45-windows-x64(1.6 64).exe"是Windows 64位系统的安装程序。安装过程中,用户需要选择安装路径,并设置环境变量,包括`JAVA_HOME`指向JDK的安装目录,`PATH`添加JDK的bin目录,确保系统可以...
标题中的“jdk1.6集成jjwt的问题”指的是在Java Development Kit (JDK) 版本1.6的环境下,尝试整合JSON Web Token (JWT) 库jjwt时遇到的挑战。JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息作为 ...
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
标题中的"jdk-jdk1.6.0.24-windows-i586.exe"是一个Java Development Kit(JDK)的安装程序,适用于Windows操作系统且为32位版本。JDK是Oracle公司提供的一个用于开发和运行Java应用程序的软件包。这个特定的版本,...
1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
### JDK1.6安装及与JDK-1.5版本共存 #### 一、前言 随着软件开发环境的变化和技术的进步,不同的项目可能需要不同的Java版本来支持其运行。例如,在某些特定环境下,可能既需要使用JDK1.5(Java Development Kit ...
logback-cfca-jdk1.6-3.1.0.0.jar
- 这可能是ZXing库的完整源码包,专门针对JDK1.6编译,包含了所有必要的源文件和资源,供开发者进行更深度的定制和集成。 总之,ZXing库是一个强大的条形码和二维码工具,这个特别适配JDK1.6的版本为那些仍在使用...
jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3
这个压缩包文件"jdk-6u45-linux-x64.zip"包含的是JDK 1.6.0_45(也被称为6u45或1.6u45)的64位Linux版本。JDK 1.6是Java平台标准版的一个重要版本,它提供了许多功能和性能改进,是许多企业级应用的基础。 JDK 1.6u...
压缩包中的文件`jdk-6u45-windows-i586.exe`是JDK 1.6更新45的Windows 32位安装程序。安装步骤通常包括: 1. 下载并运行安装程序。 2. 遵循安装向导的提示,选择安装路径和组件。 3. 设置环境变量,包括`JAVA_HOME`...
java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64
Linux64位环境下的jdk6安装包:jdk-6u45-linux-x64.bin。 由于积分无法修改,现提供网盘下载地址: https://pan.baidu.com/s/1BE55ImTxZTQO6T22051P2g 提取码:5wvm
Java编程开发工具包,最新版本,很好用,经典