- 浏览: 985060 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Quartz 任务存储JobStoreTX 持久化之RDB:http://donald-draper.iteye.com/blog/2323297
前面我们研究了一下RAMJobStore,就是将任务存在内存,但这样当程序崩溃时,触发的任务容易丢失,而Quartz 任务存储JobStoreTX 持久化之RDB可以解决这个问题,今天我们就来通过源码分析一下,探个究竟。
//标准调度器工场
//数据库连接管理器
//数据库连接池 PoolingConnectionProvider
//RDB持久化存储器
//jdbc标准代理
//Cron表达式触发器持久化类
//操作相关表的语句常量
//数据库表明,及字段名常量
总结:
RDB持久化实际上就是将触发器,任务存储在数据中的触发器表和任务表中;
暂停,恢复任务是通过暂定和恢复与jobKey相关的触发器,即修改触发器表中Trriger的状态;
调度任务时,从触发任务表中获取触发任务出发时间小于当前时间的触发任务,加入到触发任务就绪表中,
当触发任务执行完成时,从触发任务就绪表中删除完成的触发任务,然后,查看触发任务的触发器下一触发时间,
如果触发器下一刻触发时间为null,则从触发任务表中删除触发任务,否则,更新触发任务触发时间,从触发任务表,
删除旧触发任务,添加新的触发任务到触发任务表;一个时间窗口过后,重新执行以上调度步骤。
前面我们研究了一下RAMJobStore,就是将任务存在内存,但这样当程序崩溃时,触发的任务容易丢失,而Quartz 任务存储JobStoreTX 持久化之RDB可以解决这个问题,今天我们就来通过源码分析一下,探个究竟。
//标准调度器工场
public class StdSchedulerFactory implements SchedulerFactory { //初始化Quartz属性,线程池,线程数,线程优先级,job存储容器 private Scheduler instantiate() throws SchedulerException { DBConnectionManager dbMgr;//数据库连接管理器 //初始化存储器 String jsClass = cfg.getStringProperty("org.quartz.jobStore.class", org/quartz/simpl/RAMJobStore.getName()); try { js = (JobStore)loadHelper.loadClass(jsClass).newInstance(); } //加载lockHandler tProps = cfg.getPropertyGroup("org.quartz.jobStore", true, new String[] { "org.quartz.jobStore.lockHandler" }); try { setBeanProps(js, tProps); } if(js instanceof JobStoreSupport) { //加载lockHandlerClass String lockHandlerClass = cfg.getStringProperty("org.quartz.jobStore.lockHandler.class"); if(lockHandlerClass != null) try { Semaphore lockHandler = (Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance(); tProps = cfg.getPropertyGroup("org.quartz.jobStore.lockHandler", true); if(lockHandler instanceof TablePrefixAware) { tProps.setProperty("tablePrefix", ((JobStoreSupport)js).getTablePrefix()); tProps.setProperty("schedName", schedName); } try { setBeanProps(lockHandler, tProps); } ((JobStoreSupport)js).setLockHandler(lockHandler); getLog().info((new StringBuilder()).append("Using custom data access locking (synchronization): ").append(lockHandlerClass).toString()); } } //通过PropertiesParser-cfg获取db数据源配置相关属性 String dsNames[] = cfg.getPropertyGroups("org.quartz.dataSource"); for(int i = 0; i < dsNames.length; i++) { PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup((new StringBuilder()).append("org.quartz.dataSource.").append(dsNames[i]).toString(), true)); //获取数据连接管理器 String cpClass = pp.getStringProperty("connectionProvider.class", null); if(cpClass != null) { ConnectionProvider cp = null; try { //加载ConnectionProvider cp = (ConnectionProvider)loadHelper.loadClass(cpClass).newInstance(); } try { pp.getUnderlyingProperties().remove("connectionProvider.class"); //设置数据源属性 setBeanProps(cp, pp.getUnderlyingProperties()); //初始化数据源 cp.initialize(); } dbMgr = DBConnectionManager.getInstance(); dbMgr.addConnectionProvider(dsNames[i], cp); continue; } //配置驱动,url String dsDriver = pp.getStringProperty("driver"); String dsURL = pp.getStringProperty("URL"); try { PoolingConnectionProvider cp = new PoolingConnectionProvider(pp.getUnderlyingProperties()); dbMgr = DBConnectionManager.getInstance(); //添加数据源到数据库连接管理器 dbMgr.addConnectionProvider(dsNames[i], cp); continue; } } //获取前缀为prefix的属性 public String[] getPropertyGroups(String prefix) { Enumeration keys = props.propertyNames(); HashSet groups = new HashSet(10); if(!prefix.endsWith(".")) prefix = (new StringBuilder()).append(prefix).append(".").toString(); do { if(!keys.hasMoreElements()) break; String key = (String)keys.nextElement(); if(key.startsWith(prefix)) { String groupName = key.substring(prefix.length(), key.indexOf('.', prefix.length())); groups.add(groupName); } } while(true); return (String[])(String[])groups.toArray(new String[groups.size()]); } private void setBeanProps(Object obj, Properties props) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, IntrospectionException, SchedulerConfigException { props.remove("class"); BeanInfo bi = Introspector.getBeanInfo(obj.getClass()); //获取bean的属性描述 PropertyDescriptor propDescs[] = bi.getPropertyDescriptors(); PropertiesParser pp = new PropertiesParser(props); for(Enumeration keys = props.keys(); keys.hasMoreElements();) { String name = (String)keys.nextElement(); String c = name.substring(0, 1).toUpperCase(Locale.US); String methName = (new StringBuilder()).append("set").append(c).append(name.substring(1)).toString(); //获取方法名为methName的Method Method setMeth = getSetMethod(methName, propDescs); try { if(setMeth == null) throw new NoSuchMethodException((new StringBuilder()).append("No setter for property '").append(name).append("'").toString()); Class params[] = setMeth.getParameterTypes(); if(params.length != 1) throw new NoSuchMethodException((new StringBuilder()).append("No 1-argument setter for property '").append(name).append("'").toString()); PropertiesParser refProps = pp; String refName = pp.getStringProperty(name); if(refName != null && refName.startsWith("$@")) { refName = refName.substring(2); refProps = cfg; } else { refName = name; } //根据不同类型设值 if(params[0].equals(Integer.TYPE)) //Method.invoke(Object) setMeth.invoke(obj, new Object[] { Integer.valueOf(refProps.getIntProperty(refName)) }); if(params[0].equals(Long.TYPE)) setMeth.invoke(obj, new Object[] { Long.valueOf(refProps.getLongProperty(refName)) }); if(params[0].equals(Float.TYPE)) setMeth.invoke(obj, new Object[] { Float.valueOf(refProps.getFloatProperty(refName)) }); if(params[0].equals(Double.TYPE)) setMeth.invoke(obj, new Object[] { Double.valueOf(refProps.getDoubleProperty(refName)) }); if(params[0].equals(Boolean.TYPE)) setMeth.invoke(obj, new Object[] { Boolean.valueOf(refProps.getBooleanProperty(refName)) }); if(params[0].equals(java/lang/String)) setMeth.invoke(obj, new Object[] { refProps.getStringProperty(refName) }); } } } //获取方法名为name的Method private Method getSetMethod(String name, PropertyDescriptor props[]) { for(int i = 0; i < props.length; i++) { Method wMeth = props[i].getWriteMethod(); if(wMeth != null && wMeth.getName().equals(name)) return wMeth; } return null; } }
//数据库连接管理器
public class DBConnectionManager{ //获取DBConnectionManager单例instance public static DBConnectionManager getInstance() { return instance; } //添加数据源到数据库连接管理器 public void addConnectionProvider(String dataSourceName, ConnectionProvider provider) { providers.put(dataSourceName, provider); } //获取连接 public Connection getConnection(String dsName) throws SQLException { ConnectionProvider provider = (ConnectionProvider)providers.get(dsName); if(provider == null) throw new SQLException((new StringBuilder()).append("There is no DataSource named '").append(dsName).append("'").toString()); else return provider.getConnection(); } private static DBConnectionManager instance = new DBConnectionManager(); private HashMap providers;//HashMap<String,ConnectionProvider>,key为dataSourceName }
//数据库连接池 PoolingConnectionProvider
public class PoolingConnectionProvider implements ConnectionProvider { //根据config构造数据源 public PoolingConnectionProvider(Properties config) throws SchedulerException, SQLException { PropertiesParser cfg = new PropertiesParser(config); initialize(cfg.getStringProperty("driver"), cfg.getStringProperty("URL"), cfg.getStringProperty("user", ""), cfg.getStringProperty("password", ""), cfg.getIntProperty("maxConnections", 10), cfg.getIntProperty("maxCachedStatementsPerConnection", 120), cfg.getStringProperty("validationQuery"), cfg.getBooleanProperty("validateOnCheckout", false), cfg.getIntProperty("idleConnectionValidationSeconds", 50), cfg.getIntProperty("discardIdleConnectionsSeconds", 0)); } //初始化数据源 private void initialize(String dbDriver, String dbURL, String dbUser, String dbPassword, int maxConnections, int maxStatementsPerConnection, String dbValidationQuery, boolean validateOnCheckout, int idleValidationSeconds, int maxIdleSeconds) throws SQLException, SchedulerException { if(dbURL == null) throw new SQLException("DBPool could not be created: DB URL cannot be null"); if(dbDriver == null) throw new SQLException((new StringBuilder()).append("DBPool '").append(dbURL).append("' could not be created: ").append("DB driver class name cannot be null!").toString()); if(maxConnections < 0) throw new SQLException((new StringBuilder()).append("DBPool '").append(dbURL).append("' could not be created: ").append("Max connections must be greater than zero!").toString()); datasource = new ComboPooledDataSource(); try { datasource.setDriverClass(dbDriver); } catch(PropertyVetoException e) { throw new SchedulerException((new StringBuilder()).append("Problem setting driver class name on datasource: ").append(e.getMessage()).toString(), e); } datasource.setJdbcUrl(dbURL); datasource.setUser(dbUser); datasource.setPassword(dbPassword); datasource.setMaxPoolSize(maxConnections); datasource.setMinPoolSize(1); datasource.setMaxIdleTime(maxIdleSeconds); datasource.setMaxStatementsPerConnection(maxStatementsPerConnection); if(dbValidationQuery != null) { datasource.setPreferredTestQuery(dbValidationQuery); if(!validateOnCheckout) datasource.setTestConnectionOnCheckin(true); else datasource.setTestConnectionOnCheckout(true); datasource.setIdleConnectionTestPeriod(idleValidationSeconds); } } public void initialize() throws SQLException { } public static final String DB_DRIVER = "driver"; public static final String DB_URL = "URL"; public static final String DB_USER = "user"; public static final String DB_PASSWORD = "password"; public static final String DB_MAX_CONNECTIONS = "maxConnections"; public static final String DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = "maxCachedStatementsPerConnection"; public static final String DB_VALIDATION_QUERY = "validationQuery"; public static final String DB_IDLE_VALIDATION_SECONDS = "idleConnectionValidationSeconds"; public static final String DB_VALIDATE_ON_CHECKOUT = "validateOnCheckout"; private static final String DB_DISCARD_IDLE_CONNECTIONS_SECONDS = "discardIdleConnectionsSeconds"; public static final int DEFAULT_DB_MAX_CONNECTIONS = 10; public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120; private ComboPooledDataSource datasource;//Quartz数据库连接池默认为c3p0 }
//RDB持久化存储器
public class JobStoreTX extends JobStoreSupport { public void initialize(ClassLoadHelper classLoadHelper, SchedulerSignaler schedSignaler) throws SchedulerConfigException { super.initialize(classLoadHelper, schedSignaler); getLog().info("JobStoreTX initialized."); } protected Connection getNonManagedTXConnection() throws JobPersistenceException { return getConnection(); } //加锁执行txCallback,锁名lockName protected Object executeInLock(String lockName, JobStoreSupport.TransactionCallback txCallback) throws JobPersistenceException { return executeInNonManagedTXLock(lockName, txCallback, null); } } public abstract class JobStoreSupport implements JobStore, Constants { //默认构造 public JobStoreSupport() { tablePrefix = "QRTZ_"; useProperties = false; delegateClass = org/quartz/impl/jdbcjobstore/StdJDBCDelegate; calendarCache = new HashMap(); misfireThreshold = 60000L; dontSetAutoCommitFalse = false; isClustered = false; useDBLocks = false; lockOnInsert = true; lockHandler = null; selectWithLockSQL = null; clusterCheckinInterval = 7500L; clusterManagementThread = null; misfireHandler = null; maxToRecoverAtATime = 20; setTxIsolationLevelSequential = false; acquireTriggersWithinLock = false; dbRetryInterval = 15000L; makeThreadsDaemons = false; threadsInheritInitializersClassLoadContext = false; initializersLoader = null; doubleCheckLockMisfireHandler = true; threadExecutor = new DefaultThreadExecutor(); schedulerRunning = false; shutdown = false; sigChangeForTxCompletion = new ThreadLocal(); firstCheckIn = true; lastCheckin = System.currentTimeMillis(); } //QuaztSheduler.scheduleJob(JobDetail jobDetail, Trigger trigger) //resources.getJobStore().storeJobAndTrigger(jobDetail, trig); //存储jobDetail,trriger public void storeJobAndTrigger(final JobDetail newJob, final OperableTrigger newTrigger) throws JobPersistenceException { //调用JobStoreTX的executeInLock executeInLock(isLockOnInsert() ? "TRIGGER_ACCESS" : null, new VoidTransactionCallback() { public void executeVoid(Connection conn) throws JobPersistenceException { //存储jobDetail storeJob(conn, newJob, false); //存储,trriger storeTrigger(conn, newTrigger, newJob, false, "WAITING", false, false); } final JobDetail val$newJob; final OperableTrigger val$newTrigger; final JobStoreSupport this$0; { this$0 = JobStoreSupport.this; newJob = jobdetail; newTrigger = operabletrigger; super(); } }); } //是否要加插入锁,独占锁 public boolean isLockOnInsert() { return lockOnInsert; } protected void storeJob(Connection conn, JobDetail newJob, boolean replaceExisting) throws JobPersistenceException { //检查job是否存在 boolean existingJob = jobExists(conn, newJob.getKey()); try { if(existingJob) { if(!replaceExisting) throw new ObjectAlreadyExistsException(newJob); //存在,则更新job getDelegate().updateJobDetail(conn, newJob); } else { //不存在,则插入job getDelegate().insertJobDetail(conn, newJob); } } } protected void storeTrigger(Connection conn, OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state, boolean forceState, boolean recovering) throws JobPersistenceException { //查询触发器trrgerKey是否存在 boolean existingTrigger = triggerExists(conn, newTrigger.getKey()); try { if(!forceState) { //获取触发器组暂停状态 boolean shouldBepaused = getDelegate().isTriggerGroupPaused(conn, newTrigger.getKey().getGroup()); if(!shouldBepaused) { shouldBepaused = getDelegate().isTriggerGroupPaused(conn, "_$_ALL_GROUPS_PAUSED_$_"); if(shouldBepaused) //如果暂定,则加入到触发器暂停表中 getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup()); } if(shouldBepaused && (state.equals("WAITING") || state.equals("ACQUIRED"))) state = "PAUSED"; } if(job == null) //获取job信息 job = getDelegate().selectJobDetail(conn, newTrigger.getJobKey(), getClassLoadHelper()); if(job == null) throw new JobPersistenceException((new StringBuilder()).append("The job (").append(newTrigger.getJobKey()).append(") referenced by the trigger does not exist.").toString()); if(job.isConcurrentExectionDisallowed() && !recovering) //检查job阻塞状态 state = checkBlockedState(conn, job.getKey(), state); //触发器存在则更新,否插入 if(existingTrigger) getDelegate().updateTrigger(conn, newTrigger, state, job); else getDelegate().insertTrigger(conn, newTrigger, state, job); } } //获取驱动代理 protected DriverDelegate getDelegate() throws NoSuchDelegateException { JobStoreSupport jobstoresupport = this; JVM INSTR monitorenter ; if(null == _flddelegate) try { if(delegateClassName != null) delegateClass = getClassLoadHelper().loadClass(delegateClassName, org/quartz/impl/jdbcjobstore/DriverDelegate); _flddelegate = (DriverDelegate)delegateClass.newInstance(); //初始化代理 _flddelegate.initialize(getLog(), tablePrefix, instanceName, instanceId, getClassLoadHelper(), canUseProperties(), getDriverDelegateInitString()); } return _flddelegate; } //移除job public boolean removeJob(final JobKey jobKey) throws JobPersistenceException { return ((Boolean)executeInLock("TRIGGER_ACCESS", new TransactionCallback() { public Object execute(Connection conn) throws JobPersistenceException { return removeJob(conn, jobKey) ? Boolean.TRUE : Boolean.FALSE; } final JobKey val$jobKey; final JobStoreSupport this$0; { this$0 = JobStoreSupport.this; jobKey = jobkey; super(); } })).booleanValue(); } //移除jobKey protected boolean removeJob(Connection conn, JobKey jobKey) throws JobPersistenceException { //获取jobKey所有触发器 List jobTriggers = getDelegate().selectTriggerKeysForJob(conn, jobKey); TriggerKey jobTrigger; //删除所有触发器 for(Iterator i$ = jobTriggers.iterator(); i$.hasNext(); deleteTriggerAndChildren(conn, jobTrigger)) jobTrigger = (TriggerKey)i$.next(); //删除所有job return deleteJobAndChildren(conn, jobKey); } //暂定job public void pauseJob(final JobKey jobKey) throws JobPersistenceException { executeInLock("TRIGGER_ACCESS", new VoidTransactionCallback() { public void executeVoid(Connection conn) throws JobPersistenceException { List triggers = getTriggersForJob(conn, jobKey); OperableTrigger trigger; //暂定触发器任务 for(Iterator i$ = triggers.iterator(); i$.hasNext(); pauseTrigger(conn, trigger.getKey())) trigger = (OperableTrigger)i$.next(); } }); } //暂定触发器 public void pauseTrigger(Connection conn, TriggerKey triggerKey) throws JobPersistenceException { try { String oldState = getDelegate().selectTriggerState(conn, triggerKey); if(oldState.equals("WAITING") || oldState.equals("ACQUIRED")) //如果是等待状态,则更新为暂定 getDelegate().updateTriggerState(conn, triggerKey, "PAUSED"); else if(oldState.equals("BLOCKED")) //若果触发器为阻塞,则更新状态为暂定阻塞 getDelegate().updateTriggerState(conn, triggerKey, "PAUSED_BLOCKED"); } } //恢复job public void resumeJob(final JobKey jobKey) throws JobPersistenceException { executeInLock("TRIGGER_ACCESS", new VoidTransactionCallback() { public void executeVoid(Connection conn) throws JobPersistenceException { //获取jobKey的触发器 List triggers = getTriggersForJob(conn, jobKey); OperableTrigger trigger; //恢复所有触发器 for(Iterator i$ = triggers.iterator(); i$.hasNext(); resumeTrigger(conn, trigger.getKey())) trigger = (OperableTrigger)i$.next(); } }); } public void resumeTrigger(Connection conn, TriggerKey key) throws JobPersistenceException { try { //获取触发器状态 status = getDelegate().selectTriggerStatus(conn, key); if(status == null || status.getNextFireTime() == null) return; } blocked = false; if("PAUSED_BLOCKED".equals(status.getStatus())) blocked = true; //检测任务阻塞状态 newState = checkBlockedState(conn, status.getJobKey(), "WAITING"); misfired = false; if(schedulerRunning && status.getNextFireTime().before(new Date())) //获取触发器,下一次触发的时间,并更新触发任务 misfired = updateMisfiredTrigger(conn, key, newState, true); if(!misfired) if(blocked) //如果阻塞,则更新为PAUSED_BLOCKED getDelegate().updateTriggerStateFromOtherState(conn, key, newState, "PAUSED_BLOCKED"); else //如果非阻塞,则更新为PAUSED getDelegate().updateTriggerStateFromOtherState(conn, key, newState, "PAUSED"); } //获取就绪触发任务列表 protected List acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow) throws JobPersistenceException { List acquiredTriggers; Set acquiredJobKeysForNoConcurrentExec; int currentLoopCount; long firstAcquiredTriggerFireTime; if(timeWindow < 0L) throw new IllegalArgumentException(); acquiredTriggers = new ArrayList(); acquiredJobKeysForNoConcurrentExec = new HashSet(); int MAX_DO_LOOP_RETRY = 3; currentLoopCount = 0; firstAcquiredTriggerFireTime = 0L; _L2: currentLoopCount++; List keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount); label0: { if(keys == null || keys.size() == 0) return acquiredTriggers; try { Iterator i$ = keys.iterator(); do { if(!i$.hasNext()) break; TriggerKey triggerKey = (TriggerKey)i$.next(); OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey); JobKey jobKey = nextTrigger.getJobKey(); JobDetail job = getDelegate().selectJobDetail(conn, jobKey, getClassLoadHelper()); int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, "ACQUIRED", "WAITING"); if(rowsUpdated > 0) { //插入触发任务到任务绪表 getDelegate().insertFiredTrigger(conn, nextTrigger, "ACQUIRED", null); //将nextTrigger添加到就绪集合 acquiredTriggers.add(nextTrigger); if(firstAcquiredTriggerFireTime == 0L) firstAcquiredTriggerFireTime = nextTrigger.getNextFireTime().getTime(); } } while(true); if(acquiredTriggers.size() != 0 || currentLoopCount >= 3) break label0; } } return acquiredTriggers; } //处理触发任务完成后的工作 protected void triggeredJobComplete(Connection conn, OperableTrigger trigger, JobDetail jobDetail, org.quartz.Trigger.CompletedExecutionInstruction triggerInstCode) throws JobPersistenceException { try { if(triggerInstCode == org.quartz.Trigger.CompletedExecutionInstruction.DELETE_TRIGGER) { if(trigger.getNextFireTime() == null) { TriggerStatus stat = getDelegate().selectTriggerStatus(conn, trigger.getKey()); if(stat != null && stat.getNextFireTime() == null) //触发器 removeTrigger(conn, trigger.getKey()); } else { //移除触发任务 removeTrigger(conn, trigger.getKey()); //通知调度器,调度任务完成 signalSchedulingChangeOnTxCompletion(0L); } } else if(triggerInstCode == org.quartz.Trigger.CompletedExecutionInstruction.SET_TRIGGER_COMPLETE) { //更新触发器为完成状态,并产生触发器完成通知事件 getDelegate().updateTriggerState(conn, trigger.getKey(), "COMPLETE"); signalSchedulingChangeOnTxCompletion(0L); } else if(triggerInstCode == org.quartz.Trigger.CompletedExecutionInstruction.SET_TRIGGER_ERROR) { getLog().info((new StringBuilder()).append("Trigger ").append(trigger.getKey()).append(" set to ERROR state.").toString()); //更新触发器为错误状态,并产生触发器执行错误通知事件 getDelegate().updateTriggerState(conn, trigger.getKey(), "ERROR"); signalSchedulingChangeOnTxCompletion(0L); } else if(triggerInstCode == org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_COMPLETE) { //更新job为完成状态,并产生job完成通知事件 getDelegate().updateTriggerStatesForJob(conn, trigger.getJobKey(), "COMPLETE"); signalSchedulingChangeOnTxCompletion(0L); } else if(triggerInstCode == org.quartz.Trigger.CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR) { getLog().info((new StringBuilder()).append("All triggers of Job ").append(trigger.getKey()).append(" set to ERROR state.").toString()); //更新job为错误状态,并产生job执行错误通知事件 getDelegate().updateTriggerStatesForJob(conn, trigger.getJobKey(), "ERROR"); signalSchedulingChangeOnTxCompletion(0L); } if(jobDetail.isConcurrentExectionDisallowed()) { //如果job不允许并发执行,则暂定job getDelegate().updateTriggerStatesForJobFromOtherState(conn, jobDetail.getKey(), "WAITING", "BLOCKED"); getDelegate().updateTriggerStatesForJobFromOtherState(conn, jobDetail.getKey(), "PAUSED", "PAUSED_BLOCKED"); signalSchedulingChangeOnTxCompletion(0L); } //job执行完,更新job数据,持久化 if(jobDetail.isPersistJobDataAfterExecution()) try { if(jobDetail.getJobDataMap().isDirty()) getDelegate().updateJobData(conn, jobDetail); } } try { //从触发任务就绪表删除任务 getDelegate().deleteFiredTrigger(conn, trigger.getFireInstanceId()); } } //事务执行回调接口 protected abstract class VoidTransactionCallback implements TransactionCallback { public final Void execute(Connection conn) throws JobPersistenceException { executeVoid(conn); return null; } abstract void executeVoid(Connection connection) throws JobPersistenceException; public volatile Object execute(Connection x0) throws JobPersistenceException { return execute(x0); } final JobStoreSupport this$0; protected VoidTransactionCallback() { this$0 = JobStoreSupport.this; super(); } } protected static final String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS"; protected static final String LOCK_STATE_ACCESS = "STATE_ACCESS"; protected String dsName; protected String tablePrefix; protected boolean useProperties; protected String instanceId; protected String instanceName; protected String delegateClassName; protected String delegateInitString; protected Class delegateClass; protected HashMap calendarCache; private DriverDelegate _flddelegate; private long misfireThreshold; private boolean dontSetAutoCommitFalse; private boolean isClustered; private boolean useDBLocks; private boolean lockOnInsert;// private Semaphore lockHandler; private String selectWithLockSQL; private long clusterCheckinInterval; private ClusterManager clusterManagementThread; private MisfireHandler misfireHandler; private ClassLoadHelper classLoadHelper; private SchedulerSignaler schedSignaler; protected int maxToRecoverAtATime; private boolean setTxIsolationLevelSequential; private boolean acquireTriggersWithinLock; private long dbRetryInterval; private boolean makeThreadsDaemons; private boolean threadsInheritInitializersClassLoadContext; private ClassLoader initializersLoader; private boolean doubleCheckLockMisfireHandler; private final Logger log = LoggerFactory.getLogger(getClass()); private ThreadExecutor threadExecutor; private volatile boolean schedulerRunning; private volatile boolean shutdown; private static long ftrCtr = System.currentTimeMillis(); protected ThreadLocal sigChangeForTxCompletion; protected boolean firstCheckIn; protected long lastCheckin; }
//jdbc标准代理
public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants { //初始化 public void initialize(Logger logger, String tablePrefix, String schedName, String instanceId, ClassLoadHelper classLoadHelper, boolean useProperties, String initString) throws NoSuchDelegateException { this.logger = logger; this.tablePrefix = tablePrefix; this.schedName = schedName; this.instanceId = instanceId; this.useProperties = useProperties; this.classLoadHelper = classLoadHelper; //添加默认触发器代理 addDefaultTriggerPersistenceDelegates(); if(initString == null) return; String settings[] = initString.split("\\|"); String arr$[] = settings; int len$ = arr$.length; label0: for(int i$ = 0; i$ < len$; i$++) { String setting = arr$[i$]; String parts[] = setting.split("="); String name = parts[0]; if(parts.length == 1 || parts[1] == null || parts[1].equals("")) continue; if(name.equals("triggerPersistenceDelegateClasses")) { String trigDelegates[] = parts[1].split(","); String arr$[] = trigDelegates; int len$ = arr$.length; int i$ = 0; do { if(i$ >= len$) continue label0; String trigDelClassName = arr$[i$]; try { Class trigDelClass = classLoadHelper.loadClass(trigDelClassName); addTriggerPersistenceDelegate((TriggerPersistenceDelegate)trigDelClass.newInstance()); } i$++; } while(true); } } } //添加默认触发器代理 protected void addDefaultTriggerPersistenceDelegates() { //简单触发器代理 addTriggerPersistenceDelegate(new SimpleTriggerPersistenceDelegate()); //Cron表达式触发器代理 addTriggerPersistenceDelegate(new CronTriggerPersistenceDelegate()); addTriggerPersistenceDelegate(new CalendarIntervalTriggerPersistenceDelegate()); addTriggerPersistenceDelegate(new DailyTimeIntervalTriggerPersistenceDelegate()); } //插入JobDetail public int insertJobDetail(Connection conn, JobDetail job) throws IOException, SQLException { ByteArrayOutputStream baos; PreparedStatement ps; int insertResult; baos = serializeJobData(job.getJobDataMap()); ps = null; insertResult = 0; //插入job语句 ps = conn.prepareStatement(rtp("INSERT INTO {0}JOB_DETAILS (SCHED_NAME, JOB_NAME, JOB_GROUP, DESCRIPTION, JOB_CLASS_NAME, IS_DURABLE, IS_NONCONCURRENT, IS_UPDATE_DATA, REQUESTS_RECOVERY, JOB_DATA) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?)")); ps.setString(1, job.getKey().getName()); ps.setString(2, job.getKey().getGroup()); ps.setString(3, job.getDescription()); ps.setString(4, job.getJobClass().getName()); setBoolean(ps, 5, job.isDurable()); setBoolean(ps, 6, job.isConcurrentExectionDisallowed()); setBoolean(ps, 7, job.isPersistJobDataAfterExecution()); setBoolean(ps, 8, job.requestsRecovery()); setBytes(ps, 9, baos); insertResult = ps.executeUpdate(); closeStatement(ps); break MISSING_BLOCK_LABEL_187; closeStatement(ps); return insertResult; } //更新job public int updateJobDetail(Connection conn, JobDetail job) throws IOException, SQLException { ps = conn.prepareStatement(rtp("UPDATE {0}JOB_DETAILS SET DESCRIPTION = ?, JOB_CLASS_NAME = ?, IS_DURABLE = ?, IS_NONCONCURRENT = ?, IS_UPDATE_DATA = ?, REQUESTS_RECOVERY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?")); } //删除job public int deleteJobDetail(Connection conn, JobKey jobKey) throws SQLException { ps = conn.prepareStatement(rtp("DELETE FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?")); } protected String tablePrefix;//表前缀 protected String instanceId; protected String schedName; protected boolean useProperties; protected ClassLoadHelper classLoadHelper; protected List triggerPersistenceDelegates; private String schedNameLiteral; }
//Cron表达式触发器持久化类
public class CronTriggerPersistenceDelegate implements TriggerPersistenceDelegate, StdJDBCConstants { //初始化表前缀和数据库schedName public void initialize(String theTablePrefix, String schedName) { tablePrefix = theTablePrefix; schedNameLiteral = (new StringBuilder()).append("'").append(schedName).append("'").toString(); } //删除触发器 public int deleteExtendedTriggerProperties(Connection conn, TriggerKey triggerKey) throws SQLException { PreparedStatement ps = null; int i; //删除语句 ps = conn.prepareStatement(Util.rtp("DELETE FROM {0}CRON_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?", tablePrefix, schedNameLiteral)); ps.setString(1, triggerKey.getName()); ps.setString(2, triggerKey.getGroup()); i = ps.executeUpdate(); Util.closeStatement(ps); return i; Util.closeStatement(ps); } //插入触发器 public int insertExtendedTriggerProperties(Connection conn, OperableTrigger trigger, String state, JobDetail jobDetail) throws SQLException, IOException { CronTrigger cronTrigger; PreparedStatement ps; cronTrigger = (CronTrigger)trigger; ps = null; int i; ps = conn.prepareStatement(Util.rtp("INSERT INTO {0}CRON_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, CRON_EXPRESSION, TIME_ZONE_ID) VALUES({1}, ?, ?, ?, ?)", tablePrefix, schedNameLiteral)); ps.setString(1, trigger.getKey().getName()); ps.setString(2, trigger.getKey().getGroup()); ps.setString(3, cronTrigger.getCronExpression()); ps.setString(4, cronTrigger.getTimeZone().getID()); i = ps.executeUpdate(); Util.closeStatement(ps); return i; Util.closeStatement(ps); } //更新触发器 public int updateExtendedTriggerProperties(Connection conn, OperableTrigger trigger, String state, JobDetail jobDetail) throws SQLException, IOException { CronTrigger cronTrigger; PreparedStatement ps; cronTrigger = (CronTrigger)trigger; ps = null; int i; //更新语句 ps = conn.prepareStatement(Util.rtp("UPDATE {0}CRON_TRIGGERS SET CRON_EXPRESSION = ?, TIME_ZONE_ID = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?", tablePrefix, schedNameLiteral)); ps.setString(1, cronTrigger.getCronExpression()); ps.setString(2, cronTrigger.getTimeZone().getID()); ps.setString(3, trigger.getKey().getName()); ps.setString(4, trigger.getKey().getGroup()); i = ps.executeUpdate(); Util.closeStatement(ps); return i; } protected String tablePrefix; protected String schedNameLiteral; }
//操作相关表的语句常量
public interface StdJDBCConstants extends Constants { public static final String TABLE_PREFIX_SUBST = "{0}"; public static final String SCHED_NAME_SUBST = "{1}"; public static final String UPDATE_TRIGGER_STATES_FROM_OTHER_STATES = "UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND (TRIGGER_STATE = ? OR TRIGGER_STATE = ?)"; public static final String SELECT_MISFIRED_TRIGGERS = "SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"; public static final String SELECT_TRIGGERS_IN_STATE = "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ?"; public static final String SELECT_MISFIRED_TRIGGERS_IN_STATE = "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"; public static final String COUNT_MISFIRED_TRIGGERS_IN_STATE = "SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ?"; public static final String SELECT_HAS_MISFIRED_TRIGGERS_IN_STATE = "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"; public static final String SELECT_MISFIRED_TRIGGERS_IN_GROUP_IN_STATE = "SELECT TRIGGER_NAME FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"; public static final String DELETE_FIRED_TRIGGERS = "DELETE FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1}"; public static final String INSERT_JOB_DETAIL = "INSERT INTO {0}JOB_DETAILS (SCHED_NAME, JOB_NAME, JOB_GROUP, DESCRIPTION, JOB_CLASS_NAME, IS_DURABLE, IS_NONCONCURRENT, IS_UPDATE_DATA, REQUESTS_RECOVERY, JOB_DATA) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; public static final String UPDATE_JOB_DETAIL = "UPDATE {0}JOB_DETAILS SET DESCRIPTION = ?, JOB_CLASS_NAME = ?, IS_DURABLE = ?, IS_NONCONCURRENT = ?, IS_UPDATE_DATA = ?, REQUESTS_RECOVERY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String SELECT_TRIGGERS_FOR_JOB = "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String SELECT_TRIGGERS_FOR_CALENDAR = "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?"; public static final String DELETE_JOB_DETAIL = "DELETE FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String SELECT_JOB_NONCONCURRENT = "SELECT IS_NONCONCURRENT FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String SELECT_JOB_EXISTENCE = "SELECT JOB_NAME FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String UPDATE_JOB_DATA = "UPDATE {0}JOB_DETAILS SET JOB_DATA = ? WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String SELECT_JOB_DETAIL = "SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String SELECT_NUM_JOBS = "SELECT COUNT(JOB_NAME) FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1}"; public static final String SELECT_JOB_GROUPS = "SELECT DISTINCT(JOB_GROUP) FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1}"; public static final String SELECT_JOBS_IN_GROUP_LIKE = "SELECT JOB_NAME, JOB_GROUP FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_GROUP LIKE ?"; public static final String SELECT_JOBS_IN_GROUP = "SELECT JOB_NAME, JOB_GROUP FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_GROUP = ?"; public static final String INSERT_TRIGGER = "INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; public static final String INSERT_SIMPLE_TRIGGER = "INSERT INTO {0}SIMPLE_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, REPEAT_COUNT, REPEAT_INTERVAL, TIMES_TRIGGERED) VALUES({1}, ?, ?, ?, ?, ?)"; public static final String INSERT_CRON_TRIGGER = "INSERT INTO {0}CRON_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, CRON_EXPRESSION, TIME_ZONE_ID) VALUES({1}, ?, ?, ?, ?)"; public static final String INSERT_BLOB_TRIGGER = "INSERT INTO {0}BLOB_TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, BLOB_DATA) VALUES({1}, ?, ?, ?)"; public static final String UPDATE_TRIGGER_SKIP_DATA = "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String UPDATE_TRIGGER = "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String UPDATE_SIMPLE_TRIGGER = "UPDATE {0}SIMPLE_TRIGGERS SET REPEAT_COUNT = ?, REPEAT_INTERVAL = ?, TIMES_TRIGGERED = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String UPDATE_CRON_TRIGGER = "UPDATE {0}CRON_TRIGGERS SET CRON_EXPRESSION = ?, TIME_ZONE_ID = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String UPDATE_BLOB_TRIGGER = "UPDATE {0}BLOB_TRIGGERS SET BLOB_DATA = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String SELECT_TRIGGER_EXISTENCE = "SELECT TRIGGER_NAME FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String UPDATE_TRIGGER_STATE = "UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String UPDATE_TRIGGER_STATE_FROM_STATE = "UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ?"; public static final String UPDATE_TRIGGER_GROUP_STATE_FROM_STATE = "UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_GROUP LIKE ? AND TRIGGER_STATE = ?"; public static final String UPDATE_TRIGGER_STATE_FROM_STATES = "UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND (TRIGGER_STATE = ? OR TRIGGER_STATE = ? OR TRIGGER_STATE = ?)"; public static final String UPDATE_TRIGGER_GROUP_STATE_FROM_STATES = "UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_GROUP LIKE ? AND (TRIGGER_STATE = ? OR TRIGGER_STATE = ? OR TRIGGER_STATE = ?)"; public static final String UPDATE_JOB_TRIGGER_STATES = "UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String UPDATE_JOB_TRIGGER_STATES_FROM_OTHER_STATE = "UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ? AND TRIGGER_STATE = ?"; public static final String DELETE_SIMPLE_TRIGGER = "DELETE FROM {0}SIMPLE_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String DELETE_CRON_TRIGGER = "DELETE FROM {0}CRON_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String DELETE_BLOB_TRIGGER = "DELETE FROM {0}BLOB_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String DELETE_TRIGGER = "DELETE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String SELECT_NUM_TRIGGERS_FOR_JOB = "SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String SELECT_JOB_FOR_TRIGGER = "SELECT J.JOB_NAME, J.JOB_GROUP, J.IS_DURABLE, J.JOB_CLASS_NAME, J.REQUESTS_RECOVERY FROM {0}TRIGGERS T, {0}JOB_DETAILS J WHERE T.SCHED_NAME = {1} AND J.SCHED_NAME = {1} AND T.TRIGGER_NAME = ? AND T.TRIGGER_GROUP = ? AND T.JOB_NAME = J.JOB_NAME AND T.JOB_GROUP = J.JOB_GROUP"; public static final String SELECT_TRIGGER = "SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String SELECT_TRIGGER_DATA = "SELECT JOB_DATA FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String SELECT_TRIGGER_STATE = "SELECT TRIGGER_STATE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String SELECT_TRIGGER_STATUS = "SELECT TRIGGER_STATE, NEXT_FIRE_TIME, JOB_NAME, JOB_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String SELECT_SIMPLE_TRIGGER = "SELECT * FROM {0}SIMPLE_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String SELECT_CRON_TRIGGER = "SELECT * FROM {0}CRON_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String SELECT_BLOB_TRIGGER = "SELECT * FROM {0}BLOB_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String SELECT_NUM_TRIGGERS = "SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1}"; public static final String SELECT_NUM_TRIGGERS_IN_GROUP = "SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_GROUP = ?"; public static final String SELECT_TRIGGER_GROUPS = "SELECT DISTINCT(TRIGGER_GROUP) FROM {0}TRIGGERS WHERE SCHED_NAME = {1}"; public static final String SELECT_TRIGGER_GROUPS_FILTERED = "SELECT DISTINCT(TRIGGER_GROUP) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_GROUP LIKE ?"; public static final String SELECT_TRIGGERS_IN_GROUP_LIKE = "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_GROUP LIKE ?"; public static final String SELECT_TRIGGERS_IN_GROUP = "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_GROUP = ?"; public static final String INSERT_CALENDAR = "INSERT INTO {0}CALENDARS (SCHED_NAME, CALENDAR_NAME, CALENDAR) VALUES({1}, ?, ?)"; public static final String UPDATE_CALENDAR = "UPDATE {0}CALENDARS SET CALENDAR = ? WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?"; public static final String SELECT_CALENDAR_EXISTENCE = "SELECT CALENDAR_NAME FROM {0}CALENDARS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?"; public static final String SELECT_CALENDAR = "SELECT * FROM {0}CALENDARS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?"; public static final String SELECT_REFERENCED_CALENDAR = "SELECT CALENDAR_NAME FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?"; public static final String DELETE_CALENDAR = "DELETE FROM {0}CALENDARS WHERE SCHED_NAME = {1} AND CALENDAR_NAME = ?"; public static final String SELECT_NUM_CALENDARS = "SELECT COUNT(CALENDAR_NAME) FROM {0}CALENDARS WHERE SCHED_NAME = {1}"; public static final String SELECT_CALENDARS = "SELECT CALENDAR_NAME FROM {0}CALENDARS WHERE SCHED_NAME = {1}"; public static final String SELECT_NEXT_FIRE_TIME = "SELECT MIN(NEXT_FIRE_TIME) AS ALIAS_NXT_FR_TM FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME >= 0"; public static final String SELECT_TRIGGER_FOR_FIRE_TIME = "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME = ?"; public static final String SELECT_NEXT_TRIGGER_TO_ACQUIRE = "SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"; public static final String INSERT_FIRED_TRIGGER = "INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; public static final String UPDATE_FIRED_TRIGGER = "UPDATE {0}FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = {1} AND ENTRY_ID = ?"; public static final String SELECT_INSTANCES_FIRED_TRIGGERS = "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ?"; public static final String SELECT_INSTANCES_RECOVERABLE_FIRED_TRIGGERS = "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ? AND REQUESTS_RECOVERY = ?"; public static final String SELECT_JOB_EXECUTION_COUNT = "SELECT COUNT(TRIGGER_NAME) FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String SELECT_FIRED_TRIGGERS = "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1}"; public static final String SELECT_FIRED_TRIGGER = "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"; public static final String SELECT_FIRED_TRIGGER_GROUP = "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_GROUP = ?"; public static final String SELECT_FIRED_TRIGGERS_OF_JOB = "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"; public static final String SELECT_FIRED_TRIGGERS_OF_JOB_GROUP = "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND JOB_GROUP = ?"; public static final String DELETE_FIRED_TRIGGER = "DELETE FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND ENTRY_ID = ?"; public static final String DELETE_INSTANCES_FIRED_TRIGGERS = "DELETE FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ?"; public static final String DELETE_NO_RECOVERY_FIRED_TRIGGERS = "DELETE FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ?REQUESTS_RECOVERY = ?"; public static final String DELETE_ALL_SIMPLE_TRIGGERS = "DELETE FROM {0}SIMPLE_TRIGGERS WHERE SCHED_NAME = {1}"; public static final String DELETE_ALL_SIMPROP_TRIGGERS = "DELETE FROM {0}SIMPROP_TRIGGERS WHERE SCHED_NAME = {1}"; public static final String DELETE_ALL_CRON_TRIGGERS = "DELETE FROM {0}CRON_TRIGGERS WHERE SCHED_NAME = {1}"; public static final String DELETE_ALL_BLOB_TRIGGERS = "DELETE FROM {0}BLOB_TRIGGERS WHERE SCHED_NAME = {1}"; public static final String DELETE_ALL_TRIGGERS = "DELETE FROM {0}TRIGGERS WHERE SCHED_NAME = {1}"; public static final String DELETE_ALL_JOB_DETAILS = "DELETE FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1}"; public static final String DELETE_ALL_CALENDARS = "DELETE FROM {0}CALENDARS WHERE SCHED_NAME = {1}"; public static final String DELETE_ALL_PAUSED_TRIGGER_GRPS = "DELETE FROM {0}PAUSED_TRIGGER_GRPS WHERE SCHED_NAME = {1}"; public static final String SELECT_FIRED_TRIGGER_INSTANCE_NAMES = "SELECT DISTINCT INSTANCE_NAME FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1}"; public static final String INSERT_SCHEDULER_STATE = "INSERT INTO {0}SCHEDULER_STATE (SCHED_NAME, INSTANCE_NAME, LAST_CHECKIN_TIME, CHECKIN_INTERVAL) VALUES({1}, ?, ?, ?)"; public static final String SELECT_SCHEDULER_STATE = "SELECT * FROM {0}SCHEDULER_STATE WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ?"; public static final String SELECT_SCHEDULER_STATES = "SELECT * FROM {0}SCHEDULER_STATE WHERE SCHED_NAME = {1}"; public static final String DELETE_SCHEDULER_STATE = "DELETE FROM {0}SCHEDULER_STATE WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ?"; public static final String UPDATE_SCHEDULER_STATE = "UPDATE {0}SCHEDULER_STATE SET LAST_CHECKIN_TIME = ? WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ?"; public static final String INSERT_PAUSED_TRIGGER_GROUP = "INSERT INTO {0}PAUSED_TRIGGER_GRPS (SCHED_NAME, TRIGGER_GROUP) VALUES({1}, ?)"; public static final String SELECT_PAUSED_TRIGGER_GROUP = "SELECT TRIGGER_GROUP FROM {0}PAUSED_TRIGGER_GRPS WHERE SCHED_NAME = {1} AND TRIGGER_GROUP = ?"; public static final String SELECT_PAUSED_TRIGGER_GROUPS = "SELECT TRIGGER_GROUP FROM {0}PAUSED_TRIGGER_GRPS WHERE SCHED_NAME = {1}"; public static final String DELETE_PAUSED_TRIGGER_GROUP = "DELETE FROM {0}PAUSED_TRIGGER_GRPS WHERE SCHED_NAME = {1} AND TRIGGER_GROUP LIKE ?"; public static final String DELETE_PAUSED_TRIGGER_GROUPS = "DELETE FROM {0}PAUSED_TRIGGER_GRPS WHERE SCHED_NAME = {1}"; }
//数据库表明,及字段名常量
public interface Constants { public static final String TABLE_JOB_DETAILS = "JOB_DETAILS"; public static final String TABLE_TRIGGERS = "TRIGGERS"; public static final String TABLE_SIMPLE_TRIGGERS = "SIMPLE_TRIGGERS"; public static final String TABLE_CRON_TRIGGERS = "CRON_TRIGGERS"; public static final String TABLE_BLOB_TRIGGERS = "BLOB_TRIGGERS"; public static final String TABLE_FIRED_TRIGGERS = "FIRED_TRIGGERS"; public static final String TABLE_CALENDARS = "CALENDARS"; public static final String TABLE_PAUSED_TRIGGERS = "PAUSED_TRIGGER_GRPS"; public static final String TABLE_LOCKS = "LOCKS"; public static final String TABLE_SCHEDULER_STATE = "SCHEDULER_STATE"; public static final String COL_SCHEDULER_NAME = "SCHED_NAME"; public static final String COL_JOB_NAME = "JOB_NAME"; public static final String COL_JOB_GROUP = "JOB_GROUP"; public static final String COL_IS_DURABLE = "IS_DURABLE"; public static final String COL_IS_VOLATILE = "IS_VOLATILE"; public static final String COL_IS_NONCONCURRENT = "IS_NONCONCURRENT"; public static final String COL_IS_UPDATE_DATA = "IS_UPDATE_DATA"; public static final String COL_REQUESTS_RECOVERY = "REQUESTS_RECOVERY"; public static final String COL_JOB_DATAMAP = "JOB_DATA"; public static final String COL_JOB_CLASS = "JOB_CLASS_NAME"; public static final String COL_DESCRIPTION = "DESCRIPTION"; public static final String COL_TRIGGER_NAME = "TRIGGER_NAME"; public static final String COL_TRIGGER_GROUP = "TRIGGER_GROUP"; public static final String COL_NEXT_FIRE_TIME = "NEXT_FIRE_TIME"; public static final String COL_PREV_FIRE_TIME = "PREV_FIRE_TIME"; public static final String COL_TRIGGER_STATE = "TRIGGER_STATE"; public static final String COL_TRIGGER_TYPE = "TRIGGER_TYPE"; public static final String COL_START_TIME = "START_TIME"; public static final String COL_END_TIME = "END_TIME"; public static final String COL_PRIORITY = "PRIORITY"; public static final String COL_MISFIRE_INSTRUCTION = "MISFIRE_INSTR"; public static final String ALIAS_COL_NEXT_FIRE_TIME = "ALIAS_NXT_FR_TM"; public static final String COL_REPEAT_COUNT = "REPEAT_COUNT"; public static final String COL_REPEAT_INTERVAL = "REPEAT_INTERVAL"; public static final String COL_TIMES_TRIGGERED = "TIMES_TRIGGERED"; public static final String COL_CRON_EXPRESSION = "CRON_EXPRESSION"; public static final String COL_BLOB = "BLOB_DATA"; public static final String COL_TIME_ZONE_ID = "TIME_ZONE_ID"; public static final String COL_INSTANCE_NAME = "INSTANCE_NAME"; public static final String COL_FIRED_TIME = "FIRED_TIME"; public static final String COL_SCHED_TIME = "SCHED_TIME"; public static final String COL_ENTRY_ID = "ENTRY_ID"; public static final String COL_ENTRY_STATE = "STATE"; public static final String COL_CALENDAR_NAME = "CALENDAR_NAME"; public static final String COL_CALENDAR = "CALENDAR"; public static final String COL_LOCK_NAME = "LOCK_NAME"; public static final String COL_LAST_CHECKIN_TIME = "LAST_CHECKIN_TIME"; public static final String COL_CHECKIN_INTERVAL = "CHECKIN_INTERVAL"; public static final String DEFAULT_TABLE_PREFIX = "QRTZ_"; public static final String STATE_WAITING = "WAITING"; public static final String STATE_ACQUIRED = "ACQUIRED"; public static final String STATE_EXECUTING = "EXECUTING"; public static final String STATE_COMPLETE = "COMPLETE"; public static final String STATE_BLOCKED = "BLOCKED"; public static final String STATE_ERROR = "ERROR"; public static final String STATE_PAUSED = "PAUSED"; public static final String STATE_PAUSED_BLOCKED = "PAUSED_BLOCKED"; public static final String STATE_DELETED = "DELETED"; /** * @deprecated Field STATE_MISFIRED is deprecated */ public static final String STATE_MISFIRED = "MISFIRED"; public static final String ALL_GROUPS_PAUSED = "_$_ALL_GROUPS_PAUSED_$_"; public static final String TTYPE_SIMPLE = "SIMPLE"; public static final String TTYPE_CRON = "CRON"; public static final String TTYPE_CAL_INT = "CAL_INT"; public static final String TTYPE_DAILY_TIME_INT = "DAILY_I"; public static final String TTYPE_BLOB = "BLOB"; }
总结:
RDB持久化实际上就是将触发器,任务存储在数据中的触发器表和任务表中;
暂停,恢复任务是通过暂定和恢复与jobKey相关的触发器,即修改触发器表中Trriger的状态;
调度任务时,从触发任务表中获取触发任务出发时间小于当前时间的触发任务,加入到触发任务就绪表中,
当触发任务执行完成时,从触发任务就绪表中删除完成的触发任务,然后,查看触发任务的触发器下一触发时间,
如果触发器下一刻触发时间为null,则从触发任务表中删除触发任务,否则,更新触发任务触发时间,从触发任务表,
删除旧触发任务,添加新的触发任务到触发任务表;一个时间窗口过后,重新执行以上调度步骤。
发表评论
-
TreeSet在Quartz任务调度过程中的作用
2017-08-24 23:43 736红黑树详解:http://www.cnblogs.com/sk ... -
Quartz使用与Spring集成系列教程
2016-10-26 09:48 503Quartz的使用:http://donald-draper. ... -
Spring与Quartz集成-源码分析
2016-09-13 11:50 2723在阅读以下文章之前,如果对Quartz任务调度不是很熟悉,请看 ... -
Spring与Quartz集成详解
2016-09-09 17:52 2824首先这个所有的依赖包就不需要多讲了,首先下载Quazrt发布包 ... -
Quartz 任务存储JobStoreTX 持久化之RDB
2016-09-08 11:52 2839Quartz储存方式之JDBC JobStoreTX:http ... -
Quartz任务调度源码分析
2016-09-07 13:12 3576Quartz的使用:http://donald-draper. ... -
Quartz的job调度源码分析
2016-09-06 13:03 8Quartz的使用:http://donald ... -
Quartzs的job,trriger监听器源码分析
2016-09-06 11:15 1489Quartz的使用:http://donald-draper. ... -
Quartz的job、触发器的暂停与恢复源码分析
2016-09-06 09:01 5274Quartz的使用:http://donald-draper. ... -
Quartz的Scheduler初始化源码分析
2016-09-05 16:14 3601Quartz的使用:http://donald ... -
Quartzs的job存储,触发器、job删除源码分析
2016-09-05 15:39 4575前言:unscheduleJob针对TriggerKey,而d ... -
Quartz的使用
2016-09-05 11:39 1793Quartz使用总结:http://www.cnblogs.c ...
相关推荐
在这个基于Spring的配置中,我们将深入探讨如何在Spring应用中集成Quartz,实现任务的存储和持久化。 首先,我们需要在Spring配置文件中引入Quartz的相关bean。这通常包括`SchedulerFactoryBean`,它是Spring管理...
任务调度的持久化功能,即新增修改删除之类的功能,这必须得要有的,不然都不知道后台都有什么作业在跑
在本文中,我们将深入探讨如何在Spring Boot 2.3版本中集成Quartz定时任务,并实现其持久化到数据库,以便支持集群环境。这个过程的关键在于配置Quartz Scheduler,设置数据库连接,以及确保任务在多节点环境中能够...
这个压缩包“Quartz.net 3.0.7.0数据库持久化.rar”包含了关于如何使用Quartz.NET实现任务调度的数据库持久化和集群配置的资料。Quartz.NET 3.0.7.0是该库的一个稳定版本,它提供了许多改进和新特性。 **数据库持久...
通过分析这个目录,可以更深入地理解Quartz持久化任务的具体实现。 了解并掌握以上知识点,你就能创建和管理自己的Quartz持久化任务,实现高效稳定的定时任务执行。在实际应用中,Quartz的强大灵活性和扩展性使其...
它的持久化功能是其重要特性之一,使得在系统重启后能够恢复先前的状态,继续执行预定的任务,保证了服务的连续性和可靠性。 Quartz 提供了多种持久化策略,包括 JDBC、JPA、Hibernate 和文本文件等。这些策略允许...
### Spring中的Quartz定时任务与持久化管理 #### 一、Spring与Quartz简介 Spring框架作为Java领域中广泛使用的轻量级应用框架之一,它提供了丰富的功能支持,包括依赖注入、面向切面编程以及MVC框架等。而Quartz是...
Quartz的持久化机制依赖于一组特定的数据库表,这些表存储了Job(任务)和Trigger(触发器)的信息。以下是Quartz默认使用的数据库表: 1. QRTZ_JOB_DETAILS:存储Job的详细信息,如Job类名、是否持久化、组名等。 ...
总结,Quartz 2.2.0的JobStore持久化到数据库功能提供了可靠的任务调度解决方案,通过配置数据源和JobStore类型,可以将作业和触发器的元数据存储在数据库中,实现跨应用和跨服务器的集群调度。开发者只需关注作业和...
Quartz2.2.1版本在存储和持久化方面提供了多种机制,确保任务可以在系统重启后继续执行,增强了系统的可靠性和可扩展性。这篇博客文章详细介绍了Quartz在存储和持久化方面的实现方式和具体步骤。 首先,Quartz支持...
【标题】:Quartz界面化持久化管理 【描述】:Quartz是一个开源的作业调度框架,用于在Java应用程序中创建和执行计划任务。通过结合文章《quartz界面化持久化管理》(链接:...
在“Quartz2.0持久化到数据库”这个主题中,我们将深入探讨如何将Quartz2.0的任务调度信息存储到数据库中,以便在系统重启或故障后能够恢复任务,并提供更可靠的调度服务。 首先,理解Quartz2.0的持久化机制至关...
在实际应用中,Quartz的持久化经常与Spring框架结合,通过Spring的数据源(dataSource)来实现任务的存储和恢复。 一、Quartz持久化的意义 1. 任务恢复:当系统发生故障或重启时,能够恢复之前设置的任务,保证任务...
Quartz的定时任务持久化是指将任务的相关信息(如触发器、作业详情)存储到数据库中,而不是仅仅保留在内存里。这样做的好处是,即使应用服务器重启,之前设定的任务也不会丢失,因为它们可以从数据库中重新加载。...
在企业级应用中,为了保证服务的高可用性和任务的连续性,往往需要将调度信息持久化到数据库中,这就是所谓的Quartz持久化。Quartz2.2版本在前一版本的基础上进行了优化和增强,提供了更加稳定和灵活的持久化机制。 ...
在1.8.6版本中,Quartz提供了数据库持久化的功能,这使得任务调度的状态能够在系统重启后得以恢复,增强了系统的可靠性和稳定性。数据库持久化是通过将作业和触发器的信息存储在关系型数据库中实现的。 Spring框架...
在使用Quartz时,为了实现任务的持久化,即在系统重启后还能保留之前的任务设置,我们需要将任务的相关信息存储到数据库中。这涉及到一系列特定的数据库表,这些表构成了Quartz的持久化数据结构。 Quartz的持久化...
实现数据库持久化,需要配置 Quartz 的 `JobStore` 实现,比如 `org.quartz.impl.jdbcjobstore.JobStoreTX` 或 `org.quartz.impl.jdbcjobstore.StdJDBCDelegate`,这些类会与数据库交互,存储和检索 Job 和 Trigger ...
- **持久化**:任务和触发器的状态可以持久化到数据库,即使服务器重启也不会丢失已安排的任务。 - **监控与管理**:提供了Web管理界面(如`org.quartz.plugins.management.ShutdownHookPlugin`),可以远程监控和...
Quartz.NET支持多种持久化策略,如SQL Server存储、SQLite等。 6. **集群与故障转移**: Quartz.NET还支持集群配置,可以在多台服务器上运行,当一台服务器宕机时,其他服务器可以接管任务执行,提供高可用性。 7. ...