`
tanliwei
  • 浏览: 49582 次
  • 性别: Icon_minigender_1
  • 来自: 中国
社区版块
存档分类
最新评论

通过源码分析Java开源任务调度框架Quartz的主要流程

    博客分类:
  • java
 
阅读更多

通过源码分析Java开源任务调度框架Quartz的主要流程

从使用效果、调用链路跟踪、E-R图、循环调度逻辑几个方面分析Quartz。

可下载github项目地址一起参照学习: https://github.com/tanliwei/spring-quartz-cluster-sample , 在项目中增加了日志输出

 

系统说明:

IDE: IntelliJ

JDK:1.8

Quartz:2.2.1

 

使用效果

1.从github项目https://github.com/tanliwei/spring-quartz-cluster-sample中,拉取项目到本地,导入IDEA。

2.本文采用Mysql数据库。

    请执行 resources/scripts/tables_mysql_innodb.sql

3.修改jdbc.properties中数据库配置

 

4.通过IDEA, Edit Configurations -> Add Tomcat Server, 部署到Tomcat

 

 

暴露的Restful 接口 /say-hello.do 以及添加好任务后的调用效果:


 

 

添加任务

在tomcat启动成功后,在首页点击“添加任务”,添加如下任务:


 

 

代码执行逻辑在SyncJobFactory类中,从Output中可以看到执行的输出信息,

调用链跟踪的最后会回到这个类来。

 

现在开始跟踪调用链路。 

 

IDEA 快捷键:
进入方法:  Ctrl + 鼠标左键
光标前进/后退: Ctrl + Shirt + 右方向键/左方向键
 
一、 调用链路跟踪

从配置文件applicationContext.xml配置中找到任务调度核心类SchedulerFactoryBean

 resources/applicationContext.xml

<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
...
</bean>
 

 

 

使用IDEA快捷键,点击进入SchedulerFactoryBean类,它实现了InitializingBean接口,

在Spring中凡是实现了InitializingBean接口的Bean,都会在Bean属性都设置完成后调用afterPropertiesSet()方法.

 SchedulerFactoryBean.java

//---------------------------------------------------------------------
// Implementation of InitializingBean interface
// 实现 InitializingBean 接口
//---------------------------------------------------------------------
public void afterPropertiesSet() throws Exception {
	//...
	// Create SchedulerFactory instance.
	// 创建 SchedulerFactory 调度器工厂实例
	SchedulerFactory schedulerFactory = (SchedulerFactory)
			BeanUtils.instantiateClass(this.schedulerFactoryClass);
	initSchedulerFactory(schedulerFactory);
	//...
	// Get Scheduler instance from SchedulerFactory.
	// 通过调度器工厂 获取 调度器实例
	try {
		this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
	//...
}

 

 

 SchedulerFactoryBean.java

/**
 * Create the Scheduler instance for the given factory and scheduler name.
 * 通过制定工厂和调度器名称创建调度器实例
 * Called by {@link #afterPropertiesSet}.
 * <p>The default implementation invokes SchedulerFactory's <code>getScheduler</code>
 * method. Can be overridden for custom Scheduler creation.
 */
protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
		throws SchedulerException {
	//...
	try {
		SchedulerRepository repository = SchedulerRepository.getInstance();
		synchronized (repository) {
			Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
			Scheduler newScheduler = schedulerFactory.getScheduler();
			if (newScheduler == existingScheduler) {
				throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
						"in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
			}
			//...
}

 

 这个项目走的逻辑是 StdSchedulerFactory.getScheduler()方法,可自行debug。

 StdSchedulerFactory.java

/**
 * Returns a handle to the Scheduler produced by this factory.
 * 返回该工厂创造的调度器的句柄
 */
public Scheduler getScheduler() throws SchedulerException {
	if (cfg == null) {
		initialize();
	}

	SchedulerRepository schedRep = SchedulerRepository.getInstance();

	Scheduler sched = schedRep.lookup(getSchedulerName());
	//...
	sched = instantiate();
	return sched;
}

 

StdSchedulerFactory.java

private Scheduler instantiate() throws SchedulerException {
    //...
    //大量的配置初始化、实例化代码
    //...
    //第1298行代码
    qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
    //...
}

 

QuartzScheduler.java

/**
 * Create a <code>QuartzScheduler</code> with the given configuration
 * 根据给定的配置 创建Quartz调度器
 */
public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }
        //private QuartzSchedulerThread schedThread;
        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        //通过线程池执行 Quartz调度器线程
        schedThreadExecutor.execute(this.schedThread);
        //...
}

 

 QuartzSchedulerThread.java

/**
 * <p>
 * The main processing loop of the <code>QuartzSchedulerThread</code>.
 * Quartz调度器线程的主循环逻辑
 * </p>
 */
@Override
public void run() {
	//while循环执行,只要调度器为被暂停
	while(!halted.get()){
						JobRunShell shell = null;
						try {
							shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
							shell.initialize(qs);
						}
						if (qsRsrcs.getThreadPool().runInThread(shell) == false){}

	}
}

 

 

 JobRunShell.java

public void run() {
		//...
		Job job = jec.getJobInstance();
		//...
		try {
			log.debug("Calling execute on job " + jobDetail.getKey());
			//执行
			job.execute(jec);
			endTime = System.currentTimeMillis();
		}
		//...
		//更新Trigger触发器状态,删除FIRED_TRIGGERS触发记录
		instCode = trigger.executionComplete(jec, jobExEx);
		//...
}

 

QuartzJobBean.java

/**
 * This implementation applies the passed-in job data map as bean property
 * values, and delegates to <code>executeInternal</code> afterwards.
 * 这个实现 把传入的map数据作为bean属性值,然后委托给 executeInternal 方法
 */
public final void execute(JobExecutionContext context) throws JobExecutionException {
	try {
	//执行
	executeInternal(context);
}

 

  SyncJobFactory.java

//回到了我们的业务类SyncJobFactory的executeInternal方法,
//里面执行我们的业务代码
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
	try {
		LOG.info("SyncJobFactory execute" + IPAddressKowalski.getIpAddressAndPort() + " port:"+IPAddressKowalski.getTomcatPort());
	}
	//...
	System.out.println("jobName:" + scheduleJob.getJobName() + "  " + scheduleJob);
	//...
}

 

 二、E-R图

梳理6张主要的Quartz表:

 

 

 

QRTZ_TRIGGERS 触发器表

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_JOB_DETAILS表SCHED_NAME外键

    JOB_NAME,任务名。自定义值。 联合主键,QRTZ_JOB_DETAILS表JOB_NAME外键

    JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_JOB_DETAILS表JOB_GROUP外键

    TRIGGER_STATE,触发器状态: WAITING , ACQUIRED, BLOCKING

    NEXT_FIRE_TIME, 下次触发时间:

    MISFIRE_INSTR,执行失败后的指令,

        非失败策略 MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1; 

        失败策略 MISFIRE_INSTRUCTION_SMART_POLICY = 0;

    TRIGGER_TYPE, 触发器类型,例如CRON,cron表达式类型的触发器

    PRIORITY,优先级

 

QRTZ_CRON_TRIGGERS cron类型触发器表

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。 联合主键,QRTZ_TRIGGERS表SCHED_NAME外键

    JOB_NAME,任务名。自定义值。 联合主键,QRTZ_TRIGGERS表JOB_NAME外键

    JOB_GROUP,任务组。 自定义值。联合主键,QRTZ_TRIGGERS表JOB_GROUP外键

    CRON_EXPRESSION, cron表达式, 例如每30秒执行一次, 0/30 * * * * ?

 

QRTZ_JOB_DETAILS 任务详细表

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

    JOB_NAME,任务名。自定义值。 联合主键

    JOB_GROUP,任务组。 自定义值。联合主键

    JOB_DATA,blob类型,任务参数

 

QRTZ_FIRED_TRIGGERS 任务触发表

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

    ENTRY_ID,entry id,联合主键

    JOB_NAME,任务名。自定义值。 

    JOB_GROUP,任务组。 自定义值。

    FIRED_TIME, 任务触发时间

    STATE,状态

    INSTANCE_NAME, 服务器实例名

    PRIORITY,优先级

 

QRTZ_SCHEDULER_STATE 

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

    INSTANCE_NAME,服务器实例名。联合主键

    LAST_CHECKIN_TIME,上次检查时间

    CHECKIN_INTERVAL,检查间隔

 

QRTZ_LOCKS 全局锁

    SCHED_NAME,调度器名称,集群时为常量值:“ClusterScheduler”。联合主键

    LOCK_NAME,锁名称,例如,TRIGGER_ACCESS。联合主键

  

 

三、业务分析 (执行SQL)

 

    主要流程图如下:



 

 

 

    对应代码:

QuartzSchedulerThread.java

 public void run() {
        //...
        while (!halted.get()) {
            try {
                //合理休眠
                //...
                        //获取接下来的触发器
                        //1.状态为WAITING
                        //2.触发时间在30秒内
                        //3.不是错过执行的或者错过了但是时间不超过两分钟
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                       
                                //... 
                                //触发任务
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                //...
                            JobRunShell shell = null;
                            //...
                            //执行代码
                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                            //...
        } // while (!halted)
        //..
    }

 

 JobRunShell.java

    protected QuartzScheduler qs = null;
	
    public void run() {
        qs.addInternalSchedulerListener(this);
        try {
            //...
            do {
                Job job = jec.getJobInstance();
                // execute the job
                try {
                    //执行任务代码
                    job.execute(jec);
                //更新触发器,删除触发记录
                qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
                break;
            } while (true);
        } 
    //...
    }

 

 

四、扩展

 

除了对主线程 QuartzSchedulerThread 的分析

继续分析JobStoreSupport类的两个线程 ClusterManager 和 MisfireHandler 的分析, 它们维护触发器的MISFIRE_INSTR状态,和调度器状态QRTZ_SCHEDULER_STATE。

 

  • 大小: 22.8 KB
  • 大小: 54.9 KB
  • 大小: 22.3 KB
  • 大小: 80.9 KB
  • 大小: 36.8 KB
  • 大小: 63.8 KB
分享到:
评论

相关推荐

    任务调度开源框架Quartz

    通过阅读《任务调度框架Quartz.doc》文档,你将能够深入理解Quartz的内部机制,学习如何创建和配置作业与触发器,以及如何在实际项目中集成和使用Quartz。文档可能还会涵盖Quartz的源码分析,这对于想要定制或优化...

    ASP.NET使用Quartz.NET实现定时任务调度

    Quartz.NET是一个开源的作业调度框架,它允许在.NET环境中创建和执行计划任务,而TopShelf则是一个轻量级的服务宿主,使得将应用程序作为Windows服务运行变得更加简单。 Quartz.NET是基于Java的Quartz项目的.NET...

    java 定时任务 quartz

    Java定时任务Quartz是企业级应用中广泛使用的调度框架,它允许开发者定义各种定时任务,以实现周期性或一次性的工作。Quartz的核心在于其灵活的任务调度机制,它能够与Java应用程序无缝集成,使得开发者可以方便地...

    ssm框架+quartz2.2Jar包

    **Quartz 2.2** 是一个开源的作业调度框架,可以用来执行定时任务。它允许开发者创建、调度、管理和执行任务。Quartz支持CRON表达式、触发器、作业和集群等多种功能,使得在Java应用中实现复杂的定时任务变得非常...

    quartz动态任务管理

    Quartz是一款开源的作业调度框架,它允许开发者创建、调度和执行各种类型的任务。这个"quartz动态任务管理"源码包很可能是针对Quartz框架的实现,旨在帮助开发者更方便地管理和控制任务的生命周期。 Quartz的核心...

    Spring quartz任务调度

    Quartz是一款开源的作业调度框架,它允许开发者安排任务在特定时间执行,而Spring框架则提供了与Quartz的无缝集成,使得在Spring应用中管理定时任务变得更加便捷。 在Spring中使用Quartz,首先需要引入相关的依赖库...

    Quartz 定时任务web使用

    Quartz 是一个开源的作业调度框架,常用于Java应用程序中实现定时任务的管理。它提供了丰富的API和功能,使得开发者可以灵活地定义和控制任务的执行。本篇将重点介绍如何在Web环境中集成并使用Quartz,以及相关的...

    quartz1.6.0源码

    Quartz是Java领域的一款强大的开源任务调度框架,广泛应用于企业级应用系统中,用于执行定时任务。源码分析有助于深入理解其内部工作原理,从而更好地利用它来满足各种复杂的定时需求。Quartz 1.6.0源码包提供了一个...

    quartz-2.1.7 官方jar包源码

    Quartz是Java领域一个广泛应用的开源任务调度框架,它的版本为2.1.7。这个官方jar包包含了Quartz的核心库,使得开发者可以方便地在自己的应用中集成定时任务的管理功能。Quartz的设计目标是提供一个灵活、可扩展的...

    spring 任务调度

    - **Quartz简介**:Quartz是一个开源的作业调度框架,支持复杂的调度策略,如按日期、时间间隔或CRON表达式调度任务。 - **Spring与Quartz集成**:Spring通过`org.springframework.scheduling.quartz`包提供了一种...

    quartz 1.6 源码

    Quartz是Java领域的一款强大的开源任务调度框架,它允许开发者创建、组织和执行定时任务。在版本1.6中,Quartz提供了稳定的性能和丰富的功能。这个压缩包包含的资源可以帮助我们深入理解Quartz的工作机制,包括源码...

    Spring整合Quartz定时发送邮件

    首先,Quartz是一个开源的作业调度框架,它允许开发者定义、调度和执行任务。在Java应用程序中,Quartz可以用来在特定时间执行重复或一次性任务,如定时发送邮件。Quartz-1.8.3.jar和quartz-all-1.8.3.jar是Quartz库...

    Quartz-1.8.4官方

    Quartz是一款广泛应用于Java平台的开源任务调度框架,它的核心功能是实现定时任务的调度。在1.8.4这个版本中,用户可以获取到官方的原始代码、编译后的jar包以及官方提供的示例,这为开发者提供了深入理解Quartz工作...

    quartz 时间调度器的使用

    Quartz 是一个开源的作业调度框架,用于在 Java 应用程序中安排任务。它提供了丰富的 API 和配置选项,使得开发者能够轻松地定义、安排和执行各种类型的任务。本文将深入探讨 Quartz 的核心概念、使用方法以及如何在...

    Quartz集群配置和示例源码

    Quartz是一款开源的作业调度框架,它允许开发者在Java应用程序中定义定时任务,实现复杂的调度逻辑。集群配置是Quartz为了提高系统可用性和任务处理能力而设计的一种模式,它可以确保在一个集群环境中,即使某个节点...

    Quartz 开发指南(附源码)

    Quartz 是一个开源的作业调度框架,用于在 Java 应用程序中实现复杂的时间调度任务。它允许开发者定义作业(Jobs)和触发器(Triggers),以自动化执行特定任务,如定时发送邮件、数据同步或者执行任何其他业务逻辑...

    java开源项目资源

    在文档中,我们可以期待看到各种类型的项目,比如Web框架(如Spring Boot)、数据库连接池(如HikariCP)、日志库(如Logback)、测试框架(如JUnit)、任务调度系统(如Quartz)等。 此外,开源项目也经常伴随着...

    quartz-3.0.3.1_quartes_源码.zip

    Quartz 是一个开源的作业调度框架,用于在 Java 应用程序中实现复杂的时间调度任务。这个压缩包 "quartz-3.0.3.1_quartes_源码.zip" 包含了 Quartz 框架的源代码,版本为 3.0.3.1,对于学习和理解 Quartz 的工作原理...

    xxl job源码分析

    xxl-job是一个轻量级的任务调度平台,具备开源特性,其源码分析对于工程开发人员具有一定的参考价值。接下来,我们将详细介绍xxl-job的核心概念、架构特点以及源码分析过程中的关键知识点。 首先,xxl-job项目的...

Global site tag (gtag.js) - Google Analytics