1.简介
Quartz 是一个开源的作业调度框架,它完全由 Java 写成,并设计用于 J2SE 和 J2EE 应用中。它提供了巨大的灵活性而不牺牲简单性。你能够用它来为执行一个作业而创建简单的或复杂的调度。
2.原理
参考http://ssuupv.blog.163.com/blog/static/146156722013829111028966/
3.以内存运行信息方式分析源码
3.1 测试代码
public static void main(String[] args) { SimpleDateFormat DateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); Date d = new Date(); String returnstr = DateFormat.format(d); try { System.out.println(returnstr + "【系统启动】"); QuartzManager.addJob("11", TestJob.class, "0/2 * * * * ?"); //每2秒钟执行一次 } catch (Exception e) { e.printStackTrace(); } try { System.out.println("【系统启动】"); QuartzManager.start(); } catch (Exception e) { e.printStackTrace(); } }
QuartzManager:
/** */ /** * 添加一个定时任务,使用默认的任务组名,触发器名,触发器组名 * @param jobName 任务名 * @param job 任务 * @param time 时间设置,参考quartz说明文档 * @throws SchedulerException * @throws ParseException */ @SuppressWarnings("deprecation") public static void addJob(String jobName, Class<? extends Job> jobClazz, String time) throws SchedulerException, ParseException { Scheduler sched = sf.getScheduler(); JobDetail jobDetail = new JobDetailImpl(jobName, JOB_GROUP_NAME, jobClazz);//任务名,任务组,任务执行类 //触发器 CronTriggerImpl trigger = new CronTriggerImpl(jobName, TRIGGER_GROUP_NAME);//触发器名,触发器组 trigger.setCronExpression(time);//触发器时间设定 sched.scheduleJob(jobDetail, trigger); //启动 if (!sched.isShutdown()) sched.start(); } public static void start() throws SchedulerException, ParseException { Scheduler sched = sf.getScheduler(); //启动 if (!sched.isShutdown()) sched.start(); }
TestJob:
public class TestJob implements Job { private final static SimpleDateFormat DateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println(DateFormat.format(new Date()) + "★★★★★★★★★★★"); } }
3.2代码分析
3.2.1首先通过SchedulerFactory得到StdScheduler
private Scheduler instantiate() throws SchedulerException { if (cfg == null) { initialize(); } if (initException != null) { throw initException; } JobStore js = null; ThreadPool tp = null; QuartzScheduler qs = null; DBConnectionManager dbMgr = null; String instanceIdGeneratorClass = null; Properties tProps = null; String userTXLocation = null; boolean wrapJobInTx = false; boolean autoId = false; long idleWaitTime = -1; long dbFailureRetry = -1; String classLoadHelperClass; String jobFactoryClass; ThreadExecutor threadExecutor; SchedulerRepository schedRep = SchedulerRepository.getInstance(); // Get Scheduler Properties // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ String schedName = cfg.getStringProperty(PROP_SCHED_INSTANCE_NAME, "QuartzScheduler"); String threadName = cfg.getStringProperty(PROP_SCHED_THREAD_NAME, schedName + "_QuartzSchedulerThread"); String schedInstId = cfg.getStringProperty(PROP_SCHED_INSTANCE_ID, DEFAULT_INSTANCE_ID); if (schedInstId.equals(AUTO_GENERATE_INSTANCE_ID)) { autoId = true; instanceIdGeneratorClass = cfg.getStringProperty( PROP_SCHED_INSTANCE_ID_GENERATOR_CLASS, "org.quartz.simpl.SimpleInstanceIdGenerator"); } else if (schedInstId.equals(SYSTEM_PROPERTY_AS_INSTANCE_ID)) { autoId = true; instanceIdGeneratorClass = "org.quartz.simpl.SystemPropertyInstanceIdGenerator"; } userTXLocation = cfg.getStringProperty(PROP_SCHED_USER_TX_URL, userTXLocation); if (userTXLocation != null && userTXLocation.trim().length() == 0) { userTXLocation = null; } classLoadHelperClass = cfg.getStringProperty( PROP_SCHED_CLASS_LOAD_HELPER_CLASS, "org.quartz.simpl.CascadingClassLoadHelper"); wrapJobInTx = cfg.getBooleanProperty(PROP_SCHED_WRAP_JOB_IN_USER_TX, wrapJobInTx); jobFactoryClass = cfg.getStringProperty( PROP_SCHED_JOB_FACTORY_CLASS, null); idleWaitTime = cfg.getLongProperty(PROP_SCHED_IDLE_WAIT_TIME, idleWaitTime); dbFailureRetry = cfg.getLongProperty( PROP_SCHED_DB_FAILURE_RETRY_INTERVAL, dbFailureRetry); boolean makeSchedulerThreadDaemon = cfg.getBooleanProperty(PROP_SCHED_MAKE_SCHEDULER_THREAD_DAEMON); boolean threadsInheritInitalizersClassLoader = cfg.getBooleanProperty(PROP_SCHED_SCHEDULER_THREADS_INHERIT_CONTEXT_CLASS_LOADER_OF_INITIALIZING_THREAD); boolean skipUpdateCheck = cfg.getBooleanProperty(PROP_SCHED_SKIP_UPDATE_CHECK, false); long batchTimeWindow = cfg.getLongProperty(PROP_SCHED_BATCH_TIME_WINDOW, 0L); int maxBatchSize = cfg.getIntProperty(PROP_SCHED_MAX_BATCH_SIZE, 1); boolean interruptJobsOnShutdown = cfg.getBooleanProperty(PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN, false); boolean interruptJobsOnShutdownWithWait = cfg.getBooleanProperty(PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN_WITH_WAIT, false); boolean jmxExport = cfg.getBooleanProperty(PROP_SCHED_JMX_EXPORT); String jmxObjectName = cfg.getStringProperty(PROP_SCHED_JMX_OBJECT_NAME); boolean jmxProxy = cfg.getBooleanProperty(PROP_SCHED_JMX_PROXY); String jmxProxyClass = cfg.getStringProperty(PROP_SCHED_JMX_PROXY_CLASS); boolean rmiExport = cfg.getBooleanProperty(PROP_SCHED_RMI_EXPORT, false); boolean rmiProxy = cfg.getBooleanProperty(PROP_SCHED_RMI_PROXY, false); String rmiHost = cfg.getStringProperty(PROP_SCHED_RMI_HOST, "localhost"); int rmiPort = cfg.getIntProperty(PROP_SCHED_RMI_PORT, 1099); int rmiServerPort = cfg.getIntProperty(PROP_SCHED_RMI_SERVER_PORT, -1); String rmiCreateRegistry = cfg.getStringProperty( PROP_SCHED_RMI_CREATE_REGISTRY, QuartzSchedulerResources.CREATE_REGISTRY_NEVER); String rmiBindName = cfg.getStringProperty(PROP_SCHED_RMI_BIND_NAME); if (jmxProxy && rmiProxy) { throw new SchedulerConfigException("Cannot proxy both RMI and JMX."); } Properties schedCtxtProps = cfg.getPropertyGroup(PROP_SCHED_CONTEXT_PREFIX, true); // If Proxying to remote scheduler, short-circuit here... // ~~~~~~~~~~~~~~~~~~ if (rmiProxy) { if (autoId) { schedInstId = DEFAULT_INSTANCE_ID; } String uid = (rmiBindName == null) ? QuartzSchedulerResources.getUniqueIdentifier( schedName, schedInstId) : rmiBindName; RemoteScheduler remoteScheduler = new RemoteScheduler(uid, rmiHost, rmiPort); schedRep.bind(remoteScheduler); return remoteScheduler; } // Create class load helper ClassLoadHelper loadHelper = null; try { loadHelper = (ClassLoadHelper) loadClass(classLoadHelperClass) .newInstance(); } catch (Exception e) { throw new SchedulerConfigException( "Unable to instantiate class load helper class: " + e.getMessage(), e); } loadHelper.initialize(); // If Proxying to remote JMX scheduler, short-circuit here... // ~~~~~~~~~~~~~~~~~~ if (jmxProxy) { if (autoId) { schedInstId = DEFAULT_INSTANCE_ID; } if (jmxProxyClass == null) { throw new SchedulerConfigException("No JMX Proxy Scheduler class provided"); } RemoteMBeanScheduler jmxScheduler = null; try { jmxScheduler = (RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass) .newInstance(); } catch (Exception e) { throw new SchedulerConfigException( "Unable to instantiate RemoteMBeanScheduler class.", e); } if (jmxObjectName == null) { jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId); } jmxScheduler.setSchedulerObjectName(jmxObjectName); tProps = cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY, true); try { setBeanProps(jmxScheduler, tProps); } catch (Exception e) { initException = new SchedulerException("RemoteMBeanScheduler class '" + jmxProxyClass + "' props could not be configured.", e); throw initException; } jmxScheduler.initialize(); schedRep.bind(jmxScheduler); return jmxScheduler; } JobFactory jobFactory = null; if(jobFactoryClass != null) { try { jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass) .newInstance(); } catch (Exception e) { throw new SchedulerConfigException( "Unable to instantiate JobFactory class: " + e.getMessage(), e); } tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true); try { setBeanProps(jobFactory, tProps); } catch (Exception e) { initException = new SchedulerException("JobFactory class '" + jobFactoryClass + "' props could not be configured.", e); throw initException; } } InstanceIdGenerator instanceIdGenerator = null; if(instanceIdGeneratorClass != null) { try { instanceIdGenerator = (InstanceIdGenerator) loadHelper.loadClass(instanceIdGeneratorClass) .newInstance(); } catch (Exception e) { throw new SchedulerConfigException( "Unable to instantiate InstanceIdGenerator class: " + e.getMessage(), e); } tProps = cfg.getPropertyGroup(PROP_SCHED_INSTANCE_ID_GENERATOR_PREFIX, true); try { setBeanProps(instanceIdGenerator, tProps); } catch (Exception e) { initException = new SchedulerException("InstanceIdGenerator class '" + instanceIdGeneratorClass + "' props could not be configured.", e); throw initException; } } // Get ThreadPool Properties // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName()); if (tpClass == null) { initException = new SchedulerException( "ThreadPool class not specified. "); throw initException; } try { //得到ThreadPool tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance(); } catch (Exception e) { initException = new SchedulerException("ThreadPool class '" + tpClass + "' could not be instantiated.", e); throw initException; } tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true); try { setBeanProps(tp, tProps); } catch (Exception e) { initException = new SchedulerException("ThreadPool class '" + tpClass + "' props could not be configured.", e); throw initException; } // Get JobStore Properties // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS, RAMJobStore.class.getName()); if (jsClass == null) { initException = new SchedulerException( "JobStore class not specified. "); throw initException; } try { js = (JobStore) loadHelper.loadClass(jsClass).newInstance(); } catch (Exception e) { initException = new SchedulerException("JobStore class '" + jsClass + "' could not be instantiated.", e); throw initException; } SchedulerDetailsSetter.setDetails(js, schedName, schedInstId); tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX}); try { setBeanProps(js, tProps); } catch (Exception e) { initException = new SchedulerException("JobStore class '" + jsClass + "' props could not be configured.", e); throw initException; } if (js instanceof JobStoreSupport) { // Install custom lock handler (Semaphore) String lockHandlerClass = cfg.getStringProperty(PROP_JOB_STORE_LOCK_HANDLER_CLASS); if (lockHandlerClass != null) { try { Semaphore lockHandler = (Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance(); tProps = cfg.getPropertyGroup(PROP_JOB_STORE_LOCK_HANDLER_PREFIX, true); // If this lock handler requires the table prefix, add it to its properties. if (lockHandler instanceof TablePrefixAware) { tProps.setProperty( PROP_TABLE_PREFIX, ((JobStoreSupport)js).getTablePrefix()); tProps.setProperty( PROP_SCHED_NAME, schedName); } try { setBeanProps(lockHandler, tProps); } catch (Exception e) { initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass + "' props could not be configured.", e); throw initException; } ((JobStoreSupport)js).setLockHandler(lockHandler); getLog().info("Using custom data access locking (synchronization): " + lockHandlerClass); } catch (Exception e) { initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass + "' could not be instantiated.", e); throw initException; } } } // Set up any DataSources // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX); for (int i = 0; i < dsNames.length; i++) { PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup( PROP_DATASOURCE_PREFIX + "." + dsNames[i], true)); String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null); // custom connectionProvider... if(cpClass != null) { ConnectionProvider cp = null; try { cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance(); } catch (Exception e) { initException = new SchedulerException("ConnectionProvider class '" + cpClass + "' could not be instantiated.", e); throw initException; } try { // remove the class name, so it isn't attempted to be set pp.getUnderlyingProperties().remove( PROP_CONNECTION_PROVIDER_CLASS); setBeanProps(cp, pp.getUnderlyingProperties()); } catch (Exception e) { initException = new SchedulerException("ConnectionProvider class '" + cpClass + "' props could not be configured.", e); throw initException; } dbMgr = DBConnectionManager.getInstance(); dbMgr.addConnectionProvider(dsNames[i], cp); } else { String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null); if (dsJndi != null) { boolean dsAlwaysLookup = pp.getBooleanProperty( PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP); String dsJndiInitial = pp.getStringProperty( PROP_DATASOURCE_JNDI_INITIAL); String dsJndiProvider = pp.getStringProperty( PROP_DATASOURCE_JNDI_PROVDER); String dsJndiPrincipal = pp.getStringProperty( PROP_DATASOURCE_JNDI_PRINCIPAL); String dsJndiCredentials = pp.getStringProperty( PROP_DATASOURCE_JNDI_CREDENTIALS); Properties props = null; if (null != dsJndiInitial || null != dsJndiProvider || null != dsJndiPrincipal || null != dsJndiCredentials) { props = new Properties(); if (dsJndiInitial != null) { props.put(PROP_DATASOURCE_JNDI_INITIAL, dsJndiInitial); } if (dsJndiProvider != null) { props.put(PROP_DATASOURCE_JNDI_PROVDER, dsJndiProvider); } if (dsJndiPrincipal != null) { props.put(PROP_DATASOURCE_JNDI_PRINCIPAL, dsJndiPrincipal); } if (dsJndiCredentials != null) { props.put(PROP_DATASOURCE_JNDI_CREDENTIALS, dsJndiCredentials); } } JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi, props, dsAlwaysLookup); dbMgr = DBConnectionManager.getInstance(); dbMgr.addConnectionProvider(dsNames[i], cp); } else { String dsDriver = pp.getStringProperty(PROP_DATASOURCE_DRIVER); String dsURL = pp.getStringProperty(PROP_DATASOURCE_URL); String dsUser = pp.getStringProperty(PROP_DATASOURCE_USER, ""); String dsPass = pp.getStringProperty(PROP_DATASOURCE_PASSWORD, ""); int dsCnt = pp.getIntProperty(PROP_DATASOURCE_MAX_CONNECTIONS, 10); String dsValidation = pp.getStringProperty(PROP_DATASOURCE_VALIDATION_QUERY); if (dsDriver == null) { initException = new SchedulerException( "Driver not specified for DataSource: " + dsNames[i]); throw initException; } if (dsURL == null) { initException = new SchedulerException( "DB URL not specified for DataSource: " + dsNames[i]); throw initException; } try { PoolingConnectionProvider cp = new PoolingConnectionProvider( dsDriver, dsURL, dsUser, dsPass, dsCnt, dsValidation); dbMgr = DBConnectionManager.getInstance(); dbMgr.addConnectionProvider(dsNames[i], cp); } catch (SQLException sqle) { initException = new SchedulerException( "Could not initialize DataSource: " + dsNames[i], sqle); throw initException; } } } } // Set up any SchedulerPlugins // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX); SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length]; for (int i = 0; i < pluginNames.length; i++) { Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "." + pluginNames[i], true); String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null); if (plugInClass == null) { initException = new SchedulerException( "SchedulerPlugin class not specified for plugin '" + pluginNames[i] + "'"); throw initException; } SchedulerPlugin plugin = null; try { plugin = (SchedulerPlugin) loadHelper.loadClass(plugInClass).newInstance(); } catch (Exception e) { initException = new SchedulerException( "SchedulerPlugin class '" + plugInClass + "' could not be instantiated.", e); throw initException; } try { setBeanProps(plugin, pp); } catch (Exception e) { initException = new SchedulerException( "JobStore SchedulerPlugin '" + plugInClass + "' props could not be configured.", e); throw initException; } plugins[i] = plugin; } // Set up any JobListeners // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Class<?>[] strArg = new Class[] { String.class }; String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX); JobListener[] jobListeners = new JobListener[jobListenerNames.length]; for (int i = 0; i < jobListenerNames.length; i++) { Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "." + jobListenerNames[i], true); String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null); if (listenerClass == null) { initException = new SchedulerException( "JobListener class not specified for listener '" + jobListenerNames[i] + "'"); throw initException; } JobListener listener = null; try { listener = (JobListener) loadHelper.loadClass(listenerClass).newInstance(); } catch (Exception e) { initException = new SchedulerException( "JobListener class '" + listenerClass + "' could not be instantiated.", e); throw initException; } try { Method nameSetter = listener.getClass().getMethod("setName", strArg); if(nameSetter != null) { nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } ); } setBeanProps(listener, lp); } catch (Exception e) { initException = new SchedulerException( "JobListener '" + listenerClass + "' props could not be configured.", e); throw initException; } jobListeners[i] = listener; } // Set up any TriggerListeners // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX); TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length]; for (int i = 0; i < triggerListenerNames.length; i++) { Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "." + triggerListenerNames[i], true); String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null); if (listenerClass == null) { initException = new SchedulerException( "TriggerListener class not specified for listener '" + triggerListenerNames[i] + "'"); throw initException; } TriggerListener listener = null; try { listener = (TriggerListener) loadHelper.loadClass(listenerClass).newInstance(); } catch (Exception e) { initException = new SchedulerException( "TriggerListener class '" + listenerClass + "' could not be instantiated.", e); throw initException; } try { Method nameSetter = listener.getClass().getMethod("setName", strArg); if(nameSetter != null) { nameSetter.invoke(listener, new Object[] {triggerListenerNames[i] } ); } setBeanProps(listener, lp); } catch (Exception e) { initException = new SchedulerException( "TriggerListener '" + listenerClass + "' props could not be configured.", e); throw initException; } triggerListeners[i] = listener; } boolean tpInited = false; boolean qsInited = false; // Get ThreadExecutor Properties // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS); if (threadExecutorClass != null) { tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true); try { threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance(); log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass); setBeanProps(threadExecutor, tProps); } catch (Exception e) { initException = new SchedulerException( "ThreadExecutor class '" + threadExecutorClass + "' could not be instantiated.", e); throw initException; } } else { log.info("Using default implementation for ThreadExecutor"); threadExecutor = new DefaultThreadExecutor(); } // Fire everything up // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ try { JobRunShellFactory jrsf = null; // Create correct run-shell factory... if (userTXLocation != null) { UserTransactionHelper.setUserTxLocation(userTXLocation); } if (wrapJobInTx) { jrsf = new JTAJobRunShellFactory(); } else { jrsf = new JTAAnnotationAwareJobRunShellFactory(); } if (autoId) { try { schedInstId = DEFAULT_INSTANCE_ID; if (js.isClustered()) { schedInstId = instanceIdGenerator.generateInstanceId(); } } catch (Exception e) { getLog().error("Couldn't generate instance Id!", e); throw new IllegalStateException("Cannot run without an instance id."); } } if (js.getClass().getName().startsWith("org.terracotta.quartz")) { try { String uuid = (String) js.getClass().getMethod("getUUID").invoke(js); if(schedInstId.equals(DEFAULT_INSTANCE_ID)) { schedInstId = "TERRACOTTA_CLUSTERED,node=" + uuid; if (jmxObjectName == null) { jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId); } } else if(jmxObjectName == null) { jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId + ",node=" + uuid); } } catch(Exception e) { throw new RuntimeException("Problem obtaining node id from TerracottaJobStore.", e); } if(null == cfg.getStringProperty(PROP_SCHED_JMX_EXPORT)) { jmxExport = true; } } if (js instanceof JobStoreSupport) { JobStoreSupport jjs = (JobStoreSupport)js; jjs.setDbRetryInterval(dbFailureRetry); if(threadsInheritInitalizersClassLoader) jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader); } QuartzSchedulerResources rsrcs = new QuartzSchedulerResources(); rsrcs.setName(schedName); rsrcs.setThreadName(threadName); rsrcs.setInstanceId(schedInstId); rsrcs.setJobRunShellFactory(jrsf); rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon); rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader); rsrcs.setRunUpdateCheck(!skipUpdateCheck); rsrcs.setBatchTimeWindow(batchTimeWindow); rsrcs.setMaxBatchSize(maxBatchSize); rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown); rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait); rsrcs.setJMXExport(jmxExport); rsrcs.setJMXObjectName(jmxObjectName); if (rmiExport) { rsrcs.setRMIRegistryHost(rmiHost); rsrcs.setRMIRegistryPort(rmiPort); rsrcs.setRMIServerPort(rmiServerPort); rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry); rsrcs.setRMIBindName(rmiBindName); } SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId); rsrcs.setThreadExecutor(threadExecutor); threadExecutor.initialize(); rsrcs.setThreadPool(tp); if(tp instanceof SimpleThreadPool) { ((SimpleThreadPool)tp).setThreadNamePrefix(schedName + "_Worker"); if(threadsInheritInitalizersClassLoader) ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader); } //ThreadPool初始化 tp.initialize(); tpInited = true; rsrcs.setJobStore(js); // add plugins for (int i = 0; i < plugins.length; i++) { rsrcs.addSchedulerPlugin(plugins[i]); } //得到QuartzSchedulerThread 默认paused=true qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry); qsInited = true; // Create Scheduler ref... Scheduler scheduler = instantiate(rsrcs, qs); // set job factory if specified if(jobFactory != null) { qs.setJobFactory(jobFactory); } // Initialize plugins now that we have a Scheduler instance. for (int i = 0; i < plugins.length; i++) { plugins[i].initialize(pluginNames[i], scheduler); } // add listeners for (int i = 0; i < jobListeners.length; i++) { qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs()); } for (int i = 0; i < triggerListeners.length; i++) { qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers()); } // set scheduler context data... for(Object key: schedCtxtProps.keySet()) { String val = schedCtxtProps.getProperty((String) key); scheduler.getContext().put(key, val); } // fire up job store, and runshell factory js.setInstanceId(schedInstId); js.setInstanceName(schedName); js.initialize(loadHelper, qs.getSchedulerSignaler()); js.setThreadPoolSize(tp.getPoolSize()); jrsf.initialize(scheduler); qs.initialize(); getLog().info( "Quartz scheduler '" + scheduler.getSchedulerName() + "' initialized from " + propSrc); getLog().info("Quartz scheduler version: " + qs.getVersion()); // prevents the repository from being garbage collected qs.addNoGCObject(schedRep); // prevents the db manager from being garbage collected if (dbMgr != null) { qs.addNoGCObject(dbMgr); } schedRep.bind(scheduler); return scheduler; } catch(SchedulerException e) { if(qsInited) qs.shutdown(false); else if(tpInited) tp.shutdown(false); throw e; } catch(RuntimeException re) { if(qsInited) qs.shutdown(false); else if(tpInited) tp.shutdown(false); throw re; } catch(Error re) { if(qsInited) qs.shutdown(false); else if(tpInited) tp.shutdown(false); throw re; } }
3.2在QuartzSchedulerThread线程中循环通过JobStore取得List<OperableTrigger>后通过ThreadPool执行JobRunShell。
QuartzSchedulerThread:
public void run() { boolean lastAcquireFailed = false; while (!halted.get()) { try { // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } } if (halted.get()) { break; } } int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List<OperableTrigger> triggers = null; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { //通过JobStore取得List<OperableTrigger> triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); lastAcquireFailed = false; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if(!lastAcquireFailed) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } lastAcquireFailed = true; } catch (RuntimeException e) { if(!lastAcquireFailed) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } lastAcquireFailed = true; } if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to 'executing' List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { //告知JobStore开始执行trigger,内存则移除,表则更新qrtz_triggers信息 List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); } } for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error( "RuntimeException while firing trigger " + triggers.get(i), exception); // db connection must have failed... keep // retrying until it's up... releaseTriggerRetryLoop(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { try { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while releasing triggers '" + triggers.get(i).getKey() + "'", se); // db connection must have failed... keep retrying // until it's up... releaseTriggerRetryLoop(triggers.get(i)); } continue; } // TODO: improvements: // // 2- make sure we can get a job runshell before firing triggers, or // don't let that throw an exception (right now it never does, // but the signature says it can). // 3- acquire more triggers at a time (based on num threads available?) //得到JobRunShell JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); //初始化JobRunShell shell.initialize(qs); } catch (SchedulerException se) { try { qsRsrcs.getJobStore().triggeredJobComplete( triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } catch (SchedulerException se2) { qs.notifySchedulerListenersError( "An error occurred while placing job's triggers in error state '" + triggers.get(i).getKey() + "'", se2); // db connection must have failed... keep retrying // until it's up... errorTriggerRetryLoop(bndle); } continue; } //通过ThreadPool执行JobRunShell if (qsRsrcs.getThreadPool().runInThread(shell) == false) { try { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete( triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } catch (SchedulerException se2) { qs.notifySchedulerListenersError( "An error occurred while placing job's triggers in error state '" + triggers.get(i).getKey() + "'", se2); // db connection must have failed... keep retrying // until it's up... releaseTriggerRetryLoop(triggers.get(i)); } } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { sigLock.wait(timeUntilContinue); } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; }
3.3JobStore得到List<OperableTrigger>
RAMJobStore取得List<OperableTrigger>方法使用hashTree
public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) { synchronized (lock) { List<OperableTrigger> result = new ArrayList<OperableTrigger>(); while (true) { TriggerWrapper tw; try { tw = (TriggerWrapper) timeTriggers.first(); if (tw == null) return result; timeTriggers.remove(tw); } catch (java.util.NoSuchElementException nsee) { return result; } if (tw.trigger.getNextFireTime() == null) { continue; } if (applyMisfire(tw)) { if (tw.trigger.getNextFireTime() != null) { timeTriggers.add(tw); } continue; } if (tw.getTrigger().getNextFireTime().getTime() > noLaterThan + timeWindow) { timeTriggers.add(tw); return result; } tw.state = TriggerWrapper.STATE_ACQUIRED; tw.trigger.setFireInstanceId(getFiredTriggerRecordId()); OperableTrigger trig = (OperableTrigger) tw.trigger.clone(); result.add(trig); if (result.size() == maxCount) return result; } } }
3.4JobRunShell
初始化:
public void initialize(QuartzScheduler qs) throws SchedulerException { this.qs = qs; Job job = null; JobDetail jobDetail = firedTriggerBundle.getJobDetail(); try { job = qs.getJobFactory().newJob(firedTriggerBundle, scheduler); } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occured instantiating job to be executed. job= '" + jobDetail.getKey() + "'", se); throw se; } catch (Throwable ncdfe) { // such as NoClassDefFoundError SchedulerException se = new SchedulerException( "Problem instantiating class '" + jobDetail.getJobClass().getName() + "' - ", ncdfe); qs.notifySchedulerListenersError( "An error occured instantiating job to be executed. job= '" + jobDetail.getKey() + "'", se); throw se; } this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job); }
run方法
public void run() { qs.addInternalSchedulerListener(this); try { OperableTrigger trigger = (OperableTrigger) jec.getTrigger(); JobDetail jobDetail = jec.getJobDetail(); do { JobExecutionException jobExEx = null; Job job = jec.getJobInstance(); try { begin(); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't begin execution.", se); break; } // notify job & trigger listeners... try { if (!notifyListenersBeginning(jec)) { break; } } catch(VetoedException ve) { try { CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null); try { qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode); } catch(JobPersistenceException jpe) { vetoedJobRetryLoop(trigger, jobDetail, instCode); } complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error during veto of Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } break; } long startTime = System.currentTimeMillis(); long endTime = startTime; // execute the job try { log.debug("Calling execute on job " + jobDetail.getKey()); job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { endTime = System.currentTimeMillis(); jobExEx = jee; getLog().info("Job " + jobDetail.getKey() + " threw a JobExecutionException: ", jobExEx); } catch (Throwable e) { endTime = System.currentTimeMillis(); getLog().error("Job " + jobDetail.getKey() + " threw an unhandled Exception: ", e); SchedulerException se = new SchedulerException( "Job threw an unhandled exception.", e); qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getKey() + " threw an exception.", se); jobExEx = new JobExecutionException(se, false); } jec.setJobRunTime(endTime - startTime); // notify all job listeners if (!notifyJobListenersComplete(jec, jobExEx)) { break; } CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP; // update the trigger try { instCode = trigger.executionComplete(jec, jobExEx); } catch (Exception e) { // If this happens, there's a bug in the trigger... SchedulerException se = new SchedulerException( "Trigger threw an unhandled exception.", e); qs.notifySchedulerListenersError( "Please report this error to the Quartz developers.", se); } // notify all trigger listeners if (!notifyTriggerListenersComplete(jec, instCode)) { break; } // update job/trigger or re-execute job if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) { jec.incrementRefireCount(); try { complete(false); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } continue; } try { complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); continue; } try { qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode); } catch (JobPersistenceException jpe) { qs.notifySchedulerListenersError( "An error occured while marking executed job complete. job= '" + jobDetail.getKey() + "'", jpe); if (!completeTriggerRetryLoop(trigger, jobDetail, instCode)) { return; } } break; } while (true); } finally { qs.removeInternalSchedulerListener(this); } }
3.5SimpleThreadPool
初始化方法:
public void initialize() throws SchedulerConfigException { if(workers != null && workers.size() > 0) // already initialized... return; if (count <= 0) { throw new SchedulerConfigException( "Thread count must be > 0"); } if (prio <= 0 || prio > 9) { throw new SchedulerConfigException( "Thread priority must be > 0 and <= 9"); } if(isThreadsInheritGroupOfInitializingThread()) { threadGroup = Thread.currentThread().getThreadGroup(); } else { // follow the threadGroup tree to the root thread group. threadGroup = Thread.currentThread().getThreadGroup(); ThreadGroup parent = threadGroup; while ( !parent.getName().equals("main") ) { threadGroup = parent; parent = threadGroup.getParent(); } threadGroup = new ThreadGroup(parent, schedulerInstanceName + "-SimpleThreadPool"); if (isMakeThreadsDaemons()) { threadGroup.setDaemon(true); } } if (isThreadsInheritContextClassLoaderOfInitializingThread()) { getLog().info( "Job execution threads will use class loader of thread: " + Thread.currentThread().getName()); } // create the worker threads and start them Iterator workerThreads = createWorkerThreads(count).iterator(); while(workerThreads.hasNext()) { WorkerThread wt = (WorkerThread) workerThreads.next(); wt.start(); availWorkers.add(wt); } }
runInThread方法
public boolean runInThread(Runnable runnable) { if (runnable == null) { return false; } synchronized (nextRunnableLock) { handoffPending = true; // Wait until a worker thread is available while ((availWorkers.size() < 1) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } if (!isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { // If the thread pool is going down, execute the Runnable // within a new additional worker thread (no thread from the pool). WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false; } return true; }
4..以mysql存储信息方式分析源码
4.1测试配置文件
#============================================================== #Configure Main Scheduler Properties #============================================================== org.quartz.scheduler.instanceName = quartzScheduler org.quartz.scheduler.instanceId = AUTO #============================================================== #Configure JobStore #============================================================== org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.tablePrefix = QRTZ_ org.quartz.jobStore.isClustered = true org.quartz.jobStore.clusterCheckinInterval = 20000 org.quartz.jobStore.dataSource = myDS org.quartz.jobStore.lockHandler.class=org.quartz.impl.jdbcjobstore.UpdateLockRowSemaphore #============================================================== #Configure DataSource #============================================================== org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8 org.quartz.dataSource.myDS.user = root org.quartz.dataSource.myDS.password = password org.quartz.dataSource.myDS.maxConnections = 30 #============================================================== #Configure ThreadPool #============================================================== org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount = 10 org.quartz.threadPool.threadPriority = 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
4.2告知JobStore开始执行triggers,更新triggers时间,是在QuartzSchedulerThread的方法中的qsRsrcs.getJobStore().triggersFired(triggers)(该方法通过qrtz_locks表锁来控制并发)
相关推荐
Quartz 批量下载源码,Quartz 批量下载源码Quartz 批量下载源码Quartz 批量下载源码Quartz 批量下载源码Quartz 批量下载源码Quartz 批量下载源码Quartz 批量下载源码Quartz 批量下载源码Quartz 批量下载源码
QuartZ源码包
这篇博客“quartz源码解析(一)”可能是博主对Quartz核心原理、设计模式以及其实现细节的一个初步探讨。 Quartz的源码分析可以从以下几个方面入手: 1. **设计模式**: - **工厂模式**:Quartz中Job和Trigger的...
在这个"quartz 项目学习源码"中,你可以深入理解 Quartz 的工作原理和实现方式。 1. **Quartz 基本概念**: - **Job**:Quartz 中的 Job 是执行任务的基本单元,代表一个待执行的工作。 - **Trigger**:触发器...
以下是对Quartz.NET及其官方源码和演示例子的详细解析。 **Quartz.NET核心概念** 1. **作业(Jobs)**:作业是实际执行的工作单元,它们包含了要运行的任务代码。开发者可以自定义作业类来实现所需的功能。 2. **...
quartz源码包 。。。。。。。。。。。。。。。。。。。。。。。。。。
Quartz 1.6.0源码包提供了一个宝贵的资源,帮助开发者探索其设计模式、线程管理以及任务调度的机制。 1. **Quartz核心概念** - **Scheduler**:调度器,是Quartz的核心组件,负责管理和执行所有的Job(任务)和...
quartz .net 源码,可以编译,对需要了解.net 版quartz的大有益处。
1. **源码**:Quartz 1.6的源码可以让我们洞察其内部实现。通过阅读源码,我们可以学习到如何设计一个可扩展、可靠的调度系统。关键组件如`Job`, `Trigger`, `Scheduler`的实现逻辑,以及它们之间的交互,都在源码中...
Spring Quartz 是一个强大的任务调度框架,它允许开发者在Spring应用中轻松地定义和执行定时任务。Quartz 是一个开源的作业调度框架,而Spring通过Spring Quartz模块提供了与Quartz的集成,使得在Java环境中管理定时...
Quartz是一个开源的作业调度框架,它完全由Java写成,并设计用于J2SE和J2EE应用中。它提供了巨大的灵 活性而不牺牲简单性。你能够用它来为执行一个作业而创建简单的或复杂的调度 What's New In Quartz Scheduler ...
【标题】"C# quartz.net 定时任务源码 可以远程控制"涉及的核心知识点主要集中在C#编程语言、Quartz.NET库以及系统服务的安装与管理。Quartz.NET是一个开源的作业调度框架,它允许开发人员在.NET环境中创建和执行...
本指南将深入探讨 Quartz 的核心概念、配置与使用方法,并提供源码分析,帮助开发者更高效地利用这一强大工具。 1. **Quartz 基本概念** - **Job**:Job 是实际要执行的任务,是一个实现了 `org.quartz.Job` 接口...
在这个"Quartz.NET官方源码"中,我们可以深入理解其内部机制,学习如何利用它来构建自己的任务调度系统。 首先,Quartz.NET的核心组件包括`IScheduler`接口,它是调度器的抽象,负责管理作业(Jobs)和触发器...
这个压缩包 "quartz-3.0.3.1_quartes_源码.zip" 包含了 Quartz 框架的源代码,版本为 3.0.3.1,对于学习和理解 Quartz 的工作原理以及进行定制化开发非常有帮助。 Quartz 主要功能包括: 1. **作业调度**:Quartz ...
Quartz 是一种功能丰富的,开放源码的作业调度库,可以在几乎任何Java应用程序集成 - 从最小的独立的应用程序到规模最大电子商务系统。Quartz可以用来创建简单或复杂的日程安排执行几十,几百,甚至是十万的作业数 -...
下面将详细阐述Quartz集群的配置以及示例源码的相关知识点。 **1. Quartz集群的核心概念** - **Job Store**: Quartz集群的关键在于共享任务存储,Job Store负责存储Job(任务)和Trigger(触发器)的信息。常见的...
通过这个文件,开发者可以检查 Quartz 源码是否符合预先设定的代码风格和质量标准。 `quartz-all-1.6.4.jar` 包含了 Quartz 的所有功能,包括所有依赖的库,适合快速引入项目进行开发。而 `quartz-1.6.4.jar` 是...
6. **源码分析**:解压后的文件包括解决方案文件(sln)和库文件,例如`Quartz.2003.sln`、`Quartz.2008.sln`,这些都是Visual Studio的项目文件,用于在开发环境中打开和编译源码。`bin`目录包含编译后的库和程序,...