`
234390216
  • 浏览: 10232955 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
A5ee55b9-a463-3d09-9c78-0c0cf33198cd
Oracle基础
浏览量:462622
Ad26f909-6440-35a9-b4e9-9aea825bd38e
springMVC介绍
浏览量:1775515
Ce363057-ae4d-3ee1-bb46-e7b51a722a4b
Mybatis简介
浏览量:1398356
Bdeb91ad-cf8a-3fe9-942a-3710073b4000
Spring整合JMS
浏览量:395022
5cbbde67-7cd5-313c-95c2-4185389601e7
Ehcache简介
浏览量:679983
Cc1c0708-ccc2-3d20-ba47-d40e04440682
Cas简介
浏览量:530892
51592fc3-854c-34f4-9eff-cb82d993ab3a
Spring Securi...
浏览量:1183946
23e1c30e-ef8c-3702-aa3c-e83277ffca91
Spring基础知识
浏览量:467918
4af1c81c-eb9d-365f-b759-07685a32156e
Spring Aop介绍
浏览量:151393
2f926891-9e7a-3ce2-a074-3acb2aaf2584
JAXB简介
浏览量:68153
社区版块
存档分类
最新评论

elastic-job之流式作业

阅读更多

流式作业对应于DataflowJob接口,其定义如下:

/**
 * 数据流分布式作业接口.
 * 
 * @author zhangliang
 * 
 * @param <T> 数据类型
 */
public interface DataflowJob<T> extends ElasticJob {
    
    /**
     * 获取待处理数据.
     *
     * @param shardingContext 分片上下文
     * @return 待处理的数据集合
     */
    List<T> fetchData(ShardingContext shardingContext);
    
    /**
     * 处理数据.
     *
     * @param shardingContext 分片上下文
     * @param data 待处理数据集合
     */
    void processData(ShardingContext shardingContext, List<T> data);
}

流式作业,每次调度触发的时候都会先调fetchData获取数据,如果获取到了数据再调度processData方法处理数据。DataflowJob在运行时有两种方式,流式的和非流式的,通过属性streamingProcess控制,如果是基于Spring XML的配置方式则是streaming-process属性,boolean类型。当作业配置为流式的时候,每次触发作业后会调度一次fetchData获取数据,如果获取到了数据会调度processData方法处理数据,处理完后又继续调fetchData获取数据,再调processData处理,如此循环,就像流水一样。直到fetchData没有获取到数据或者发生了重新分片才会停止。代码实现部分可参考数据流执行器 com.dangdang.ddframe.job.executor.type.DataflowJobExecutor。以下是DataflowJob的一个简单实现,该实现中每次调度触发时都会连续调度processData十次。

public class MyDataflowJob implements DataflowJob<String> {
	
	private static final ThreadLocal<Integer> LOOP_COUNTER
 = new ThreadLocal<>();
	private static final int LOOP_TIMES = 10;//每次获取流处理循环次数
	private static final AtomicInteger COUNTER = new AtomicInteger(1);//计数器

	@Override
	public List<String> fetchData(ShardingContext shardingContext) {
		Integer current = LOOP_COUNTER.get();
		if (current == null) {
			current = 1;
		} else {
			current += 1;
		}
		LOOP_COUNTER.set(current);
		System.out.println(Thread.currentThread() 
+ "------------current--------" + current);
		if (current > LOOP_TIMES) {
			System.out.println("\n\n\n\n");
			return null;
		} else {
			int shardingItem = shardingContext.getShardingItem();
			List<String> datas = Arrays.asList(getData(shardingItem),
 getData(shardingItem), getData(shardingItem));
			return datas;
		}
	}
	
	private String getData(int shardingItem) {
		return shardingItem + "-" + COUNTER.getAndIncrement();
	}

	@Override
	public void processData(ShardingContext shardingContext, 
List<String> data) {
		System.out.println(Thread.currentThread() + "--------" +data);
	}

}

流式作业的配置使用<job:dataflow/>配置,上面的流式作业对应的配置如下:

<job:dataflow id="myDataflowJob"
	class="com.elim.learn.elastic.job.MyDataflowJob" 
registry-center-ref="regCenter"
	cron="0 0/2 * * * ?" sharding-total-count="2"
	sharding-item-parameters="0=广州,1=深圳" failover="true" overwrite="true"
	streaming-process="true">

上述配置参数的含义跟上一篇介绍的简单作业的配置是一样的,新增的streaming-process表示是否启用流式作业。

(本文由Elim写于2017年10月1日)

0
0
分享到:
评论

相关推荐

    elastic-job-lite-console-2.1.5.zip[支持Windows/Os/Linux]

    Elastic-Job是一个分布式任务调度框架,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。这里的"elastic-job-lite-console-2.1.5.zip"是一个压缩包,其中包含了Elastic-Job-Lite的控制台版本,支持在...

    elastic-job-lite-console-2.1.5.tar

    Elastic-Job分为Elastic-Job-Lite和Elastic-Job-Cloud两个版本,其中Elastic-Job-Lite是轻量级的离线分布式作业调度框架,它不依赖任何云平台,适合在各种环境部署。Elastic-Job-Lite提供了一套完整的作业解决方案...

    elastic-job-lite-console-2.1.5

    Elastic-Job Lite Console是当当网开源的一款用于分布式定时任务管理的控制台,版本为2.1.5。这个项目无需借助Tomcat等Web服务器,可以直接通过在解压后的bin目录下执行命令来启动服务,之后只需在浏览器中访问`...

    elastic-job-lite-console-2.1.5.tar.gz

    Elastic-Job Lite Console是基于Elastic-Job Lite的一个监控平台,主要用来管理和监控分布式作业的执行情况。Elastic-Job是由当当网开源的、轻量级的分布式任务调度框架,它分为Elastic-Job-Lite和Elastic-Job-Cloud...

    elastic-job-lite-console-master.zip

    Elastic-Job-Lite是一款轻量级的分布式任务调度框架,由当当网开源,它提供了简单易用的API和可扩展的作业生态。在Elastic-Job-Lite中,"elastic-job-lite-console-master.zip"是一个包含Elastic-Job-Lite的可视化...

    elastic-job-lite-console-2.1.6-SNAPSHOT.tar

    Elastic-Job 是一款分布式作业调度框架,分为Elastic-Job-Lite和Elastic-Job-Cloud两个子项目。Elastic-Job-Lite 主要关注的是分布式执行和故障转移,而Elastic-Job-Cloud 则是在Mesos之上实现的分布式调度解决方案...

    Elastic-Job控制台2.1.5

    Elastic-Job控制台是一个可视化的管理平台,用于管理和监控Elastic-Job-Lite的作业执行情况。它提供了作业的注册、配置、启动、停止以及查看作业状态等功能,使得分布式任务的管理变得更加便捷。 首先,了解Elastic...

    elastic-job-lite-console-3.0.0.M1-SNAPSHOT.rar

    当当elastic-job控制台jar包,elastic-job-lite-console-3.0.0.M1-SNAPSHOT,本人从git下载源码后编译生成的jar。 $ 解压 elastic-job-lite-console-3.0.0.M1-SNAPSHOT.rar $ cd elastic-job-lite-console-3.0.0.M1-...

    elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar.gz

    在Elastic-Job-Lite中,作业是调度的基本单元,它可以是一个Java方法或者一个完整的业务逻辑。作业的配置包括作业类型(如简单作业、数据流作业等)、执行策略(如简单、优先级、全量替换等)、执行器(决定哪个...

    elastic-job-lite-console-2.1.5压缩包.rar

    elastic-job-lite-console-2.1.5.rar 文件是 Elastic-Job-Lite Console 的一个压缩包,包含了 Elastic-Job-Lite 控制台的所有必要文件和资源。这个控制台用于管理和监控 Elastic-Job-Lite 作业的执行。

    elastic-job,elastic-job-console

    elastic-job-console,elastic-job页面管理job控制台,希望可以帮到朋友们

    elastic-job-quickstart.zip

    Elastic-Job是一个分布式任务调度框架,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。这个"elastic-job-quickstart.zip"压缩包应该是为初学者提供的一份快速入门示例代码,帮助理解并掌握Elastic-...

    elastic-job文档资料

    Elastic-Job是一个分布式任务调度框架,由两个独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成,分别对应轻量级和云原生的解决方案。本资料包将详细介绍Elastic-Job的核心功能、设计理念以及如何在实际项目中...

    elastic-job-lite-master.zip

    四、Elastic-Job Lite作业开发 1. **定义作业接口**:创建一个实现了`com.dangdang.ddframe.job.api.Job`接口的类,这个接口规定了作业的执行逻辑。 2. **注册作业**:在SpringBoot的配置类中,使用`@...

    elastic-job-dangdang

    Elastic-Job是基于 Quartz 和 ZooKeeper 的分布式作业调度框架,由两个子项目组成:Elastic-Job-Lite 和 Elastic-Job-Cloud。Elastic-Job-Lite 适用于微服务架构,而 Elastic-Job-Cloud 更适合大规模云计算环境。...

    elastic-job-lite-console-2.1.5.zip

    "预习资料"中的《任务调度之Elastic-Job.doc》很可能是对Elastic-Job的详细介绍,包括其设计理念、核心功能、使用方法等内容,对于初学者来说是很好的学习材料。文档可能会涵盖以下几点: 1. **设计理念**:Elastic...

Global site tag (gtag.js) - Google Analytics