`

elastic-job的原理简介和使用

 
阅读更多

elastic-job是当当开源的一款非常好用的作业框架,在这之前,我们开发定时任务一般都是使用quartz或者spring-task(ScheduledExecutorService),无论是使用quartz还是spring-task,我们都会至少遇到两个痛点:
1.不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误。
2.quartz的集群仅仅只是用来HA,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展。

本篇博文将会自顶向下地介绍elastic-job,让大家认识了解并且快速搭建起环境。

elastic-job产品线说明

elastic-job在2.x之后,出了两个产品线:Elastic-Job-Lite和Elastic-Job-Cloud。我们一般使用Elastic-Job-Lite就能够满足需求,本文也是以Elastic-Job-Lite为主。1.x系列对应的就只有Elastic-Job-Lite,并且在2.x里修改了一些核心类名,差别虽大,原理类似,建议使用2.x系列。写此博文,最新release版本为2.0.5。

elastic-job-lite原理

举个典型的job场景,比如余额宝里的昨日收益,系统需要job在每天某个时间点开始,给所有余额宝用户计算收益。如果用户数量不多,我们可以轻易使用quartz来完成,我们让计息job在某个时间点开始执行,循环遍历所有用户计算利息,这没问题。可是,如果用户体量特别大,我们可能会面临着在第二天之前处理不完这么多用户。另外,我们部署job的时候也得注意,我们可能会把job直接放在我们的webapp里,webapp通常是多节点部署的,这样,我们的job也就是多节点,多个job同时执行,很容易造成重复执行,比如用户重复计息,为了避免这种情况,我们可能会对job的执行加锁,保证始终只有一个节点能执行,或者干脆让job从webapp里剥离出来,独自部署一个节点。
elastic-job就可以帮助我们解决上面的问题,elastic底层的任务调度还是使用的quartz,通过zookeeper来动态给job节点分片。
我们来看:
很大体量的用户需要在特定的时间段内计息完成
我们肯定是希望我们的任务可以通过集群达到水平扩展,集群里的每个节点都处理部分用户,不管用户数量有多庞大,我们只要增加机器就可以了,比如单台机器特定时间能处理n个用户,2台机器处理2n个用户,3台3n,4台4n...,再多的用户也不怕了。
使用elastic-job开发的作业都是zookeeper的客户端,比如我希望3台机器跑job,我们将任务分成3片,框架通过zk的协调,最终会让3台机器分别分配到0,1,2的任务片,比如server0-->0,server1-->1,server2-->2,当server0执行时,可以只查询id%3==0的用户,server1执行时,只查询id%3==1的用户,server2执行时,只查询id%3==2的用户。
任务部署多节点引发重复执行
在上面的基础上,我们再增加server3,此时,server3分不到任务分片,因为只有3片,已经分完了。没有分到任务分片的作业程序将不执行。
如果此时server2挂了,那么server2的分片项会分配给server3,server3有了分片,就会替代server2执行。
如果此时server3也挂了,只剩下server0和server1了,框架也会自动把server3的分片随机分配给server0或者server1,可能会这样,server0-->0,server1-->1,2。
这种特性称之为弹性扩容,即elastic-job名称的由来。

代码演示

我们搭建环境通过示例代码来演示上面的例子,elastic-job是不支持单机多实例的,通过zk的协调分片是以ip为单元的。很多同学上来可能就是通过单机多实例来学习,结果导致分片和预期不一致。这里没办法,只能通过多机器或者虚拟机,我们这里使用虚拟机,另外,由于资源有限,我们这里仅仅只模拟两台机器。
 
节点说明:
本地宿主机器
zookeeper、job
192.168.241.1

虚拟机
job
192.168.241.128

环境说明:
Java
请使用JDK1.7及其以上版本。
Zookeeper
请使用Zookeeper3.4.6及其以上版本
Elastic-Job-Lite
2.0.5(2.x系列即可,最好是2.0.4及其以上,因为2.0.4版本有本人提交的少许代码,(*^__^*) 嘻嘻……)

需求说明:
通过两台机器演示动态分片
 
step1. 引入框架的jar包
<!-- 引入elastic-job-lite核心模块 -->  
<dependency>  
    <groupId>com.dangdang</groupId>  
    <artifactId>elastic-job-lite-core</artifactId>  
    <version>2.0.5</version>  
</dependency>  
<!-- 使用springframework自定义命名空间时引入 -->  
<dependency>  
    <groupId>com.dangdang</groupId>  
    <artifactId>elastic-job-lite-spring</artifactId>  
    <version>2.0.5</version>  
</dependency>  
 
step2. 编写job
package com.fanfan.sample001;  
  
import com.dangdang.ddframe.job.api.ShardingContext;  
import com.dangdang.ddframe.job.api.simple.SimpleJob;  
  
import java.util.Date;  
  
/** 
 * Created by fanfan on 2016/12/20. 
 */  
public class MySimpleJob implements SimpleJob {  
    @Override  
    public void execute(ShardingContext shardingContext) {  
        System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, 当前分片项: %s",  
                Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));  
        /** 
         * 实际开发中,有了任务总片数和当前分片项,就可以对任务进行分片执行了 
         * 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem 
         */  
    }  
} 
 
Step3. Spring配置
<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
       xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"  
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"  
       xsi:schemaLocation="http://www.springframework.org/schema/beans  
                        http://www.springframework.org/schema/beans/spring-beans.xsd  
                        http://www.dangdang.com/schema/ddframe/reg  
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd  
                        http://www.dangdang.com/schema/ddframe/job  
                        http://www.dangdang.com/schema/ddframe/job/job.xsd">  
    <!--配置作业注册中心 -->  
    <reg:zookeeper id="regCenter" server-lists="192.168.241.1:2181" namespace="dd-job"  
                   base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />  
  
    <!-- 配置作业-->  
    <job:simple id="mySimpleJob" class="com.fanfan.sample001.MySimpleJob" registry-center-ref="regCenter"  
                sharding-total-count="2" cron="0/2 * * * * ?" overwrite="true" />  
  
</beans> 
  
 
Case1. 单节点


 
 
 
 
Case2. 增加一个节点

 
 
 
 
 
 
 
Case3. 断开一个节点
 
 
 

作业类型

elastic-job提供了三种类型的作业:Simple类型作业、Dataflow类型作业、Script类型作业。这里主要讲解前两者。Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本,使用不多,可以参见github文档。

SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似,比如示例代码中所使用的job。

Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
可通过DataflowJobConfiguration配置是否流式处理。
流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
实际开发中,Dataflow类型的job还是很有好用的。
 
比如拿余额宝计息来说:
 
package com.fanfan.sample001;  
  
import com.dangdang.ddframe.job.api.ShardingContext;  
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;  
  
import java.util.ArrayList;  
import java.util.List;  
  
/** 
 * Created by fanfan on 2016/12/23. 
 */  
public class MyDataFlowJob implements DataflowJob<User> {  
  
    /* 
        status 
        0:待处理 
        1:已处理 
     */  
  
    @Override  
    public List<User> fetchData(ShardingContext shardingContext) {  
        List<User> users = null;  
        /** 
         * users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30 
         */  
        return users;  
    }  
  
    @Override  
    public void processData(ShardingContext shardingContext, List<User> data) {  
        for (User user: data) {  
            System.out.println(String.format("用户 %s 开始计息", user.getUserId()));  
            user.setStatus(1);  
            /** 
             * update user 
             */  
        }  
    }  
}
  
 
<job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"  
              sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" />  
 

其它功能

上述介绍的是最精简常用的功能。elastic-job的功能集还不止这些,比如像作业事件追踪、任务监听等,另外,elastic-job-lite-console作为一个独立的运维平台还提供了用来查询和操作任务的web页面。
这些增强的功能读者可以在github/elastic-job上自行学习,相信有了本篇博文的基础,再阅读那些文档就特别简单了。

转自:https://blog.csdn.net/fanfan_v5/article/details/61310045

分享到:
评论

相关推荐

    elastic-job-quickstart.zip

    快速入门示例代码可能包括了创建任务、配置分片策略、注册任务、启动和停止任务的完整流程,通过运行和调试这些代码,你可以深入理解Elastic-Job的工作原理和使用方法。同时,建议参考Elastic-Job的官方文档和示例...

    elastic-job spring 源码和控制台

    3. 创建作业:通过Spring的@Bean注解创建ElasticJob实例,指定作业类、分片策略、执行逻辑等。 4. 注册作业:使用ElasticJobScheduler将作业注册到注册中心,实现任务的分布式调度。 三、EQL(Elastic Job Lite)...

    elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar.gz

    7. **标签解析**:“elastic-job-lite”、“jobConsole”和“elasticJob”都是与Elastic-Job相关的关键词,它们分别对应Elastic-Job的轻量级组件、Elastic-Job Lite Console(即本压缩包的主要内容)和整个Elastic-...

    elastic-job文档资料

    通过深入理解Elastic-Job的原理和使用方式,开发者可以构建出稳定、高效、可扩展的分布式任务调度系统,提升系统的整体性能和可靠性。在阅读和研究提供的"elastic-job资料"后,你将能够更好地理解和应用这个强大的...

    elastic-job-lite-master.zip

    在本文中,我们将深入探讨Elastic-Job Lite的核心特性、工作原理以及如何在SpringBoot项目中进行集成和应用。 一、Elastic-Job Lite简介 Elastic-Job分为两个版本,分别是Elastic-Job-Lite和Elastic-Job-Cloud。...

    elastic-job可以案例

    Elastic-Job是一个分布式任务调度框架,由当当网开源,它主要分为两个部分:Elastic-Job-Lite和Elastic-Job-Cloud。本文将深入探讨Elastic-Job的核心特性、工作原理以及如何在实际项目中应用。 一、Elastic-Job概述...

    elastic-job-lite-console-2.1.5.tar.gz

    6. **示例**:Elastic-Job-Lite可能会提供一些示例代码,帮助开发者快速理解和上手使用。 Elastic-Job-Lite Console是Elastic-Job-Lite的Web管理界面,它允许用户通过Web方式查看作业状态、管理作业、设置作业属性...

    elastic-job资料

    Elastic-Job是一个分布式任务调度框架,源自淘宝的TBSchedule,由两个相互独立的子项目Elastic-Job-Lite和...这个资料包将帮助你深入了解Elastic-Job的原理、使用方法以及最佳实践,提升你的分布式系统开发能力。

    elastic-job-1.0.5源码

    4. 作业调度:Elastic-Job使用Quartz作为内嵌的定时任务框架,通过`org.elasticjob.lite.trigger.quartz.QuartzJobTrigger`类触发作业的执行。Quartz的灵活性使得Elastic-Job能够实现复杂的定时任务调度。 5. 异常...

    SpringBoot2整合ElasticJob框架过程详解

    "SpringBoot2整合ElasticJob框架过程详解" 本文主要介绍了SpringBoot2整合ElasticJob框架的过程...SpringBoot2整合ElasticJob框架的过程详解可以帮助大家更好地理解ElasticJob的原理和应用,提高开发效率和系统性能。

    Elastic-Job分布式任务调度视频教程

    3、课程优势本课程不仅讲解了Elastic-job分布式任务调度的使用方法及原理,而且通过案例讲解了如何构建分布式系统任务调度具体方案,通过从理论到实战的学习可以快速将Elastic-job分布式任务调度技术应用到项目中。

    elastic-job-lite-console-2.1.4.tar.gz

    - **Spring整合**:通过`@ElasticJob`注解实现Spring Bean的自动装配,简化开发流程。 - **任务定义**:自定义`com.dangdang.ddframe.job.api.JobType`,实现具体的作业逻辑。 - **监控与调试**:使用控制台监控...

    当当网开源的分布式作业调度组件 Elastic-Job.zip

    目录结构说明使用步骤开发指南使用限制运维平台阅读源码编译问题说明实现原理作业分片策略监控快速上手(感谢第三方志愿者 泽伟@心探索科技 提供文档)InfoQ新闻Elastic-Job Wiki (由社区志愿者自由编辑的) ...

    elas-job.zip

    3. **任务调度**:Elastic-Job使用基于时间的调度方式,支持cron表达式,可以灵活设置任务执行频率。同时,它还支持错峰启动,避免多台服务器在同一时刻启动同一任务导致的压力集中。 4. **任务变更**:在运行过程...

    scheduler_20170720

    1.x系列对应的就只有Elastic-Job-Lite,并且在2.x里修改了一些核心类名,差别虽大,原理类似,建议使用2.x系列。写此博文,最新release版本为2.0.5。 scheduler 20170720是基于elastic-job 早期版本开发的。

    elasticjob完成例子.zip

    ElasticJob是一个强大的分布式...通过这个实例,你可以深入理解ElasticJob的工作原理,学习如何创建、配置、调度和监控分布式任务。在实际项目中,你可以根据业务需求调整分片策略,实现高效、稳定的分布式任务处理。

    elasticjobmaster2.03源码

    本文将围绕Elastic-Job-Master 2.0.3的源码进行深入解析,旨在帮助开发者理解其内部工作原理,提升在分布式任务调度领域的专业技能。 一、Elastic-Job简介 Elastic-Job主要解决的是分布式环境下的任务调度问题,它...

    elasticsearch-bulk-insert-plugin.zip

    Elasticsearch-Bulk-Insert-Plugin 就是利用这一原理,通过Kettle的Job或Transformation实现数据的批量导入。 要使用这个插件,首先需要在Kettle环境中安装elasticsearch-bulk-insert-plugin。通常,这可以通过将...

    ElasticJob 中文文档.pdf

    在维护和使用 ElasticJob 的过程中,还需要关注作业的监控和告警,特别是在高并发和大数据量的环境下,需要对作业的执行效率和状态有充分的了解,并且及时做出调整。这样既可以保证作业的稳定运行,也可以提高系统的...

    elastic文档1

    在深入学习Elasticsearch和Elastic-Job的过程中,理解它们的基本概念和工作原理至关重要。同时,掌握如何配置和使用这些工具,以及如何根据业务需求进行优化,是提高系统性能和稳定性的重要环节。通过不断实践和探索...

Global site tag (gtag.js) - Google Analytics