- 浏览: 987935 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
前言:unscheduleJob针对TriggerKey,而deleteJob针对jobKey下面我们进一步从源码来分析,从job,trriger
调度,再到unscheduleJob,deleteJob
//Job存储的实现
TriggerKey与JobKey包装类
JobKey与JobDetail包装类
//JobDetail
//JobKey
//TriggerKey
//Key
总结:
首先从Scheduler.scheduleJob(JobDetail jobDetail, Trigger trigger) 调度job,
实际上就是将job存储到RAM中的jobsByGroup,jobsByKey对应的Map中, 将trigger存储到
triggers(List),triggersByKey,triggersByGroup对应的Map中,及timeTriggers的Treeset中。
Scheduler.unscheduleJob(TriggerKey triggerKey) 就是将triggerKey从triggersByKey,triggersByGroup,
triggers,timeTriggers中移除;Scheduler.deleteJob(JobKey jobKey)
除了从容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>,并unscheduleJob
(TriggerKey triggerKey)列表 List<TriggerWrapper>中的所有TriggerWrapper,同时从jobsByKey,jobsByGroup
的移除对应jobKey的相关信息
调度,再到unscheduleJob,deleteJob
public class QuartzScheduler implements RemotableQuartzScheduler { public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException { validateState(); if(jobDetail == null) throw new SchedulerException("JobDetail cannot be null"); if(trigger == null) throw new SchedulerException("Trigger cannot be null"); if(jobDetail.getKey() == null) throw new SchedulerException("Job's key cannot be null"); if(jobDetail.getJobClass() == null) throw new SchedulerException("Job's class cannot be null"); OperableTrigger trig = (OperableTrigger)trigger; if(trigger.getJobKey() == null) //设置触发器的jobKey trig.setJobKey(jobDetail.getKey()); else if(!trigger.getJobKey().equals(jobDetail.getKey())) throw new SchedulerException("Trigger does not reference given job!"); trig.validate(); Calendar cal = null; if(trigger.getCalendarName() != null) cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName()); Date ft = trig.computeFirstFireTime(cal); if(ft == null) { throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString()); } else { //存储jobDetail, trig到RAMJobStore resources.getJobStore().storeJobAndTrigger(jobDetail, trig); //下面三个现在先不说以后再说 notifySchedulerListenersJobAdded(jobDetail); notifySchedulerThread(trigger.getNextFireTime().getTime()); notifySchedulerListenersSchduled(trigger); return ft; } public boolean unscheduleJob(TriggerKey triggerKey) throws SchedulerException { validateState(); //从容器中的RAMJobStore的triggersByKey Map中将triggerKey移除 if(resources.getJobStore().removeTrigger(triggerKey)) { notifySchedulerThread(0L); notifySchedulerListenersUnscheduled(triggerKey); } else { return false; } return true; } public boolean deleteJob(JobKey jobKey) throws SchedulerException { validateState(); boolean result = false; //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper> List triggers = getTriggersOfJob(jobKey); for(Iterator i$ = triggers.iterator(); i$.hasNext();) { Trigger trigger = (Trigger)i$.next(); //移除触发器 if(!unscheduleJob(trigger.getKey())) { StringBuilder sb = (new StringBuilder()).append("Unable to unschedule trigger [").append(trigger.getKey()).append("] while deleting job [").append(jobKey).append("]"); throw new SchedulerException(sb.toString()); } result = true; } //从容器中的RAMJobStore的triggersByKey,trriger, //triggersByGroup,timeTriggers,中将jobKey对应的TrrigerWapper移除 //同时,从jobsByKey,jobsByGroup的移除对应jobKey的JobWrapper相关信息 result = resources.getJobStore().removeJob(jobKey) || result; if(result) { notifySchedulerThread(0L); notifySchedulerListenersJobDeleted(jobKey); } return result; } private static String VERSION_MAJOR; private static String VERSION_MINOR; private static String VERSION_ITERATION; private QuartzSchedulerResources resources; private QuartzSchedulerThread schedThread; private ThreadGroup threadGroup; private SchedulerContext context; private ListenerManager listenerManager; private HashMap internalJobListeners; private HashMap internalTriggerListeners; private ArrayList internalSchedulerListeners; private JobFactory jobFactory; ExecutingJobsManager jobMgr; ErrorLogger errLogger; private SchedulerSignaler signaler; private Random random; private ArrayList holdToPreventGC; private boolean signalOnSchedulingChange; private volatile boolean closed; private volatile boolean shuttingDown; private boolean boundRemotely; private QuartzSchedulerMBean jmxBean; private Date initialStart; private final Timer updateTimer; private final Logger log = LoggerFactory.getLogger(getClass()); static { Properties props; InputStream is; VERSION_MAJOR = "UNKNOWN"; VERSION_MINOR = "UNKNOWN"; VERSION_ITERATION = "UNKNOWN"; props = new Properties(); is = null; is = org/quartz/core/QuartzScheduler.getResourceAsStream("quartz-build.properties"); if(is != null) { props.load(is); String version = props.getProperty("version"); if(version != null) { String versionComponents[] = version.split("\\."); VERSION_MAJOR = versionComponents[0]; VERSION_MINOR = versionComponents[1]; if(versionComponents.length > 2) VERSION_ITERATION = versionComponents[2]; else VERSION_ITERATION = "0"; } else { LoggerFactory.getLogger(org/quartz/core/QuartzScheduler).error("Can't parse Quartz version from quartz-build.properties"); } } if(is != null) try { is.close(); } catch(Exception ignore) { } break MISSING_BLOCK_LABEL_181; Exception e; e; LoggerFactory.getLogger(org/quartz/core/QuartzScheduler).error("Error loading version info from quartz-build.properties.", e); if(is != null) try { is.close(); } catch(Exception ignore) { } break MISSING_BLOCK_LABEL_181; Exception exception; exception; if(is != null) try { is.close(); } catch(Exception ignore) { } throw exception; } }
//Job存储的实现
public class RAMJobStore implements JobStore { //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper> public List getTriggersForJob(JobKey jobKey) { ArrayList trigList = new ArrayList(); synchronized(lock) { //从triggers中获取jobKey对应的TriggerWrapper Iterator i$ = triggers.iterator(); do { if(!i$.hasNext()) break; TriggerWrapper tw = (TriggerWrapper)i$.next(); if(tw.jobKey.equals(jobKey)) trigList.add((OperableTrigger)tw.trigger.clone()); } while(true); } return trigList; } public void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger) throws JobPersistenceException { storeJob(newJob, false); storeTrigger(newTrigger, false); } //存储newJob到jobsByKey,jobsByGroup public void storeJob(JobDetail newJob, boolean replaceExisting) throws ObjectAlreadyExistsException { JobWrapper jw = new JobWrapper((JobDetail)newJob.clone()); boolean repl = false; synchronized(lock) { if(jobsByKey.get(jw.key) != null) { if(!replaceExisting) throw new ObjectAlreadyExistsException(newJob); repl = true; } if(!repl) { HashMap grpMap = (HashMap)jobsByGroup.get(newJob.getKey().getGroup()); if(grpMap == null) { grpMap = new HashMap(100); jobsByGroup.put(newJob.getKey().getGroup(), grpMap); } //存储job到jobsByGroup grpMap.put(newJob.getKey(), jw); //存储job到jobsByKey jobsByKey.put(jw.key, jw); } else { JobWrapper orig = (JobWrapper)jobsByKey.get(jw.key); orig.jobDetail = jw.jobDetail; } } } //将newTrigger存储到triggersByKey,trriger,triggersByGroup,timeTriggers public void storeTrigger(OperableTrigger newTrigger, boolean replaceExisting) throws JobPersistenceException { TriggerWrapper tw = new TriggerWrapper((OperableTrigger)newTrigger.clone()); synchronized(lock) { if(triggersByKey.get(tw.key) != null) { if(!replaceExisting) throw new ObjectAlreadyExistsException(newTrigger); removeTrigger(newTrigger.getKey(), false); } if(retrieveJob(newTrigger.getJobKey()) == null) throw new JobPersistenceException((new StringBuilder()).append("The job (").append(newTrigger.getJobKey()).append(") referenced by the trigger does not exist.").toString()); //存储trriger到triggers triggers.add(tw); HashMap grpMap = (HashMap)triggersByGroup.get(newTrigger.getKey().getGroup()); if(grpMap == null) { grpMap = new HashMap(100); triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap); } //存储trriger到triggersByGroup grpMap.put(newTrigger.getKey(), tw); //存储trriger到triggersByKey triggersByKey.put(tw.key, tw); if(pausedTriggerGroups.contains(newTrigger.getKey().getGroup()) || pausedJobGroups.contains(newTrigger.getJobKey().getGroup())) { tw.state = 4; if(blockedJobs.contains(tw.jobKey)) tw.state = 6; } else if(blockedJobs.contains(tw.jobKey)) tw.state = 5; else // 将trriger添加到timeTriggers timeTriggers.add(tw); } } public boolean removeTrigger(TriggerKey triggerKey) { return removeTrigger(triggerKey, true); } //从triggersByKey,triggers,timeTriggers移除key private boolean removeTrigger(TriggerKey key, boolean removeOrphanedJob) { boolean found; synchronized(lock) { //从triggersByKey移除 found = triggersByKey.remove(key) != null; if(found) { TriggerWrapper tw = null; HashMap grpMap = (HashMap)triggersByGroup.get(key.getGroup()); if(grpMap != null) { ////从triggersByGroup移除 grpMap.remove(key); if(grpMap.size() == 0) triggersByGroup.remove(key.getGroup()); } Iterator tgs = triggers.iterator(); do { if(!tgs.hasNext()) break; tw = (TriggerWrapper)tgs.next(); if(!key.equals(tw.key)) continue; //从triggers移除 tgs.remove(); break; } while(true); //从timeTriggers移除 timeTriggers.remove(tw); if(removeOrphanedJob) { JobWrapper jw = (JobWrapper)jobsByKey.get(tw.jobKey); List trigs = getTriggersForJob(tw.jobKey); if((trigs == null || trigs.size() == 0) && !jw.jobDetail.isDurable() && removeJob(jw.key)) signaler.notifySchedulerListenersJobDeleted(jw.key); } } } return found; } //从容器中的RAMJobStore的triggersByKey,trriger, //triggersByGroup,timeTriggers,中将jobKey对应的TrrigerWapper移除 //同时,从jobsByKey,jobsByGroup的移除对应jobKey的JobWrapper相关信息 public boolean removeJob(JobKey jobKey) { boolean found = false; synchronized(lock) { //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper> List triggersOfJob = getTriggersForJob(jobKey); for(Iterator i$ = triggersOfJob.iterator(); i$.hasNext();) { OperableTrigger trig = (OperableTrigger)i$.next(); //从triggersByKey,triggers,timeTriggers移除key removeTrigger(trig.getKey()); found = true; } //从jobsByKey移除jobKey found = (jobsByKey.remove(jobKey) != null) | found; if(found) { HashMap grpMap = (HashMap)jobsByGroup.get(jobKey.getGroup()); if(grpMap != null) { //从jobsByGroup对应的group总移除jobKey grpMap.remove(jobKey); if(grpMap.size() == 0) jobsByGroup.remove(jobKey.getGroup()); } } } return found; } protected HashMap jobsByKey;//HashMap<JobKey,JobWrapper> protected HashMap triggersByKey;//HashMap<TrrigerKey,TriggerWrapper> protected HashMap jobsByGroup;//HashMap<String,HashMap<JobKey,JobWrapper>>,,key为JobKey.group protected HashMap triggersByGroup;//HashMap<String,HashMap<TrrigerKey,TriggerWrapper>>,,key为TrrigerKey.group protected TreeSet timeTriggers;//TreeSet<TrrigerWrapper> 红黑树 protected HashMap calendarsByName; protected ArrayList triggers; //List<TriggerWrapper> protected final Object lock = new Object(); protected HashSet pausedTriggerGroups; protected HashSet pausedJobGroups; protected HashSet blockedJobs; protected long misfireThreshold; protected SchedulerSignaler signaler; private final Logger log = LoggerFactory.getLogger(getClass()); private static final AtomicLong ftrCtr = new AtomicLong(System.currentTimeMillis()); }
TriggerKey与JobKey包装类
class TriggerWrapper { TriggerWrapper(OperableTrigger trigger) { state = 0; if(trigger == null) { throw new IllegalArgumentException("Trigger cannot be null!"); } else { this.trigger = trigger; key = trigger.getKey(); jobKey = trigger.getJobKey(); return; } } public boolean equals(Object obj) { if(obj instanceof TriggerWrapper) { TriggerWrapper tw = (TriggerWrapper)obj; if(tw.key.equals(key)) return true; } return false; } public int hashCode() { return key.hashCode(); } public OperableTrigger getTrigger() { return trigger; } public final TriggerKey key; public final JobKey jobKey; public final OperableTrigger trigger; public int state; public static final int STATE_WAITING = 0; public static final int STATE_ACQUIRED = 1; public static final int STATE_EXECUTING = 2; public static final int STATE_COMPLETE = 3; public static final int STATE_PAUSED = 4; public static final int STATE_BLOCKED = 5; public static final int STATE_PAUSED_BLOCKED = 6; public static final int STATE_ERROR = 7; }
JobKey与JobDetail包装类
class JobWrapper { JobWrapper(JobDetail jobDetail) { this.jobDetail = jobDetail; key = jobDetail.getKey(); } public boolean equals(Object obj) { if(obj instanceof JobWrapper) { JobWrapper jw = (JobWrapper)obj; if(jw.key.equals(key)) return true; } return false; } public int hashCode() { return key.hashCode(); } public JobKey key; public JobDetail jobDetail; }
//JobDetail
public class JobDetailImpl implements Cloneable, Serializable, JobDetail { private static final long serialVersionUID = -6069784757781506897L; private String name; private String group; private String description; private Class jobClass; private JobDataMap jobDataMap;//拥有共享数据,就是一个Map private boolean durability; private boolean shouldRecover; private transient JobKey key; }
//JobKey
public final class JobKey extends Key { public JobKey(String name) { super(name, null); } public JobKey(String name, String group) { super(name, group); } public static JobKey jobKey(String name) { return new JobKey(name, null); } public static JobKey jobKey(String name, String group) { return new JobKey(name, group); } private static final long serialVersionUID = -6073883950062574010L; }
//TriggerKey
public final class TriggerKey extends Key { public TriggerKey(String name) { super(name, null); } public TriggerKey(String name, String group) { super(name, group); } public static TriggerKey triggerKey(String name) { return new TriggerKey(name, null); } public static TriggerKey triggerKey(String name, String group) { return new TriggerKey(name, group); } private static final long serialVersionUID = 8070357886703449660L; }
//Key
package org.quartz.utils; import java.io.Serializable; import java.util.UUID; public class Key implements Serializable, Comparable { public Key(String name, String group) { if(name == null) throw new IllegalArgumentException("Name cannot be null."); this.name = name; if(group != null) this.group = group; else this.group = "DEFAULT"; } public String getName() { return name; } public String getGroup() { return group; } private static final long serialVersionUID = -7141167957642391350L; public static final String DEFAULT_GROUP = "DEFAULT"; private final String name; private final String group; }
总结:
首先从Scheduler.scheduleJob(JobDetail jobDetail, Trigger trigger) 调度job,
实际上就是将job存储到RAM中的jobsByGroup,jobsByKey对应的Map中, 将trigger存储到
triggers(List),triggersByKey,triggersByGroup对应的Map中,及timeTriggers的Treeset中。
Scheduler.unscheduleJob(TriggerKey triggerKey) 就是将triggerKey从triggersByKey,triggersByGroup,
triggers,timeTriggers中移除;Scheduler.deleteJob(JobKey jobKey)
除了从容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>,并unscheduleJob
(TriggerKey triggerKey)列表 List<TriggerWrapper>中的所有TriggerWrapper,同时从jobsByKey,jobsByGroup
的移除对应jobKey的相关信息
发表评论
-
TreeSet在Quartz任务调度过程中的作用
2017-08-24 23:43 740红黑树详解:http://www.cnblogs.com/sk ... -
Quartz使用与Spring集成系列教程
2016-10-26 09:48 511Quartz的使用:http://donald-draper. ... -
Spring与Quartz集成-源码分析
2016-09-13 11:50 2726在阅读以下文章之前,如果对Quartz任务调度不是很熟悉,请看 ... -
Spring与Quartz集成详解
2016-09-09 17:52 2826首先这个所有的依赖包就不需要多讲了,首先下载Quazrt发布包 ... -
Quartz 任务存储JobStoreTX 持久化之RDB-源码分析
2016-09-08 18:24 2338Quartz 任务存储JobStoreTX 持久化之RDB:h ... -
Quartz 任务存储JobStoreTX 持久化之RDB
2016-09-08 11:52 2847Quartz储存方式之JDBC JobStoreTX:http ... -
Quartz任务调度源码分析
2016-09-07 13:12 3580Quartz的使用:http://donald-draper. ... -
Quartz的job调度源码分析
2016-09-06 13:03 8Quartz的使用:http://donald ... -
Quartzs的job,trriger监听器源码分析
2016-09-06 11:15 1494Quartz的使用:http://donald-draper. ... -
Quartz的job、触发器的暂停与恢复源码分析
2016-09-06 09:01 5281Quartz的使用:http://donald-draper. ... -
Quartz的Scheduler初始化源码分析
2016-09-05 16:14 3613Quartz的使用:http://donald ... -
Quartz的使用
2016-09-05 11:39 1796Quartz使用总结:http://www.cnblogs.c ...
相关推荐
Java Job和触发器是Java应用程序中用于定时任务处理的关键组件,尤其在企业级应用和大数据处理中非常常见。本文将深入探讨Java中的Job和触发器,以及如何通过示例进行应用。 首先,让我们理解什么是Java Job。在...
总的来说,Oracle Job调度存储过程和触发器是数据库管理中不可或缺的工具,它们允许数据库管理员自动化许多常规任务,提高效率,减少人为错误,并确保数据的一致性和准确性。通过灵活地配置和组合这些功能,可以实现...
在Oracle数据库系统中,"Job调度存储过程"和"触发器"是两种强大的工具,用于自动化数据库维护和管理任务。本教程将深入探讨这两个概念以及它们如何协同工作以实现定时更新数据库。 首先,我们来理解"Job调度存储...
python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts可视化。python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts...
xxl-job是一个轻量级的任务调度平台,具备开源特性,其源码分析对于工程开发人员具有一定的参考价值。接下来,我们将详细介绍xxl-job的核心概念、架构特点以及源码分析过程中的关键知识点。 首先,xxl-job项目的...
- **调度中心(XXL-JOB Admin)**:管理任务调度的组件,存储任务信息,执行策略等,并负责触发任务执行。 - **作业(Job)**:需要被调度的任务,可以是任何业务逻辑,例如数据同步、报表生成等。 - **分片策略**...
- **Registry**:注册中心是Elastic-Job中用于协调各个节点的重要组件,如Zookeeper或Redis,存储作业的配置和运行状态信息。 - **Job Execution**:作业执行器负责实际的任务执行,根据分片策略在不同节点上运行...
本文将深入探讨如何将Elastic-Job Lite与Spring进行集成,以及如何利用其提供的控制台进行任务管理和监控,并对源码进行深度分析。 一、Elastic-Job Lite简介 Elastic-Job Lite是Elastic-Job的轻量级版本,适用于...
标题中的“传参数给job及job状态学习”指的是在使用Quartz Scheduler进行任务调度时,如何传递参数给Job以及理解Job的状态管理。Quartz Scheduler是一个开源的Java作业调度框架,它允许程序创建、安排和执行重复的...
以下示例创建了一个名为`tri_delete`的触发器,它会在删除`jobs`表中的记录时触发,并检查`job_id`是否为1。如果是,则引发错误并回滚事务。 ```sql CREATE TRIGGER tri_delete ON jobs FOR DELETE AS BEGIN ...
### 将xxl-job-admin、xxl-job-core、xxl-job-executor的源码引入项目中的实践 在探讨如何将这三个组件(xxl-job-admin、xxl-job-core、xxl-job-executor)有效地引入到项目中之前,我们需要先了解它们各自的作用及...
在Oracle数据库系统中,存储过程、触发器和定时器是三个关键的数据库管理工具,它们在数据处理和业务逻辑执行中扮演着重要角色。本文将详细介绍这三个概念,并结合实际例子来帮助理解它们的工作原理和应用。 1. **...
触发器是数据库中的一个特殊类型的存储过程,它会在特定事件发生时自动执行。触发器可以用来实现复杂的业务逻辑或数据完整性规则。 **查询所有用户的表:** ```sql SELECT * FROM user_tables; ``` **查询所有表:...
本篇将深入探讨XXL-JOB的核心概念、架构设计以及源码分析。 **一、XXL-JOB简介** 1. **核心概念** - **调度中心(Admin)**: 负责任务的注册、管理、监控,提供Web界面供用户操作。 - **执行器(Executor)**: ...
本文将深入解析如何利用Oracle JOB定时器来操作存储过程,包括创建、执行、查询、停止、启动以及删除JOB的全过程,并详细阐述定时器执行时间间隔的设置方法。 ### 创建表和存储过程 首先,为了演示JOB定时器的操作...
11. **源码分析**:解压“xxl-job-2.3.0”后,我们可以看到源码结构,包括Admin和Executor的模块,以及相关的配置文件和文档。通过阅读源码,可以深入了解其内部设计原理,例如任务调度算法、通信协议、数据库模型等...
总之,XXL-JOB-2.2.0源码的分析可以帮助我们理解分布式任务调度的实现细节,了解任务的分发、执行、监控等核心流程,为自定义任务处理和扩展提供指导。深入学习这些源码,不仅可以提升开发技能,也能提高解决实际...
PLSQL Developer中存储过程、存储函数和触发器的编写和管理 PLSQL Developer是一种功能强大的Oracle数据库开发工具,提供了许多功能来帮助开发者快速编写和管理存储过程、存储函数和触发器等高级数据库对象。本文将...
plsql创建存储过程并创建job定时任务执行详细笔记文档总结 在 Oracle 中,plsql 是一种强大的编程语言,可以用来创建存储过程和定时任务执行。在本文中,我们将详细介绍如何使用 plsql 创建存储过程并创建 job 定时...