`
freish
  • 浏览: 83859 次
  • 性别: Icon_minigender_1
  • 来自: 摄影帝国
社区版块
存档分类
最新评论

ScheduledThreadPoolExecutor实现原理

 
阅读更多

自jdk1.5开始,Java开始提供ScheduledThreadPoolExecutor类来支持周期性任务的调度,在这之前,这些工作需要依靠Timer/TimerTask或者其它第三方工具来完成。但Timer有着不少缺陷,如Timer是单线程模式,调度多个周期性任务时,如果某个任务耗时较久就会影响其它任务的调度;如果某个任务出现异常而没有被catch则可能导致唯一的线程死掉而所有任务都不会再被调度。ScheduledThreadPoolExecutor解决了很多Timer存在的缺陷。

先来看看ScheduledThreadPoolExecutor的实现模型,它通过继承ThreadPoolExecutor来重用线程池的功能,里面做了几件事情:

  • 为线程池设置了一个DelayedWorkQueue,该queue同时具有PriorityQueue(优先级大的元素会放到队首)和DelayQueue(如果队列里第一个元素的getDelay返回值大于0,则take调用会阻塞)的功能
  • 将传入的任务封装成ScheduledFutureTask,这个类有两个特点,实现了java.lang.Comparable和java.util.concurrent.Delayed接口,也就是说里面有两个重要的方法:compareTo和getDelay。ScheduledFutureTask里面存储了该任务距离下次调度还需要的时间(使用的是基于System#nanoTime实现的相对时间,不会因为系统时间改变而改变,如距离下次执行还有10秒,不会因为将系统时间调前6秒而变成4秒后执行)。getDelay方法就是返回当前时间(运行getDelay的这个时刻)距离下次调用之间的时间差;compareTo用于比较两个任务的优先关系,距离下次调度间隔较短的优先级高。那么,当有任务丢进上面说到的DelayedWorkQueue时,因为它有DelayQueue(DelayQueue的内部使用PriorityQueue来实现的)的功能,所以新的任务会与队列中已经存在的任务进行排序,距离下次调度间隔短的任务排在前面,也就是说这个队列并不是先进先出的;另外,在调用DelayedWorkQueue的take方法的时候,如果没有元素,会阻塞,如果有元素而第一个元素的getDelay返回值大于0(前面说过已经排好序了,第一个元素的getDelay不会大于后面元素的getDelay返回值),也会一直阻塞。
  • ScheduledFutureTask提供了一个run的实现,线程池执行的就是这个run方法。看看run的源码(本文的代码取自hotspot1.5.0_22,jdk后续版本的代码可能已经不一样了,如jdk1.7中使用了自己实现的DelayedWorkQueue,而不再使用PriorityQueue作为存储,不过从外面看它们的行为还是一样的,所以并不影响对ScheduledThreadPoolExecutor调度机制的理解):

     

    public void run() {
        if (isPeriodic())
            runPeriodic();
        else
            ScheduledFutureTask.super.run();
    }

    如果不是周期性任务就直接执行任务(也就是else部分),这个主要是用于实现ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit),后面会讲到它们的实现,这里先关注周期任务的执行方式。周期性任务执行的是runPeriodic(),看下它的实现:

    private void runPeriodic() {
        boolean ok = ScheduledFutureTask.super.runAndReset();
        boolean down = isShutdown();
        // Reschedule if not cancelled and not shutdown or policy allows
        if (ok && (!down ||
                   (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                    !isTerminating()))) {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
            ScheduledThreadPoolExecutor.super.getQueue().add(this);
        }
        // This might have been the final executed delayed
        // task.  Wake up threads to check.
        else if (down)
            interruptIdleWorkers();
    }

    这里可以看到,先执行了任务本身(ScheduledFutureTask.super.runAndReset),这个调用有一个返回值,来看下它的实现:

    protected boolean runAndReset() {
        return sync.innerRunAndReset();
    }

    跟进去看下innerRunAndReset():

    boolean innerRunAndReset() {
        if (!compareAndSetState(0, RUNNING))
            return false;
        try {
            runner = Thread.currentThread();
            callable.call(); // don't set result
            runner = null;
            return compareAndSetState(RUNNING, 0);
        } catch(Throwable ex) {
            innerSetException(ex);
            return false;
        }
    }

    可以发现,这里需要关注的是第三个return,也就是如果任务执行出现了异常,会被catch且返回false.

    继续看runPeriodic()方法,if里面,如果刚才任务执行的返回值是true且线程池还在运行就在if块中的操作,如果线程池被关闭了就做else if里的操作。也就是说,如果之前的任务执行出现的异常返回了false,那么if里以及else if里的代码都不会执行了,那有什么影响?接下来看看if里做了什么。

    if里的代码很简单,分为两部分,一是计算这个任务下次调度的间隔,二是将任务重新放回队列中。回到出现异常的情况,如果刚才的任务执行出现了异常,就不会将任务再放回队列中,换而言之,也就是这个任务再也得不到调度了!但是,这并不影响其它周期任务的调度。

综上,可以看到,ScheduledThreadPoolExecutor执行周期性任务的模型就是:调度一次任务,计算并设置该任务下次间隔,将任务放回队列中供线程池执行。这里的队列起了很大的作用,且有一些特点:距离下次调度间隔短的任务总是在队首,队首的任务若距离下次调度的间隔时间大于0就无法从该队列的take()方法中拿到任务。

接下来看看ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit)这两个非周期性任务的实现方式,先看看它们的源码:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    ScheduledFutureTask<?> t =
        new ScheduledFutureTask<Boolean>(command, null, triggerTime(delay, unit));
    delayedExecute(t);
    return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    ScheduledFutureTask<V> t =
        new ScheduledFutureTask<V>(callable, triggerTime(delay, unit));
    delayedExecute(t);
    return t;
}
private void delayedExecute(Runnable command) {
    if (isShutdown()) {
        reject(command);
        return;
    }
    // Prestart a thread if necessary. We cannot prestart it
    // running the task because the task (probably) shouldn't be
    // run yet, so thread will just idle until delay elapses.
    if (getPoolSize() < getCorePoolSize())
        prestartCoreThread();
 
    super.getQueue().add(command);
}

实现方式也很简单,在创建ScheduledThreadPoolExecutor内部任务(即ScheduledFutureTask)的时候就将调度间隔计算并设置好,如果当前线程数小于设置的核心线程数,就启动一个线程(可能是线程池刚启动里面还没有线程,也可能是里面的线程执行任务时挂掉了。如果线程池中的线程挂掉了而又没有调用这些schedule方法谁去补充挂掉的线程?不用担心,线程池自己会处理的)去监听队列里的任务,然后将任务放到队列里,在任务执行间隔不大于0的时候,线程就可以拿到这个任务并执行。

周期性任务的入口(ScheduledThreadPoolExecutor#scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)和ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit))与非周期性任务是类似的,它们处理方式不同的地方在于前文说到的ScheduledFutureTask#run()中。

 

更多文章在我的博客:http://www.ticmy.com/

0
0
分享到:
评论

相关推荐

    java 定时器线程池(ScheduledThreadPoolExecutor)的实现

    Java定时器线程池,即`ScheduledThreadPoolExecutor`,是Java并发编程中一个重要的工具,它继承自`ThreadPoolExecutor`并扩展了...理解`ScheduledThreadPoolExecutor`的工作原理,对于提升Java并发编程的能力至关重要。

    Java并发包源码分析(JDK1.8)

    AQS相关应用(CountDownLatch、CyclicBarrier、Semaphore等),executor(ThreadPoolExecutor、ScheduledThreadPoolExecutor、FutureTask等),collection(ConcurrentHashMap、CopyOnWriteArrayList等), ...

    Java多线程学习基础原理

    常用的线程池实现类是ThreadPoolExecutor和ScheduledThreadPoolExecutor。 7. 高级并发API 随着Java的发展,新增加了更多的并发API来简化并增强并发编程能力,比如CompletableFuture、Stream API中的并行流等,这些...

    Java定时任务实现解析.pptx.pptx

    二、定时任务的实现原理 1. **Java的Timer类**:Timer类是Java基础库中的一个工具类,它允许开发者安排任务在未来某个时刻运行或定期执行。通过创建Timer对象和TimerTask子类,可以设置任务的执行时间和周期。 2. ...

    JAVA线程池原理以及几种线程池类型介绍

    3. **ScheduledThreadPoolExecutor**:支持定时任务执行的线程池,可以指定任务执行的延迟时间或周期。 #### 六、线程池的工作队列 在实现线程池时,通常会使用一个工作队列来存储等待执行的任务。这个队列可以是...

    【2018最新最详细】并发多线程教程

    20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解 21.线程池ThreadPoolExecutor实现原理 22.线程池之ScheduledThreadPoolExecutor 23.FutureTask基本操作总结 24.Java中atomic包中的原子操作类...

    JAVA课程学习笔记.doc

    - `java.util.concurrent.ScheduledThreadPoolExecutor`:调度线程池的核心实现,除了执行任务外,还能按预定的时间间隔执行。 4. 辅助类 - `java.util.concurrent.Executors`:提供了一些静态工厂方法,用于创建...

    优秀博文汇总1

    本资源摘要信息涵盖了 Java 线程池的多个方面,包括线程池的实现、ThreadPoolExecutor、ScheduledThreadPoolExecutor 等。 AbstractQueuedSynchronizer AbstractQueuedSynchronizer(AQS)是 Java 语言中的一种...

    定时任务与线程池:并发编程的关键

    首先,文档详细介绍了ScheduledThreadPoolExecutor类的结构和工作原理。这个类用于处理延时任务或定时任务,通过三种不同的任务提交方式:schedule、scheduledAtFixedRate、和scheduledWithFixedDelay。重点介绍了...

    java schedule

    2. `ScheduledExecutorService`和`ScheduledThreadPoolExecutor`的工作原理及优势。 3. 如何使用`java.time`包进行精确的时间日期操作。 4. 如何设计和实现周期性任务,以及如何处理任务取消和中断。 5. 理解线程...

    JavaGuide面试突击版(全).rar

    - volatile关键字:理解其作用和实现原理,以及与synchronized的区别。 - 并发容器:并发集合如ConcurrentHashMap、CopyOnWriteArrayList的使用场景和内部实现。 6. **Spring框架** - IoC与DI:理解控制反转和...

    Executor框架使用详解

    标签中提到的“源码”意味着深入理解Executor框架的实现原理是重要的。例如,理解`ThreadPoolExecutor`如何管理线程、如何处理工作队列溢出、如何应用不同的拒绝策略等,都有助于优化系统性能和调试问题。 至于...

    Java并发编程学习笔记

    1. **线程安全与锁 Synchronized 底层实现原理**: 线程安全是指在多线程环境下,对共享数据的操作始终保持一致性。Synchronized是Java中的关键字,用于实现线程同步,它基于Java虚拟机的Monitor机制,通过对象头的...

    Java面试题以及答案.zip

    理解它们的特性、使用场景及底层实现原理非常重要。面试中可能会要求你分析不同集合的性能差异,并设计相关解决方案。 5. **JVM内存模型**:理解堆、栈、方法区、本地方法栈、程序计数器等区域的分配与回收,以及...

    Java 面试题大全,面试必备

    3. **集合框架**:ArrayList、LinkedList、HashSet、HashMap等集合类的使用及其底层实现原理,了解并发环境下对集合的操作,比如ConcurrentHashMap的线程安全特性。 4. **多线程**:线程的创建方式(实现Runnable...

    最全的JAVA面试题汇总 JAVA面试题精华 内置答案

    - **动态代理**:理解JDK动态代理和CGLIB动态代理的实现原理。 8. **I/O与NIO** - **传统I/O**:掌握FileInputStream、OutputStream、BufferedReader、Writer等流的使用。 - **NIO(New I/O)**:理解非阻塞I/O...

    JAVA并发容器代码随读1

    Delayed 接口用于表示具有延迟属性的元素,ScheduledFutureTask 就是其一个实现,常用于 ScheduledThreadPoolExecutor 的工作队列(DelayedWorkQueue),用于实现定时任务。DelayQueue 的核心在于通过条件变量...

    线程池、任务、任务组、任务池,定时任务的类库-hy.common.tpool.zip

    ExecutorService是线程池的基本服务接口,ThreadPoolExecutor是其具体实现,用于管理一组可重用的工作线程,而ScheduledThreadPoolExecutor则支持定时及周期性执行任务。 任务(Task)是在线程池中执行的工作单元,...

    Android Executor线程池

    Android开发者通常会使用`ThreadPoolExecutor`或`ScheduledThreadPoolExecutor`这两个实现了`ExecutorService`的类。 三、`ThreadPoolExecutor`详解 `ThreadPoolExecutor`是`ExecutorService`的一个具体实现,它...

Global site tag (gtag.js) - Google Analytics