- 浏览: 980952 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Executor接口的定义:http://donald-draper.iteye.com/blog/2365625
ExecutorService接口定义:http://donald-draper.iteye.com/blog/2365738
Future接口定义:http://donald-draper.iteye.com/blog/2365798
FutureTask解析:http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义:http://donald-draper.iteye.com/blog/2366239
ExecutorCompletionService解析:http://donald-draper.iteye.com/blog/2366254
AbstractExecutorService解析:http://donald-draper.iteye.com/blog/2366348
ScheduledExecutorService接口定义:http://donald-draper.iteye.com/blog/2366436
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等) :
http://donald-draper.iteye.com/blog/2366934
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
http://donald-draper.iteye.com/blog/2367064
ThreadPoolExecutor解析三(线程池执行提交任务):
http://donald-draper.iteye.com/blog/2367199
ThreadPoolExecutor解析四(线程池关闭):
http://donald-draper.iteye.com/blog/2367246
ScheduledFutureTask有两个方法需要关注一下,我们分别来看,
再来看执行:
需要关注的是重新入任务队列
上面看了调度任务的包装类ScheduledFutureTask,
ScheduledFutureTask用一个序列号标识延时任务的执行编号,以保证任务的调度
按照FIFO的顺序,用time记录任务执行的系统时间,period是任务执行间隔时间,
用于计算下一次任务执行系统时间,outerTask为实际的调度任务,heapIndex为
任务在队列的索引。
我们再来看一下任务队列:
put,add,超时offer操作都是通过offer操作来实现,下面来看一下offer
offer方法有两点要关注,
1.
2.
下面分别来看;
1.
2.
从上面来看,offer操作首先判断当前队列size,如果当前size大于队列的长度,
扩容数组堆容量为原来的1.5倍,然后任务入队列,如果新添加的任务为队列头,
释放leader为null,触发条件available。
再来看peek操作:直接返回队列头任务
来看take操作:
take操作,有一个地方是我们要看的
take操作,首先判断队列是否为空,空则等待available条件,不为空,则获取队列头任务的延时时间,如果队列头的延时时间小于0,即过期,返回队头任务,并左旋使二叉树数组堆平衡,否则判断leader是否为null,不为null,则等待available条件,为null,设置当前线程为leader。
再来看超时poll
超时poll与take不同的是在等待available条件上为超时等待。
再看poll操作:
poll操作,首先判断队列是否为空或或队列头任务延时时间是否大于0,
如果队列都为空或或或队列头任务延时时间大于0,则返回null,否则,返回队头任务,并左旋使二叉树数组堆平衡
再来看remove操作:
有了前面的操作,remove没有什么多说的,唯一要说的是定位任务位置
//定位任务位置
从DelayedWorkQueue的Remove操作中,我们可以看到ScheduledFutureTask的heapIndex
的作用,如果移除的任务为ScheduledFutureTask,我们可以根据heapIndex直接移除任务,
否则遍历队列,找与x相等任务并移除。
再看其他操作:
小节一下DelayedWorkQueue:
put,add,超时offer操作都是通过offer操作来实现;
offer操作首先判断当前队列size,如果当前size大于队列的长度,
扩容数组堆容量为原来的1.5倍,然后任务入队列,如果新添加的任务为队列头,
释放leader为null,触发条件available。take操作,首先判断队列是否为空,空则等待available条件,不为空,则获取队列头任务的延时时间,如果队列头的延时时间小于0,即过期,返回队头任务,并左旋使二叉树数组堆平衡,否则判断leader是否为null,不为null,则等待available条件,为null,设置当前线程为leader。超时poll与take不同的是在等待available条件上为超时等待。poll操作,首先判断队列是否为空或或队列头任务延时时间是否大于0,如果队列都为空或或或队列头任务延时时间大于0,则返回null,否则,返回队头任务,并左旋使二叉树数组堆平衡
总结:
ScheduledFutureTask用一个序列号标识延时任务的执行编号,以保证任务的调度按照FIFO的顺序,用time记录任务执行的系统时间,period是任务执行间隔时间,
用于计算下一次任务执行系统时间,outerTask为实际的调度任务,heapIndex为任务在队列的索引。调度线程池执行器用DelayedWorkQueue来存储调度任务,DelayedWorkQueue与
延时队列DelayedQueue有点像,一个可重入锁控制队列的并发访问,一个available条件控制队列中是否有任务可用,leader为当前正在等待队列头任务可用(队列不为空,队列头任务过期)的线程,当队列不为空或leader被释放,才会触发available条件。DelayedWorkQueue是特为存放ScheduledFutureTask调度任务而定制的。
ScheduledThreadPoolExecutor解析二(任务调度):
http://donald-draper.iteye.com/blog/2367593
ExecutorService接口定义:http://donald-draper.iteye.com/blog/2365738
Future接口定义:http://donald-draper.iteye.com/blog/2365798
FutureTask解析:http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义:http://donald-draper.iteye.com/blog/2366239
ExecutorCompletionService解析:http://donald-draper.iteye.com/blog/2366254
AbstractExecutorService解析:http://donald-draper.iteye.com/blog/2366348
ScheduledExecutorService接口定义:http://donald-draper.iteye.com/blog/2366436
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等) :
http://donald-draper.iteye.com/blog/2366934
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
http://donald-draper.iteye.com/blog/2367064
ThreadPoolExecutor解析三(线程池执行提交任务):
http://donald-draper.iteye.com/blog/2367199
ThreadPoolExecutor解析四(线程池关闭):
http://donald-draper.iteye.com/blog/2367246
package java.util.concurrent; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; import java.util.*; /** * A {@link ThreadPoolExecutor} that can additionally schedule * commands to run after a given delay, or to execute * periodically. This class is preferable to {@link java.util.Timer} * when multiple worker threads are needed, or when the additional * flexibility or capabilities of {@link ThreadPoolExecutor} (which * this class extends) are required. * ScheduledThreadPoolExecutor可以在指定的时延后,调度一个任务,或间歇性 地执行任务。当需要多线程执行任务或需要ThreadPoolExecutor的灵活性和功能性, ScheduledThreadPoolExecutor是一个比java.util.Timer更优的选择。 * <p>Delayed tasks execute no sooner than they are enabled, but * without any real-time guarantees about when, after they are * enabled, they will commence. Tasks scheduled for exactly the same * execution time are enabled in first-in-first-out (FIFO) order of * submission. * 延时任务在启动后,不久后将执行,但不能保证具体的开始执行的真实时间。 执行时间完全相同的调度任务,将会以提交的顺序FIFO启动。 * <p>When a submitted task is cancelled before it is run, execution * is suppressed. By default, such a cancelled task is not * automatically removed from the work queue until its delay * elapses. While this enables further inspection and monitoring, it * may also cause unbounded retention of cancelled tasks. To avoid * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which * causes tasks to be immediately removed from the work queue at * time of cancellation. * 当一个提交任务在执行前被取消,则将取消执行。默认情况下,取消的任务不会自动 的从任务队列移除,除非延时时间过期。如果检查和监视功能开启,可能引起取消任务 无限保留的问题。为了避免这种事情,我们可以设置#setRemoveOnCancelPolicy为true, 可以保证当任务被取消时,立刻从任务队列移除。 * <p>Successive executions of a task scheduled via * {@code scheduleAtFixedRate} or * {@code scheduleWithFixedDelay} do not overlap. While different * executions may be performed by different threads, the effects of * prior executions <a * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> * those of subsequent ones. * 通过scheduleAtFixedRate和scheduleWithFixedDelay调度的任务不会重复。 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few * of the inherited tuning methods are not useful for it. In * particular, because it acts as a fixed-sized pool using * {@code corePoolSize} threads and an unbounded queue, adjustments * to {@code maximumPoolSize} have no useful effect. Additionally, it * is almost never a good idea to set {@code corePoolSize} to zero or * use {@code allowCoreThreadTimeOut} because this may leave the pool * without threads to handle tasks once they become eligible to run. * 调度线程池执行器继承ThreadPoolExecutor,有一些继承的调校方法,实际上没有什么用处。 在特殊情况下,因为用corePoolSize的数量,创建一个固定的线程池和无界队列, 调整maximumPoolSize,将没有什么用处。另外不将以将核心线程池数量设置0和 用allowCoreThreadTimeOut,因为这也许导致在任务延时时间过期时,线程池没有线程可以用于 执行任务。 * <p><b>Extension notes:</b> This class overrides the * {@link ThreadPoolExecutor#execute execute} and * {@link AbstractExecutorService#submit(Runnable) submit} * methods to generate internal {@link ScheduledFuture} objects to * control per-task delays and scheduling. To preserve * functionality, any further overrides of these methods in * subclasses must invoke superclass versions, which effectively * disables additional task customization. However, this class * provides alternative protected extension method * {@code decorateTask} (one version each for {@code Runnable} and * {@code Callable}) that can be used to customize the concrete task * types used to execute commands entered via {@code execute}, * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate}, * and {@code scheduleWithFixedDelay}. By default, a * {@code ScheduledThreadPoolExecutor} uses a task type extending * {@link FutureTask}. However, this may be modified or replaced using * subclasses of the form: * 调度线程池执行器重写了ThreadPoolExecutor#execute和AbstractExecutorService#submit(Runnable) 方法,用内部的ScheduledFuture控制每个任务的延时和调度。为了保护功能性, 重写的方法都要调用父类的对应方法,这可以有效的使定制的任务功能失效。另外 调度线程池执行器提供了一个protected的方法decorateTask(Runnable和Callable两个版本), 可以定制具体通过execute,submit,schedule,scheduleAtFixedRate, scheduleWithFixedDelay执行的任务。默认调度线程池执行器用一个扩展FutureTask来表示一个任务。 * <pre> {@code * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { * * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } * * protected <V> RunnableScheduledFuture<V> decorateTask( * Runnable r, RunnableScheduledFuture<V> task) { * return new CustomTask<V>(r, task); * } * * protected <V> RunnableScheduledFuture<V> decorateTask( * Callable<V> c, RunnableScheduledFuture<V> task) { * return new CustomTask<V>(c, task); * } * // ... add constructors, etc. * }}</pre> * * @since 1.5 * @author Doug Lea */ public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { /* * This class specializes ThreadPoolExecutor implementation by * 调度线程池执行器是一个特殊的线程池执行器实现,具体如下: * 1. Using a custom task type, ScheduledFutureTask for * tasks, even those that don't require scheduling (i.e., * those submitted using ExecutorService execute, not * ScheduledExecutorService methods) which are treated as * delayed tasks with a delay of zero. * 1.用ScheduledFutureTask表示一个调度任务,如果用ExecutorService的 方法execute,而不是ScheduledExecutorService方法提价的任务,将会被 对待为一个0延时的任务。 * 2. Using a custom queue (DelayedWorkQueue), a variant of * unbounded DelayQueue. The lack of capacity constraint and * the fact that corePoolSize and maximumPoolSize are * effectively identical simplifies some execution mechanics * (see delayedExecute) compared to ThreadPoolExecutor. * 2.用一个DelayedWorkQueue(无界DelayQueue的变种),容量和,corePoolSize 和maximumPoolSize将会影响具体的任务执行机制(delayedExecute)。 * 3. Supporting optional run-after-shutdown parameters, which * leads to overrides of shutdown methods to remove and cancel * tasks that should NOT be run after shutdown, as well as * different recheck logic when task (re)submission overlaps * with a shutdown. * 3.支持run-after-shutdown选择参数,用于重写移除和取消在关闭之后没有执行的任务的shutdown方法 当任务在关闭时,重复提交的任务会有重新检查。 * 4. Task decoration methods to allow interception and * instrumentation, which are needed because subclasses cannot * otherwise override submit methods to get this effect. These * don't have any impact on pool control logic though. */ 4.decoration方法运行监控任务,这个方法是需要的,因为重写submit方法, 不能实现这样的效果。这个方法不会影响线程池的控制逻辑。 /** * False if should cancel/suppress periodic tasks on shutdown. 如果应该在关闭时,取消间歇性的任务,则为false */ private volatile boolean continueExistingPeriodicTasksAfterShutdown; /** * False if should cancel non-periodic tasks on shutdown. 如果应该在线程池关闭时取消非间歇性任务,则为false */ private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /** * True if ScheduledFutureTask.cancel should remove from queue 当任务取消时,是否应该将调度任务从队列中移除 */ private volatile boolean removeOnCancel = false; /** * Sequence number to break scheduling ties, and in turn to * guarantee FIFO order among tied entries. 调度任务break任务的序列号,为了保证任务FIFO的特性 */ private static final AtomicLong sequencer = new AtomicLong(0); /** * Returns current nanosecond time.系统当前时间 */ final long now() { return System.nanoTime(); } //调度任务 private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** Sequence number to break ties FIFO 任务序列号*/ private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units 任务执行的系统时间*/ private long time; /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value * indicates fixed-delay execution. A value of 0 indicates a * non-repeating task. 间歇性调度任务的间隔时间,正数表示 fixed-rate执行,负数表示fixed-delay执行,0表示非重复调度任务。 */ private final long period; /** The actual task to be re-enqueued by reExecutePeriodic 实际任务*/ RunnableScheduledFuture<V> outerTask = this; /** * Index into delay queue, to support faster cancellation.延时队列索引 */ int heapIndex; /** * Creates a one-shot action with given nanoTime-based trigger time. 创建一个只执行一次的延时任务 */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a periodic action with given nano time and period. 创建一个间歇性执行的延时任务 */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a one-shot action with given nanoTime-based trigger. 创建一个只执行一次的延时任务 */ ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } //获取任务延时时间 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); } //比较任务的延时时间 public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } /** * Returns true if this is a periodic (not a one-shot) action. * 是否是间歇性执行的任务 * @return true if periodic */ public boolean isPeriodic() { return period != 0; } /** * Sets the next time to run for a periodic task. 设置间歇性任务下一次执行的时间 */ private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); } //取消调度任务 public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); if (cancelled && removeOnCancel && heapIndex >= 0) remove(this);//从任务队列移除任务 return cancelled; } /** * Overrides FutureTask version so as to reset/requeue if periodic. 如果是间歇性任务,重新设置下一次执行的时间,并重新入任务队列 */ public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false);//如果在线程池关闭时可以取消任务,则以不可中断方式取消任务 else if (!periodic) ScheduledFutureTask.super.run();//如果非间歇性任务,则直接运行 else if (ScheduledFutureTask.super.runAndReset()) {//执行调度任务 //设置下一次执行的时间 setNextRunTime(); //重新入任务队列 reExecutePeriodic(outerTask); } } } }
ScheduledFutureTask有两个方法需要关注一下,我们分别来看,
/** * Sets the next time to run for a periodic task. 设置间歇性任务下一次执行的时间,大于零直接添加,小于零,则需要重新计算触发时间 */ private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); } /** * Returns the trigger time of a delayed action.返回触发时间延时 */ long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } /** * Constrains the values of all delays in the queue to be within * Long.MAX_VALUE of each other, to avoid overflow in compareTo. * This may occur if a task is eligible to be dequeued, but has * not yet been, while some other task is added with a delay of * Long.MAX_VALUE. */ private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(TimeUnit.NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) //如果延时时间小于队列头任务的延时时间,则delay为Long的最大值+队列头任务延时 delay = Long.MAX_VALUE + headDelay; } return delay; }
再来看执行:
/** * Overrides FutureTask version so as to reset/requeue if periodic. 如果是间歇性任务,重新设置下一次执行的时间,并重新入任务队列 */ public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) //如果在线程池关闭时可以取消任务,则以不可中断方式取消任务,此关闭非立即关闭 //等待已经执行的任务执行完 cancel(false); else if (!periodic) ScheduledFutureTask.super.run();//如果非间歇性任务,则直接运行 else if (ScheduledFutureTask.super.runAndReset()) {//执行调度任务 //设置下一次执行的时间 setNextRunTime(); //重新入任务队列 reExecutePeriodic(outerTask); } }
需要关注的是重新入任务队列
/** * Requeues a periodic task unless current run state precludes it. * Same idea as delayedExecute except drops task rather than rejecting. * * @param task the task */ void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task);//添加任务到队列 if (!canRunInCurrentRunState(true) && remove(task)) //如果当前线程状态不接受任务,则移除任务,并取消 task.cancel(false); else //保证有一个空闲工作线程,等待任务 ensurePrestart(); } }
上面看了调度任务的包装类ScheduledFutureTask,
ScheduledFutureTask用一个序列号标识延时任务的执行编号,以保证任务的调度
按照FIFO的顺序,用time记录任务执行的系统时间,period是任务执行间隔时间,
用于计算下一次任务执行系统时间,outerTask为实际的调度任务,heapIndex为
任务在队列的索引。
我们再来看一下任务队列:
/** * Specialized delay queue. To mesh with TPE declarations, this * class must be declared as a BlockingQueue<Runnable> even though * it can only hold RunnableScheduledFutures. */ static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { /* * A DelayedWorkQueue is based on a heap-based data structure * like those in DelayQueue and PriorityQueue, except that * every ScheduledFutureTask also records its index into the * heap array. This eliminates the need to find a task upon * cancellation, greatly speeding up removal (down from O(n) * to O(log n)), and reducing garbage retention that would * otherwise occur by waiting for the element to rise to top * before clearing. But because the queue may also hold * RunnableScheduledFutures that are not ScheduledFutureTasks, * we are not guaranteed to have such indices available, in * which case we fall back to linear search. (We expect that * most tasks will not be decorated, and that the faster cases * will be much more common.) * DelayedWorkQueue是一个基于堆数据结构的队列,如延时队列和有限级队列一样, 除此之外,ScheduledFutureTask同时记录任务在堆数组上的索引index。 记录索引的目的是当要取消任务时,快速地定位到取消任务,提高了移除 的时间复杂度从O(n),降到O(log n)),减少了等待任务在在清除前旋转到顶部 的产生的内存垃圾。因为队列可能存放非调度任务的RunnableScheduledFutures, 我们不能保证这个索引index可用,在这种情况下,我们将退回到线性的时间复杂度搜索。 (我们希望大部分的任务不要包装,以防退回到线性的时间复杂度搜索) * All heap operations must record index changes -- mainly * within siftUp and siftDown. Upon removal, a task's * heapIndex is set to -1. Note that ScheduledFutureTasks can * appear at most once in the queue (this need not be true for * other kinds of tasks or work queues), so are uniquely * identified by heapIndex. 所有的堆操作必须记录索引的改变,主要为堆的上旋(右旋)与下旋(左旋)。 在任务移除后,heapIndex将会被被设置为-1.ScheduledFutureTasks任务 可能出现在队列不止一次,所以heapIndex必须唯一。 */ private static final int INITIAL_CAPACITY = 16;//初始容量 private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];//存放任务的堆数组 private final ReentrantLock lock = new ReentrantLock(); private int size = 0;//队列size /** * Thread designated to wait for the task at the head of the * queue. This variant of the Leader-Follower pattern * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to * minimize unnecessary timed waiting. When a thread becomes * the leader, it waits only for the next delay to elapse, but * other threads await indefinitely. The leader thread must * signal some other thread before returning from take() or * poll(...), unless some other thread becomes leader in the * interim. Whenever the head of the queue is replaced with a * task with an earlier expiration time, the leader field is * invalidated by being reset to null, and some waiting * thread, but not necessarily the current leader, is * signalled. So waiting threads must be prepared to acquire * and lose leadership while waiting. leader为等待队列头的任务的线程。这是Leader-Follower服务模式的一个变种, 可以减少不必要的时间等待。当一个线程变为leader时,将会等待下一个延时时间过期的 任务,但是其他线程的等待时不确定性的。leader线程必须在take或poll方法返回时, 必须通知其他线程,除非其他线程在take或poll的操作过程中成为了Leader。当队列的头被一个 更早过期时间的任务替换时,leader线程将会被设置为null,一些等待的线程,不需要 当前leader唤醒。在等待的过程中,线程必须时刻准备获取或者失去leader权限。 */ private Thread leader = null; /** * Condition signalled when a newer task becomes available at the * head of the queue or a new thread may need to become leader. 当一个任务的任务在队列头部可以用或一个新线程成为leader时,触发available。 */ private final Condition available = lock.newCondition(); /** * Set f's heapIndex if it is a ScheduledFutureTask. 设置ScheduledFutureTask的在数组堆中的索引 */ private void setIndex(RunnableScheduledFuture f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask)f).heapIndex = idx; } public void put(Runnable e) { offer(e); } public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable e, long timeout, TimeUnit unit) { return offer(e); } }
put,add,超时offer操作都是通过offer操作来实现,下面来看一下offer
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture e = (RunnableScheduledFuture)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; //如果当前size大于队列的长度,扩容 if (i >= queue.length) grow(); size = i + 1; if (i == 0) { //队列为空,则任务入队列 queue[0] = e; setIndex(e, 0); } else { //否则右旋,使二叉树堆平衡 siftUp(i, e); } if (queue[0] == e) { //如果添加的任务为队列头,触发条件available leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
offer方法有两点要关注,
1.
//如果当前size大于队列的长度,扩容 if (i >= queue.length) grow();
2.
if (i == 0) { //队列为空,则任务入队列 queue[0] = e; setIndex(e, 0); } else { //否则右旋,使二叉树堆平衡 siftUp(i, e); }
下面分别来看;
1.
//如果当前size大于队列的长度,扩容 if (i >= queue.length) grow();
/** * Resize the heap array. Call only when holding lock. 扩容数组堆容量为原来的1.5倍 */ private void grow() { int oldCapacity = queue.length; int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% if (newCapacity < 0) // overflow newCapacity = Integer.MAX_VALUE; queue = Arrays.copyOf(queue, newCapacity); }
2.
if (i == 0) { //队列为空,则任务入队列 queue[0] = e; setIndex(e, 0); } else { //否则右旋,使二叉树数组堆平衡 siftUp(i, e); }
/** * Sift element added at bottom up to its heap-ordered spot. * Call only when holding lock. 当向堆底添加任务时,右旋使其处于堆中的应有顺序点 */ private void siftUp(int k, RunnableScheduledFuture key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
从上面来看,offer操作首先判断当前队列size,如果当前size大于队列的长度,
扩容数组堆容量为原来的1.5倍,然后任务入队列,如果新添加的任务为队列头,
释放leader为null,触发条件available。
再来看peek操作:直接返回队列头任务
public RunnableScheduledFuture peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return queue[0]; } finally { lock.unlock(); } }
来看take操作:
public RunnableScheduledFuture take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//以可中断方式获取锁 try { for (;;) { RunnableScheduledFuture first = queue[0]; if (first == null) //如果队列头为空,则等待队列可用条件available available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) //如果队列头的延时时间小于0,即过期,返回队头任务,并左旋使二叉树数组堆平衡 return finishPoll(first); else if (leader != null) //如果延时不为0,且leader不为null,则等待队列可用条件available available.await(); else { Thread thisThread = Thread.currentThread(); //如果leader为null,则设置当前线程为leader leader = thisThread; try { //超时等待available条件 available.awaitNanos(delay); } finally { if (leader == thisThread) //释放leader leader = null; } } } } } finally { if (leader == null && queue[0] != null) //leader为null,且队列中有任务可用,则唤醒available available.signal(); lock.unlock(); } }
take操作,有一个地方是我们要看的
if (delay <= 0) //如果队列头的延时时间小于0,即过期 return finishPoll(first);
/** * Performs common bookkeeping for poll and take: Replaces * first element with last and sifts it down. Call only when * holding lock. 用最后一个任务替换第一个任务,左旋使二叉树数组堆平衡 * @param f the task to remove and return */ private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { int s = --size; RunnableScheduledFuture x = queue[s]; queue[s] = null; if (s != 0) //将最后一个任务放在第一个位置上,左旋使二叉树数组堆平衡 siftDown(0, x); setIndex(f, -1); return f; } /** * Sift element added at top down to its heap-ordered spot. * Call only when holding lock. 左旋,这里我们就不说了,在Delay队列中有说 */ private void siftDown(int k, RunnableScheduledFuture key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; RunnableScheduledFuture c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); }
take操作,首先判断队列是否为空,空则等待available条件,不为空,则获取队列头任务的延时时间,如果队列头的延时时间小于0,即过期,返回队头任务,并左旋使二叉树数组堆平衡,否则判断leader是否为null,不为null,则等待available条件,为null,设置当前线程为leader。
再来看超时poll
public RunnableScheduledFuture poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//以可中断方式获取锁 try { for (;;) { RunnableScheduledFuture first = queue[0]; if (first == null) { if (nanos <= 0) //如果队列为空,且超时时间小于0,则返回null return null; else //否则超时等待available条件 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) //如果队列头的延时时间小于0,即过期,返回队头任务,并左旋使二叉树数组堆平衡 return finishPoll(first); if (nanos <= 0) //如果队列为空,但超时时间小于0,则返回null return null; if (nanos < delay || leader != null) //如果超时时间小于队列头任务延时时间,或leader不为null,则超时等待available nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft;//nanos =nanos - delay + timeLeft,需要等待的时间 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
超时poll与take不同的是在等待available条件上为超时等待。
再看poll操作:
public RunnableScheduledFuture poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture first = queue[0]; if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) //如果队列为空,或队列头任务延时时间大于0,则返回null, return null; else //否则,返回队头任务,并左旋使二叉树数组堆平衡 return finishPoll(first); } finally { lock.unlock(); } }
poll操作,首先判断队列是否为空或或队列头任务延时时间是否大于0,
如果队列都为空或或或队列头任务延时时间大于0,则返回null,否则,返回队头任务,并左旋使二叉树数组堆平衡
再来看remove操作:
public boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { //定位任务位置 int i = indexOf(x); if (i < 0) //不存在指定任务,返回false return false; setIndex(queue[i], -1); int s = --size; RunnableScheduledFuture replacement = queue[s]; queue[s] = null; if (s != i) { siftDown(i, replacement); if (queue[i] == replacement) siftUp(i, replacement); } return true; } finally { lock.unlock(); } }
有了前面的操作,remove没有什么多说的,唯一要说的是定位任务位置
//定位任务位置
int i = indexOf(x);
/** * Find index of given object, or -1 if absent */ private int indexOf(Object x) { if (x != null) { if (x instanceof ScheduledFutureTask) { int i = ((ScheduledFutureTask) x).heapIndex; // Sanity check; x could conceivably be a // ScheduledFutureTask from some other pool. if (i >= 0 && i < size && queue[i] == x) //根据调度任务在数组堆中的索引,直接定位,并判断任务是否相等 return i; } else { //否则遍历队列,找与x相等任务 for (int i = 0; i < size; i++) if (x.equals(queue[i])) return i; } } return -1; }
从DelayedWorkQueue的Remove操作中,我们可以看到ScheduledFutureTask的heapIndex
的作用,如果移除的任务为ScheduledFutureTask,我们可以根据heapIndex直接移除任务,
否则遍历队列,找与x相等任务并移除。
再看其他操作:
//判断是否包含指定任务 public boolean contains(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { //委托给indexOf return indexOf(x) != -1; } finally { lock.unlock(); } } //清空操作 public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { //遍历队列,置null堆数组任务 for (int i = 0; i < size; i++) { RunnableScheduledFuture t = queue[i]; if (t != null) { queue[i] = null; setIndex(t, -1); } } size = 0; } finally { lock.unlock(); } } //drainTo操作: public int drainTo(Collection<? super Runnable> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture first; int n = 0; while ((first = pollExpired()) != null) { //遍历队列,如果队头任务不为空,且以过期,添加到集合 c.add(first); ++n; } return n; } finally { lock.unlock(); } } //排空指定数量的任务 public int drainTo(Collection<? super Runnable> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture first; int n = 0; while (n < maxElements && (first = pollExpired()) != null) { c.add(first); ++n; } return n; } finally { lock.unlock(); } } /** * Return and remove first element only if it is expired. * Used only by drainTo. Call only when holding lock. */ private RunnableScheduledFuture pollExpired() { RunnableScheduledFuture first = queue[0]; if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) //如队列为空或队列头任务延时时间大于0,则返回为null return null; return finishPoll(first); } //实际容量 public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return size; } finally { lock.unlock(); } } //是否为空 public boolean isEmpty() { return size() == 0; } //剩余容量 public int remainingCapacity() { return Integer.MAX_VALUE; }
小节一下DelayedWorkQueue:
put,add,超时offer操作都是通过offer操作来实现;
offer操作首先判断当前队列size,如果当前size大于队列的长度,
扩容数组堆容量为原来的1.5倍,然后任务入队列,如果新添加的任务为队列头,
释放leader为null,触发条件available。take操作,首先判断队列是否为空,空则等待available条件,不为空,则获取队列头任务的延时时间,如果队列头的延时时间小于0,即过期,返回队头任务,并左旋使二叉树数组堆平衡,否则判断leader是否为null,不为null,则等待available条件,为null,设置当前线程为leader。超时poll与take不同的是在等待available条件上为超时等待。poll操作,首先判断队列是否为空或或队列头任务延时时间是否大于0,如果队列都为空或或或队列头任务延时时间大于0,则返回null,否则,返回队头任务,并左旋使二叉树数组堆平衡
总结:
ScheduledFutureTask用一个序列号标识延时任务的执行编号,以保证任务的调度按照FIFO的顺序,用time记录任务执行的系统时间,period是任务执行间隔时间,
用于计算下一次任务执行系统时间,outerTask为实际的调度任务,heapIndex为任务在队列的索引。调度线程池执行器用DelayedWorkQueue来存储调度任务,DelayedWorkQueue与
延时队列DelayedQueue有点像,一个可重入锁控制队列的并发访问,一个available条件控制队列中是否有任务可用,leader为当前正在等待队列头任务可用(队列不为空,队列头任务过期)的线程,当队列不为空或leader被释放,才会触发available条件。DelayedWorkQueue是特为存放ScheduledFutureTask调度任务而定制的。
ScheduledThreadPoolExecutor解析二(任务调度):
http://donald-draper.iteye.com/blog/2367593
发表评论
-
Executors解析
2017-04-07 14:38 1245ThreadPoolExecutor解析一(核心线程池数量、线 ... -
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4451ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2117ScheduledThreadPoolExecutor解析一( ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9100Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6081Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3036Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20514Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1502Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1071Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1586Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1061Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1324package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1193/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1159Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1672package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2025线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 920Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1733Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2133Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析上-TransferStack
2017-03-21 22:08 3048Queue接口定义:http://donald-draper. ...
相关推荐
在计算机操作系统中,任务调度是核心功能之一,用于决定如何在多任务环境下分配CPU时间片。多级反馈队列调度算法(Multi-Level Feedback Queue Scheduling)是一种高效且灵活的调度策略,广泛应用于现代操作系统中。...
任务队列通过调度策略来决定任务的执行顺序,如先入先出(FIFO)、优先级排序等。 线程池的常见类型: 1. `SingleThreadExecutor`:只有一个工作线程,确保任务按顺序执行,适合用于需要保持顺序的场景。 2. `...
在这个场景中,"任务调度,任务调度,任务调度"的标题强调了对这一主题的重视,而描述提到的"简单的任务调度池"暗示了我们正在讨论一种能够处理大量任务(例如300万个)的高效机制,而不会导致系统崩溃或性能严重...
多级反馈队列调度算法(Multilevel Feedback Queue Scheduling,MLFQ)是一种常见的操作系统进程调度策略,它结合了时间片轮转和优先级调度的优点,以优化系统的响应时间和周转时间。在深入探讨MLFQ的实现之前,我们...
多级反馈队列(Multilevel Feedback Queue, MFQ)调度算法是一种在现代操作系统中常用的进程调度策略。该算法通过将进程分配到不同优先级的就绪队列中,并为每个队列设定不同的时间片大小来实现公平性和响应性之间的...
多级反馈队列调度算法(Multilevel Feedback Queue Scheduling,MLFQ)是一种在操作系统中用于进程调度的策略,其目标是优化系统的整体性能,兼顾各种类型的任务,确保响应时间和吞吐量的平衡。该算法的核心思想是将...
在操作系统中,进程调度是一项至关重要的任务,它决定了哪个进程能够在CPU上执行以及执行的时间长度。本报告主要关注多级反馈队列调度算法(Multi-Level Feedback Queue Scheduling,MLFQ)的模拟,这是一种广泛应用...
### 深入解析多级反馈队列调度算法 #### 引言 在现代操作系统中,进程调度算法是核心组成部分之一,它对于确保系统高效、公平地分配计算资源至关重要。多级反馈队列调度(Multi-Level Feedback Queue Scheduling, ...
在IT行业中,定时调度任务服务是不可或缺的一部分,它使得系统能够按预设的时间间隔自动执行某些任务,提高了工作效率并降低了人为操作的复杂性。本文将深入探讨如何利用C#语言,结合Quartz.NET任务调度库以及Log4...
`Android-TimeTask`就是这样一款专为Android设计的轻量级、简洁的定时任务队列框架。它旨在简化多组任务的分发和管理工作,使开发者能够更加专注于业务逻辑,而不是底层的调度细节。 `TimeTask`框架的核心思想是...
它通过将任务放入队列中,然后由一个或多个工作线程按顺序取出并执行,有效地实现了任务的异步处理和调度。下面将详细讨论如何实现C#任务队列及其相关知识点。 1. **线程安全**: - 在多线程环境下,对任务队列的...
多级反馈队列调度算法是一种在操作系统中广泛使用的CPU调度策略,旨在平衡系统中不同类型进程的需求,确保高优先级进程能够及时响应,同时又能让短进程快速完成。这种算法的特点在于它能够动态调整进程的优先级和...
多级反馈队列调度C语言设计进程控制块PCB表结构,适用于多级反馈队列调度算法。PCB结构通常包括以下信息:进程名,进程优先数,轮转时间片,进程已占用的CPU时间,进程还需要的CPU时间,进程的状态,当前队列指针等...
用C语言实现的多级反馈队列调度算法,操作系统课程作业。用VC6.0调试通过。
多级反馈队列调度算法(Multilevel Feedback Queue Scheduling,MLFQ)是一种高效且灵活的调度策略,广泛应用于现代操作系统中。本项目通过Java语言实现了这一算法,为学习者提供了实际操作和理解该算法的机会。 ...
### IBM TSM 调度任务解析 #### 一、TSM调度任务基本概念 IBM Tivoli Storage Manager (简称TSM) 是一款强大的数据保护和存储管理解决方案,旨在简化数据备份、恢复以及数据归档的过程。TSM的调度任务功能是其核心...
Python中的huey是一个轻量级、灵活且功能强大的任务队列库,主要设计用于处理异步任务和后台作业。huey提供了一种简单的方法来在Python应用中引入多线程和多进程,使得复杂的任务可以被分解为可独立执行的小单元,...
在IT领域,任务队列是一种常见的编程概念,尤其在多线程和并发编程中发挥着重要作用。本实例将深入探讨C++中实现简单任务队列的方法。任务队列,顾名思义,是用于存储待执行任务的数据结构,通常采用先进先出(FIFO...
总的来说,【基于三支队列的实时云任务节能调度算法】是一项创新性的研究,它将三支决策理论应用于云任务调度,旨在通过精细化的任务管理和智能的队列策略,以达到降低能耗、提高资源利用率的目标。这种算法对于优化...