`
Donald_Draper
  • 浏览: 981080 次
社区版块
存档分类
最新评论

Quartzs的job,trriger监听器源码分析

阅读更多
Quartz的使用:http://donald-draper.iteye.com/blog/2321886
Quartz的Scheduler初始化源码分析:http://donald-draper.iteye.com/blog/2322730
Quartz的job、触发器的暂停与恢复源码分析:http://donald-draper.iteye.com/blog/2322823
Quartz的Job存储,触发器、任务删除,源码分析:http://donald-draper.iteye.com/blog/2322725
Quartzs的job,trriger监听器源码分析:http://donald-draper.iteye.com/blog/2322863
Quartz 任务存储JobStoreTX 持久化之RDB:http://donald-draper.iteye.com/blog/2323297
Quartz 任务存储JobStoreTX 持久化之RDB-源码分析:http://donald-draper.iteye.com/blog/2323409
Quartz任务调度源码分析:http://donald-draper.iteye.com/blog/2323118
Spring与Quartz集成详解:http://donald-draper.iteye.com/blog/2323591
Spring与Quartz集成-源码分析:http://donald-draper.iteye.com/blog/2324132
这一节,我们来探索一下scheduleJob(JobDetail jobDetail, Trigger trigger),存储job之后所做的事情,以及怎么做?
public class QuartzScheduler
    implements RemotableQuartzScheduler
{
 public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
        throws SchedulerException
    {
        validateState();
        if(jobDetail == null)
            throw new SchedulerException("JobDetail cannot be null");
        if(trigger == null)
            throw new SchedulerException("Trigger cannot be null");
        if(jobDetail.getKey() == null)
            throw new SchedulerException("Job's key cannot be null");
        if(jobDetail.getJobClass() == null)
            throw new SchedulerException("Job's class cannot be null");
	//包装触发器
        OperableTrigger trig = (OperableTrigger)trigger;
        if(trigger.getJobKey() == null)
	    //设置触发器jobKey
            trig.setJobKey(jobDetail.getKey());
        else
        if(!trigger.getJobKey().equals(jobDetail.getKey()))
            throw new SchedulerException("Trigger does not reference given job!");
        trig.validate();
        Calendar cal = null;
        if(trigger.getCalendarName() != null)
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
        Date ft = trig.computeFirstFireTime(cal);
        if(ft == null)
        {
            throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString());
        } else
        {
	    //存储job and trriger到jobStrore
            resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
	    //添加jobDetail到调度监听器
            notifySchedulerListenersJobAdded(jobDetail);
            //通知调度器下一刻调度时间
            notifySchedulerThread(trigger.getNextFireTime().getTime());
             //添加trigger到调度监听器
            notifySchedulerListenersSchduled(trigger);
            return ft;
        }
     //添加jobDetail
     public void notifySchedulerListenersJobAdded(JobDetail jobDetail)
    {
        List schedListeners = buildSchedulerListenerList();
        for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)
        {
            SchedulerListener sl = (SchedulerListener)i$.next();
            try
            {
                sl.jobAdded(jobDetail);
            }
            catch(Exception e)
            {
                getLog().error("Error while notifying SchedulerListener of JobAdded.", e);
            }
        }

    }
    //获取调度监听器
     private List buildSchedulerListenerList()
    {
        List allListeners = new LinkedList();
        allListeners.addAll(getListenerManager().getSchedulerListeners());
        allListeners.addAll(getInternalSchedulerListeners());
        return allListeners;
    }
    //获取内部调度器
      public List getInternalSchedulerListeners()
    {
        ArrayList arraylist = internalSchedulerListeners;
        JVM INSTR monitorenter ;
        return Collections.unmodifiableList(new ArrayList(internalSchedulerListeners));
        Exception exception;
        exception;
        throw exception;
    }
     //通知调度器下一刻调度时间
    protected void notifySchedulerThread(long candidateNewNextFireTime)
    {
        if(isSignalOnSchedulingChange())
            signaler.signalSchedulingChange(candidateNewNextFireTime);
    }
    //添加trigger到调度监听器
    public void notifySchedulerListenersSchduled(Trigger trigger)
    {
        List schedListeners = buildSchedulerListenerList();
        for(Iterator i$ = schedListeners.iterator(); i$.hasNext();)
        {
            SchedulerListener sl = (SchedulerListener)i$.next();
            try
            {
                sl.jobScheduled(trigger);
            }
            catch(Exception e)
            {
                getLog().error((new StringBuilder()).append("Error while notifying SchedulerListener of scheduled job.  Triger=").append(trigger.getKey()).toString(), e);
            }
        }

    }
    private static String VERSION_MAJOR;
    private static String VERSION_MINOR;
    private static String VERSION_ITERATION;
    private QuartzSchedulerResources resources;
    private QuartzSchedulerThread schedThread;
    private ThreadGroup threadGroup;
    private SchedulerContext context;
    private ListenerManager listenerManager;//监听管理器
    private HashMap internalJobListeners;
    private HashMap internalTriggerListeners;
    private ArrayList internalSchedulerListeners;
    private JobFactory jobFactory;
    ExecutingJobsManager jobMgr;
    ErrorLogger errLogger;
    private SchedulerSignaler signaler;
    private Random random;
    private ArrayList holdToPreventGC;
    private boolean signalOnSchedulingChange;
    private volatile boolean closed;
    private volatile boolean shuttingDown;
    private boolean boundRemotely;
    private QuartzSchedulerMBean jmxBean;
    private Date initialStart;
    private final Timer updateTimer;
}

//调度监听器
 public class ListenerManagerImpl
    implements ListenerManager
{
public ListenerManagerImpl()
    {
        globalJobListeners = new LinkedHashMap(10);
        globalTriggerListeners = new LinkedHashMap(10);
        globalJobListenersMatchers = new LinkedHashMap(10);
        globalTriggerListenersMatchers = new LinkedHashMap(10);
        schedulerListeners = new ArrayList(10);
    }
    //添加调度监听器
     public void addSchedulerListener(SchedulerListener schedulerListener)
    {
        synchronized(schedulerListeners)
        {
            schedulerListeners.add(schedulerListener);
        }
    }
    //添加job监听器
    public void addJobListener(JobListener jobListener, List matchers)
    {
        if(jobListener.getName() == null || jobListener.getName().length() == 0)
            throw new IllegalArgumentException("JobListener name cannot be empty.");
        synchronized(globalJobListeners)
        {
            globalJobListeners.put(jobListener.getName(), jobListener);
            LinkedList matchersL = new LinkedList();
            if(matchers != null && matchers.size() > 0)
                matchersL.addAll(matchers);
            else
                matchersL.add(EverythingMatcher.allJobs());
            globalJobListenersMatchers.put(jobListener.getName(), matchersL);
        }
    }
     //添加Trigger监听器
      public void addTriggerListener(TriggerListener triggerListener, List matchers)
    {
        if(triggerListener.getName() == null || triggerListener.getName().length() == 0)
            throw new IllegalArgumentException("TriggerListener name cannot be empty.");
        synchronized(globalTriggerListeners)
        {
            globalTriggerListeners.put(triggerListener.getName(), triggerListener);
            LinkedList matchersL = new LinkedList();
            if(matchers != null && matchers.size() > 0)
                matchersL.addAll(matchers);
            else
                matchersL.add(EverythingMatcher.allTriggers());
            globalTriggerListenersMatchers.put(triggerListener.getName(), matchersL);
        }
    }
     public List getSchedulerListeners()
    {
        ArrayList arraylist = schedulerListeners;
        JVM INSTR monitorenter ;
        return Collections.unmodifiableList(new ArrayList(schedulerListeners));
        Exception exception;
        exception;
        throw exception;
    }

    private Map globalJobListeners;//LinkedHashMap<String,JobListener>,key为JobListener.getName(),全局job监听器
    private Map globalTriggerListeners;//LinkedHashMap<String,TriggerListener>,key为TriggerListener.getName(),全局Trriger监听器
    private Map globalJobListenersMatchers;
    private Map globalTriggerListenersMatchers;
    private ArrayList schedulerListeners;//List<SchedulerListener>调度监听器
}

//QuartzSchedulerMBean
public class QuartzSchedulerMBeanImpl extends StandardMBean
    implements NotificationEmitter, QuartzSchedulerMBean, JobListener, SchedulerListener
{
  //添加jobDetail通知到JMX
  public void jobAdded(JobDetail jobDetail)
    {
        sendNotification("jobAdded", JobDetailSupport.toCompositeData(jobDetail));
    }
 //添加trigger通知到JMX
  public void jobScheduled(Trigger trigger)
    {
        sendNotification("jobScheduled", TriggerSupport.toCompositeData(trigger));
    }
  public void sendNotification(String eventType, Object data)
    {
        sendNotification(eventType, data, null);
    }

 public void sendNotification(String eventType, Object data, String msg)
    {
        Notification notif = new Notification(eventType, this, sequenceNumber.incrementAndGet(), System.currentTimeMillis(), msg);
        if(data != null)
            notif.setUserData(data);
        emitter.sendNotification(notif);
    }
}

//调度通知器
public class SchedulerSignalerImpl
    implements SchedulerSignaler
{

    public SchedulerSignalerImpl(QuartzScheduler sched, QuartzSchedulerThread schedThread)
    {
        log = LoggerFactory.getLogger(org/quartz/core/SchedulerSignalerImpl);
        this.sched = sched;
        this.schedThread = schedThread;
        log.info((new StringBuilder()).append("Initialized Scheduler Signaller of type: ").append(getClass()).toString());
    }

    public void signalSchedulingChange(long candidateNewNextFireTime)
    {
        schedThread.signalSchedulingChange(candidateNewNextFireTime);
    }

    Logger log;
    protected QuartzScheduler sched;
    protected QuartzSchedulerThread schedThread;
}

//调度器线程
public class QuartzSchedulerThread extends Thread
{
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs)
    {
        this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), 5);
    }
    //唤醒所有等待sigLock的调度器
     public void signalSchedulingChange(long candidateNewNextFireTime)
    {
        synchronized(sigLock)
        {
            signaled = true;
            signaledNextFireTime = candidateNewNextFireTime;
            sigLock.notifyAll();
        }
    }
 private QuartzScheduler qs;
    private QuartzSchedulerResources qsRsrcs;
    private final Object sigLock;
    private boolean signaled;
    private long signaledNextFireTime;
    private boolean paused;
    private AtomicBoolean halted;
    private Random random;
    private static long DEFAULT_IDLE_WAIT_TIME = 30000L;
    private long idleWaitTime;
    private int idleWaitVariablness;
    private final Logger log;

}

//SchedulerListener
public interface SchedulerListener
{
    public abstract void jobScheduled(Trigger trigger);
    public abstract void jobUnscheduled(TriggerKey triggerkey);
    public abstract void triggerFinalized(Trigger trigger);
    public abstract void triggerPaused(TriggerKey triggerkey);
    public abstract void triggersPaused(String s);
    public abstract void triggerResumed(TriggerKey triggerkey);
    public abstract void triggersResumed(String s);
    public abstract void jobAdded(JobDetail jobdetail);
    public abstract void jobDeleted(JobKey jobkey);
    public abstract void jobPaused(JobKey jobkey);
    public abstract void jobsPaused(String s);
    public abstract void jobResumed(JobKey jobkey);
    public abstract void jobsResumed(String s);
    public abstract void schedulerError(String s, SchedulerException schedulerexception);
    public abstract void schedulerInStandbyMode();
    public abstract void schedulerStarted();
    public abstract void schedulerStarting();
    public abstract void schedulerShutdown();
    public abstract void schedulerShuttingdown();
    public abstract void schedulingDataCleared();
}

//job监听器
public interface JobListener
{
    public abstract String getName();
    public abstract void jobToBeExecuted(JobExecutionContext jobexecutioncontext);
    public abstract void jobExecutionVetoed(JobExecutionContext jobexecutioncontext);
    public abstract void jobWasExecuted(JobExecutionContext jobexecutioncontext, JobExecutionException jobexecutionexception);
}

//Trigger监听器
public interface TriggerListener
{
    public abstract String getName();
    public abstract void triggerFired(Trigger trigger, JobExecutionContext jobexecutioncontext);
    public abstract boolean vetoJobExecution(Trigger trigger, JobExecutionContext jobexecutioncontext);
    public abstract void triggerMisfired(Trigger trigger);
    public abstract void triggerComplete(Trigger trigger, JobExecutionContext jobexecutioncontext, Trigger.CompletedExecutionInstruction completedexecutioninstruction);
}

总结:
从以上的分析可以看出,在job存储之后,之后做了job添加通知,通知调度器下一刻执行时间,并唤醒正在等在执行的job,然后添加trriger通知。
0
0
分享到:
评论

相关推荐

    xxl job源码分析

    xxl-job是一个轻量级的任务调度平台,具备开源特性,其源码分析对于工程开发人员具有一定的参考价值。接下来,我们将详细介绍xxl-job的核心概念、架构特点以及源码分析过程中的关键知识点。 首先,xxl-job项目的...

    java监听器+quartz实现每天动态时间执行任务的功能

    这个项目“java监听器+quartz实现每天动态时间执行任务的功能”是结合了Java的监听器机制和Quartz定时任务框架来完成这样的需求。下面将详细解释这两个关键知识点。 **Java监听器** Java监听器是Java AWT和Swing库...

    MapReduce Job本地提交过程源码跟踪及分析

    本文将深入源码层面,分析MapReduce Job在本地提交的详细步骤。 首先,我们来了解一下MapReduce Job的基本流程。当用户通过Java API调用`Job`类的`submit()`方法时,本地提交过程就开始了。这个过程主要分为以下几...

    深入理解Spark 核心思想与源码分析

    本文将深入探讨Spark的核心思想,并通过源码分析来深化理解。 一、Spark核心思想 1. **弹性分布式数据集(Resilient Distributed Datasets, RDD)**:RDD是Spark的核心数据抽象,它是一种不可变、分区的记录集合,...

    elastic-job-1.0.5相关源码

    - **Job Execution**:节点上的Job执行器接收到触发器的触发信号后,执行对应的Job分片。 - **Failure Transferring**:如果某个节点执行失败,会自动转移到其他节点进行重试,保证任务的高可用性。 - **...

    将 xxl-job-admin、xxl-job-core、xxl-job-executor 的源码引入项目中.docx

    ### 将xxl-job-admin、xxl-job-core、xxl-job-executor的源码引入项目中的实践 在探讨如何将这三个组件(xxl-job-admin、xxl-job-core、xxl-job-executor)有效地引入到项目中之前,我们需要先了解它们各自的作用及...

    基于Scala语言的spark-jobserver项目设计与源码分析

    该项目是一款基于Scala语言的Spark-JobServer项目,致力于设计与源码分析。该系统包含378个文件,主要由Scala源代码构成,辅以Shell、Python、JavaScript、Java、CSS和HTML等编程语言。文件类型涵盖Scala、Markdown...

    MapReduce2.0源码分析与实战编程

    《MapReduce2.0源码分析与实战编程》是一本深度探讨Hadoop生态系统中的核心组件MapReduce 2.0(也称为YARN)的专著。MapReduce是大数据处理领域的重要框架,它提供了并行计算的能力,使得海量数据的处理变得高效可行...

    xxl-job源码下载

    - **任务调度器(XXL-JOB Executor)**:执行任务的组件,负责接收调度中心的指令并执行相应的作业。 - **调度中心(XXL-JOB Admin)**:管理任务调度的组件,存储任务信息,执行策略等,并负责触发任务执行。 - **...

    基于scrapy爬取51job爬虫系统源码.zip

    基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job爬虫系统源码.zip 基于scrapy爬取51job...

    python基于51job数据可视化图表展示源码.zip

    python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts可视化。python基于51job数据可视化图表展示源码,基于51job工作招聘数据可视化图表展示 web 数据挖掘,ECharts...

    Hadoop源码分析(client端提交job到rm端)

    学习Hadoop源码过程中做的源码分析,共享一下,PPT中有我的邮箱,可以互相探讨。Hadoop源码分析(client端提交job到rm端)

    Flink源码分析-Job调度部署运行流程.pdf

    Flink源码分析-Job调度部署运行流程.pdf

    java调用kettle中的job与转换-源码

    6. **监控与错误处理**:通过监听器(如`TransListener`或`JobListener`)跟踪Job和转换的进度,以便在出错时进行异常处理。 7. **资源清理**:当转换或作业执行完毕后,记得释放占用的资源,例如关闭打开的数据库...

    Kettle源码分析

    【Kettle源码分析】 Kettle,也称为Pentaho Data Integration (PDI),是一个开源的提取、转换、加载(ETL)工具,用于在各种数据源之间移动数据。Kettle4.2的源码分析主要涉及其核心概念、源代码结构以及关键模块。...

    xxl-job-2.2.0.zip

    总之,XXL-JOB-2.2.0源码的分析可以帮助我们理解分布式任务调度的实现细节,了解任务的分发、执行、监控等核心流程,为自定义任务处理和扩展提供指导。深入学习这些源码,不仅可以提升开发技能,也能提高解决实际...

    xxljob接口文档及源码运行操作.docx

    XXL-JOB 接口文档及源码运行操作 XXL-JOB 是一个轻量级分布式任务调度框架,核心设计目标是开发迅速、学习简单、轻量级、易扩展。该框架支持通过 Web 页面对任务进行 CRUD 操作,操作简单,一分钟上手。同时,XXL-...

    xxl-job实战源码

    本篇将深入探讨XXL-JOB的核心概念、架构设计以及源码分析。 **一、XXL-JOB简介** 1. **核心概念** - **调度中心(Admin)**: 负责任务的注册、管理、监控,提供Web界面供用户操作。 - **执行器(Executor)**: ...

Global site tag (gtag.js) - Google Analytics