`
Donald_Draper
  • 浏览: 987936 次
社区版块
存档分类
最新评论

Quartz的job、触发器的暂停与恢复源码分析

阅读更多
Quartz的使用:http://donald-draper.iteye.com/blog/2321886
Quartz的Scheduler初始化源码分析:http://donald-draper.iteye.com/blog/2322730
Quartz的job、触发器的暂停与恢复源码分析:http://donald-draper.iteye.com/blog/2322823
Quartz的Job存储,触发器、任务删除,源码分析:http://donald-draper.iteye.com/blog/2322725
Quartzs的job,trriger监听器源码分析:http://donald-draper.iteye.com/blog/2322863
Quartz 任务存储JobStoreTX 持久化之RDB:http://donald-draper.iteye.com/blog/2323297
Quartz 任务存储JobStoreTX 持久化之RDB-源码分析:http://donald-draper.iteye.com/blog/2323409
Quartz任务调度源码分析:http://donald-draper.iteye.com/blog/2323118
Spring与Quartz集成详解:http://donald-draper.iteye.com/blog/2323591
Spring与Quartz集成-源码分析:http://donald-draper.iteye.com/blog/2324132]

由于前面QuartzScheduler已经介绍过,我们这里直接看代码
public class QuartzScheduler
    implements RemotableQuartzScheduler
{
//暂停触发器
 public void pauseTrigger(TriggerKey triggerKey)
        throws SchedulerException
    {
        //检查调度器状态,是否关闭
        validateState();
	//暂停triggerKey
        resources.getJobStore().pauseTrigger(triggerKey);
	//通知调度器下一刻调度时间 
        notifySchedulerThread(0L);
	//添加trigger暂定事件到调度监听器  
        notifySchedulerListenersPausedTrigger(triggerKey);
    }
//恢复触发器
     public void resumeTrigger(TriggerKey triggerKey)
        throws SchedulerException
    {
       //检查调度器状态,是否关闭
        validateState();
	//恢复触发器triggerKey
        resources.getJobStore().resumeTrigger(triggerKey);
	////通知调度器下一刻调度时间 
        notifySchedulerThread(0L);
	//添加trigger恢复事件到调度监听器  
        notifySchedulerListenersResumedTrigger(triggerKey);
    }
    //暂定job
    public void pauseJob(JobKey jobKey)
        throws SchedulerException
    {
        validateState();
	//暂定job,关键
        resources.getJobStore().pauseJob(jobKey);
        notifySchedulerThread(0L);
	添加job暂定事件到调度监听器 
        notifySchedulerListenersPausedJob(jobKey);
    }
    //恢复job
    public void resumeJob(JobKey jobKey)
        throws SchedulerException
    {
        validateState();
	//恢复job,关键
        resources.getJobStore().resumeJob(jobKey);
        notifySchedulerThread(0L);
	//添加job恢复事件到调度监听器  
        notifySchedulerListenersResumedJob(jobKey);
    }
    //检查调度器状态,是否关闭
     public void validateState()
        throws SchedulerException
    {
        if(isShutdown())
            throw new SchedulerException("The Scheduler has been shutdown.");
        else
            return;
    }
    private static String VERSION_MAJOR;
    private static String VERSION_MINOR;
    private static String VERSION_ITERATION;
    private QuartzSchedulerResources resources;
    private QuartzSchedulerThread schedThread;
    private ThreadGroup threadGroup;
    private SchedulerContext context;
    private ListenerManager listenerManager;//监听管理器
    private HashMap internalJobListeners;
    private HashMap internalTriggerListeners;
    private ArrayList internalSchedulerListeners;
    private JobFactory jobFactory;
    ExecutingJobsManager jobMgr;
    ErrorLogger errLogger;
    private SchedulerSignaler signaler;
    private Random random;
    private ArrayList holdToPreventGC;
    private boolean signalOnSchedulingChange;
    private volatile boolean closed;
    private volatile boolean shuttingDown;
    private boolean boundRemotely;
    private QuartzSchedulerMBean jmxBean;
    private Date initialStart;
    private final Timer updateTimer;
}

//job存储器
public class RAMJobStore
    implements JobStore
{
//暂停触发器
 public void pauseTrigger(TriggerKey triggerKey)
    {
        TriggerWrapper tw;
label0:
        {
	    //获取锁
            synchronized(lock)
            {
                tw = (TriggerWrapper)triggersByKey.get(triggerKey);
                if(tw != null && tw.trigger != null)
                    break label0;
            }
            return;
        }
        if(tw.state != 3)
            break MISSING_BLOCK_LABEL_44;
        obj;
        JVM INSTR monitorexit ;
        return;
	//如果处于阻塞状态,则triggerKey为暂停阻塞,否则置为暂停状态
        if(tw.state == 5)
            tw.state = 6;
        else
            tw.state = 4;
	//并从timeTriggers移除
        timeTriggers.remove(tw);
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception;
        throw exception;
_L1:
    }
     public void resumeTrigger(TriggerKey triggerKey)
    {
        TriggerWrapper tw;
label0:
        {
            synchronized(lock)
            {
                tw = (TriggerWrapper)triggersByKey.get(triggerKey);
                if(tw != null && tw.trigger != null)
                    break label0;
            }
            return;
        }
        OperableTrigger trig;
        trig = tw.getTrigger();
        if(tw.state == 4 || tw.state == 6)
            break MISSING_BLOCK_LABEL_59;
        obj;
        JVM INSTR monitorexit ;
        return;
	//如果blockedJobs包含取法任务,则则triggerKey为阻塞,否则置为等待状态
        if(blockedJobs.contains(trig.getJobKey()))
            tw.state = 5;
        else
            tw.state = 0;
        applyMisfire(tw);
	//如果调度器状态为等待,则加入到调度容器中
        if(tw.state == 0)
            timeTriggers.add(tw);
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception;
        throw exception;
_L1:
    }
    //暂定job
     public void pauseJob(JobKey jobKey)
    {
        synchronized(lock)
        {
	    //获取触发任务
            List triggersOfJob = getTriggersForJob(jobKey);
            OperableTrigger trigger;
	    //关键调用pauseTrigger(TriggerKey triggerKey)
            for(Iterator i$ = triggersOfJob.iterator(); i$.hasNext(); pauseTrigger(trigger.getKey()))
                trigger = (OperableTrigger)i$.next();

        }
    }
    //恢复job
    public void resumeJob(JobKey jobKey)
    {
        synchronized(lock)
        {   
	    //获取触发任务
            List triggersOfJob = getTriggersForJob(jobKey);
            OperableTrigger trigger;
	    //调resumeTrigge()
            for(Iterator i$ = triggersOfJob.iterator(); i$.hasNext(); resumeTrigger(trigger.getKey()))
                trigger = (OperableTrigger)i$.next();

        }
    }
    //获取jobkey的触发任务
    public List getTriggersForJob(JobKey jobKey)
    {
        ArrayList trigList = new ArrayList();
        synchronized(lock)
        {
            Iterator i$ = triggers.iterator();
            do
            {
                if(!i$.hasNext())
                    break;
                TriggerWrapper tw = (TriggerWrapper)i$.next();
                if(tw.jobKey.equals(jobKey))
                    trigList.add((OperableTrigger)tw.trigger.clone());
            } while(true);
        }
        return trigList;
    }

    protected HashMap jobsByKey;//HashMap<JobKey,JobWrapper>  
    protected HashMap triggersByKey;//HashMap<TrrigerKey,TriggerWrapper>  
    protected HashMap jobsByGroup;//HashMap<String,HashMap<JobKey,JobWrapper>>,,key为JobKey.group  
    protected HashMap triggersByGroup;//HashMap<String,HashMap<TrrigerKey,TriggerWrapper>>,,key为TrrigerKey.group  
    protected TreeSet timeTriggers;//TreeSet<TrrigerWrapper> 红黑树  
    protected HashMap calendarsByName;  
    protected ArrayList triggers; //List<TriggerWrapper>  
    protected final Object lock = new Object();  
    protected HashSet pausedTriggerGroups;  
    protected HashSet pausedJobGroups;  
    protected HashSet blockedJobs;  
    protected long misfireThreshold;  
    protected SchedulerSignaler signaler;  
    private final Logger log = LoggerFactory.getLogger(getClass());  
    private static final AtomicLong ftrCtr = new AtomicLong(System.currentTimeMillis());  
}  

//TriggerKey,JobKey包装类
class TriggerWrapper
{

    TriggerWrapper(OperableTrigger trigger)
    {
        state = 0;
        if(trigger == null)
        {
            throw new IllegalArgumentException("Trigger cannot be null!");
        } else
        {
            this.trigger = trigger;
            key = trigger.getKey();
            jobKey = trigger.getJobKey();
            return;
        }
    }

    public boolean equals(Object obj)
    {
        if(obj instanceof TriggerWrapper)
        {
            TriggerWrapper tw = (TriggerWrapper)obj;
            if(tw.key.equals(key))
                return true;
        }
        return false;
    }

    public int hashCode()
    {
        return key.hashCode();
    }

    public OperableTrigger getTrigger()
    {
        return trigger;
    }

    public final TriggerKey key;
    public final JobKey jobKey;
    public final OperableTrigger trigger;
    public int state;
    public static final int STATE_WAITING = 0;//等待
    public static final int STATE_ACQUIRED = 1;//就绪
    public static final int STATE_EXECUTING = 2;//执行
    public static final int STATE_COMPLETE = 3;//完成
    public static final int STATE_PAUSED = 4;//暂停
    public static final int STATE_BLOCKED = 5;//阻塞
    public static final int STATE_PAUSED_BLOCKED = 6;//暂停阻塞
    public static final int STATE_ERROR = 7;//错误
}

总结:
调度器暂停和恢复trriger,是通过改变triggersByKey中TriggerWrapper的TrrgerKey为trrigerKey的状态,暂定和恢复job是通过暂停和恢复triggers容器中JobKey为jobKey的TriggerWrapper。
0
0
分享到:
评论

相关推荐

    Quartz 开发指南(附源码)

    本指南将深入探讨 Quartz 的核心概念、配置与使用方法,并提供源码分析,帮助开发者更高效地利用这一强大工具。 1. **Quartz 基本概念** - **Job**:Job 是实际要执行的任务,是一个实现了 `org.quartz.Job` 接口...

    quartz-1.4.5以及源码

    1. **任务(Job)与触发器(Trigger)**:在Quartz中,任务是需要执行的工作单元,定义了具体的业务逻辑;触发器则决定任务何时启动。常见的触发器类型有简单触发器、cron触发器等,可以按照时间间隔或者自定义的...

    Quartz Job Scheduling Framework第11章翻译初稿

    4. **监控和管理**:可以通过Quartz的API或Web界面来查看、暂停、恢复或删除已调度的作业。 五、案例分析 在本章中,可能会有一个详细的案例,展示如何创建一个简单的定时发送邮件的Job,以及如何使用CronTrigger使...

    Quartz.NET定时任务源码(转)

    6. **源码分析**:解压后的文件包括解决方案文件(sln)和库文件,例如`Quartz.2003.sln`、`Quartz.2008.sln`,这些都是Visual Studio的项目文件,用于在开发环境中打开和编译源码。`bin`目录包含编译后的库和程序,...

    quartz source包

    Quartz 是一个开源的作业调度框架,用于在 Java 应用程序中安排任务。它提供了丰富的 API 和配置选项,使得开发者能够轻松地实现定时任务的...同时,源码分析也可以帮助你发现潜在的性能优化点,或者为社区贡献代码。

    quartz-master.zip

    源码分析: 1. **Job接口**:Job是所有任务的基础,它定义了`execute`方法,这是任务实际执行的地方。开发者可以创建自己的Job类,实现这个接口,然后在Scheduler中注册。 2. **Trigger接口**:Trigger接口及其...

    Quartz Job Scheduling Framework第7章翻译初稿

    5. **事件类型**:Quartz调度器生成多种事件,包括作业开始、结束、触发器错过、暂停、恢复等。理解这些事件类型对于有效地设计和实现监听器至关重要。 6. **多线程考虑**:由于Quartz是多线程的,开发者在实现监听...

    quartz例子

    关于源码分析,Quartz的源代码结构清晰,易于理解。关键组件如JobStore(用于存储Job和Trigger)、ThreadPool(处理Job执行的线程池)等模块的设计都是值得学习的。通过阅读源码,可以深入理解Quartz内部的工作机制...

    quartz 时间调度器的使用

    Quartz 源码分析 了解 Quartz 的内部工作机制可以帮助你更好地利用这个库。Quartz 的源码提供了丰富的注释,你可以查看 `org.quartz.impl.StdSchedulerFactory`、`org.quartz.Scheduler`、`org.quartz.Trigger` 和 ...

    作业调度Quartz.net源代码

    Quartz.NET是一个强大的开源作业调度框架,用于在.NET环境中创建和执行计划任务。...同时,源码分析也有助于理解任务调度领域的设计模式和最佳实践,对于提升自身编程技能和解决实际问题的能力大有裨益。

    Quartz

    3. **Quartz的源码分析** - **org.quartz.Scheduler**: Scheduler是Quartz的核心类,它管理所有的Job和Trigger,并负责启动、停止以及暂停任务调度。 - **org.quartz.impl.StdSchedulerFactory**: 用于创建...

    Spring-Quartz

    Spring的`SchedulerFactoryBean`还提供了管理Quartz Scheduler的方法,如暂停、恢复、删除Job或Trigger,以及查看当前调度状态。 8. **错误处理和事务支持**: Spring可以集成Quartz的错误处理机制,例如通过`...

    定时器quartz的应用

    5. **源码分析**:标签中的"源码"可能意味着文章涉及了Quartz的内部工作原理。Quartz的源码结构清晰,适合学习和扩展,开发者可以通过阅读源码了解其调度算法、存储策略等核心细节。 6. **工具化应用**:作为一款...

    Quartz/J2EE (定时服务)

    源码分析是理解Quartz工作原理的关键,通过阅读源码,我们可以深入理解Job、Trigger和Scheduler之间的协作机制,以及Quartz如何调度和执行任务。例如,Quartz使用了PriorityBlockingQueue来存储待执行的任务,保证了...

    Quartz任务调度

    4. **Quartz的源码分析** - **Scheduler接口**:调度器的主要接口,定义了调度、暂停、恢复和删除作业的方法,以及与其他组件交互的接口。 - **JobStore**:负责存储和检索作业和触发器的接口,如`RAMJobStore`...

    Introduction to Quartz(2)

    6. **源码分析**: - 对于源码感兴趣的开发者,可以通过阅读Quartz的源码了解其内部实现机制,包括作业调度算法、线程池管理等。 7. **集成到项目**: - 集成Quartz到Java项目通常涉及添加依赖、配置调度器、注册...

    quartz系列之九:存储

    在Quartz调度框架中,"存储"扮演着至关重要的角色,它是Quartz能够可靠地管理和执行作业与触发器的基础。Quartz提供了多种存储选项,包括内存存储、关系数据库存储以及JDBC作业存储等,以满足不同场景的需求。这篇...

    quartz

    它允许开发者定义定时任务(Jobs)并将其与触发器(Triggers)关联,以便在指定时间执行这些任务。Quartz 提供了高度可配置性,支持多种持久化策略,如 JDBC 存储,使得在分布式环境中也能进行可靠的作业调度。 在...

    Spring中使用Quartz(二)

    - 分析`SchedulerFactoryBean`的源码,理解其如何初始化和管理Quartz调度器。 - 查看`Job`和`Trigger`的实现细节,了解Quartz如何执行和调度任务。 8. **实战应用** - 示例场景:定时备份数据库、清理缓存、发送...

Global site tag (gtag.js) - Google Analytics