elastic-job部署以及简单例子:elastic-job是当当开发的基于qutarz以及zookeeper封装的作业调度工具,主要有两个大框架,一个是elastic-job lite另外一个是elastic-job cloud,其中qutarz是一个开源的作业调度工具,zookeeper是分布式调度工具,这两者结合搭建了elastic-job-lite,这是一个无中心节点的调度,而elastic-job-cloud是一个有中心节点的分布式调度开源工具,只需要设置好机器以及分片,就可以自动的调度到对应的机器上运行。
与lite的不同时cloud采用了mesos来进行分布式资源管理,简单的来说两者的不同是:同一个作业在两台机器上跑,lite需要手动在两台机器上跑,但是cloud只需要上传作业包,就可以自动的在两台机器上跑,因为lite不支持作业的调度,为无中心的。
二、环境的搭建
由于elastic-job-cloud的环境暂时未搭建出来,因此在此简单介绍lite的搭建
(1)jdk的安装
jdk需要1.7以上,因为里面有spring相关的代码,具体的安装请自行百度,或参考链接https://blog.csdn.net/molong1208/article/details/50537898
(2)zookeeper的安装
具体的安装过程见链接https://blog.csdn.net/molong1208/article/details/53675063
(3)maven的安装
官网maven要求3.0.4以及以上,具体的安装过程与jdk类似,请自行百度
三、elastic-job-lite的优势及特点
(1)简单的概念及适用场景
1. 分片概念
任务的分布式执行,需要将一个任务拆分为n个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。
2. 分片项与业务处理解耦
Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。
3. 个性化参数的适用场景
个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。
例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。
(2)elastic-job-lite优势及特点
1. 分布式调度
Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。
注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。
弹性分布式实现
-
第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务。
-
某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态。
-
主节点选举,服务器上下线,分片总数变更均更新重新分片标记。
-
定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。
-
通过上一项说明可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。
-
每次分片都会按服务器IP排序,保证分片结果不会产生较大波动。
-
实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。
2. 作业高可用
Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。
一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。
3. 最大限度利用资源
Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。
例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
三、简单的例子
elastic-job的作业类型分为三种,一种是简单的simple的形式,一种是基于流式数据的处理,一种是基于脚本的调度,因为本人所使用的情况是基于流式的处理,那么就简单搭了一个基于流式的demo,其他类型的类似
流式作业的方式适合于不间断的数据处理的类型,例如需要拉取订单数据,因为订单是连续不间断的,因此需要一直拉取。
按照elastic-job官网上介绍,搭建一个基于dataflow(流式处理)的demo,这个demo的功能就是,从一个数据中心里面取数据,按照数据中心的数据id%分片个数==分片参数进行拉取数据,拉取完成后将对应的数据id置为完成的状态,具体代码如下所示:
(1)入口函数main函数以及作业的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package ElasticJobExample.ElasticJobExample;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
/** * Hello world!
*
*/
public class App
{ public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter( new ZookeeperConfiguration( "ip:2181" , "elastic-job-demo" ));
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
// 创建作业配置
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder( "myDataFlowTest" , "0/10 * * * * ?" , 3 ).shardingItemParameters( "0=0,1=1,2=2" ).build();
DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig, JavaDataflowJob. class .getCanonicalName(), true );
LiteJobConfiguration result = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();
return result;
}
} |
(2)作业的逻辑处理部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package ElasticJobExample.ElasticJobExample;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import dataflowjob.entity.Foo;
import dataflowjob.process.DataProcess;
import dataflowjob.process.DataProcessFactory;
public class JavaDataflowJob implements DataflowJob<foo> {
private DataProcess dataProcess = DataProcessFactory.getDataProcess();
@Override
public List<foo> fetchData(ShardingContext context) {
List<foo> result = new ArrayList<foo>();
result = dataProcess.getData(context.getShardingParameter(), context.getShardingTotalCount());
System.out.println(String.format( "------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s" , Thread.currentThread().getId(), new Date(), context, "fetch data" ,result));
return result;
}
@Override
public void processData(ShardingContext shardingContext, List<foo> data) {
System.out.println(String.format( "------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s" , Thread.currentThread().getId(), new Date(), shardingContext, "finish data" ,data));
for (Foo foo:data){
dataProcess.setData(foo.getId());
}
}
}</foo></foo></foo></foo></foo> |
(3)具体的处理类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package dataflowjob.process;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import dataflowjob.entity.Foo;
public class DataProcess {
private Map<integer, foo= "" > data = new ConcurrentHashMap<>( 30 , 1 );
public DataProcess()
{
for ( int i= 0 ;i< 30 ;i++){
data.put(i, new Foo(i,Foo.Status.TODO));
}
}
public List<foo> getData(String tailId, int shardNum)
{
int intId = Integer.parseInt(tailId);
List<foo> result = new ArrayList<foo>();
for (Map.Entry<integer, foo= "" > each : data.entrySet()) {
Foo foo = each.getValue();
int key = each.getKey();
if (key % shardNum == intId && foo.getStatus() == Foo.Status.TODO) {
result.add(foo);
}
}
return result;
}
public void setData( int i){
data.get(i).setStatus(Foo.Status.DONE);
}
} </integer,></foo></foo></foo></integer,> |
(4)entity类Foo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package dataflowjob.entity;
public class Foo {
private int id;
private Status status;
public Foo( final int id, final Status status) {
this .id = id;
this .status = status;
}
public int getId() {
return id;
}
public void setId( int id) {
this .id = id;
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) {
this .status = status;
}
public enum Status{
TODO,
DONE
}
} |
(5)具体处理工厂类
1
2
3
4
5
6
7
8
9
10
11
|
package dataflowjob.process;
public class DataProcessFactory {
private static DataProcess dataProcess = new DataProcess();
public static DataProcess getDataProcess() {
return dataProcess;
}
} |
相关推荐
Elastic-Job是一个分布式任务调度框架,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。这里的"elastic-job-lite-console-2.1.5.zip"是一个压缩包,其中包含了Elastic-Job-Lite的控制台版本,支持在...
Elastic-Job分为Elastic-Job-Lite和Elastic-Job-Cloud两个版本,其中Elastic-Job-Lite是轻量级的离线分布式作业调度框架,它不依赖任何云平台,适合在各种环境部署。Elastic-Job-Lite提供了一套完整的作业解决方案...
Elastic-Job Lite Console是...综上所述,Elastic-Job Lite Console 2.1.5是分布式定时任务管理的一个优秀工具,它结合了Elastic-Job Lite的强大功能和轻量级部署的特性,为开发和运维人员提供了便捷的管理和监控手段。
在部署Elastic-Job-Lite Console 2.1.5时,首先需要解压"elastic-job-lite-console-2.1.5"压缩包,然后根据提供的文档配置服务器环境和数据库连接,接着启动服务。在浏览器中输入服务器地址即可访问控制台。在实际...
《Elastic-Job Lite Console 2.1.6-SNAPSHOT 源码解析与应用探索》 Elastic-Job Lite Console 是一个基于Elastic-Job框架开发的定时任务控制台,由当当网推出,旨在为企业级分布式系统提供简单易用、功能强大的定时...
用户可以编译源码并部署到自己的服务器上,自定义配置并接入自己的Elastic-Job集群。 总之,Elastic-Job-Lite Console为Elastic-Job-Lite的使用者提供了一个直观且强大的管理工具,通过这个控制台,运维人员可以...
当当elastic-job控制台jar包,elastic-job-lite-console-3.0.0.M1-SNAPSHOT,本人从git下载源码后编译生成的jar。 $ 解压 elastic-job-lite-console-3.0.0.M1-SNAPSHOT.rar $ cd elastic-job-lite-console-3.0.0.M1-...
Elastic-Job是一个分布式任务调度框架,由两个子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite是轻量级的解决方案,适合在私有云或物理服务器集群上使用;而Elastic-Job-Cloud则基于Mesos框架,更...
在"Elastic-job-lite-console-3.0.0.M1-SNAPSHOT"的压缩包中,包含的文件可能是源码、编译后的类库、配置文件以及可能的部署脚本。用户可以下载此压缩包,解压后进行编译、打包和部署。为了运行控制台,通常需要一个...
Elastic-Job是一个分布式任务调度框架,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。这个"elastic-job-quickstart.zip"压缩包应该是为初学者提供的一份快速入门示例代码,帮助理解并掌握Elastic-...
elastic-job-console,elastic-job页面管理job控制台,希望可以帮到朋友们
- **安装部署**:介绍如何配置和启动Elastic-Job-Lite Console,以及如何将项目集成到应用中。 - **任务注册**:开发者定义好作业类后,需要在Console中注册作业,设置执行策略和调度参数。 - **监控与管理**:...
而Elastic-Job-Cloud则是在Elastic-Job-Lite基础上增加了YARN的资源调度管理,适合大规模集群部署。 二、Elastic-Job Lite核心特性 1. **分布式任务拆分**:Elastic-Job Lite将一个大任务拆分成多个子任务,并均匀...
Elastic-Job是一个分布式任务调度框架,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成,分别对应轻量级和云原生的解决方案。本资料包将详细介绍Elastic-Job的核心功能、设计理念以及如何在实际项目中...
《Elastic-Job Lite Console 2.1.4:分布式任务调度管理的得力助手》 Elastic-Job Lite Console 2.1.4是一款基于Elastic-Job Lite的轻量级分布式任务调度管理工具,它为开发者提供了一个可视化的控制台,用于方便地...
elastic-job-lite-console-2.1.4.tar.gz,可以在windows或者linux环境部署。控制台和Elastic Job并无直接关系,是通过读取Elastic Job的注册中心数据展现作业状态,或更新注册中心数据修改全局配置。