流式作业对应于DataflowJob接口,其定义如下:
/**
* 数据流分布式作业接口.
*
* @author zhangliang
*
* @param <T> 数据类型
*/
public interface DataflowJob<T> extends ElasticJob {
/**
* 获取待处理数据.
*
* @param shardingContext 分片上下文
* @return 待处理的数据集合
*/
List<T> fetchData(ShardingContext shardingContext);
/**
* 处理数据.
*
* @param shardingContext 分片上下文
* @param data 待处理数据集合
*/
void processData(ShardingContext shardingContext, List<T> data);
}
流式作业,每次调度触发的时候都会先调fetchData获取数据,如果获取到了数据再调度processData方法处理数据。DataflowJob在运行时有两种方式,流式的和非流式的,通过属性streamingProcess控制,如果是基于Spring XML的配置方式则是streaming-process属性,boolean类型。当作业配置为流式的时候,每次触发作业后会调度一次fetchData获取数据,如果获取到了数据会调度processData方法处理数据,处理完后又继续调fetchData获取数据,再调processData处理,如此循环,就像流水一样。直到fetchData没有获取到数据或者发生了重新分片才会停止。代码实现部分可参考数据流执行器 com.dangdang.ddframe.job.executor.type.DataflowJobExecutor。以下是DataflowJob的一个简单实现,该实现中每次调度触发时都会连续调度processData十次。
public class MyDataflowJob implements DataflowJob<String> {
private static final ThreadLocal<Integer> LOOP_COUNTER
= new ThreadLocal<>();
private static final int LOOP_TIMES = 10;//每次获取流处理循环次数
private static final AtomicInteger COUNTER = new AtomicInteger(1);//计数器
@Override
public List<String> fetchData(ShardingContext shardingContext) {
Integer current = LOOP_COUNTER.get();
if (current == null) {
current = 1;
} else {
current += 1;
}
LOOP_COUNTER.set(current);
System.out.println(Thread.currentThread()
+ "------------current--------" + current);
if (current > LOOP_TIMES) {
System.out.println("\n\n\n\n");
return null;
} else {
int shardingItem = shardingContext.getShardingItem();
List<String> datas = Arrays.asList(getData(shardingItem),
getData(shardingItem), getData(shardingItem));
return datas;
}
}
private String getData(int shardingItem) {
return shardingItem + "-" + COUNTER.getAndIncrement();
}
@Override
public void processData(ShardingContext shardingContext,
List<String> data) {
System.out.println(Thread.currentThread() + "--------" +data);
}
}
流式作业的配置使用<job:dataflow/>
配置,上面的流式作业对应的配置如下:
<job:dataflow id="myDataflowJob"
class="com.elim.learn.elastic.job.MyDataflowJob"
registry-center-ref="regCenter"
cron="0 0/2 * * * ?" sharding-total-count="2"
sharding-item-parameters="0=广州,1=深圳" failover="true" overwrite="true"
streaming-process="true">
上述配置参数的含义跟上一篇介绍的简单作业的配置是一样的,新增的streaming-process表示是否启用流式作业。
(本文由Elim写于2017年10月1日)
相关推荐
Elastic-Job是一个分布式任务调度框架,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。这里的"elastic-job-lite-console-2.1.5.zip"是一个压缩包,其中包含了Elastic-Job-Lite的控制台版本,支持在...
Elastic-Job分为Elastic-Job-Lite和Elastic-Job-Cloud两个版本,其中Elastic-Job-Lite是轻量级的离线分布式作业调度框架,它不依赖任何云平台,适合在各种环境部署。Elastic-Job-Lite提供了一套完整的作业解决方案...
Elastic-Job Lite Console是当当网开源的一款用于分布式定时任务管理的控制台,版本为2.1.5。这个项目无需借助Tomcat等Web服务器,可以直接通过在解压后的bin目录下执行命令来启动服务,之后只需在浏览器中访问`...
Elastic-Job Lite Console是基于Elastic-Job Lite的一个监控平台,主要用来管理和监控分布式作业的执行情况。Elastic-Job是由当当网开源的、轻量级的分布式任务调度框架,它分为Elastic-Job-Lite和Elastic-Job-Cloud...
Elastic-Job-Lite是一款轻量级的分布式任务调度框架,由当当网开源,它提供了简单易用的API和可扩展的作业生态。在Elastic-Job-Lite中,"elastic-job-lite-console-master.zip"是一个包含Elastic-Job-Lite的可视化...
Elastic-Job 是一款分布式作业调度框架,分为Elastic-Job-Lite和Elastic-Job-Cloud两个子项目。Elastic-Job-Lite 主要关注的是分布式执行和故障转移,而Elastic-Job-Cloud 则是在Mesos之上实现的分布式调度解决方案...
Elastic-Job控制台是一个可视化的管理平台,用于管理和监控Elastic-Job-Lite的作业执行情况。它提供了作业的注册、配置、启动、停止以及查看作业状态等功能,使得分布式任务的管理变得更加便捷。 首先,了解Elastic...
当当elastic-job控制台jar包,elastic-job-lite-console-3.0.0.M1-SNAPSHOT,本人从git下载源码后编译生成的jar。 $ 解压 elastic-job-lite-console-3.0.0.M1-SNAPSHOT.rar $ cd elastic-job-lite-console-3.0.0.M1-...
在Elastic-Job-Lite中,作业是调度的基本单元,它可以是一个Java方法或者一个完整的业务逻辑。作业的配置包括作业类型(如简单作业、数据流作业等)、执行策略(如简单、优先级、全量替换等)、执行器(决定哪个...
《Elastic-Job Lite Console 2.1.4:分布式任务调度管理的得力助手》 Elastic-Job Lite Console 2.1.4是一款基于Elastic-Job Lite的轻量级分布式任务调度管理工具,它为开发者提供了一个可视化的控制台,用于方便地...
elastic-job-lite-console-2.1.5.rar 文件是 Elastic-Job-Lite Console 的一个压缩包,包含了 Elastic-Job-Lite 控制台的所有必要文件和资源。这个控制台用于管理和监控 Elastic-Job-Lite 作业的执行。
elastic-job-console,elastic-job页面管理job控制台,希望可以帮到朋友们
Elastic-Job是一个分布式任务调度框架,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。这个"elastic-job-quickstart.zip"压缩包应该是为初学者提供的一份快速入门示例代码,帮助理解并掌握Elastic-...
Elastic-Job是一个分布式任务调度框架,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成,分别对应轻量级和云原生的解决方案。本资料包将详细介绍Elastic-Job的核心功能、设计理念以及如何在实际项目中...
四、Elastic-Job Lite作业开发 1. **定义作业接口**:创建一个实现了`com.dangdang.ddframe.job.api.Job`接口的类,这个接口规定了作业的执行逻辑。 2. **注册作业**:在SpringBoot的配置类中,使用`@...
Elastic-Job是基于 Quartz 和 ZooKeeper 的分布式作业调度框架,由两个子项目组成:Elastic-Job-Lite 和 Elastic-Job-Cloud。Elastic-Job-Lite 适用于微服务架构,而 Elastic-Job-Cloud 更适合大规模云计算环境。...