`
yychao
  • 浏览: 98400 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ScheduledThreadPoolExecutor源码分析

 
阅读更多

ScheduledThreadPoolExecutor实现:

 

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {

    /**
     * False if should cancel/suppress periodic tasks on shutdown.
     */
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;

    /**
     * False if should cancel non-periodic tasks on shutdown.
     */
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;



    /**
     * Creates a new ScheduledThreadPoolExecutor with the given core
     * pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

    /**
     * Creates a new ScheduledThreadPoolExecutor with the given
     * initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @param threadFactory the factory to use when the executor
     * creates a new thread
     * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
     * @throws NullPointerException if threadFactory is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                             ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    /**
     * Creates a new ScheduledThreadPoolExecutor with the given
     * initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @param handler the handler to use when execution is blocked
     * because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
     * @throws NullPointerException if handler is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                              RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }
}
  

ScheduledThreadPoolExecutor构造参数:

 

  1. corePoolSize: 任务执行线程池大小
  2. RejectedExecutionHandler: 任务拒绝策略,当线程池shutdown时,任务处理策略
  3. DelayedWorkQueue: 无界延迟队列,提交任务都加入队列中,由队列实现延迟执行功能
  4. MaximumPoolSize:  由于DelayedWorkQueue为无界队列,所以该值没有意义

提交任务:

 

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        schedule(command, 0, TimeUnit.NANOSECONDS);
    }

    // Override AbstractExecutorService methods

    public Future<?> submit(Runnable task) {
        return schedule(task, 0, TimeUnit.NANOSECONDS);
    }

    public <T> Future<T> submit(Runnable task, T result) {
        return schedule(Executors.callable(task, result),
                        0, TimeUnit.NANOSECONDS);
    }

    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, TimeUnit.NANOSECONDS);
    }

 ExecutorService的任务提交方式都有schedule(task,0,TimeUnit. NANOSECONDS)以延迟时间为0任务实现

 

任务计划执行方法:

 

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));

        delayedExecute(t);
        return t;
    }

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
	   			       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Object>(command,
                                            null,
                                            triggerTime(initialDelay, unit),
                                            unit.toNanos(period)));
        delayedExecute(t);
        return t;
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Boolean>(command,
                                             null,
                                             triggerTime(initialDelay, unit),
                                             unit.toNanos(-delay)));
        delayedExecute(t);
        return t;
    }

 任务计划执行步骤:

 

  1. 封装任务:decorateTask()方法封装后,返回RunnableScheduledFuture
  2. 延迟执行:delayedExecute()延迟任务执行

 

    /**
     * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
     */
    private void delayedExecute(Runnable command) {
        if (isShutdown()) {
            reject(command);
            return;
        }
        // Prestart a thread if necessary. We cannot prestart it
        // running the task because the task (probably) shouldn't be
        // run yet, so thread will just idle until delay elapses.
        if (getPoolSize() < getCorePoolSize())
            prestartCoreThread();

        super.getQueue().add(command);
    }

 延迟执行步骤:

 

  1. 线程池关闭(runState!=RUNNING),由拒绝策略决定
  2. 否则,线程池小于核心线程池大小,启动新线程
  3. 添加任务到等待队列

 

ScheduledThreadPoolExecutor线程池关闭

 

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted. If the
     * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
     * been set <tt>false</tt>, existing delayed tasks whose delays
     * have not yet elapsed are cancelled. And unless the
     * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
     * been set <tt>true</tt>, future executions of existing periodic
     * tasks will be cancelled.
     */
    public void shutdown() {
        cancelUnwantedTasks();
        super.shutdown();
    }

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @return list of tasks that never commenced execution.  Each
     * element of this list is a {@link ScheduledFuture},
     * including those tasks submitted using <tt>execute</tt>, which
     * are for scheduling purposes used as the basis of a zero-delay
     * <tt>ScheduledFuture</tt>.
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        return super.shutdownNow();
    }

shutdownNow()方式与ThreadPoolExecutor一样

 

shutdown()方法:不在接收新提交任务

 

  1. 先调用cancelUnwantedTasks(),决定未执行任务时候继续执行
  2. 调用父类ThreadPoolExecutor方法

 

    /**
     * Cancels and clears the queue of all tasks that should not be run
     * due to shutdown policy.
     */
    private void cancelUnwantedTasks() {
        boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
        boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
        if (!keepDelayed && !keepPeriodic)
            super.getQueue().clear();
        else if (keepDelayed || keepPeriodic) {
            Object[] entries = super.getQueue().toArray();
            for (int i = 0; i < entries.length; ++i) {
                Object e = entries[i];
                if (e instanceof RunnableScheduledFuture) {
                    RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
                    if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
                        t.cancel(false);
                }
            }
            entries = null;
            purge();
        }
    }

等待队列中任务处理取决于:

 

 

  1. executeExistingDelayedTasksAfterShutdown : 执行已经存在与延迟队列任务
  2. continueExistingPeriodicTasksAfterShutdown : 继续执行周期任务

 

 

 

ScheduledFutureTask任务实现:

    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
        /** Sequence number to break ties FIFO */
        private final long sequenceNumber;
        /** The time the task is enabled to execute in nanoTime units */
        private long time;
        /**
         * Period in nanoseconds for repeating tasks.  A positive
         * value indicates fixed-rate execution.  A negative value
         * indicates fixed-delay execution.  A value of 0 indicates a
         * non-repeating task.
         */
        private final long period;

        /**
         * Creates a one-shot action with given nanoTime-based trigger time.
         */
        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        /**
         * Creates a periodic action with given nano time and period.
         */
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        /**
         * Creates a one-shot action with given nanoTime-based trigger.
         */
        ScheduledFutureTask(Callable<V> callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }
     //...
    }

任务继承自FutureTask,同时实现了RunnableScheduledFuture接口(任务延迟执行时间getDelay()/任务周期之执行isPeriodic())  

 

实例变量:

 

 

  1. time:任务延迟执行时间
  2. period:任务重复执行周期;正值:固定频率执行;负值:定时延迟执行;零: 不重复执行

任务执行:

 

        public void run() {
            if (isPeriodic())
                runPeriodic();
            else
                ScheduledFutureTask.super.run();
        }

非周期任务:直接调用FutureTask.run()方法执行 

周期任务:调用runPeriodic()周期执行

 

        private void runPeriodic() {
            boolean ok = ScheduledFutureTask.super.runAndReset();
            boolean down = isShutdown();
            // Reschedule if not cancelled and not shutdown or policy allows
            if (ok && (!down ||
                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                        !isStopped()))) {
                long p = period;
                if (p > 0)
                    time += p;
                else
                    time = triggerTime(-p);
                ScheduledThreadPoolExecutor.super.getQueue().add(this);
            }
            // This might have been the final executed delayed
            // task.  Wake up threads to check.
            else if (down)
                interruptIdleWorkers();
        }

 runPeriodic实现:

 

  1. 首先调用runAndReset(),保证任务重复执行(任务执行后,重置state=0)
  2. 根据runAndReset返回状态,以及线程池是否关闭或者线程池shutdown且getContinueExistingPeriodicTasksAfterShutdownPolicy()是否继续执行已有中期任务策略;将任务放入任务队列
  3. 否则,若是最后延迟任务,则唤醒空闲线程检查

 

 

DelayedWorkQueue实现:

 

包装了DelayQueue作为实现

 

 

 

分享到:
评论

相关推荐

    Java并发包源码分析(JDK1.8)

    Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...

    jdk1.8-source:JDK1.8源码分析包

    JDK1.8源码分析 相关的原始码分析结果会以注解的形式体现到原始码中 已完成部分: ReentrantLock CountDownLatch Semaphore HashMap TreeMap LinkedHashMap ConcurrentHashMap 执行器 ...

    java技术指南

    文档内容丰富,既包括了Java的基本语法、源码分析、多线程处理、IO流操作、设计模式、常用框架、数据库技术、数据结构与算法、JVM原理、Web开发技术,也包括了Linux操作系统、Redis数据库、UML绘图以及JDK的新特性等...

    JAVA课程学习笔记.doc

    7. 异步结果源码分析 异步结果通常通过 `Future` 接口获取,它提供了获取任务结果、判断任务是否完成、取消任务等功能。`ScheduledThreadPoolExecutor` 的 `delayedExecute()` 方法中,如果任务被取消,会调用 `task...

    JUC+课程源码+线程操作

    常见的实现类有`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`,前者用于执行提交的任务,后者则支持定时及周期性任务。 2. `Semaphore`:信号量,用于控制同时访问特定资源的线程数量,通过acquire()获取一...

    JAVA定时任务

    在实际开发中,源码分析对于理解定时任务的工作原理至关重要。例如,通过阅读`ScheduledThreadPoolExecutor`的源码,我们可以了解其调度策略和线程池的管理方式。同时,利用IDE(如IntelliJ IDEA或Eclipse)的调试...

    java并发编程实战源码,java并发编程实战pdf,Java源码.zip

    通过阅读本书的源码,你可以更深入地理解并发编程的实现细节,而PDF版则提供了清晰的理论解释和实例分析。无论你是初学者还是经验丰富的开发者,这个压缩包都是一份宝贵的参考资料,能助你在Java并发编程的道路上...

    java精通程度应该会什么

    #### 六、JDK源码分析 深入理解JDK源码可以帮助开发者更好地理解Java语言的底层实现。 - **重要源码分析**: - List、Map、Set等集合类的实现细节。 - 多线程相关的类如`Thread`、`Lock`等。 - I/O相关的类如`...

    优秀博文汇总1

    【Tomcat源码分析】 Tomcat是Java应用服务器,其源码剖析有助于理解类加载器的工作原理以及如何处理HTTP请求。了解Tomcat的内部机制对于优化Web应用性能和排查问题非常有帮助。 【NIO与Netty】 非阻塞I/O (NIO) ...

    注释java源码-jconcurrency:&lt;>本书由BrianGoetz/TimPeierls/JoshuaBloch..编写,源代码用中文

    这本书结合了丰富的源码分析,旨在帮助读者理解和掌握Java多线程与并发机制的核心概念。 在Java编程中,并发处理是构建高性能、可扩展应用的关键技术之一。Java并发库(java.util.concurrent)提供了一系列高效、...

    java面试题

    9. **源码分析**: - 对于面试中常问的开源库,如Spring、MyBatis等,理解其核心原理和工作流程。 10. **工具**: - Maven或Gradle构建工具的使用,理解它们的项目管理和依赖管理功能。 - JUnit测试框架,单元...

    Java性能调优指南.pptx

    了解JDK源码对于优化至关重要,例如,通过研究`ScheduledThreadPoolExecutor`的源码,可以发现其任务取消机制的细节,进而理解为何任务未从队列中移除。 **结论** Java性能调优是一个涉及多个层面的过程,需要结合...

    java 手术任务(线程池)

    在Java编程中,线程池是一种管理线程的机制,它可以帮助我们更有效地调度和执行并发任务。...结合`SurgeryThreadPool.java`源码分析和`Java.jpg`中的示例,我们可以进一步理解线程池在实际项目中的具体实现和应用。

    Java进阶知识点汇总.pdf

    ### Java进阶知识点详解 #### 第一章:基本语法 ##### 关键字 - **static**:用于定义...以上是对Java进阶知识点的详细解析,覆盖了基本语法、JDK源码分析等多个方面,有助于深入理解Java语言的核心机制及高级特性。

    Java线程编程学习笔记(二)

    在Java编程中,线程是...这可能包括分析线程安全的代码示例,例如使用Collections.synchronizedList()、ConcurrentHashMap等线程安全的数据结构,或者理解线程池的工作原理,如ExecutorService、ThreadPoolExecutor和...

    SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)推荐

    源码分析显示,Spring Task默认使用了`ScheduledThreadPoolExecutor`,并设置核心线程数为1,这意味着所有任务会在同一线程中依次执行。`ScheduledThreadPoolExecutor`结合`DelayedWorkQueue`实现了任务的延迟或定时...

    java并发编程实战.zip

    7. **并发集合**:分析了并发环境下集合类的使用,如ConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueue等,它们是如何实现线程安全的。 8. **并发设计模式**:介绍了生产者消费者模式、读写锁模式、...

    任务管理

    在IT行业中,任务管理是一项至关重要的技能,尤其是在软件开发领域。Java作为一种广泛应用的...在`task-management-main`项目中,很可能包含了这些概念的实际应用,通过源码分析可以深入理解如何在Java中实现任务管理。

    多线程

    而“工具”可能是指一些辅助进行多线程调试、分析性能的工具,如JConsole或VisualVM,它们可以帮助开发者监控线程状态,找出死锁、竞态条件等问题。 从压缩包内的文件名来看,我们涵盖了多个与Java多线程相关的主题...

Global site tag (gtag.js) - Google Analytics