`
Donald_Draper
  • 浏览: 979771 次
社区版块
存档分类
最新评论

ScheduledThreadPoolExecutor解析一(调度任务,任务队列)

    博客分类:
  • JUC
阅读更多
Executor接口的定义:http://donald-draper.iteye.com/blog/2365625
ExecutorService接口定义:http://donald-draper.iteye.com/blog/2365738
Future接口定义:http://donald-draper.iteye.com/blog/2365798
FutureTask解析:http://donald-draper.iteye.com/blog/2365980
CompletionService接口定义:http://donald-draper.iteye.com/blog/2366239
ExecutorCompletionService解析:http://donald-draper.iteye.com/blog/2366254
AbstractExecutorService解析:http://donald-draper.iteye.com/blog/2366348
ScheduledExecutorService接口定义:http://donald-draper.iteye.com/blog/2366436
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等) :
http://donald-draper.iteye.com/blog/2366934
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
http://donald-draper.iteye.com/blog/2367064
ThreadPoolExecutor解析三(线程池执行提交任务):
http://donald-draper.iteye.com/blog/2367199
ThreadPoolExecutor解析四(线程池关闭):
http://donald-draper.iteye.com/blog/2367246
package java.util.concurrent;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import java.util.*;

/**
 * A {@link ThreadPoolExecutor} that can additionally schedule
 * commands to run after a given delay, or to execute
 * periodically. This class is preferable to {@link java.util.Timer}
 * when multiple worker threads are needed, or when the additional
 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
 * this class extends) are required.
 *
 ScheduledThreadPoolExecutor可以在指定的时延后,调度一个任务,或间歇性
 地执行任务。当需要多线程执行任务或需要ThreadPoolExecutor的灵活性和功能性,
ScheduledThreadPoolExecutor是一个比java.util.Timer更优的选择。
 * <p>Delayed tasks execute no sooner than they are enabled, but
 * without any real-time guarantees about when, after they are
 * enabled, they will commence. Tasks scheduled for exactly the same
 * execution time are enabled in first-in-first-out (FIFO) order of
 * submission.
 *
延时任务在启动后,不久后将执行,但不能保证具体的开始执行的真实时间。
执行时间完全相同的调度任务,将会以提交的顺序FIFO启动。
 * <p>When a submitted task is cancelled before it is run, execution
 * is suppressed. By default, such a cancelled task is not
 * automatically removed from the work queue until its delay
 * elapses. While this enables further inspection and monitoring, it
 * may also cause unbounded retention of cancelled tasks. To avoid
 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
 * causes tasks to be immediately removed from the work queue at
 * time of cancellation.
 *
 当一个提交任务在执行前被取消,则将取消执行。默认情况下,取消的任务不会自动
的从任务队列移除,除非延时时间过期。如果检查和监视功能开启,可能引起取消任务
无限保留的问题。为了避免这种事情,我们可以设置#setRemoveOnCancelPolicy为true,
可以保证当任务被取消时,立刻从任务队列移除。
 * <p>Successive executions of a task scheduled via
 * {@code scheduleAtFixedRate} or
 * {@code scheduleWithFixedDelay} do not overlap. While different
 * executions may be performed by different threads, the effects of
 * prior executions <a
 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * those of subsequent ones.
 *
 通过scheduleAtFixedRate和scheduleWithFixedDelay调度的任务不会重复。
 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
 * of the inherited tuning methods are not useful for it. In
 * particular, because it acts as a fixed-sized pool using
 * {@code corePoolSize} threads and an unbounded queue, adjustments
 * to {@code maximumPoolSize} have no useful effect. Additionally, it
 * is almost never a good idea to set {@code corePoolSize} to zero or
 * use {@code allowCoreThreadTimeOut} because this may leave the pool
 * without threads to handle tasks once they become eligible to run.
 *
调度线程池执行器继承ThreadPoolExecutor,有一些继承的调校方法,实际上没有什么用处。
在特殊情况下,因为用corePoolSize的数量,创建一个固定的线程池和无界队列,
调整maximumPoolSize,将没有什么用处。另外不将以将核心线程池数量设置0和
用allowCoreThreadTimeOut,因为这也许导致在任务延时时间过期时,线程池没有线程可以用于
执行任务。

 * <p><b>Extension notes:</b> This class overrides the
 * {@link ThreadPoolExecutor#execute execute} and
 * {@link AbstractExecutorService#submit(Runnable) submit}
 * methods to generate internal {@link ScheduledFuture} objects to
 * control per-task delays and scheduling.  To preserve
 * functionality, any further overrides of these methods in
 * subclasses must invoke superclass versions, which effectively
 * disables additional task customization.  However, this class
 * provides alternative protected extension method
 * {@code decorateTask} (one version each for {@code Runnable} and
 * {@code Callable}) that can be used to customize the concrete task
 * types used to execute commands entered via {@code execute},
 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
 * and {@code scheduleWithFixedDelay}.  By default, a
 * {@code ScheduledThreadPoolExecutor} uses a task type extending
 * {@link FutureTask}. However, this may be modified or replaced using
 * subclasses of the form:
 *
 调度线程池执行器重写了ThreadPoolExecutor#execute和AbstractExecutorService#submit(Runnable)
 方法,用内部的ScheduledFuture控制每个任务的延时和调度。为了保护功能性,
 重写的方法都要调用父类的对应方法,这可以有效的使定制的任务功能失效。另外
 调度线程池执行器提供了一个protected的方法decorateTask(Runnable和Callable两个版本),
 可以定制具体通过execute,submit,schedule,scheduleAtFixedRate,
 scheduleWithFixedDelay执行的任务。默认调度线程池执行器用一个扩展FutureTask来表示一个任务。
 *  <pre> {@code
 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
 *
 *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
 *
 *   protected <V> RunnableScheduledFuture<V> decorateTask(
 *                Runnable r, RunnableScheduledFuture<V> task) {
 *       return new CustomTask<V>(r, task);
 *   }
 *
 *   protected <V> RunnableScheduledFuture<V> decorateTask(
 *                Callable<V> c, RunnableScheduledFuture<V> task) {
 *       return new CustomTask<V>(c, task);
 *   }
 *   // ... add constructors, etc.
 * }}</pre>
 *
 * @since 1.5
 * @author Doug Lea
 */
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
	 /*
     * This class specializes ThreadPoolExecutor implementation by
     *
     调度线程池执行器是一个特殊的线程池执行器实现,具体如下:
     * 1. Using a custom task type, ScheduledFutureTask for
     *    tasks, even those that don't require scheduling (i.e.,
     *    those submitted using ExecutorService execute, not
     *    ScheduledExecutorService methods) which are treated as
     *    delayed tasks with a delay of zero.
     *
     1.用ScheduledFutureTask表示一个调度任务,如果用ExecutorService的
     方法execute,而不是ScheduledExecutorService方法提价的任务,将会被
     对待为一个0延时的任务。
     * 2. Using a custom queue (DelayedWorkQueue), a variant of
     *    unbounded DelayQueue. The lack of capacity constraint and
     *    the fact that corePoolSize and maximumPoolSize are
     *    effectively identical simplifies some execution mechanics
     *    (see delayedExecute) compared to ThreadPoolExecutor.
     *
     2.用一个DelayedWorkQueue(无界DelayQueue的变种),容量和,corePoolSize
     和maximumPoolSize将会影响具体的任务执行机制(delayedExecute)。
     * 3. Supporting optional run-after-shutdown parameters, which
     *    leads to overrides of shutdown methods to remove and cancel
     *    tasks that should NOT be run after shutdown, as well as
     *    different recheck logic when task (re)submission overlaps
     *    with a shutdown.
     *
     3.支持run-after-shutdown选择参数,用于重写移除和取消在关闭之后没有执行的任务的shutdown方法
     当任务在关闭时,重复提交的任务会有重新检查。
     * 4. Task decoration methods to allow interception and
     *    instrumentation, which are needed because subclasses cannot
     *    otherwise override submit methods to get this effect. These
     *    don't have any impact on pool control logic though.
     */
      4.decoration方法运行监控任务,这个方法是需要的,因为重写submit方法,
      不能实现这样的效果。这个方法不会影响线程池的控制逻辑。
    /**
     * False if should cancel/suppress periodic tasks on shutdown.
     如果应该在关闭时,取消间歇性的任务,则为false
     */
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;

    /**
     * False if should cancel non-periodic tasks on shutdown.
     如果应该在线程池关闭时取消非间歇性任务,则为false
     */
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    /**
     * True if ScheduledFutureTask.cancel should remove from queue
     当任务取消时,是否应该将调度任务从队列中移除
     */
    private volatile boolean removeOnCancel = false;

    /**
     * Sequence number to break scheduling ties, and in turn to
     * guarantee FIFO order among tied entries.
     调度任务break任务的序列号,为了保证任务FIFO的特性
     */
    private static final AtomicLong sequencer = new AtomicLong(0);
     /**
     * Returns current nanosecond time.系统当前时间
     */
    final long now() {
        return System.nanoTime();
    }
    //调度任务
    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        /** Sequence number to break ties FIFO 任务序列号*/
        private final long sequenceNumber;

        /** The time the task is enabled to execute in nanoTime units 任务执行的系统时间*/
        private long time;

        /**
         * Period in nanoseconds for repeating tasks.  A positive
         * value indicates fixed-rate execution.  A negative value
         * indicates fixed-delay execution.  A value of 0 indicates a
         * non-repeating task.
	 间歇性调度任务的间隔时间,正数表示 fixed-rate执行,负数表示fixed-delay执行,0表示非重复调度任务。
         */
        private final long period;

        /** The actual task to be re-enqueued by reExecutePeriodic 实际任务*/
        RunnableScheduledFuture<V> outerTask = this;

        /**
         * Index into delay queue, to support faster cancellation.延时队列索引
         */
        int heapIndex;

        /**
         * Creates a one-shot action with given nanoTime-based trigger time.
	 创建一个只执行一次的延时任务
         */
        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        /**
         * Creates a periodic action with given nano time and period.
	 创建一个间歇性执行的延时任务
         */
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        /**
         * Creates a one-shot action with given nanoTime-based trigger.
	 创建一个只执行一次的延时任务
         */
        ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }
        //获取任务延时时间
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }
        //比较任务的延时时间
        public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }

        /**
         * Returns true if this is a periodic (not a one-shot) action.
         *
	 是否是间歇性执行的任务
         * @return true if periodic
         */
        public boolean isPeriodic() {
            return period != 0;
        }

        /**
         * Sets the next time to run for a periodic task.
	 设置间歇性任务下一次执行的时间
         */
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }
        //取消调度任务
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            if (cancelled && removeOnCancel && heapIndex >= 0)
                remove(this);//从任务队列移除任务
            return cancelled;
        }

        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
	 如果是间歇性任务,重新设置下一次执行的时间,并重新入任务队列
         */
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);//如果在线程池关闭时可以取消任务,则以不可中断方式取消任务
            else if (!periodic)
                ScheduledFutureTask.super.run();//如果非间歇性任务,则直接运行
            else if (ScheduledFutureTask.super.runAndReset()) {//执行调度任务
	        //设置下一次执行的时间
                setNextRunTime();
		//重新入任务队列
                reExecutePeriodic(outerTask);
            }
        }
    }
 }

ScheduledFutureTask有两个方法需要关注一下,我们分别来看,
/**
  * Sets the next time to run for a periodic task.
 设置间歇性任务下一次执行的时间,大于零直接添加,小于零,则需要重新计算触发时间
  */
 private void setNextRunTime() {
     long p = period;
     if (p > 0)
         time += p;
     else
         time = triggerTime(-p);
 }

    /**
     * Returns the trigger time of a delayed action.返回触发时间延时
     */
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

/**
     * Constrains the values of all delays in the queue to be within
     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
     * This may occur if a task is eligible to be dequeued, but has
     * not yet been, while some other task is added with a delay of
     * Long.MAX_VALUE.
     */
    private long overflowFree(long delay) {
        Delayed head = (Delayed) super.getQueue().peek();
        if (head != null) {
            long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
            if (headDelay < 0 && (delay - headDelay < 0))
	        //如果延时时间小于队列头任务的延时时间,则delay为Long的最大值+队列头任务延时
                delay = Long.MAX_VALUE + headDelay;
        }
        return delay;
    }

再来看执行:
/**
  * Overrides FutureTask version so as to reset/requeue if periodic.
 如果是间歇性任务,重新设置下一次执行的时间,并重新入任务队列
  */
 public void run() {
     boolean periodic = isPeriodic();
     if (!canRunInCurrentRunState(periodic))
         //如果在线程池关闭时可以取消任务,则以不可中断方式取消任务,此关闭非立即关闭
	 //等待已经执行的任务执行完
         cancel(false);
     else if (!periodic)
         ScheduledFutureTask.super.run();//如果非间歇性任务,则直接运行
     else if (ScheduledFutureTask.super.runAndReset()) {//执行调度任务
        //设置下一次执行的时间
         setNextRunTime();
	//重新入任务队列
         reExecutePeriodic(outerTask);
     }
 }

需要关注的是重新入任务队列
/**
     * Requeues a periodic task unless current run state precludes it.
     * Same idea as delayedExecute except drops task rather than rejecting.
     *
     * @param task the task
     */
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);//添加任务到队列
            if (!canRunInCurrentRunState(true) && remove(task))
	        //如果当前线程状态不接受任务,则移除任务,并取消
                task.cancel(false);
            else
	        //保证有一个空闲工作线程,等待任务
                ensurePrestart();
        }
    }

上面看了调度任务的包装类ScheduledFutureTask,
ScheduledFutureTask用一个序列号标识延时任务的执行编号,以保证任务的调度
按照FIFO的顺序,用time记录任务执行的系统时间,period是任务执行间隔时间,
用于计算下一次任务执行系统时间,outerTask为实际的调度任务,heapIndex为
任务在队列的索引。
我们再来看一下任务队列:

   
/**
     * Specialized delay queue. To mesh with TPE declarations, this
     * class must be declared as a BlockingQueue<Runnable> even though
     * it can only hold RunnableScheduledFutures.
     */
    static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

        /*
         * A DelayedWorkQueue is based on a heap-based data structure
         * like those in DelayQueue and PriorityQueue, except that
         * every ScheduledFutureTask also records its index into the
         * heap array. This eliminates the need to find a task upon
         * cancellation, greatly speeding up removal (down from O(n)
         * to O(log n)), and reducing garbage retention that would
         * otherwise occur by waiting for the element to rise to top
         * before clearing. But because the queue may also hold
         * RunnableScheduledFutures that are not ScheduledFutureTasks,
         * we are not guaranteed to have such indices available, in
         * which case we fall back to linear search. (We expect that
         * most tasks will not be decorated, and that the faster cases
         * will be much more common.)
         *
	 DelayedWorkQueue是一个基于堆数据结构的队列,如延时队列和有限级队列一样,
	 除此之外,ScheduledFutureTask同时记录任务在堆数组上的索引index。
	 记录索引的目的是当要取消任务时,快速地定位到取消任务,提高了移除
	 的时间复杂度从O(n),降到O(log n)),减少了等待任务在在清除前旋转到顶部
	 的产生的内存垃圾。因为队列可能存放非调度任务的RunnableScheduledFutures,
	 我们不能保证这个索引index可用,在这种情况下,我们将退回到线性的时间复杂度搜索。
	 (我们希望大部分的任务不要包装,以防退回到线性的时间复杂度搜索)

         * All heap operations must record index changes -- mainly
         * within siftUp and siftDown. Upon removal, a task's
         * heapIndex is set to -1. Note that ScheduledFutureTasks can
         * appear at most once in the queue (this need not be true for
         * other kinds of tasks or work queues), so are uniquely
         * identified by heapIndex.
	 所有的堆操作必须记录索引的改变,主要为堆的上旋(右旋)与下旋(左旋)。
	 在任务移除后,heapIndex将会被被设置为-1.ScheduledFutureTasks任务
	 可能出现在队列不止一次,所以heapIndex必须唯一。
         */

        private static final int INITIAL_CAPACITY = 16;//初始容量
        private RunnableScheduledFuture[] queue =
            new RunnableScheduledFuture[INITIAL_CAPACITY];//存放任务的堆数组
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;//队列size

        /**
         * Thread designated to wait for the task at the head of the
         * queue.  This variant of the Leader-Follower pattern
         * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
         * minimize unnecessary timed waiting.  When a thread becomes
         * the leader, it waits only for the next delay to elapse, but
         * other threads await indefinitely.  The leader thread must
         * signal some other thread before returning from take() or
         * poll(...), unless some other thread becomes leader in the
         * interim.  Whenever the head of the queue is replaced with a
         * task with an earlier expiration time, the leader field is
         * invalidated by being reset to null, and some waiting
         * thread, but not necessarily the current leader, is
         * signalled.  So waiting threads must be prepared to acquire
         * and lose leadership while waiting.
         leader为等待队列头的任务的线程。这是Leader-Follower服务模式的一个变种,
	 可以减少不必要的时间等待。当一个线程变为leader时,将会等待下一个延时时间过期的
	 任务,但是其他线程的等待时不确定性的。leader线程必须在take或poll方法返回时,
	 必须通知其他线程,除非其他线程在take或poll的操作过程中成为了Leader。当队列的头被一个
	 更早过期时间的任务替换时,leader线程将会被设置为null,一些等待的线程,不需要
	 当前leader唤醒。在等待的过程中,线程必须时刻准备获取或者失去leader权限。

         */
        private Thread leader = null;

        /**
         * Condition signalled when a newer task becomes available at the
         * head of the queue or a new thread may need to become leader.
	 当一个任务的任务在队列头部可以用或一个新线程成为leader时,触发available。
         */
        private final Condition available = lock.newCondition();
	 /**
         * Set f's heapIndex if it is a ScheduledFutureTask.
	 设置ScheduledFutureTask的在数组堆中的索引
         */
        private void setIndex(RunnableScheduledFuture f, int idx) {
            if (f instanceof ScheduledFutureTask)
                ((ScheduledFutureTask)f).heapIndex = idx;
        }
	 public void put(Runnable e) {
            offer(e);
        }

        public boolean add(Runnable e) {
            return offer(e);
        }

        public boolean offer(Runnable e, long timeout, TimeUnit unit) {
            return offer(e);
        }
}

put,add,超时offer操作都是通过offer操作来实现,下面来看一下offer
public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
		//如果当前size大于队列的长度,扩容
                if (i >= queue.length)
                    grow();
                size = i + 1;
                if (i == 0) {
		    //队列为空,则任务入队列
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
		    //否则右旋,使二叉树堆平衡
                    siftUp(i, e);
                }
                if (queue[0] == e) {
		    //如果添加的任务为队列头,触发条件available
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

offer方法有两点要关注,
1.
//如果当前size大于队列的长度,扩容
if (i >= queue.length)
    grow();


2.
if (i == 0) {
    //队列为空,则任务入队列
    queue[0] = e;
    setIndex(e, 0);
} else {
    //否则右旋,使二叉树堆平衡
    siftUp(i, e);
}

下面分别来看;
1.
//如果当前size大于队列的长度,扩容
if (i >= queue.length)
    grow();

 /**
 * Resize the heap array.  Call only when holding lock.
 扩容数组堆容量为原来的1.5倍
 */
private void grow() {
    int oldCapacity = queue.length;
    int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
    if (newCapacity < 0) // overflow
        newCapacity = Integer.MAX_VALUE;
    queue = Arrays.copyOf(queue, newCapacity);
}

2.
if (i == 0) {
    //队列为空,则任务入队列
    queue[0] = e;
    setIndex(e, 0);
} else {
    //否则右旋,使二叉树数组堆平衡
    siftUp(i, e);
}

/**
  * Sift element added at bottom up to its heap-ordered spot.
  * Call only when holding lock.
  当向堆底添加任务时,右旋使其处于堆中的应有顺序点
  */
 private void siftUp(int k, RunnableScheduledFuture key) {
     while (k > 0) {
         int parent = (k - 1) >>> 1;
         RunnableScheduledFuture e = queue[parent];
         if (key.compareTo(e) >= 0)
             break;
         queue[k] = e;
         setIndex(e, k);
         k = parent;
     }
     queue[k] = key;
     setIndex(key, k);
 }

从上面来看,offer操作首先判断当前队列size,如果当前size大于队列的长度,
扩容数组堆容量为原来的1.5倍,然后任务入队列,如果新添加的任务为队列头,
释放leader为null,触发条件available。

再来看peek操作:直接返回队列头任务
public RunnableScheduledFuture peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return queue[0];
    } finally {
        lock.unlock();
    }
}

来看take操作:
public RunnableScheduledFuture take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//以可中断方式获取锁
    try {
        for (;;) {
            RunnableScheduledFuture first = queue[0];
            if (first == null)
	        //如果队列头为空,则等待队列可用条件available
                available.await();
            else {
                long delay = first.getDelay(TimeUnit.NANOSECONDS);
                if (delay <= 0)
		    //如果队列头的延时时间小于0,即过期,返回队头任务,并左旋使二叉树数组堆平衡
                    return finishPoll(first);
                else if (leader != null)
		    //如果延时不为0,且leader不为null,则等待队列可用条件available
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
		     //如果leader为null,则设置当前线程为leader
                    leader = thisThread;
                    try {
		        //超时等待available条件
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
			    //释放leader
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
	    //leader为null,且队列中有任务可用,则唤醒available
            available.signal();
        lock.unlock();
    }
}

take操作,有一个地方是我们要看的
 if (delay <= 0)
     //如果队列头的延时时间小于0,即过期
     return finishPoll(first);


 
/**
   * Performs common bookkeeping for poll and take: Replaces
   * first element with last and sifts it down.  Call only when
   * holding lock.
   用最后一个任务替换第一个任务,左旋使二叉树数组堆平衡
   * @param f the task to remove and return
   */
  private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
      int s = --size;
      RunnableScheduledFuture x = queue[s];
      queue[s] = null;
      if (s != 0)
          //将最后一个任务放在第一个位置上,左旋使二叉树数组堆平衡
          siftDown(0, x);
      setIndex(f, -1);
      return f;
  }
/**
 * Sift element added at top down to its heap-ordered spot.
 * Call only when holding lock.
 左旋,这里我们就不说了,在Delay队列中有说
 */
private void siftDown(int k, RunnableScheduledFuture key) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        RunnableScheduledFuture c = queue[child];
        int right = child + 1;
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo(c) <= 0)
            break;
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

take操作,首先判断队列是否为空,空则等待available条件,不为空,则获取队列头任务的延时时间,如果队列头的延时时间小于0,即过期,返回队头任务,并左旋使二叉树数组堆平衡,否则判断leader是否为null,不为null,则等待available条件,为null,设置当前线程为leader。
再来看超时poll
public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
      throws InterruptedException {
      long nanos = unit.toNanos(timeout);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();//以可中断方式获取锁
      try {
          for (;;) {
              RunnableScheduledFuture first = queue[0];
              if (first == null) {
                  if (nanos <= 0)
		      //如果队列为空,且超时时间小于0,则返回null
                      return null;
                  else
		      //否则超时等待available条件
                      nanos = available.awaitNanos(nanos);
              } else {
                  long delay = first.getDelay(TimeUnit.NANOSECONDS);
                  if (delay <= 0)
		      //如果队列头的延时时间小于0,即过期,返回队头任务,并左旋使二叉树数组堆平衡
                      return finishPoll(first);
                  if (nanos <= 0)
		      //如果队列为空,但超时时间小于0,则返回null
                      return null;
                  if (nanos < delay || leader != null)
		      //如果超时时间小于队列头任务延时时间,或leader不为null,则超时等待available
                      nanos = available.awaitNanos(nanos);
                  else {
                      Thread thisThread = Thread.currentThread();
                      leader = thisThread;
                      try {
                          long timeLeft = available.awaitNanos(delay);
                          nanos -= delay - timeLeft;//nanos =nanos - delay + timeLeft,需要等待的时间
                      } finally {
                          if (leader == thisThread)
                              leader = null;
                      }
                  }
              }
          }
      } finally {
          if (leader == null && queue[0] != null)
              available.signal();
          lock.unlock();
      }
  }

超时poll与take不同的是在等待available条件上为超时等待。
再看poll操作:
 public RunnableScheduledFuture poll() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         RunnableScheduledFuture first = queue[0];
         if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
	     //如果队列为空,或队列头任务延时时间大于0,则返回null,
             return null;
         else
	     //否则,返回队头任务,并左旋使二叉树数组堆平衡
             return finishPoll(first);
     } finally {
         lock.unlock();
     }
 }

poll操作,首先判断队列是否为空或或队列头任务延时时间是否大于0,
如果队列都为空或或或队列头任务延时时间大于0,则返回null,否则,返回队头任务,并左旋使二叉树数组堆平衡
再来看remove操作:
public boolean remove(Object x) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
	        //定位任务位置
                int i = indexOf(x);
                if (i < 0)
		    //不存在指定任务,返回false
                    return false;
                setIndex(queue[i], -1);
                int s = --size;
                RunnableScheduledFuture replacement = queue[s];
                queue[s] = null;
                if (s != i) {
                    siftDown(i, replacement);
                    if (queue[i] == replacement)
                        siftUp(i, replacement);
                }
                return true;
            } finally {
                lock.unlock();
            }
        }

有了前面的操作,remove没有什么多说的,唯一要说的是定位任务位置
//定位任务位置
int i = indexOf(x);

/**
  * Find index of given object, or -1 if absent
  */
 private int indexOf(Object x) {
     if (x != null) {
         if (x instanceof ScheduledFutureTask) {
             int i = ((ScheduledFutureTask) x).heapIndex;
             // Sanity check; x could conceivably be a
             // ScheduledFutureTask from some other pool.
             if (i >= 0 && i < size && queue[i] == x)
	         //根据调度任务在数组堆中的索引,直接定位,并判断任务是否相等
                 return i;
         } else {
	     //否则遍历队列,找与x相等任务
             for (int i = 0; i < size; i++)
                 if (x.equals(queue[i]))
                     return i;
         }
     }
     return -1;
 }

从DelayedWorkQueue的Remove操作中,我们可以看到ScheduledFutureTask的heapIndex
的作用,如果移除的任务为ScheduledFutureTask,我们可以根据heapIndex直接移除任务,
否则遍历队列,找与x相等任务并移除。
再看其他操作:
//判断是否包含指定任务
 public boolean contains(Object x) {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         //委托给indexOf
         return indexOf(x) != -1;
     } finally {
         lock.unlock();
     }
 }
 //清空操作
 public void clear() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
        //遍历队列,置null堆数组任务
         for (int i = 0; i < size; i++) {
             RunnableScheduledFuture t = queue[i];
             if (t != null) {
                 queue[i] = null;
                 setIndex(t, -1);
             }
         }
         size = 0;
     } finally {
         lock.unlock();
     }
 }
//drainTo操作:
  public int drainTo(Collection<? super Runnable> c) {
            if (c == null)
                throw new NullPointerException();
            if (c == this)
                throw new IllegalArgumentException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                RunnableScheduledFuture first;
                int n = 0;
                while ((first = pollExpired()) != null) {
		    //遍历队列,如果队头任务不为空,且以过期,添加到集合
                    c.add(first);
                    ++n;
                }
                return n;
            } finally {
                lock.unlock();
            }
        }
//排空指定数量的任务
 public int drainTo(Collection<? super Runnable> c, int maxElements) {
     if (c == null)
         throw new NullPointerException();
     if (c == this)
         throw new IllegalArgumentException();
     if (maxElements <= 0)
         return 0;
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         RunnableScheduledFuture first;
         int n = 0;
         while (n < maxElements && (first = pollExpired()) != null) {
             c.add(first);
             ++n;
         }
         return n;
     } finally {
         lock.unlock();
     }
 }
/**
 * Return and remove first element only if it is expired.
 * Used only by drainTo.  Call only when holding lock.
 */
private RunnableScheduledFuture pollExpired() {
    RunnableScheduledFuture first = queue[0];
    if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
        //如队列为空或队列头任务延时时间大于0,则返回为null
        return null;
    return finishPoll(first);
}
//实际容量
 public int size() {
     final ReentrantLock lock = this.lock;
     lock.lock();
     try {
         return size;
     } finally {
         lock.unlock();
     }
 }
//是否为空
 public boolean isEmpty() {
     return size() == 0;
 }
//剩余容量
 public int remainingCapacity() {
     return Integer.MAX_VALUE;
 }

小节一下DelayedWorkQueue:
put,add,超时offer操作都是通过offer操作来实现;
offer操作首先判断当前队列size,如果当前size大于队列的长度,
扩容数组堆容量为原来的1.5倍,然后任务入队列,如果新添加的任务为队列头,
释放leader为null,触发条件available。take操作,首先判断队列是否为空,空则等待available条件,不为空,则获取队列头任务的延时时间,如果队列头的延时时间小于0,即过期,返回队头任务,并左旋使二叉树数组堆平衡,否则判断leader是否为null,不为null,则等待available条件,为null,设置当前线程为leader。超时poll与take不同的是在等待available条件上为超时等待。poll操作,首先判断队列是否为空或或队列头任务延时时间是否大于0,如果队列都为空或或或队列头任务延时时间大于0,则返回null,否则,返回队头任务,并左旋使二叉树数组堆平衡

总结:
ScheduledFutureTask用一个序列号标识延时任务的执行编号,以保证任务的调度按照FIFO的顺序,用time记录任务执行的系统时间,period是任务执行间隔时间,
用于计算下一次任务执行系统时间,outerTask为实际的调度任务,heapIndex为任务在队列的索引。调度线程池执行器用DelayedWorkQueue来存储调度任务,DelayedWorkQueue与
延时队列DelayedQueue有点像,一个可重入锁控制队列的并发访问,一个available条件控制队列中是否有任务可用,leader为当前正在等待队列头任务可用(队列不为空,队列头任务过期)的线程,当队列不为空或leader被释放,才会触发available条件。DelayedWorkQueue是特为存放ScheduledFutureTask调度任务而定制的。


ScheduledThreadPoolExecutor解析二(任务调度):
http://donald-draper.iteye.com/blog/2367593

0
0
分享到:
评论

相关推荐

    C语言实现多级反馈队列调度算法

    在计算机操作系统中,任务调度是核心功能之一,用于决定如何在多任务环境下分配CPU时间片。多级反馈队列调度算法(Multi-Level Feedback Queue Scheduling)是一种高效且灵活的调度策略,广泛应用于现代操作系统中。...

    Android中的线程池与任务队列

    任务队列通过调度策略来决定任务的执行顺序,如先入先出(FIFO)、优先级排序等。 线程池的常见类型: 1. `SingleThreadExecutor`:只有一个工作线程,确保任务按顺序执行,适合用于需要保持顺序的场景。 2. `...

    任务调度,任务调度,任务调度

    在这个场景中,"任务调度,任务调度,任务调度"的标题强调了对这一主题的重视,而描述提到的"简单的任务调度池"暗示了我们正在讨论一种能够处理大量任务(例如300万个)的高效机制,而不会导致系统崩溃或性能严重...

    多级反馈队列调度算法实现

    多级反馈队列调度算法(Multilevel Feedback Queue Scheduling,MLFQ)是一种常见的操作系统进程调度策略,它结合了时间片轮转和优先级调度的优点,以优化系统的响应时间和周转时间。在深入探讨MLFQ的实现之前,我们...

    多级反馈队列调度算法 C语言模拟实现

    多级反馈队列(Multilevel Feedback Queue, MFQ)调度算法是一种在现代操作系统中常用的进程调度策略。该算法通过将进程分配到不同优先级的就绪队列中,并为每个队列设定不同的时间片大小来实现公平性和响应性之间的...

    多级反馈队列调度算法

    多级反馈队列调度算法(Multilevel Feedback Queue Scheduling,MLFQ)是一种在操作系统中用于进程调度的策略,其目标是优化系统的整体性能,兼顾各种类型的任务,确保响应时间和吞吐量的平衡。该算法的核心思想是将...

    操作系统课程设计报告-多级反馈队列调度算法模拟

    在操作系统中,进程调度是一项至关重要的任务,它决定了哪个进程能够在CPU上执行以及执行的时间长度。本报告主要关注多级反馈队列调度算法(Multi-Level Feedback Queue Scheduling,MLFQ)的模拟,这是一种广泛应用...

    深入解析多级反馈队列调度算法

    ### 深入解析多级反馈队列调度算法 #### 引言 在现代操作系统中,进程调度算法是核心组成部分之一,它对于确保系统高效、公平地分配计算资源至关重要。多级反馈队列调度(Multi-Level Feedback Queue Scheduling, ...

    C#定时调度任务服务

    在IT行业中,定时调度任务服务是不可或缺的一部分,它使得系统能够按预设的时间间隔自动执行某些任务,提高了工作效率并降低了人为操作的复杂性。本文将深入探讨如何利用C#语言,结合Quartz.NET任务调度库以及Log4...

    Android-TimeTask是一个轻量简洁的定时任务队列框架

    `Android-TimeTask`就是这样一款专为Android设计的轻量级、简洁的定时任务队列框架。它旨在简化多组任务的分发和管理工作,使开发者能够更加专注于业务逻辑,而不是底层的调度细节。 `TimeTask`框架的核心思想是...

    C#任务队列的实现

    它通过将任务放入队列中,然后由一个或多个工作线程按顺序取出并执行,有效地实现了任务的异步处理和调度。下面将详细讨论如何实现C#任务队列及其相关知识点。 1. **线程安全**: - 在多线程环境下,对任务队列的...

    多级反馈队列调度算法的实现

    多级反馈队列调度算法是一种在操作系统中广泛使用的CPU调度策略,旨在平衡系统中不同类型进程的需求,确保高优先级进程能够及时响应,同时又能让短进程快速完成。这种算法的特点在于它能够动态调整进程的优先级和...

    多级反馈队列调度C语言

    多级反馈队列调度C语言设计进程控制块PCB表结构,适用于多级反馈队列调度算法。PCB结构通常包括以下信息:进程名,进程优先数,轮转时间片,进程已占用的CPU时间,进程还需要的CPU时间,进程的状态,当前队列指针等...

    多级反馈队列调度算法C语言源代码

    用C语言实现的多级反馈队列调度算法,操作系统课程作业。用VC6.0调试通过。

    JAVA-计算机操作系统 多级反馈队列调度算法

    多级反馈队列调度算法(Multilevel Feedback Queue Scheduling,MLFQ)是一种高效且灵活的调度策略,广泛应用于现代操作系统中。本项目通过Java语言实现了这一算法,为学习者提供了实际操作和理解该算法的机会。 ...

    IBM_TSM_调度任务解析

    ### IBM TSM 调度任务解析 #### 一、TSM调度任务基本概念 IBM Tivoli Storage Manager (简称TSM) 是一款强大的数据保护和存储管理解决方案,旨在简化数据备份、恢复以及数据归档的过程。TSM的调度任务功能是其核心...

    Python-huey小型多线程任务队列

    Python中的huey是一个轻量级、灵活且功能强大的任务队列库,主要设计用于处理异步任务和后台作业。huey提供了一种简单的方法来在Python应用中引入多线程和多进程,使得复杂的任务可以被分解为可独立执行的小单元,...

    C++ 简单的任务队列实例

    在IT领域,任务队列是一种常见的编程概念,尤其在多线程和并发编程中发挥着重要作用。本实例将深入探讨C++中实现简单任务队列的方法。任务队列,顾名思义,是用于存储待执行任务的数据结构,通常采用先进先出(FIFO...

    基于三支队列的实时云任务节能调度算法.pdf

    总的来说,【基于三支队列的实时云任务节能调度算法】是一项创新性的研究,它将三支决策理论应用于云任务调度,旨在通过精细化的任务管理和智能的队列策略,以达到降低能耗、提高资源利用率的目标。这种算法对于优化...

Global site tag (gtag.js) - Google Analytics