`

java定时任务之ScheduledThreadPoolExecutor

阅读更多

Java定时任务

 

定时任务:在日常开发中,经常会遇到延迟任务和周期性任务问题。解决这类问题,一般有以下几种途径:spring+Quartzjava Timerspring3.0以后的scheduled task

 

首先来看java Timerjava ScheduledThreadPoolExecutor,在jdk1.5推出ScheduledThreadPoolExecutor后,Timer几乎被完全替代。主要原因是Timer有三个大的缺陷:

1、不能捕获任务执行中的异常,如果出现一次异常整个定时任务执行将停止执行;

2Timer是基于Date时钟的,在分布式环境下由于机器的时钟不一致会导致一些致命的问题;

3Timer只会创建一个线程,存在任务丢失问题(任务执行时间超过一个周期时)。而ScheduledThreadPoolExecutor巧好能解决上述3个问题:

1ScheduledThreadPoolExecutor可以捕获异常,当然如果不捕获异常 该线程会中止;

2ScheduledThreadPoolExecutor是基于相对时间;

3ScheduledThreadPoolExecutor本质上是线程池,并可以保证每次任务都会执行。

ScheduledThreadPoolExecutor也是本次分享的主角。

 

再来看下跟spring相关的两种方式:spring+ Quartzspring scheduled task。其实在这两种方式中,spring只是起到整合作用,真实做事的是Quartz框架、以及java ScheduledThreadPoolExecutor。与spring整合Quartz一样,spring scheduled task只是对ScheduledThreadPoolExecutor进行了包装,简单的说就是可以实现配置化或注解方式 透明的使用ScheduledThreadPoolExecutor

 

对于Quartz框架,个人觉得有点鸡肋,相对于ScheduledThreadPoolExecutor 它的优势是能实现分布式集群下的定时任务调度。但使用起来确实不是很方便,需要创建10多张数据库表。建议分布式环境下可以直接使用一些开源的分布式调度框架,比如当当的elastic-job,当然也可以选择更轻量的实现SkySchedule(关于SkySchedule相关原理以及使用方法,可以点击这里)。

 

如果不需要分布式调度,只是简单的实现定时任务,使用spring scheduled task会轻量很多,它本质上是基于java ScheduledThreadPoolExecutor实现无需引入第三方包。而且还支持注解使用,非常简单方便:

Spring xml配置:

    <task:executor id="testExecutor" pool-size="5" /> //创建核心线程数为5的线程池
<task:annotation-driven executor="testExecutor" />//启动线程池
 

 

Java注解:

@Component
public class TestBussiness {
    //@Scheduled(cron = "* * 1 * * *")
    @Scheduled(fixedRate=10000)//每隔10秒
    public void doJob(){
        System.out.println("1111111");
    }
}
 

 

综上所述:java Timer几乎被遗弃,spring+ Quartz略显鸡肋(不排除有特殊使用场景);

使用ScheduledThreadPoolExecutor相对来说是最好的选择,如果项目中使用的spring 3.0以上建议直接使用spring scheduled task,但它本质上只是对ScheduledThreadPoolExecutor进行了简单的包装。所以要了解java中定时任务的实现原理,还得从ScheduledThreadPoolExecutor说起。

 

ScheduledThreadPoolExecutor简单介绍

 

首先来看ScheduledThreadPoolExecutor的类定义:

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {
//省略实现代码
}

 

可以看到它继承了ThreadPoolExecutor,说明它本质上也是一个线程池(想了解ThreadPoolExecutor详细介绍,请点击这里)。另它还实现了ScheduledExecutorService接口,这个接口中的方法 其实就是ScheduledThreadPoolExecutor相对于父类ScheduledExecutorService来说,要自己定制的个性化方法,也就是ScheduledThreadPoolExecutor实现“延迟任务和周期性任务”的核心方法,一共4个方法:

//延迟任务 执行方法,参数为Runnable类型对象
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
 
    //延迟任务 执行方法,参数为Callable类型对象
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
    //固定频率 执行方法,时间到了以后就执行
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
 
    //固定延迟周期 执行方法,上次执行完成后固定等待延迟时间后 再执行
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

 

4个方法都是提交任务方法,相当于ThreadPoolExecutorexecute、submit方法;并且都有返回值,类型为ScheduledFuture,相当于普通线程池的Future,可以用于控制任务生命周期。

 

12个是延迟任务,即延迟固定period时间后,执行任务。区别是参数不同;

34个是周期性任务,scheduleAtFixedRate是固定频率,任务时间到后就立即执行,等待周期分两种情况:如果程序的执行时间大于间隔时间,等待周期为执行时间,如果程序的执行时间小于间隔时间等待周期为间隔时间;scheduleWithFixedDelay是固定周期,不过执行任务需要花多长时间,下次执行必须等待固定delay时间。

 

了解了这几个方法的作用,使用ScheduledThreadPoolExecutor就已经不成问题了。下面来看具体实现原理。

 

ScheduledThreadPoolExecutor实现原理

 

构造方法

ScheduledThreadPoolExecutor4个构造方法,本质上都是调用父类ThreadPoolExecutor的构造方法创建线程池,只是使用的任务队列是DelayedWorkQueue而已:

 

//指定核心线程数,最大线程数不做限制
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
}
 
//指定核心线程数 和拒绝策略
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
}
 
//指定核心线程数,创建线程工厂类
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
}
 
//指定核心线程数,创建线程工厂类,拒绝策略
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
}

 

构造方法都很简单,只是需要注意的是最大线程数都没有做线程。

 

提交Callable任务schedule方法

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        //把Callable对象包装成ScheduledFutureTask对象
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        //提交任务
        delayedExecute(t);
        return t;
    }
 

 

这个方法与上次总结FutureTask(点击这里http://moon-walker.iteye.com/blog/2407394)中,AbstractExecutorServicesubmit方法类似。首先把callable对象包装成ScheduledFutureTask对象,ScheduledFutureTask继承自FutureTask,同时实现了RunnableScheduledFuture接口。所以这里可以把ScheduledFutureTask对象作为一个任务对象提交(Runnable),同时也可以作为返回值(ScheduledFuture),关系有点复杂 上个类图有助于理解:



 

 

关系有点复杂,但我们只需要知道ScheduledFutureTask从根本上实现了三个接口:FutureRunnableDelayed,即上图标黄的部分。另外还持有一个Callable的成员变量对象,即上面说的把Callable对象包装成ScheduledFutureTask对象。

 

ScheduledFutureTask相对于FutureTask而言,多实现了一个Delayed接口,这个是关键。因为ScheduledThreadPoolExecutor 的任务队列是DelayedWorkQueue,该队列要求放入的对象必须是实现了Delayed接口的RunnableScheduledFuture类型,而ScheduledFutureTask刚好实现了RunnableScheduledFuture。放入队列的入口就是delayedExecute方法。

 

delayedExecute方法

schedule方法包装好任务对象后,会调用delayedExecute方法,把任务放入任务队列,下面看下这个方法实现:

private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())//线程池已关闭,执行拒绝策略
            reject(task);
        else {
            super.getQueue().add(task);//调用任务队列的add方法加入队列
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false); //取消任务
            else
                ensurePrestart();//是否需要创建新线程
        }
    }

 

这里主要的操作就是super.getQueue().add(task),调用“任务队列”的add方法把任务放入队列。这个队列在构造方法中,已经可以看出来了,是DelayedWorkQueue。如何实现延迟任务,核心操作都在DelayedWorkQueue中实现。

 

内部类DelayedWorkQueue

DelayedWorkQueue主要维护了一个由二叉堆算法实现的数组,关于二叉堆算法可以参考这篇文章https://www.cnblogs.com/skywang12345/p/3610390.html(最小堆),这里就不细讲。简单的理解就是在调用add方法插入队列时,采用二叉堆算法保证第一个元素是最小值。在这里其实就是Delay时间最短的值。

 

线程池获取任务执行时,是调用的take方法,这是一个阻塞方法,由于二叉堆算法的缘故,每次取头结点即可,该任务是延时最小的任务。具体要阻塞多久呢,调用的是任务对象的getDelay方法。来看下take方法实现:

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    //取出头结点
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();//队列为空就阻塞
                    else {
                        //获取头结点的,延时时长
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);//取出任务,并重写计算二叉堆
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();//阻塞等待其他线程take完成
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay); //阻塞等待delay时间
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();//继续唤醒其他线程take
                lock.unlock();
            }
        }

 

结合给出的注释,应该很好理解了。任务对象的getDelay方法又是在什么地方定义呢?任务类型为RunnableScheduledFuture,前面提到过具体的实现是ScheduledFutureTaskgetDelay实际就是Delayed接口定义的方法,Delayed又继承了Comparable接口,该接口中定义了compareTo方法,二叉堆算法就是利用这个方法比较进行排序的。也就是说ScheduledFutureTask应该同时实现了getDelay方法和compareTo方法。

 

ScheduledFutureTask实现原理

 

ScheduledFutureTask有几个重载的构造方法,这里就不列出来了,主要作用就是初始化两个重要的成员变量:

//任务下一次执行间隔时间
//初始延迟时间 和 period计算得到
private long time;
//传入间隔时间
private final long period;
 

 

getDelay方法

上面已经讲过,主要用于计算执行下次任务阻塞时间:

public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }
 

 

compareTo方法

上面讲过,用于二叉堆算法计算时进行排序:

public int compareTo(Delayed other) {
            //省略其他代码
 
            //这句是关键,比较的其实就是getDelay方法返回的大小
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

 

compareTo本质上比较的就是getDelay方法的返回值。

 

run方法

线程池中执行任务时,最终会调用任务的run方法,看下ScheduledFutureTaskrun方法:

public void run() {
            boolean periodic = isPeriodic();//判断是不是 周期性任务
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)//如果只是延时任务,只需执行一次
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();//周期性任务,需要计算下个周期执行时间
                reExecutePeriodic(outerTask);//把任务重新放入队列
            }
        }
 
private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }
 

 

run方法主要的逻辑就是执行延时任务,或者执行周期性任务。如果是周期性任务,需要调用setNextRunTime方法设置下次执行时间,并且把任务重新放入队列。至此周期性任务就可以一直循环执行下去了。

 

ScheduleThreadPoolExecutor的另外三个提交任务方法

前面只讲了scheduleCallable参数类型方法,另外scheduleRunnable参数类型方法只是包装成ScheduledFutureTask对象有点细微的区别,这里就不列出来了。

 

另外scheduleAtFixedRatescheduleWithFixedDelay方法,的区别也只是在包装ScheduledFutureTask对象时,构造period成员变量有区别:scheduleAtFixedRate使用的是unit.toNanos(period)scheduleWithFixedDelay使用的是unit.toNanos(-delay)。在执行setNextRunTime方法获取下一次执行间隔时间时,就有差别了。

 

另外 延时任务和周期性任务的区别其实就在run方法中,前面已经讲过。至此ScheduleThreadPoolExecutor的核心实现代码分析完毕。

 

总结

 

 

ScheduleThreadPoolExecutor本质上还是线程池,只是使用了DelayedWorkQueue作为任务队列。对应周期性任务而已,只是每次执行完成后,重新计算下一次执行时间,然后再重新放入队列而已。如果出现异常,没有捕获的话,就无法再次放入队列,周期性任务也就无法继续执行了。

 

另外我们可以通过Executors框架提供的静态方法快速的创建ScheduleThreadPoolExecutor定时任务线程池:Executors.newScheduledThreadPool(int coresize);

 

 

 

 

  • 大小: 40.2 KB
0
0
分享到:
评论

相关推荐

    java定时任务调度

    Java定时任务调度是Java开发中常见的一种功能,用于在特定时间执行特定的任务,例如数据同步、日志清理、报表生成等。在Java中,有多种实现定时任务调度的方式,包括但不限于Java内置的`java.util.Timer`类、Spring...

    java定时任务开源案例

    Java定时任务是软件开发中一个不可或缺的特性,它允许程序在特定时间执行预定的任务,而无需用户干预。在Java世界里,实现定时任务的方式多种多样,包括但不限于使用Java内置的`java.util.Timer`和`java.util....

    java定时任务

    Java定时任务是Java开发中一个重要的功能模块,它允许程序按照预定的时间间隔或者特定时间点执行特定的任务。在企业级应用开发中,定时任务被广泛用于数据同步、报表生成、邮件发送、缓存清理等场景。Java提供了多种...

    Java自带定时任务ScheduledThreadPoolExecutor实现定时器和延时加载功能

    总的来说,`ScheduledThreadPoolExecutor`是Java中实现定时任务和延时加载的强大工具,具有高度的灵活性和控制力,是现代Java应用中的首选定时任务组件。合理利用它可以提升系统的自动化程度和效率。

    Java定时任务实现解析.pptx.pptx

    Java定时任务是编程领域中一个重要的概念,它允许开发者安排任务在特定时间点或按预设周期执行。这种功能在各种应用中都有广泛用途,比如系统维护、数据处理和用户行为分析。本篇将深入解析Java定时任务的原理、常见...

    JAVA定时任务

    Java定时任务是Java编程中一个重要的概念,它允许开发者在特定的时间间隔或特定时间执行某项任务,这对于系统维护、数据备份、定时提醒等场景非常有用。本篇将围绕Java定时任务这一主题展开,深入探讨相关知识。 1....

    java 定时重启服务工具

    在提供的压缩包文件中,"RbootTool.zip"可能是服务重启工具的主程序,它可能包含了实现定时任务的核心逻辑,如使用上述的Java定时机制,并且具备服务监控和重启的接口。而"TimerJob.rar"可能包含了具体的定时任务...

    java 定时器线程池(ScheduledThreadPoolExecutor)的实现

    通过灵活的构造函数和调度方法,开发者可以轻松创建复杂的定时任务执行逻辑,从而满足各种应用场景的需求。理解`ScheduledThreadPoolExecutor`的工作原理,对于提升Java并发编程的能力至关重要。

    java定时器(定时任务)

    Java定时器,也被称为Java定时任务,是Java编程语言中用于执行预定任务的重要工具。它允许程序员设置在未来某个时间点或按照一定周期执行特定代码,从而实现自动化操作。在Java中,有两个主要类用于实现定时任务:`...

    定时程序-java版

    `ScheduledThreadPoolExecutor`是`ScheduledExecutorService`的一个实现,可以创建一个线程池来执行定时任务。 8. **优缺点**:`Timer`简单易用,但当多个任务相互冲突时,其调度可能会出现问题。而`...

    Java语言定时调度任务之实现.pdf

    线程池调度在Java中并不是直接提供的一种定时调度方式,但可以通过自定义线程池配合定时任务来实现。线程池是一种资源池化技术,用来管理一组同构的工作线程,可以有效地执行异步任务。Java的Executor框架提供了...

    WebService 定时任务 任务预警

    对于定时任务,Java提供了一套标准API,名为ScheduledExecutorService,它属于java.util.concurrent包。开发者可以创建一个ScheduledThreadPoolExecutor实例,然后通过其scheduleAtFixedRate或...

    Java实现终止线程池中正在运行的定时任务

    Java实现终止线程池中正在运行的定时任务 Java中实现终止线程池中正在运行的定时任务是Java多线程编程中一个常见的问题。本篇文章将详细介绍如何实现终止线程池中正在运行的定时任务,并提供相应的代码示例。 首先...

    定时自动关机 JAVA

    总结起来,实现Java定时自动关机涉及到以下关键点: 1. 使用`ScheduledExecutorService`来定时执行任务。 2. 通过`Runtime.exec()`调用操作系统命令实现关机功能。 3. 考虑跨平台兼容性,为不同操作系统编写不同的...

    定时的爱-完成定时任务的加载和发送

    在Java编程领域,定时任务是不可或缺的一部分,它允许开发者安排任务在特定时间点或周期性地执行。"定时的爱-完成定时任务的加载和发送"这个标题暗示了我们正在探讨的是如何在Java环境中实现定时任务的加载和调度,...

    定时任务与线程池:并发编程的关键

    深入分析了Java中的并发编程,特别是关于定时任务和定时线程池的使用。首先,文档详细介绍了ScheduledThreadPoolExecutor类的结构和工作原理。这个类用于处理延时任务或定时任务,通过三种不同的任务提交方式:...

    SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)推荐

    在Java开发领域,有多种方法可以实现定时任务,这些方法在不同场景下各有优缺点。 1. **JDK自带的定时工具**: - `Timer`:是JDK提供的一个基础定时调度类,适合简单的按频率执行的任务,功能较为有限。 - `...

    线程池、任务、任务组、任务池,定时任务的类库-hy.common.tpool.zip

    Java的ScheduledExecutorService提供了定时任务的支持,可以通过它的scheduleAtFixedRate()或scheduleWithFixedDelay()方法来安排周期性的任务。 在`hy.common.tpool`这个库中,可能包含了一套自定义的线程池实现,...

    yyblog-master.zip

    【标题】"yyblog-master.zip" 是一个包含Java定时任务示例代码的压缩包,它可能是一个博客系统项目,其中的“master”分支被压缩为了这个文件。在Java开发中,定时任务通常用于自动化执行一些周期性的后台任务,如...

Global site tag (gtag.js) - Google Analytics