`
jiagyao
  • 浏览: 99370 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

Quartz源码分析(一)------ 以线程等待的方式实现按时间调度 (转)

阅读更多
Quartz是运用最广的任务调度框架,它最核心的组成部分是Scheduler、Trigger、JobDetail,然后给Scheduler配置个线程QuartzSchedulerThread,此线程在Scheduler初始化时启动,等待Scheduler start,然后从JobStore里拿到最近要触发的Trigger,以线程等待的方式等到trigger触发时间点,之后就是执行trigger所关联的JobDetail,最后清扫战场。Scheduler初始化、start和trigger执行的时序图如下所示:



其中,最核心的地方是QuartzSchedulerThread运行机制。下面解析一下它的run方法:

view plaincopy to clipboardprint?
public void run() {   
        boolean lastAcquireFailed = false;   
           
        while (!halted) {   
            try {   
                // check if we're supposed to pause...   
                synchronized (pauseLock) {   
                    while (paused && !halted) {   
                        try {   
                            // wait until togglePause(false) is called...   
                            pauseLock.wait(100L);   
                        } catch (InterruptedException ignore) {   
                        }   
                    }   
       
                    if (halted) {   
                        break;   
                    }   
                }   
           ......   
      }   
}  
public void run() {
        boolean lastAcquireFailed = false;
        
        while (!halted) {
            try {
                // check if we're supposed to pause...
                synchronized (pauseLock) {
                    while (paused && !halted) {
                        try {
                            // wait until togglePause(false) is called...
                            pauseLock.wait(100L);
                        } catch (InterruptedException ignore) {
                        }
                    }
    
                    if (halted) {
                        break;
                    }
                }
           ......
      }
} 

以上是run的最开头的一段,不难看出这是在等待scheduler的start,实际上Quartz就是通过线程的wait或sleep来实现时间调度。继续看代码:
view plaincopy to clipboardprint?
Trigger trigger = null;   
long now = System.currentTimeMillis();   
signaled = false;   
try {   
    trigger = qsRsrcs.getJobStore().acquireNextTrigger(   
            ctxt, now + idleWaitTime);   
    lastAcquireFailed = false;   
} catch (JobPersistenceException jpe) {   
    if(!lastAcquireFailed) {   
        qs.notifySchedulerListenersError(   
            "An error occured while scanning for the next trigger to fire.",   
            jpe);   
    }   
    lastAcquireFailed = true;   
} catch (RuntimeException e) {   
    if(!lastAcquireFailed) {   
        getLog().error("quartzSchedulerThreadLoop: RuntimeException "  
                +e.getMessage(), e);   
    }   
    lastAcquireFailed = true;   
}  
Trigger trigger = null;
long now = System.currentTimeMillis();
signaled = false;
try {
    trigger = qsRsrcs.getJobStore().acquireNextTrigger(
            ctxt, now + idleWaitTime);
    lastAcquireFailed = false;
} catch (JobPersistenceException jpe) {
    if(!lastAcquireFailed) {
        qs.notifySchedulerListenersError(
            "An error occured while scanning for the next trigger to fire.",
            jpe);
    }
    lastAcquireFailed = true;
} catch (RuntimeException e) {
    if(!lastAcquireFailed) {
        getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                +e.getMessage(), e);
    }
    lastAcquireFailed = true;
} 


这段代码是从jobStore里拿到下一个要执行的trigger,一般情况下jobStore使用的是RAMJobStore,即trigger等相关信息存放在内存里,如果需要把任务持久化就得使用可持久化JobStore。继续看代码:

 view plaincopy to clipboardprint?
now = System.currentTimeMillis();   
long triggerTime = trigger.getNextFireTime().getTime();   
long timeUntilTrigger = triggerTime - now;   
long spinInterval = 10;   
int numPauses = (int) (timeUntilTrigger / spinInterval);   
while (numPauses >= 0 && !signaled) {   
    try {   
        Thread.sleep(spinInterval);   
    } catch (InterruptedException ignore) {   
    }   
    now = System.currentTimeMillis();   
    timeUntilTrigger = triggerTime - now;   
    numPauses = (int) (timeUntilTrigger / spinInterval);   
}   
if (signaled) {   
    try {   
        qsRsrcs.getJobStore().releaseAcquiredTrigger(   
                ctxt, trigger);   
    } catch (JobPersistenceException jpe) {   
        qs.notifySchedulerListenersError(   
                "An error occured while releasing trigger '"  
                        + trigger.getFullName() + "'",   
                jpe);   
        // db connection must have failed... keep   
        // retrying until it's up...   
        releaseTriggerRetryLoop(trigger);   
    } catch (RuntimeException e) {   
        getLog().error(   
            "releaseTriggerRetryLoop: RuntimeException "  
            +e.getMessage(), e);   
        // db connection must have failed... keep   
        // retrying until it's up...   
        releaseTriggerRetryLoop(trigger);   
    }   
    signaled = false;   
    continue;   
}  
now = System.currentTimeMillis();
long triggerTime = trigger.getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
long spinInterval = 10;
int numPauses = (int) (timeUntilTrigger / spinInterval);
while (numPauses >= 0 && !signaled) {
    try {
        Thread.sleep(spinInterval);
    } catch (InterruptedException ignore) {
    }
    now = System.currentTimeMillis();
    timeUntilTrigger = triggerTime - now;
    numPauses = (int) (timeUntilTrigger / spinInterval);
}
if (signaled) {
    try {
        qsRsrcs.getJobStore().releaseAcquiredTrigger(
                ctxt, trigger);
    } catch (JobPersistenceException jpe) {
        qs.notifySchedulerListenersError(
                "An error occured while releasing trigger '"
                        + trigger.getFullName() + "'",
                jpe);
        // db connection must have failed... keep
        // retrying until it's up...
        releaseTriggerRetryLoop(trigger);
    } catch (RuntimeException e) {
        getLog().error(
            "releaseTriggerRetryLoop: RuntimeException "
            +e.getMessage(), e);
        // db connection must have failed... keep
        // retrying until it's up...
        releaseTriggerRetryLoop(trigger);
    }
    signaled = false;
    continue;
} 


此段代码是计算下一个trigger的执行时间和现在系统时间的差,然后通过循环线程sleep的方式暂停住此线程,一直等到trigger的执行时间点。继续看代码:

view plaincopy to clipboardprint?
import org.quartz.core.JobRunShell;   
JobRunShell shell = null;   
try {   
    shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();   
    shell.initialize(qs, bndle);   
} catch (SchedulerException se) {   
    try {   
        qsRsrcs.getJobStore().triggeredJobComplete(ctxt,   
                trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);   
    } catch (SchedulerException se2) {   
        qs.notifySchedulerListenersError(   
                "An error occured while placing job's triggers in error state '"  
                        + trigger.getFullName() + "'", se2);   
        // db connection must have failed... keep retrying   
        // until it's up...   
        errorTriggerRetryLoop(bndle);   
    }   
    continue;   
}   
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {   
    try {   
        getLog().error("ThreadPool.runInThread() return false!");   
        qsRsrcs.getJobStore().triggeredJobComplete(ctxt,   
                trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);   
    } catch (SchedulerException se2) {   
        qs.notifySchedulerListenersError(   
                "An error occured while placing job's triggers in error state '"  
                        + trigger.getFullName() + "'", se2);   
        // db connection must have failed... keep retrying   
        // until it's up...   
        releaseTriggerRetryLoop(trigger);   
    }   
}  
import org.quartz.core.JobRunShell;
JobRunShell shell = null;
try {
    shell = qsRsrcs.getJobRunShellFactory().borrowJobRunShell();
    shell.initialize(qs, bndle);
} catch (SchedulerException se) {
    try {
        qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
    } catch (SchedulerException se2) {
        qs.notifySchedulerListenersError(
                "An error occured while placing job's triggers in error state '"
                        + trigger.getFullName() + "'", se2);
        // db connection must have failed... keep retrying
        // until it's up...
        errorTriggerRetryLoop(bndle);
    }
    continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
    try {
        getLog().error("ThreadPool.runInThread() return false!");
        qsRsrcs.getJobStore().triggeredJobComplete(ctxt,
                trigger, bndle.getJobDetail(), Trigger.INSTRUCTION_SET_ALL_JOB_TRIGGERS_ERROR);
    } catch (SchedulerException se2) {
        qs.notifySchedulerListenersError(
                "An error occured while placing job's triggers in error state '"
                        + trigger.getFullName() + "'", se2);
        // db connection must have failed... keep retrying
        // until it's up...
        releaseTriggerRetryLoop(trigger);
    }
} 


此段代码就是包装trigger,然后通过以JobRunShell为载体,在threadpool里执行trigger所关联的jobDetail。

之后的代码就是清扫战场,就不在累述。



本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/cutesource/archive/2009/12/08/4965520.aspx
分享到:
评论

相关推荐

    quartz源码解析(一)

    这篇博客“quartz源码解析(一)”可能是博主对Quartz核心原理、设计模式以及其实现细节的一个初步探讨。 Quartz的源码分析可以从以下几个方面入手: 1. **设计模式**: - **工厂模式**:Quartz中Job和Trigger的...

    quartz-1.4.5以及源码

    Quartz是Java平台上的一款强大的作业调度框架,常用于在企业级应用中实现任务调度。版本1.4.5是该框架的一个稳定版本,虽然现在已经有更新的版本,但对于某些老系统来说,它仍然具有重要的维护价值。这个压缩包包含...

    quartz-1.6.4.源码

    Quartz 是一个开源的作业调度框架,用于在 Java 应用程序中实现复杂的时间调度任务。版本 1.6.4 提供了稳定的调度服务,适用于各种企业级应用。源码分析可以帮助我们深入理解其内部机制,从而更好地利用或定制这个...

    quartz-2.1.7 官方jar包源码

    源码分析对于理解Quartz的工作原理至关重要。在Quartz-2.1.7源码中,你可以看到如下关键组件: 1. `org.quartz.core` 包:这是Quartz的核心模块,包含Scheduler、JobStore和ThreadPool的主要实现。SchedulerImpl是...

    quartz1.6.0源码

    源码分析有助于深入理解其内部工作原理,从而更好地利用它来满足各种复杂的定时需求。Quartz 1.6.0源码包提供了一个宝贵的资源,帮助开发者探索其设计模式、线程管理以及任务调度的机制。 1. **Quartz核心概念** -...

    quartz-3.0.3.1_quartes_源码.zip

    Quartz 是一个开源的作业调度框架,用于在 Java 应用程序中实现复杂的时间调度任务。这个压缩包 "quartz-3.0.3.1_quartes_源码.zip" 包含了 Quartz 框架的源代码,版本为 3.0.3.1,对于学习和理解 Quartz 的工作原理...

    quartz-2.2 需要的jar以及src源代码

    Quartz是Java领域一个广泛应用的开源任务调度框架,它的核心功能是允许开发者安排任务在特定时间执行,或者按一定间隔重复执行。这个压缩包“quartz-2.2.3”包含了Quartz库的jar文件以及源代码,这对于开发者理解和...

    quartz-master.zip

    Quartz是一款开源的作业调度框架,它允许程序创建和管理定时任务。在Java开发中,Quartz被广泛用于...此外,对于希望参与开源项目或者对任务调度有深入需求的开发者,理解并可能贡献Quartz源码将是一次宝贵的学习经历。

    任务调度开源框架Quartz

    Quartz是一款广泛应用于Java环境中的开源任务调度框架,它提供了高度可配置的作业调度系统,使得开发者能够轻松地在应用程序中实现定时任务的管理。Quartz的核心特性包括但不限于以下几点: 1. **灵活的调度**:...

    Quartz集群配置和示例源码

    Quartz是一款开源的作业调度框架,它允许开发者在Java应用程序中定义定时任务,实现复杂的调度逻辑。集群配置是Quartz为了提高系统可用性和任务处理能力而设计的一种模式,它可以确保在一个集群环境中,即使某个节点...

    作业调度Quartz.net源代码

    通过阅读源码,开发者可以了解到Quartz.NET的实现细节,例如作业调度算法、线程管理策略以及如何与不同存储机制交互。 总之,这个资源对于想要深入研究和使用Quartz.NET的人来说非常有价值。通过学习源码和DEMO,你...

    Quartz.NET 官方源码

    此外,Quartz.NET还支持集群模式,允许多个调度器实例共享同一份作业和触发器的状态,以实现高可用性和故障转移。 Quartz.NET的持久化机制也很关键,它允许在应用程序重启后恢复之前的调度状态。默认情况下,Quartz...

    xxl job源码分析

    xxl-job是一个轻量级的任务调度平台,具备开源特性,其源码分析对于工程开发人员具有一定的参考价值。接下来,我们将详细介绍xxl-job的核心概念、架构特点以及源码分析过程中的关键知识点。 首先,xxl-job项目的...

    FreePay_Quartz_aug_2008-源码.rar

    开发者可以设置触发器类型,如SimpleTrigger(简单触发器)或CronTrigger(基于Cron表达式的时间触发器),以实现不同的调度策略。 4. **持久化机制**:Quartz支持数据库持久化,这意味着即使服务器重启,调度信息...

    quartz-master.zip 2.3.2源码

    Quartz是一款开源的作业调度框架,它允许开发者在Java应用程序中安排任务的执行。Quartz 2.3.2是该框架的一...对于希望深入学习任务调度和Java后台服务开发的开发者来说,Quartz 2.3.2的源码分析是一份宝贵的学习资源。

    Quartz 定时任务web使用

    Quartz 是一个开源的作业调度框架,常用于Java应用程序中实现定时任务的管理。它提供了丰富的API和功能,使得开发者可以灵活地定义和控制任务的执行。本篇将重点介绍如何在Web环境中集成并使用Quartz,以及相关的...

    任务调度源码.net

    此外,源码分析可以帮助你理解Quartz.NET如何实现线程安全、任务调度算法、触发器的解析与管理,以及如何优雅地处理任务异常和调度冲突。通过阅读源码,你还可以学习到多线程编程、事件驱动编程和设计模式的应用。 ...

    Quartz任务调度

    Quartz任务调度是一款开源的Java定时任务框架,它允许开发者精确地控制任务的执行时间,提供了丰富的调度功能,被广泛应用于各种系统中的定时任务管理。Quartz的核心是基于作业(Job)和触发器(Trigger)的概念,...

    quartz定时器源码jar包下载

    6. **并发与线程管理**:Quartz在多线程环境下运行,分析`org.quartz.core.QuartzSchedulerThread`,了解其内部调度机制。 7. **插件系统**:Quartz有一个强大的插件系统,如`org.quartz.plugins.history....

    spring 任务调度

    - **Quartz简介**:Quartz是一个开源的作业调度框架,支持复杂的调度策略,如按日期、时间间隔或CRON表达式调度任务。 - **Spring与Quartz集成**:Spring通过`org.springframework.scheduling.quartz`包提供了一种...

Global site tag (gtag.js) - Google Analytics