`
Donald_Draper
  • 浏览: 987935 次
社区版块
存档分类
最新评论

Quartzs的job存储,触发器、job删除源码分析

阅读更多
前言:unscheduleJob针对TriggerKey,而deleteJob针对jobKey下面我们进一步从源码来分析,从job,trriger
调度,再到unscheduleJob,deleteJob
public class QuartzScheduler
    implements RemotableQuartzScheduler
{
public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
        throws SchedulerException
    {
        validateState();
        if(jobDetail == null)
            throw new SchedulerException("JobDetail cannot be null");
        if(trigger == null)
            throw new SchedulerException("Trigger cannot be null");
        if(jobDetail.getKey() == null)
            throw new SchedulerException("Job's key cannot be null");
        if(jobDetail.getJobClass() == null)
            throw new SchedulerException("Job's class cannot be null");
        OperableTrigger trig = (OperableTrigger)trigger;
        if(trigger.getJobKey() == null)
            //设置触发器的jobKey
            trig.setJobKey(jobDetail.getKey());
        else
        if(!trigger.getJobKey().equals(jobDetail.getKey()))
            throw new SchedulerException("Trigger does not reference given job!");
        trig.validate();
        Calendar cal = null;
        if(trigger.getCalendarName() != null)
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
        Date ft = trig.computeFirstFireTime(cal);
        if(ft == null)
        {
            throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString());
        } else
        {
	    //存储jobDetail, trig到RAMJobStore
            resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
	    //下面三个现在先不说以后再说
            notifySchedulerListenersJobAdded(jobDetail);
            notifySchedulerThread(trigger.getNextFireTime().getTime());
            notifySchedulerListenersSchduled(trigger);
            return ft;
        }
    public boolean unscheduleJob(TriggerKey triggerKey)
        throws SchedulerException
    {
        validateState();
	//从容器中的RAMJobStore的triggersByKey Map中将triggerKey移除
        if(resources.getJobStore().removeTrigger(triggerKey))
        {
            notifySchedulerThread(0L);
            notifySchedulerListenersUnscheduled(triggerKey);
        } else
        {
            return false;
        }
        return true;
    }

    public boolean deleteJob(JobKey jobKey)
        throws SchedulerException
    {
        validateState();
        boolean result = false;
	//获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>
        List triggers = getTriggersOfJob(jobKey);
        for(Iterator i$ = triggers.iterator(); i$.hasNext();)
        {
            Trigger trigger = (Trigger)i$.next();
	    //移除触发器
            if(!unscheduleJob(trigger.getKey()))
            {
                StringBuilder sb = (new StringBuilder()).append("Unable to unschedule trigger [").append(trigger.getKey()).append("] while deleting job [").append(jobKey).append("]");
                throw new SchedulerException(sb.toString());
            }
            result = true;
        }
        //从容器中的RAMJobStore的triggersByKey,trriger,
        //triggersByGroup,timeTriggers,中将jobKey对应的TrrigerWapper移除
        //同时,从jobsByKey,jobsByGroup的移除对应jobKey的JobWrapper相关信息
        result = resources.getJobStore().removeJob(jobKey) || result;
        if(result)
        {
            notifySchedulerThread(0L);
            notifySchedulerListenersJobDeleted(jobKey);
        }
        return result;
    }

    private static String VERSION_MAJOR;
    private static String VERSION_MINOR;
    private static String VERSION_ITERATION;
    private QuartzSchedulerResources resources;
    private QuartzSchedulerThread schedThread;
    private ThreadGroup threadGroup;
    private SchedulerContext context;
    private ListenerManager listenerManager;
    private HashMap internalJobListeners;
    private HashMap internalTriggerListeners;
    private ArrayList internalSchedulerListeners;
    private JobFactory jobFactory;
    ExecutingJobsManager jobMgr;
    ErrorLogger errLogger;
    private SchedulerSignaler signaler;
    private Random random;
    private ArrayList holdToPreventGC;
    private boolean signalOnSchedulingChange;
    private volatile boolean closed;
    private volatile boolean shuttingDown;
    private boolean boundRemotely;
    private QuartzSchedulerMBean jmxBean;
    private Date initialStart;
    private final Timer updateTimer;
    private final Logger log = LoggerFactory.getLogger(getClass());

    static 
    {
        Properties props;
        InputStream is;
        VERSION_MAJOR = "UNKNOWN";
        VERSION_MINOR = "UNKNOWN";
        VERSION_ITERATION = "UNKNOWN";
        props = new Properties();
        is = null;
        is = org/quartz/core/QuartzScheduler.getResourceAsStream("quartz-build.properties");
        if(is != null)
        {
            props.load(is);
            String version = props.getProperty("version");
            if(version != null)
            {
                String versionComponents[] = version.split("\\.");
                VERSION_MAJOR = versionComponents[0];
                VERSION_MINOR = versionComponents[1];
                if(versionComponents.length > 2)
                    VERSION_ITERATION = versionComponents[2];
                else
                    VERSION_ITERATION = "0";
            } else
            {
                LoggerFactory.getLogger(org/quartz/core/QuartzScheduler).error("Can't parse Quartz version from quartz-build.properties");
            }
        }
        if(is != null)
            try
            {
                is.close();
            }
            catch(Exception ignore) { }
        break MISSING_BLOCK_LABEL_181;
        Exception e;
        e;
        LoggerFactory.getLogger(org/quartz/core/QuartzScheduler).error("Error loading version info from quartz-build.properties.", e);
        if(is != null)
            try
            {
                is.close();
            }
            catch(Exception ignore) { }
        break MISSING_BLOCK_LABEL_181;
        Exception exception;
        exception;
        if(is != null)
            try
            {
                is.close();
            }
            catch(Exception ignore) { }
        throw exception;
    }
}

//Job存储的实现
public class RAMJobStore
    implements JobStore
{
//获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>
 public List getTriggersForJob(JobKey jobKey)
    {
        ArrayList trigList = new ArrayList();
        synchronized(lock)
        {
	    //从triggers中获取jobKey对应的TriggerWrapper
            Iterator i$ = triggers.iterator();
            do
            {
                if(!i$.hasNext())
                    break;
                TriggerWrapper tw = (TriggerWrapper)i$.next();
                if(tw.jobKey.equals(jobKey))
                    trigList.add((OperableTrigger)tw.trigger.clone());
            } while(true);
        }
        return trigList;
    }
    public void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger)
        throws JobPersistenceException
    {
        storeJob(newJob, false);
        storeTrigger(newTrigger, false);
    }
    //存储newJob到jobsByKey,jobsByGroup
    public void storeJob(JobDetail newJob, boolean replaceExisting)
        throws ObjectAlreadyExistsException
    {
        JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
        boolean repl = false;
        synchronized(lock)
        {
            if(jobsByKey.get(jw.key) != null)
            {
                if(!replaceExisting)
                    throw new ObjectAlreadyExistsException(newJob);
                repl = true;
            }
            if(!repl)
            {
                HashMap grpMap = (HashMap)jobsByGroup.get(newJob.getKey().getGroup());
                if(grpMap == null)
                {
                    grpMap = new HashMap(100);
                    jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
                }
		 //存储job到jobsByGroup
                grpMap.put(newJob.getKey(), jw);
		//存储job到jobsByKey
                jobsByKey.put(jw.key, jw);
            } else
            {
                JobWrapper orig = (JobWrapper)jobsByKey.get(jw.key);
                orig.jobDetail = jw.jobDetail;
            }
        }
    }
    //将newTrigger存储到triggersByKey,trriger,triggersByGroup,timeTriggers
     public void storeTrigger(OperableTrigger newTrigger, boolean replaceExisting)
        throws JobPersistenceException
    {
        TriggerWrapper tw = new TriggerWrapper((OperableTrigger)newTrigger.clone());
        synchronized(lock)
        {
            if(triggersByKey.get(tw.key) != null)
            {
                if(!replaceExisting)
                    throw new ObjectAlreadyExistsException(newTrigger);
                removeTrigger(newTrigger.getKey(), false);
            }
            if(retrieveJob(newTrigger.getJobKey()) == null)
                throw new JobPersistenceException((new StringBuilder()).append("The job (").append(newTrigger.getJobKey()).append(") referenced by the trigger does not exist.").toString());
            //存储trriger到triggers
	    triggers.add(tw);
            HashMap grpMap = (HashMap)triggersByGroup.get(newTrigger.getKey().getGroup());
            if(grpMap == null)
            {
                grpMap = new HashMap(100);
                triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap);
            }
	     //存储trriger到triggersByGroup
            grpMap.put(newTrigger.getKey(), tw);
	    //存储trriger到triggersByKey
            triggersByKey.put(tw.key, tw);
            if(pausedTriggerGroups.contains(newTrigger.getKey().getGroup()) || pausedJobGroups.contains(newTrigger.getJobKey().getGroup()))
            {
                tw.state = 4;
                if(blockedJobs.contains(tw.jobKey))
                    tw.state = 6;
            } else
            if(blockedJobs.contains(tw.jobKey))
                tw.state = 5;
            else
	       // 将trriger添加到timeTriggers
                timeTriggers.add(tw);
        }
    }
     public boolean removeTrigger(TriggerKey triggerKey)
    {
        return removeTrigger(triggerKey, true);
    }
//从triggersByKey,triggers,timeTriggers移除key
    private boolean removeTrigger(TriggerKey key, boolean removeOrphanedJob)
    {
        boolean found;
        synchronized(lock)
        {
	   //从triggersByKey移除
            found = triggersByKey.remove(key) != null;
            if(found)
            {
                TriggerWrapper tw = null;
                HashMap grpMap = (HashMap)triggersByGroup.get(key.getGroup());
                if(grpMap != null)
                {
                    ////从triggersByGroup移除 
                    grpMap.remove(key);
                    if(grpMap.size() == 0)
                        triggersByGroup.remove(key.getGroup());
                }
                Iterator tgs = triggers.iterator();
                do
                {
                    if(!tgs.hasNext())
                        break;
                    tw = (TriggerWrapper)tgs.next();
                    if(!key.equals(tw.key))
                        continue;
		    //从triggers移除
                    tgs.remove();
                    break;
                } while(true);
		//从timeTriggers移除
                timeTriggers.remove(tw);
                if(removeOrphanedJob)
                {
                    JobWrapper jw = (JobWrapper)jobsByKey.get(tw.jobKey);
                    List trigs = getTriggersForJob(tw.jobKey);
                    if((trigs == null || trigs.size() == 0) && !jw.jobDetail.isDurable() && removeJob(jw.key))
                        signaler.notifySchedulerListenersJobDeleted(jw.key);
                }
            }
        }
        return found;
    }
//从容器中的RAMJobStore的triggersByKey,trriger,
//triggersByGroup,timeTriggers,中将jobKey对应的TrrigerWapper移除
//同时,从jobsByKey,jobsByGroup的移除对应jobKey的JobWrapper相关信息
    public boolean removeJob(JobKey jobKey)
    {
        boolean found = false;
        synchronized(lock)
        {
	    //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>
            List triggersOfJob = getTriggersForJob(jobKey);
            for(Iterator i$ = triggersOfJob.iterator(); i$.hasNext();)
            {
                OperableTrigger trig = (OperableTrigger)i$.next();
		//从triggersByKey,triggers,timeTriggers移除key
                removeTrigger(trig.getKey());
                found = true;
            }
            //从jobsByKey移除jobKey
            found = (jobsByKey.remove(jobKey) != null) | found;
            if(found)
            {
                HashMap grpMap = (HashMap)jobsByGroup.get(jobKey.getGroup());
                if(grpMap != null)
                {
		    //从jobsByGroup对应的group总移除jobKey
                    grpMap.remove(jobKey);
                    if(grpMap.size() == 0)
                        jobsByGroup.remove(jobKey.getGroup());
                }
            }
        }
        return found;
    }
    protected HashMap jobsByKey;//HashMap<JobKey,JobWrapper>
    protected HashMap triggersByKey;//HashMap<TrrigerKey,TriggerWrapper>
    protected HashMap jobsByGroup;//HashMap<String,HashMap<JobKey,JobWrapper>>,,key为JobKey.group
    protected HashMap triggersByGroup;//HashMap<String,HashMap<TrrigerKey,TriggerWrapper>>,,key为TrrigerKey.group
    protected TreeSet timeTriggers;//TreeSet<TrrigerWrapper> 红黑树
    protected HashMap calendarsByName;
    protected ArrayList triggers; //List<TriggerWrapper>
    protected final Object lock = new Object();
    protected HashSet pausedTriggerGroups;
    protected HashSet pausedJobGroups;
    protected HashSet blockedJobs;
    protected long misfireThreshold;
    protected SchedulerSignaler signaler;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private static final AtomicLong ftrCtr = new AtomicLong(System.currentTimeMillis());
}

TriggerKey与JobKey包装类
class TriggerWrapper
{

    TriggerWrapper(OperableTrigger trigger)
    {
        state = 0;
        if(trigger == null)
        {
            throw new IllegalArgumentException("Trigger cannot be null!");
        } else
        {
            this.trigger = trigger;
            key = trigger.getKey();
            jobKey = trigger.getJobKey();
            return;
        }
    }

    public boolean equals(Object obj)
    {
        if(obj instanceof TriggerWrapper)
        {
            TriggerWrapper tw = (TriggerWrapper)obj;
            if(tw.key.equals(key))
                return true;
        }
        return false;
    }

    public int hashCode()
    {
        return key.hashCode();
    }

    public OperableTrigger getTrigger()
    {
        return trigger;
    }

    public final TriggerKey key;
    public final JobKey jobKey;
    public final OperableTrigger trigger;
    public int state;
    public static final int STATE_WAITING = 0;
    public static final int STATE_ACQUIRED = 1;
    public static final int STATE_EXECUTING = 2;
    public static final int STATE_COMPLETE = 3;
    public static final int STATE_PAUSED = 4;
    public static final int STATE_BLOCKED = 5;
    public static final int STATE_PAUSED_BLOCKED = 6;
    public static final int STATE_ERROR = 7;
}

JobKey与JobDetail包装类
class JobWrapper
{

    JobWrapper(JobDetail jobDetail)
    {
        this.jobDetail = jobDetail;
        key = jobDetail.getKey();
    }

    public boolean equals(Object obj)
    {
        if(obj instanceof JobWrapper)
        {
            JobWrapper jw = (JobWrapper)obj;
            if(jw.key.equals(key))
                return true;
        }
        return false;
    }

    public int hashCode()
    {
        return key.hashCode();
    }

    public JobKey key;
    public JobDetail jobDetail;
}

//JobDetail
public class JobDetailImpl
    implements Cloneable, Serializable, JobDetail
{
 private static final long serialVersionUID = -6069784757781506897L;
    private String name;
    private String group;
    private String description;
    private Class jobClass;
    private JobDataMap jobDataMap;//拥有共享数据,就是一个Map
    private boolean durability;
    private boolean shouldRecover;
    private transient JobKey key;
}

//JobKey
public final class JobKey extends Key
{

    public JobKey(String name)
    {
        super(name, null);
    }

    public JobKey(String name, String group)
    {
        super(name, group);
    }

    public static JobKey jobKey(String name)
    {
        return new JobKey(name, null);
    }

    public static JobKey jobKey(String name, String group)
    {
        return new JobKey(name, group);
    }

    private static final long serialVersionUID = -6073883950062574010L;
}

//TriggerKey
public final class TriggerKey extends Key
{

    public TriggerKey(String name)
    {
        super(name, null);
    }

    public TriggerKey(String name, String group)
    {
        super(name, group);
    }

    public static TriggerKey triggerKey(String name)
    {
        return new TriggerKey(name, null);
    }

    public static TriggerKey triggerKey(String name, String group)
    {
        return new TriggerKey(name, group);
    }

    private static final long serialVersionUID = 8070357886703449660L;
}

//Key
package org.quartz.utils;

import java.io.Serializable;
import java.util.UUID;

public class Key
    implements Serializable, Comparable
{

    public Key(String name, String group)
    {
        if(name == null)
            throw new IllegalArgumentException("Name cannot be null.");
        this.name = name;
        if(group != null)
            this.group = group;
        else
            this.group = "DEFAULT";
    }

    public String getName()
    {
        return name;
    }

    public String getGroup()
    {
        return group;
    }
 
    private static final long serialVersionUID = -7141167957642391350L;
    public static final String DEFAULT_GROUP = "DEFAULT";
    private final String name;
    private final String group;
}


总结:
首先从Scheduler.scheduleJob(JobDetail jobDetail, Trigger trigger) 调度job,
实际上就是将job存储到RAM中的jobsByGroup,jobsByKey对应的Map中, 将trigger存储到
triggers(List),triggersByKey,triggersByGroup对应的Map中,及timeTriggers的Treeset中。
Scheduler.unscheduleJob(TriggerKey triggerKey) 就是将triggerKey从triggersByKey,triggersByGroup,
triggers,timeTriggers中移除;Scheduler.deleteJob(JobKey jobKey)
除了从容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>,并unscheduleJob
(TriggerKey triggerKey)列表 List<TriggerWrapper>中的所有TriggerWrapper,同时从jobsByKey,jobsByGroup
的移除对应jobKey的相关信息
0
0
分享到:
评论

相关推荐

    java job_触发器例子

    Java Job和触发器是Java应用程序中用于定时任务处理的关键组件,尤其在企业级应用和大数据处理中非常常见。本文将深入探讨Java中的Job和触发器,以及如何通过示例进行应用。 首先,让我们理解什么是Java Job。在...

    oracle job调度存储过程 触发器 定时更新数据库

    总的来说,Oracle Job调度存储过程和触发器是数据库管理中不可或缺的工具,它们允许数据库管理员自动化许多常规任务,提高效率,减少人为错误,并确保数据的一致性和准确性。通过灵活地配置和组合这些功能,可以实现...

    oracle中job调度存储过程 触发器 定时更新数据库.rar

    在Oracle数据库系统中,"Job调度存储过程"和"触发器"是两种强大的工具,用于自动化数据库维护和管理任务。本教程将深入探讨这两个概念以及它们如何协同工作以实现定时更新数据库。 首先,我们来理解"Job调度存储...

    python基于51job数据可视化图表展示源码.zip

    python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts可视化。python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts...

    xxl job源码分析

    xxl-job是一个轻量级的任务调度平台,具备开源特性,其源码分析对于工程开发人员具有一定的参考价值。接下来,我们将详细介绍xxl-job的核心概念、架构特点以及源码分析过程中的关键知识点。 首先,xxl-job项目的...

    xxl-job源码下载

    - **调度中心(XXL-JOB Admin)**:管理任务调度的组件,存储任务信息,执行策略等,并负责触发任务执行。 - **作业(Job)**:需要被调度的任务,可以是任何业务逻辑,例如数据同步、报表生成等。 - **分片策略**...

    elastic-job-1.0.5相关源码

    - **Registry**:注册中心是Elastic-Job中用于协调各个节点的重要组件,如Zookeeper或Redis,存储作业的配置和运行状态信息。 - **Job Execution**:作业执行器负责实际的任务执行,根据分片策略在不同节点上运行...

    elastic-job spring 源码和控制台

    本文将深入探讨如何将Elastic-Job Lite与Spring进行集成,以及如何利用其提供的控制台进行任务管理和监控,并对源码进行深度分析。 一、Elastic-Job Lite简介 Elastic-Job Lite是Elastic-Job的轻量级版本,适用于...

    传参数给job及job状态学习

    标题中的“传参数给job及job状态学习”指的是在使用Quartz Scheduler进行任务调度时,如何传递参数给Job以及理解Job的状态管理。Quartz Scheduler是一个开源的Java作业调度框架,它允许程序创建、安排和执行重复的...

    sql触发器语法兼实例

    以下示例创建了一个名为`tri_delete`的触发器,它会在删除`jobs`表中的记录时触发,并检查`job_id`是否为1。如果是,则引发错误并回滚事务。 ```sql CREATE TRIGGER tri_delete ON jobs FOR DELETE AS BEGIN ...

    将 xxl-job-admin、xxl-job-core、xxl-job-executor 的源码引入项目中.docx

    ### 将xxl-job-admin、xxl-job-core、xxl-job-executor的源码引入项目中的实践 在探讨如何将这三个组件(xxl-job-admin、xxl-job-core、xxl-job-executor)有效地引入到项目中之前,我们需要先了解它们各自的作用及...

    存储过程、触发器、定时器例子(oracle)

    在Oracle数据库系统中,存储过程、触发器和定时器是三个关键的数据库管理工具,它们在数据处理和业务逻辑执行中扮演着重要角色。本文将详细介绍这三个概念,并结合实际例子来帮助理解它们的工作原理和应用。 1. **...

    锁表进程和触发器开关

    触发器是数据库中的一个特殊类型的存储过程,它会在特定事件发生时自动执行。触发器可以用来实现复杂的业务逻辑或数据完整性规则。 **查询所有用户的表:** ```sql SELECT * FROM user_tables; ``` **查询所有表:...

    xxl-job实战源码

    本篇将深入探讨XXL-JOB的核心概念、架构设计以及源码分析。 **一、XXL-JOB简介** 1. **核心概念** - **调度中心(Admin)**: 负责任务的注册、管理、监控,提供Web界面供用户操作。 - **执行器(Executor)**: ...

    job定时器操作存储过程

    本文将深入解析如何利用Oracle JOB定时器来操作存储过程,包括创建、执行、查询、停止、启动以及删除JOB的全过程,并详细阐述定时器执行时间间隔的设置方法。 ### 创建表和存储过程 首先,为了演示JOB定时器的操作...

    xxl-job-2.3.0.tar.gz

    11. **源码分析**:解压“xxl-job-2.3.0”后,我们可以看到源码结构,包括Admin和Executor的模块,以及相关的配置文件和文档。通过阅读源码,可以深入了解其内部设计原理,例如任务调度算法、通信协议、数据库模型等...

    xxl-job-2.2.0.zip

    总之,XXL-JOB-2.2.0源码的分析可以帮助我们理解分布式任务调度的实现细节,了解任务的分发、执行、监控等核心流程,为自定义任务处理和扩展提供指导。深入学习这些源码,不仅可以提升开发技能,也能提高解决实际...

    练习利用PLSQL Developer编写和管理存储过程、存储函数和触发器等

    PLSQL Developer中存储过程、存储函数和触发器的编写和管理 PLSQL Developer是一种功能强大的Oracle数据库开发工具,提供了许多功能来帮助开发者快速编写和管理存储过程、存储函数和触发器等高级数据库对象。本文将...

    plsql创建存储过程并创建job定时任务执行-详细笔记文档总结

    plsql创建存储过程并创建job定时任务执行详细笔记文档总结 在 Oracle 中,plsql 是一种强大的编程语言,可以用来创建存储过程和定时任务执行。在本文中,我们将详细介绍如何使用 plsql 创建存储过程并创建 job 定时...

Global site tag (gtag.js) - Google Analytics