流式作业对应于DataflowJob接口,其定义如下:
/**
* 数据流分布式作业接口.
*
* @author zhangliang
*
* @param <T> 数据类型
*/
public interface DataflowJob<T> extends ElasticJob {
/**
* 获取待处理数据.
*
* @param shardingContext 分片上下文
* @return 待处理的数据集合
*/
List<T> fetchData(ShardingContext shardingContext);
/**
* 处理数据.
*
* @param shardingContext 分片上下文
* @param data 待处理数据集合
*/
void processData(ShardingContext shardingContext, List<T> data);
}
流式作业,每次调度触发的时候都会先调fetchData获取数据,如果获取到了数据再调度processData方法处理数据。DataflowJob在运行时有两种方式,流式的和非流式的,通过属性streamingProcess控制,如果是基于Spring XML的配置方式则是streaming-process属性,boolean类型。当作业配置为流式的时候,每次触发作业后会调度一次fetchData获取数据,如果获取到了数据会调度processData方法处理数据,处理完后又继续调fetchData获取数据,再调processData处理,如此循环,就像流水一样。直到fetchData没有获取到数据或者发生了重新分片才会停止。代码实现部分可参考数据流执行器 com.dangdang.ddframe.job.executor.type.DataflowJobExecutor。以下是DataflowJob的一个简单实现,该实现中每次调度触发时都会连续调度processData十次。
public class MyDataflowJob implements DataflowJob<String> {
private static final ThreadLocal<Integer> LOOP_COUNTER
= new ThreadLocal<>();
private static final int LOOP_TIMES = 10;//每次获取流处理循环次数
private static final AtomicInteger COUNTER = new AtomicInteger(1);//计数器
@Override
public List<String> fetchData(ShardingContext shardingContext) {
Integer current = LOOP_COUNTER.get();
if (current == null) {
current = 1;
} else {
current += 1;
}
LOOP_COUNTER.set(current);
System.out.println(Thread.currentThread()
+ "------------current--------" + current);
if (current > LOOP_TIMES) {
System.out.println("\n\n\n\n");
return null;
} else {
int shardingItem = shardingContext.getShardingItem();
List<String> datas = Arrays.asList(getData(shardingItem),
getData(shardingItem), getData(shardingItem));
return datas;
}
}
private String getData(int shardingItem) {
return shardingItem + "-" + COUNTER.getAndIncrement();
}
@Override
public void processData(ShardingContext shardingContext,
List<String> data) {
System.out.println(Thread.currentThread() + "--------" +data);
}
}
流式作业的配置使用<job:dataflow/>
配置,上面的流式作业对应的配置如下:
<job:dataflow id="myDataflowJob"
class="com.elim.learn.elastic.job.MyDataflowJob"
registry-center-ref="regCenter"
cron="0 0/2 * * * ?" sharding-total-count="2"
sharding-item-parameters="0=广州,1=深圳" failover="true" overwrite="true"
streaming-process="true">
上述配置参数的含义跟上一篇介绍的简单作业的配置是一样的,新增的streaming-process表示是否启用流式作业。
(本文由Elim写于2017年10月1日)
相关推荐
### ElasticJob与SpringBatch的结合使用 #### 一、引言 随着大数据和微服务架构的兴起,数据处理的需求越来越复杂。在很多场景下,我们需要处理海量数据,并且要保证数据处理的一致性和顺序性。为此,业界发展出了...
3. **流式处理**:Elasticsearch-JDBC采用流式处理的方式读取数据库记录并写入Elasticsearch,以提高导入效率。这种方式减少了内存占用,使得大体积数据导入成为可能。 4. **SQL查询支持**:用户可以通过编写SQL...
ElasticJob是阿里开源的一个强大的分布式任务调度解决方案,它提供了两种类型的定时任务:Simple作业和Dataflow流式作业。本资料旨在帮助学习者掌握如何有效地使用ElasticJob进行高效、稳定、可扩展的定时任务管理。...
TBSchedule & elastic-job | 分布式调度框架 | [https://github.com/dangdangdotcom/elastic-job](https://github.com/dangdangdotcom/elastic-job) Redis | 分布式缓存数据库 | [https://redis.io/]...
4. Job Manager 和 Task Manager:Job Manager 负责任务调度和资源管理,Task Manager 执行具体的计算任务。它们共同构成了 Flink 的分布式运行环境。 三、Flink 1.14.5 的新特性与改进 1. SQL 支持增强:Flink ...
springall spring技术选型集成,不同分支代表不同技术选型,集成了常用技术的spring分支,不再零散。 master 主版本就是apache log4j2的集成和spring...elasticjob 分布式定时任务调度与分片演示。 hadoop 基于原生M