`
234390216
  • 浏览: 10233082 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:462627
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1775522
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1398369
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:395023
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:679990
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:530894
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1183954
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:467938
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:151399
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:68154
社区版块
存档分类
最新评论

elastic-job之监听器

阅读更多

每个作业都可以配置一个任务监听器,确切的说是只能配置一个本地监听器和一个分布式监听器。Elastic-job有三种作业类型,但是它们的通用配置都是一样的,所以本文在介绍作业的监听器配置时将仅以简单作业的配置为例。

本地监听器

本地监听器只在节点执行自己分片的时候调度,每个分片任务调度的时候本地监听器都会执行。本地监听器由ElasticJobListener接口定义,其定义如下:

/**
 * 弹性化分布式作业监听器接口.
 * 
 * @author zhangliang
 */
public interface ElasticJobListener {
    
    /**
     * 作业执行前的执行的方法.
     * 
     * @param shardingContexts 分片上下文
     */
    void beforeJobExecuted(final ShardingContexts shardingContexts);
    
    /**
     * 作业执行后的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    void afterJobExecuted(final ShardingContexts shardingContexts);
}

该接口的接口方法的注释上已经说明了对应的接口方法的调用时机,详情也可以参考com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute()方法。简单示例如下:

public class MyElasticJobListener implements ElasticJobListener {

	private static final Logger LOGGER = Logger.getLogger(MyElasticJobListener.class);
	
	@Override
	public void beforeJobExecuted(ShardingContexts shardingContexts) {
		LOGGER.info(String.format("开始调度任务[%s]", shardingContexts.getJobName()));
	}

	@Override
	public void afterJobExecuted(ShardingContexts shardingContexts) {
		LOGGER.info(String.format("任务[%s]调度完成", shardingContexts.getJobName()));
	}

}

本地监听器的配置由<job:listener/>节点配置,如下示例中就通过<job:listener/>给简单作业myElasticJob定义了一个本地监听器。

<bean id="simpleJob" class="com.elim.learn.elastic.job.MyElasticJob"/>
<job:simple id="myElasticJob" job-ref="simpleJob"
	registry-center-ref="regCenter" cron="0/30 * * * * ?"
	sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"
	failover="true" overwrite="true" >
	<job:listener class="com.elim.learn.elastic.job.listener.MyElasticJobListener" />
</job:simple>

分布式监听器

本地监听器在作业执行本地的分片任务时会执行,如上面的示例,我们的作业被分成了6片,则监听器任务会执行6次。而分布式监听器会在总的任务开始执行时执行一次,在总的任务结束执行时执行一次。分布式监听器也是在普通监听器的基础上实现的,由AbstractDistributeOnceElasticJobListener抽象类封装的,其实现了ElasticJobListener接口。要实现自己的监听器只需要继承AbstractDistributeOnceElasticJobListener抽象类,实现其中的抽象方法即可。AbstractDistributeOnceElasticJobListener抽象类的定义如下:

/**
 * 在分布式作业中只执行一次的监听器.
 * 
 * @author zhangliang
 */
public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener {
    
    private final long startedTimeoutMilliseconds;
    
    private final Object startedWait = new Object();
    
    private final long completedTimeoutMilliseconds;
    
    private final Object completedWait = new Object();
    
    @Setter
    private GuaranteeService guaranteeService;
    
    private TimeService timeService = new TimeService();
    
    public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
        if (startedTimeoutMilliseconds <= 0L) {
            this.startedTimeoutMilliseconds = Long.MAX_VALUE;
        } else {
            this.startedTimeoutMilliseconds = startedTimeoutMilliseconds;
        }
        if (completedTimeoutMilliseconds <= 0L) {
            this.completedTimeoutMilliseconds = Long.MAX_VALUE; 
        } else {
            this.completedTimeoutMilliseconds = completedTimeoutMilliseconds;
        }
    }
    
    @Override
    public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
        guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());
        if (guaranteeService.isAllStarted()) {
            doBeforeJobExecutedAtLastStarted(shardingContexts);
            guaranteeService.clearAllStartedInfo();
            return;
        }
        long before = timeService.getCurrentMillis();
        try {
            synchronized (startedWait) {
                startedWait.wait(startedTimeoutMilliseconds);
            }
        } catch (final InterruptedException ex) {
            Thread.interrupted();
        }
        if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {
            guaranteeService.clearAllStartedInfo();
            handleTimeout(startedTimeoutMilliseconds);
        }
    }
    
    @Override
    public final void afterJobExecuted(final ShardingContexts shardingContexts) {
        guaranteeService.registerComplete(shardingContexts.getShardingItemParameters().keySet());
        if (guaranteeService.isAllCompleted()) {
            doAfterJobExecutedAtLastCompleted(shardingContexts);
            guaranteeService.clearAllCompletedInfo();
            return;
        }
        long before = timeService.getCurrentMillis();
        try {
            synchronized (completedWait) {
                completedWait.wait(completedTimeoutMilliseconds);
            }
        } catch (final InterruptedException ex) {
            Thread.interrupted();
        }
        if (timeService.getCurrentMillis() - before >= completedTimeoutMilliseconds) {
            guaranteeService.clearAllCompletedInfo();
            handleTimeout(completedTimeoutMilliseconds);
        }
    }
    
    private void handleTimeout(final long timeoutMilliseconds) {
        throw new JobSystemException("Job timeout. timeout mills is %s.", timeoutMilliseconds);
    }
    
    /**
     * 分布式环境中最后一个作业执行前的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts);
    
    /**
     * 分布式环境中最后一个作业执行后的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts);
    
    /**
     * 通知任务开始.
     */
    public void notifyWaitingTaskStart() {
        synchronized (startedWait) {
            startedWait.notifyAll();
        }
    }
    
    /**
     * 通知任务结束.
     */
    public void notifyWaitingTaskComplete() {
        synchronized (completedWait) {
            completedWait.notifyAll();
        }
    }
}

以下是一个使用分布式监听器的示例:

public class MyDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener {

	private static final Logger logger = Logger.getLogger(MyDistributeOnceElasticJobListener.class);
	
	/**
	 * @param startedTimeoutMilliseconds
	 * @param completedTimeoutMilliseconds
	 */
	public MyDistributeOnceElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
		super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
	}

	@Override
	public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
		logger.info("分布式监听器开始……");
	}

	@Override
	public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
		logger.info("分布式监听器结束……");
	}

}

分布式监听器用到了锁的等待和通知,startedTimeoutMilliseconds和completedTimeoutMilliseconds分别用来指定作业开始前和完成后的对应的锁等待最大超时时间。分布式监听器由<job:distributed-listener/>,以下是一个使用分布式监听器的示例:

<bean id="simpleJob" class="com.elim.learn.elastic.job.MyElasticJob"/>
<job:simple id="myElasticJob" job-ref="simpleJob"
	registry-center-ref="regCenter" cron="0/30 * * * * ?"
	sharding-total-count="6" sharding-item-parameters="0=A,1=B,2=C,3=D,4=E,5=F"
	failover="true" overwrite="true" >
	<job:distributed-listener class="com.elim.learn.elastic.job.listener.MyDistributeOnceElasticJobListener" 
			started-timeout-milliseconds="100" completed-timeout-milliseconds="100"/>
</job:simple>

(本文由Elim写于2017年10月2日)

0
0
分享到:
评论

相关推荐

    elastic-job-lite-console-2.1.5

    7. **扩展性**:Elastic-Job设计上具有很好的扩展性,如支持自定义ShardingStrategy(分片策略)和ExecutionListener(执行监听器),允许开发者根据业务需求定制任务的分片规则和执行过程。 综上所述,Elastic-Job...

    Elastic-Job控制台2.1.5

    Elastic-Job是一个分布式任务调度框架,由两个子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite是轻量级的解决方案,适合在私有云或物理服务器集群上使用;而Elastic-Job-Cloud则基于Mesos框架,更...

    elastic-job-quickstart.zip

    7. **JobListener(监听器)**:Elastic-Job提供了监听器接口,允许用户在任务执行前后进行额外的操作,如日志记录、状态通知等。 8. **API使用**:在代码中,我们需要通过`SimpleJob`或`DataflowJob`接口实现自己...

    elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar.gz

    3. **ExecutionListener** 和 **JobListener**:监听器,提供任务执行前后的回调功能,可以用来记录日志、监控状态等。 Elastic-Job-Lite 3.0.0.M1-SNAPSHOT是该框架的一个里程碑版本,其中可能包含了以下改进和新...

    elastic-job-1.0.5源码

    在实际应用中,我们需要遵循Elastic-Job提供的API和设计模式,例如,通过`SimpleJob`接口实现自定义作业,配置任务执行的cron表达式,以及设置作业的监听器和错误处理器。同时,确保正确配置注册中心,并在作业...

    elastic-job-boot

    4. 事件通知:Elastic-Job支持事件通知机制,当任务状态发生改变时,可以通过监听器触发相应的回调函数,便于我们进行日志记录或者业务处理。 四、实战示例 假设我们有一个需要定时清理数据库过期数据的任务,可以...

    validation-elastic-job-lite-console-2.1.5.zip

    ElasticJob-Lite是一款轻量级分布式任务调度框架,由当当网开源并维护,它在Java社区中广受欢迎。这个"validation-elastic-job-lite-console-2.1.5.zip"压缩包包含了ElasticJob-Lite 2.1.5版本的控制台组件,这个...

    elastic-job 动态创建任务示例

    `ElasticJobListener.java`代表一个监听器,Elastic-Job提供了丰富的监听器接口,如`SimpleJobListener`、`ScriptJobListener`等,可以监听作业的生命周期事件,如开始执行、执行完成、异常等,进行相应的日志记录或...

    分布式任务调度框架elastic-job-lite

    - 定义作业类,实现`SimpleJob`接口或使用注解`@ElasticJob`。 - 在作业服务器上启动作业实例。 - 在调度服务器上设置调度规则。 - 监控作业执行情况,可以通过日志或注册中心查看。 5. **最佳实践** - 尽可能...

    elasticjob分布式调度动态添加任务

    4. **监听配置变更**:使用ElasticJob提供的监听器,如`ZookeeperConfigurationChangeListener`,监听配置中心的变化,当有新任务添加时,自动同步到各个工作节点。 5. **任务调度**:ElasticJob会根据注册的任务和...

    elasticjob完成例子.zip

    ElasticJob提供了作业监听器(JobListener)的概念,用于监听作业生命周期中的各种事件,比如作业启动、结束、异常等。开发者可以根据需求自定义监听器,实现特定功能。 4. **弹性调度策略** ElasticJob的核心...

    elasticjob-lite-example.rar

    4. **启动类**:应用的主入口,通常会初始化Spring Boot应用,并启动ElasticJob-Lite的监听器。 5. **测试类**:可能包含单元测试或集成测试,用于验证任务的正确性和功能。 在学习和使用这个示例时,你需要理解...

    eljob源码解读1

    二、Elastic-job监听器分析 2.1 ListenerManager 介绍 `ListenerManager`是Elastic-job中用于管理和处理Zookeeper节点变化的关键组件,位于`com.dangdang.ddframe.job.lite.internal.listener`包下。它包含四个主要...

    ElasticJob 中文文档.pdf

    ElasticJob 是一个分布式任务调度框架,它由当当网开源,目的是解决在分布式系统环境下定时任务的管理问题。ElasticJob 提供了分布式环境下任务的分片与高可用特性,支持任务的快速弹性扩展。它将任务分片到多个...

    shardingsphere-elasticjob.zip

    ElasticJob-Cloud则是在Hadoop YARN之上构建的分布式任务调度框架,它充分利用YARN的资源管理和调度能力,为大规模的数据处理提供更高效的任务调度解决方案。主要特性包括:基于YARN的资源调度、任务生命周期管理、...

    Java Elastic Job动态添加任务实现过程解析

    Java Elastic Job是一个强大的分布式任务调度框架,它提供了轻量级的Elastic-Job-Lite和基于Mesos的Elastic-Job-Cloud两个版本。本文主要关注Elastic-Job-Lite,探讨如何动态添加任务,以及如何处理在动态任务中遇到...

    SpingBoot集成ElaticJob定时器

    5. **自定义监听器**(可选): 可以通过实现`ElasticJobListener`接口来自定义任务执行前后的操作,例如记录日志或发送通知。 6. **启动应用**: 启动SpringBoot应用,ElasticJob会自动注册并开始执行定时任务。...

    jdbc.rar elasticsearch 与mysql 同步所需要jar包

    2. **Elasticsearch JDBC**:Elasticsearch提供了名为`elasticsearch-jdbc`的工具,它是一个可配置的JDBC导入器,可以从任何兼容JDBC的数据库(如MySQL)中拉取数据并导入到Elasticsearch中。 3. **配置文件**:...

    Laravel开发-aos-laravel-emq-queue

    为了使队列工作,我们需要启动监听器进程。Laravel提供了命令`php artisan queue:listen`来启动监听器,但长期运行的生产环境建议使用`php artisan queue:work --daemon`。你也可以在`config/queue.php`中配置监听器...

    SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)推荐

    - 当面临分布式集群环境时,如需解决任务多次执行和单点故障问题,可以采用Quartz、elastic-job、xxl-job或Saturn等分布式任务调度系统。它们能实现任务的高可用性和分片执行。 **Spring Task在Spring Boot中的...

Global site tag (gtag.js) - Google Analytics