- 浏览: 981080 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Quartz的使用:http://donald-draper.iteye.com/blog/2321886
Quartz的Scheduler初始化源码分析:http://donald-draper.iteye.com/blog/2322730
Quartz的job、触发器的暂停与恢复源码分析:http://donald-draper.iteye.com/blog/2322823
Quartz的Job存储,触发器、任务删除,源码分析:http://donald-draper.iteye.com/blog/2322725
Quartzs的job,trriger监听器源码分析:http://donald-draper.iteye.com/blog/2322863
Quartz 任务存储JobStoreTX 持久化之RDB:http://donald-draper.iteye.com/blog/2323297
Quartz 任务存储JobStoreTX 持久化之RDB-源码分析:http://donald-draper.iteye.com/blog/2323409
Quartz任务调度源码分析:http://donald-draper.iteye.com/blog/2323118
Spring与Quartz集成详解:http://donald-draper.iteye.com/blog/2323591
Spring与Quartz集成-源码分析:http://donald-draper.iteye.com/blog/2324132
这一节,我们来探索一下scheduleJob(JobDetail jobDetail, Trigger trigger),存储job之后所做的事情,以及怎么做?
//调度监听器
//QuartzSchedulerMBean
//调度通知器
//调度器线程
//SchedulerListener
//job监听器
//Trigger监听器
总结:
从以上的分析可以看出,在job存储之后,之后做了job添加通知,通知调度器下一刻执行时间,并唤醒正在等在执行的job,然后添加trriger通知。
Quartz的Scheduler初始化源码分析:http://donald-draper.iteye.com/blog/2322730
Quartz的job、触发器的暂停与恢复源码分析:http://donald-draper.iteye.com/blog/2322823
Quartz的Job存储,触发器、任务删除,源码分析:http://donald-draper.iteye.com/blog/2322725
Quartzs的job,trriger监听器源码分析:http://donald-draper.iteye.com/blog/2322863
Quartz 任务存储JobStoreTX 持久化之RDB:http://donald-draper.iteye.com/blog/2323297
Quartz 任务存储JobStoreTX 持久化之RDB-源码分析:http://donald-draper.iteye.com/blog/2323409
Quartz任务调度源码分析:http://donald-draper.iteye.com/blog/2323118
Spring与Quartz集成详解:http://donald-draper.iteye.com/blog/2323591
Spring与Quartz集成-源码分析:http://donald-draper.iteye.com/blog/2324132
这一节,我们来探索一下scheduleJob(JobDetail jobDetail, Trigger trigger),存储job之后所做的事情,以及怎么做?
public class QuartzScheduler implements RemotableQuartzScheduler { public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException { validateState(); if(jobDetail == null) throw new SchedulerException("JobDetail cannot be null"); if(trigger == null) throw new SchedulerException("Trigger cannot be null"); if(jobDetail.getKey() == null) throw new SchedulerException("Job's key cannot be null"); if(jobDetail.getJobClass() == null) throw new SchedulerException("Job's class cannot be null"); //包装触发器 OperableTrigger trig = (OperableTrigger)trigger; if(trigger.getJobKey() == null) //设置触发器jobKey trig.setJobKey(jobDetail.getKey()); else if(!trigger.getJobKey().equals(jobDetail.getKey())) throw new SchedulerException("Trigger does not reference given job!"); trig.validate(); Calendar cal = null; if(trigger.getCalendarName() != null) cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName()); Date ft = trig.computeFirstFireTime(cal); if(ft == null) { throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString()); } else { //存储job and trriger到jobStrore resources.getJobStore().storeJobAndTrigger(jobDetail, trig); //添加jobDetail到调度监听器 notifySchedulerListenersJobAdded(jobDetail); //通知调度器下一刻调度时间 notifySchedulerThread(trigger.getNextFireTime().getTime()); //添加trigger到调度监听器 notifySchedulerListenersSchduled(trigger); return ft; } //添加jobDetail public void notifySchedulerListenersJobAdded(JobDetail jobDetail) { List schedListeners = buildSchedulerListenerList(); for(Iterator i$ = schedListeners.iterator(); i$.hasNext();) { SchedulerListener sl = (SchedulerListener)i$.next(); try { sl.jobAdded(jobDetail); } catch(Exception e) { getLog().error("Error while notifying SchedulerListener of JobAdded.", e); } } } //获取调度监听器 private List buildSchedulerListenerList() { List allListeners = new LinkedList(); allListeners.addAll(getListenerManager().getSchedulerListeners()); allListeners.addAll(getInternalSchedulerListeners()); return allListeners; } //获取内部调度器 public List getInternalSchedulerListeners() { ArrayList arraylist = internalSchedulerListeners; JVM INSTR monitorenter ; return Collections.unmodifiableList(new ArrayList(internalSchedulerListeners)); Exception exception; exception; throw exception; } //通知调度器下一刻调度时间 protected void notifySchedulerThread(long candidateNewNextFireTime) { if(isSignalOnSchedulingChange()) signaler.signalSchedulingChange(candidateNewNextFireTime); } //添加trigger到调度监听器 public void notifySchedulerListenersSchduled(Trigger trigger) { List schedListeners = buildSchedulerListenerList(); for(Iterator i$ = schedListeners.iterator(); i$.hasNext();) { SchedulerListener sl = (SchedulerListener)i$.next(); try { sl.jobScheduled(trigger); } catch(Exception e) { getLog().error((new StringBuilder()).append("Error while notifying SchedulerListener of scheduled job. Triger=").append(trigger.getKey()).toString(), e); } } } private static String VERSION_MAJOR; private static String VERSION_MINOR; private static String VERSION_ITERATION; private QuartzSchedulerResources resources; private QuartzSchedulerThread schedThread; private ThreadGroup threadGroup; private SchedulerContext context; private ListenerManager listenerManager;//监听管理器 private HashMap internalJobListeners; private HashMap internalTriggerListeners; private ArrayList internalSchedulerListeners; private JobFactory jobFactory; ExecutingJobsManager jobMgr; ErrorLogger errLogger; private SchedulerSignaler signaler; private Random random; private ArrayList holdToPreventGC; private boolean signalOnSchedulingChange; private volatile boolean closed; private volatile boolean shuttingDown; private boolean boundRemotely; private QuartzSchedulerMBean jmxBean; private Date initialStart; private final Timer updateTimer; }
//调度监听器
public class ListenerManagerImpl implements ListenerManager { public ListenerManagerImpl() { globalJobListeners = new LinkedHashMap(10); globalTriggerListeners = new LinkedHashMap(10); globalJobListenersMatchers = new LinkedHashMap(10); globalTriggerListenersMatchers = new LinkedHashMap(10); schedulerListeners = new ArrayList(10); } //添加调度监听器 public void addSchedulerListener(SchedulerListener schedulerListener) { synchronized(schedulerListeners) { schedulerListeners.add(schedulerListener); } } //添加job监听器 public void addJobListener(JobListener jobListener, List matchers) { if(jobListener.getName() == null || jobListener.getName().length() == 0) throw new IllegalArgumentException("JobListener name cannot be empty."); synchronized(globalJobListeners) { globalJobListeners.put(jobListener.getName(), jobListener); LinkedList matchersL = new LinkedList(); if(matchers != null && matchers.size() > 0) matchersL.addAll(matchers); else matchersL.add(EverythingMatcher.allJobs()); globalJobListenersMatchers.put(jobListener.getName(), matchersL); } } //添加Trigger监听器 public void addTriggerListener(TriggerListener triggerListener, List matchers) { if(triggerListener.getName() == null || triggerListener.getName().length() == 0) throw new IllegalArgumentException("TriggerListener name cannot be empty."); synchronized(globalTriggerListeners) { globalTriggerListeners.put(triggerListener.getName(), triggerListener); LinkedList matchersL = new LinkedList(); if(matchers != null && matchers.size() > 0) matchersL.addAll(matchers); else matchersL.add(EverythingMatcher.allTriggers()); globalTriggerListenersMatchers.put(triggerListener.getName(), matchersL); } } public List getSchedulerListeners() { ArrayList arraylist = schedulerListeners; JVM INSTR monitorenter ; return Collections.unmodifiableList(new ArrayList(schedulerListeners)); Exception exception; exception; throw exception; } private Map globalJobListeners;//LinkedHashMap<String,JobListener>,key为JobListener.getName(),全局job监听器 private Map globalTriggerListeners;//LinkedHashMap<String,TriggerListener>,key为TriggerListener.getName(),全局Trriger监听器 private Map globalJobListenersMatchers; private Map globalTriggerListenersMatchers; private ArrayList schedulerListeners;//List<SchedulerListener>调度监听器 }
//QuartzSchedulerMBean
public class QuartzSchedulerMBeanImpl extends StandardMBean implements NotificationEmitter, QuartzSchedulerMBean, JobListener, SchedulerListener { //添加jobDetail通知到JMX public void jobAdded(JobDetail jobDetail) { sendNotification("jobAdded", JobDetailSupport.toCompositeData(jobDetail)); } //添加trigger通知到JMX public void jobScheduled(Trigger trigger) { sendNotification("jobScheduled", TriggerSupport.toCompositeData(trigger)); } public void sendNotification(String eventType, Object data) { sendNotification(eventType, data, null); } public void sendNotification(String eventType, Object data, String msg) { Notification notif = new Notification(eventType, this, sequenceNumber.incrementAndGet(), System.currentTimeMillis(), msg); if(data != null) notif.setUserData(data); emitter.sendNotification(notif); } }
//调度通知器
public class SchedulerSignalerImpl implements SchedulerSignaler { public SchedulerSignalerImpl(QuartzScheduler sched, QuartzSchedulerThread schedThread) { log = LoggerFactory.getLogger(org/quartz/core/SchedulerSignalerImpl); this.sched = sched; this.schedThread = schedThread; log.info((new StringBuilder()).append("Initialized Scheduler Signaller of type: ").append(getClass()).toString()); } public void signalSchedulingChange(long candidateNewNextFireTime) { schedThread.signalSchedulingChange(candidateNewNextFireTime); } Logger log; protected QuartzScheduler sched; protected QuartzSchedulerThread schedThread; }
//调度器线程
public class QuartzSchedulerThread extends Thread { QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs) { this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), 5); } //唤醒所有等待sigLock的调度器 public void signalSchedulingChange(long candidateNewNextFireTime) { synchronized(sigLock) { signaled = true; signaledNextFireTime = candidateNewNextFireTime; sigLock.notifyAll(); } } private QuartzScheduler qs; private QuartzSchedulerResources qsRsrcs; private final Object sigLock; private boolean signaled; private long signaledNextFireTime; private boolean paused; private AtomicBoolean halted; private Random random; private static long DEFAULT_IDLE_WAIT_TIME = 30000L; private long idleWaitTime; private int idleWaitVariablness; private final Logger log; }
//SchedulerListener
public interface SchedulerListener { public abstract void jobScheduled(Trigger trigger); public abstract void jobUnscheduled(TriggerKey triggerkey); public abstract void triggerFinalized(Trigger trigger); public abstract void triggerPaused(TriggerKey triggerkey); public abstract void triggersPaused(String s); public abstract void triggerResumed(TriggerKey triggerkey); public abstract void triggersResumed(String s); public abstract void jobAdded(JobDetail jobdetail); public abstract void jobDeleted(JobKey jobkey); public abstract void jobPaused(JobKey jobkey); public abstract void jobsPaused(String s); public abstract void jobResumed(JobKey jobkey); public abstract void jobsResumed(String s); public abstract void schedulerError(String s, SchedulerException schedulerexception); public abstract void schedulerInStandbyMode(); public abstract void schedulerStarted(); public abstract void schedulerStarting(); public abstract void schedulerShutdown(); public abstract void schedulerShuttingdown(); public abstract void schedulingDataCleared(); }
//job监听器
public interface JobListener { public abstract String getName(); public abstract void jobToBeExecuted(JobExecutionContext jobexecutioncontext); public abstract void jobExecutionVetoed(JobExecutionContext jobexecutioncontext); public abstract void jobWasExecuted(JobExecutionContext jobexecutioncontext, JobExecutionException jobexecutionexception); }
//Trigger监听器
public interface TriggerListener { public abstract String getName(); public abstract void triggerFired(Trigger trigger, JobExecutionContext jobexecutioncontext); public abstract boolean vetoJobExecution(Trigger trigger, JobExecutionContext jobexecutioncontext); public abstract void triggerMisfired(Trigger trigger); public abstract void triggerComplete(Trigger trigger, JobExecutionContext jobexecutioncontext, Trigger.CompletedExecutionInstruction completedexecutioninstruction); }
总结:
从以上的分析可以看出,在job存储之后,之后做了job添加通知,通知调度器下一刻执行时间,并唤醒正在等在执行的job,然后添加trriger通知。
发表评论
-
TreeSet在Quartz任务调度过程中的作用
2017-08-24 23:43 726红黑树详解:http://www.cnblogs.com/sk ... -
Quartz使用与Spring集成系列教程
2016-10-26 09:48 496Quartz的使用:http://donald-draper. ... -
Spring与Quartz集成-源码分析
2016-09-13 11:50 2715在阅读以下文章之前,如果对Quartz任务调度不是很熟悉,请看 ... -
Spring与Quartz集成详解
2016-09-09 17:52 2818首先这个所有的依赖包就不需要多讲了,首先下载Quazrt发布包 ... -
Quartz 任务存储JobStoreTX 持久化之RDB-源码分析
2016-09-08 18:24 2322Quartz 任务存储JobStoreTX 持久化之RDB:h ... -
Quartz 任务存储JobStoreTX 持久化之RDB
2016-09-08 11:52 2822Quartz储存方式之JDBC JobStoreTX:http ... -
Quartz任务调度源码分析
2016-09-07 13:12 3570Quartz的使用:http://donald-draper. ... -
Quartz的job调度源码分析
2016-09-06 13:03 8Quartz的使用:http://donald ... -
Quartz的job、触发器的暂停与恢复源码分析
2016-09-06 09:01 5266Quartz的使用:http://donald-draper. ... -
Quartz的Scheduler初始化源码分析
2016-09-05 16:14 3591Quartz的使用:http://donald ... -
Quartzs的job存储,触发器、job删除源码分析
2016-09-05 15:39 4549前言:unscheduleJob针对TriggerKey,而d ... -
Quartz的使用
2016-09-05 11:39 1787Quartz使用总结:http://www.cnblogs.c ...
相关推荐
xxl-job是一个轻量级的任务调度平台,具备开源特性,其源码分析对于工程开发人员具有一定的参考价值。接下来,我们将详细介绍xxl-job的核心概念、架构特点以及源码分析过程中的关键知识点。 首先,xxl-job项目的...
这个项目“java监听器+quartz实现每天动态时间执行任务的功能”是结合了Java的监听器机制和Quartz定时任务框架来完成这样的需求。下面将详细解释这两个关键知识点。 **Java监听器** Java监听器是Java AWT和Swing库...
本文将深入源码层面,分析MapReduce Job在本地提交的详细步骤。 首先,我们来了解一下MapReduce Job的基本流程。当用户通过Java API调用`Job`类的`submit()`方法时,本地提交过程就开始了。这个过程主要分为以下几...
本文将深入探讨Spark的核心思想,并通过源码分析来深化理解。 一、Spark核心思想 1. **弹性分布式数据集(Resilient Distributed Datasets, RDD)**:RDD是Spark的核心数据抽象,它是一种不可变、分区的记录集合,...
- **Job Execution**:节点上的Job执行器接收到触发器的触发信号后,执行对应的Job分片。 - **Failure Transferring**:如果某个节点执行失败,会自动转移到其他节点进行重试,保证任务的高可用性。 - **...
### 将xxl-job-admin、xxl-job-core、xxl-job-executor的源码引入项目中的实践 在探讨如何将这三个组件(xxl-job-admin、xxl-job-core、xxl-job-executor)有效地引入到项目中之前,我们需要先了解它们各自的作用及...
该项目是一款基于Scala语言的Spark-JobServer项目,致力于设计与源码分析。该系统包含378个文件,主要由Scala源代码构成,辅以Shell、Python、JavaScript、Java、CSS和HTML等编程语言。文件类型涵盖Scala、Markdown...
《MapReduce2.0源码分析与实战编程》是一本深度探讨Hadoop生态系统中的核心组件MapReduce 2.0(也称为YARN)的专著。MapReduce是大数据处理领域的重要框架,它提供了并行计算的能力,使得海量数据的处理变得高效可行...
- **任务调度器(XXL-JOB Executor)**:执行任务的组件,负责接收调度中心的指令并执行相应的作业。 - **调度中心(XXL-JOB Admin)**:管理任务调度的组件,存储任务信息,执行策略等,并负责触发任务执行。 - **...
基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job...
python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts可视化。python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts...
学习Hadoop源码过程中做的源码分析,共享一下,PPT中有我的邮箱,可以互相探讨。Hadoop源码分析(client端提交job到rm端)
Flink源码分析-Job调度部署运行流程.pdf
6. **监控与错误处理**:通过监听器(如`TransListener`或`JobListener`)跟踪Job和转换的进度,以便在出错时进行异常处理。 7. **资源清理**:当转换或作业执行完毕后,记得释放占用的资源,例如关闭打开的数据库...
【Kettle源码分析】 Kettle,也称为Pentaho Data Integration (PDI),是一个开源的提取、转换、加载(ETL)工具,用于在各种数据源之间移动数据。Kettle4.2的源码分析主要涉及其核心概念、源代码结构以及关键模块。...
总之,XXL-JOB-2.2.0源码的分析可以帮助我们理解分布式任务调度的实现细节,了解任务的分发、执行、监控等核心流程,为自定义任务处理和扩展提供指导。深入学习这些源码,不仅可以提升开发技能,也能提高解决实际...
XXL-JOB 接口文档及源码运行操作 XXL-JOB 是一个轻量级分布式任务调度框架,核心设计目标是开发迅速、学习简单、轻量级、易扩展。该框架支持通过 Web 页面对任务进行 CRUD 操作,操作简单,一分钟上手。同时,XXL-...
本篇将深入探讨XXL-JOB的核心概念、架构设计以及源码分析。 **一、XXL-JOB简介** 1. **核心概念** - **调度中心(Admin)**: 负责任务的注册、管理、监控,提供Web界面供用户操作。 - **执行器(Executor)**: ...