【转】http://blog.csdn.net/prime7/article/details/49837517
调度器分为正常调度,异常调度,异常调度根据不同的队列进行时间间隔的区分,采用ScheduledExecutorService进行时间间隔调度,调度时根据当前队列中addData进队列里的数据分配线程进行处理。先看正常调度,这个类在构造函数时就会被调度,可以添加set方法,配置好单独再进行scheduler:
- package test;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- /**
- * 异步任务定时执行管理
- */
- public abstract class AbstractScheduleTask<T> implements ScheduleTask<T> {
- /** 任务队列 */
- protected final BlockingQueue<T> dataQueue;
- /** 线程池 */
- protected final ExecutorService executorService;
- /** 调度任务 */
- protected final ScheduledExecutorService scheduledExecutorService;
- /** 定时任务数 */
- private int scheduleSize = 1;
- /** 定时任务开始执行时间 */
- private int scheduleInitialDelay = 0;
- /** 定时任务间隔时间,正常单条数据的插入时间<3ms,队列的长度为1000,1000m秒的时间足够,避免队列的数据堆积 */
- private int scheduleDelay = 1000;
- /** 线程池大小 */
- private int threadPoolSize = 8;
- /** 队列大小 */
- private int queueSize = 2000;
- /** 线程批处理大小; */
- private int dataSize = 100;
- /** 默认构造方法,加载定时任务 */
- public AbstractScheduleTask() {
- dataQueue = new LinkedBlockingQueue<T>(queueSize);
- executorService = Executors.newFixedThreadPool(threadPoolSize);
- scheduledExecutorService = Executors.newScheduledThreadPool(scheduleSize);
- schedule();
- }
- /**
- * 具体业务数据处理
- *
- * @param data
- * @return
- */
- protected abstract Integer doData(final List<T> data);
- /**
- * 添加数据到队列
- */
- @Override
- public void addData(T parameterObject) {
- if (parameterObject != null) {
- if (dataQueue.size() >= this.getQueueSize()) {
- // 消费队列数据过大
- }
- try {
- dataQueue.put(parameterObject);
- } catch (InterruptedException e) {
- // 添加队列数据异常
- }
- }
- }
- /**
- * 设置定时任务 设定执行线程计划,初始10s延迟,每次任务完成后延迟10s再执行一次任务
- */
- private void schedule() {
- for (int i = 0; i < scheduleSize; i++) {
- scheduledExecutorService.scheduleWithFixedDelay(new ScheduleHandler(), scheduleInitialDelay, scheduleDelay,
- TimeUnit.MILLISECONDS);
- }
- }
- /**
- * 创建任务
- *
- * @param data
- * @return
- */
- private Callable<Integer> getTask(final List<T> data) {
- Callable<Integer> task = new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- if (data == null) {
- return 0;
- }
- try {
- int result = doData(data);
- // 任务执行完成
- return result;
- } catch (Throwable e) {
- // 执行任务出现异常
- }
- return 0;
- }
- };
- return task;
- }
- /**
- * 定时任务
- */
- private final class ScheduleHandler implements Runnable {
- /**
- * 定时任务实现
- *
- * @see java.lang.Runnable#run()
- */
- public void run() {
- List<T> dataList = new ArrayList<T>(dataSize);
- while (!dataQueue.isEmpty()) {
- if (dataList.size() == dataSize) {
- break;
- }
- try {
- T data = dataQueue.take();
- dataList.add(data);
- } catch (InterruptedException e) {
- // 出队列执行异常
- }
- }
- Callable<Integer> task = getTask(dataList);
- if (task == null) {
- return;
- }
- Future<Integer> future = executorService.submit(task);
- try {
- int result = future.get();
- // 任务执行结果: result
- } catch (InterruptedException e) {
- // 任务执行异常
- } catch (ExecutionException e) {
- // 任务执行异常
- }
- }
- }
- /**
- * Getter method for property <tt>scheduleSize</tt>.
- *
- * @return property value of scheduleSize
- */
- public int getScheduleSize() {
- return scheduleSize;
- }
- /**
- * Getter method for property <tt>scheduleInitialDelay</tt>.
- *
- * @return property value of scheduleInitialDelay
- */
- public int getScheduleInitialDelay() {
- return scheduleInitialDelay;
- }
- /**
- * Getter method for property <tt>scheduleDelay</tt>.
- *
- * @return property value of scheduleDelay
- */
- public int getScheduleDelay() {
- return scheduleDelay;
- }
- /**
- * Getter method for property <tt>threadPoolSize</tt>.
- *
- * @return property value of threadPoolSize
- */
- public int getThreadPoolSize() {
- return threadPoolSize;
- }
- /**
- * Getter method for property <tt>queueSize</tt>.
- *
- * @return property value of queueSize
- */
- public int getQueueSize() {
- return queueSize;
- }
- /**
- * Getter method for property <tt>dataSize</tt>.
- *
- * @return property value of dataSize
- */
- public int getDataSize() {
- return dataSize;
- }
- }
实现的添加数据接口的定义:
- package test;
- /**
- * 任务定时执行管理器
- */
- public interface ScheduleTask<T> {
- /**
- * 添加数据
- *
- * @param parameterObject
- */
- public void addData(final T parameterObject);
- }
再来看一下 实现带有重试机制的调度器,不同的重试机制使用不同的调度队列
- package test;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- /**
- * 重试调度器
- *
- * @author prime7
- */
- public abstract class AbstractRetryScheduleTask<T> extends AbstractScheduleTask<T> implements RetryScheduleTask<T> {
- /** 重试调度模式,供外部添加调用 */
- public static final int RETRY_MODE_30S = 1;
- public static final int RETRY_MODE_5MIN = 2;
- public static final int RETRY_MODE_30MIN = 3;
- /** 重试任务队列 */
- protected final BlockingQueue<T> retryDataQueue30S;
- protected final BlockingQueue<T> retryDataQueue5Min;
- protected final BlockingQueue<T> retryDataQueue30Min;
- /** 重试调度任务 */
- protected final ScheduledExecutorService retryScheduledExecutorService;
- /** 重试定时任务数 */
- private int retryScheduleSize = 3;
- /** 重试定时任务开始执行时间 */
- private int retryScheduleInitialDelay = 0;
- /** 重试任务间隔时间 */
- private int retryScheduleDelay30S = 1000; // 30 * 1000;
- private int retryScheduleDelay5Min = 5000; // 5 * 60 * 1000;
- private int retryScheduleDelay30Min = 10000; // 30 * 60 * 1000;
- /** 队列大小 */
- private int retryQueueSize = 2000;
- /** 线程批处理大小; */
- private int retryDataSize = 100;
- public AbstractRetryScheduleTask() {
- super();
- // LinkedBlockingQueue is Thread safety.
- retryDataQueue30S = new LinkedBlockingQueue<T>(retryQueueSize);
- retryDataQueue5Min = new LinkedBlockingQueue<T>(retryQueueSize);
- retryDataQueue30Min = new LinkedBlockingQueue<T>(retryQueueSize);
- retryScheduledExecutorService = Executors.newScheduledThreadPool(retryScheduleSize);
- schedule();
- }
- private void schedule() {
- // init 30s retry scheduler.
- retryScheduledExecutorService.scheduleWithFixedDelay(new RetryScheduleHandler(retryDataQueue30S),
- retryScheduleInitialDelay, retryScheduleDelay30S, TimeUnit.MILLISECONDS);
- // init 5min retry scheduler.
- retryScheduledExecutorService.scheduleWithFixedDelay(new RetryScheduleHandler(retryDataQueue5Min),
- retryScheduleInitialDelay, retryScheduleDelay5Min, TimeUnit.MILLISECONDS);
- // init 30min retry scheduler.
- retryScheduledExecutorService.scheduleWithFixedDelay(new RetryScheduleHandler(retryDataQueue30Min),
- retryScheduleInitialDelay, retryScheduleDelay30Min, TimeUnit.MILLISECONDS);
- }
- /**
- * 具体业务重试数据处理
- *
- * @param data
- * @return
- */
- protected abstract Integer doRetryData(final List<T> data);
- private final class RetryScheduleHandler implements Runnable {
- /**
- * 定时任务实现
- *
- * @see java.lang.Runnable#run()
- */
- private BlockingQueue<T> dataQueue;
- public RetryScheduleHandler(BlockingQueue<T> retryQueue) {
- dataQueue = retryQueue;
- }
- public void run() {
- List<T> dataList = new ArrayList<T>(retryDataSize);
- while (!dataQueue.isEmpty()) {
- if (dataList.size() >= retryDataSize)
- break;
- try {
- T data = dataQueue.take();
- dataList.add(data);
- } catch (InterruptedException e) {
- // LogUtil.error(logger, e, "出队列执行异常!");
- }
- }
- Callable<Integer> task = getRetryTask(dataList);
- if (task == null)
- return;
- Future<Integer> future = executorService.submit(task);
- try {
- int result = future.get();
- // LogUtil.info(logger, "任务执行结果:" + result);
- } catch (InterruptedException e) {
- // LogUtil.error(logger, e, "任务执行异常!");
- } catch (ExecutionException e) {
- // LogUtil.error(logger, e, "任务执行异常!");
- }
- }
- }
- /**
- * 创建任务
- *
- * @param data
- * @return
- */
- private Callable<Integer> getRetryTask(final List<T> data) {
- Callable<Integer> task = new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- if (data == null) {
- return 0;
- }
- try {
- int result = doRetryData(data);
- return result;
- } catch (Throwable e) {
- // LogUtil.error(logger, e, "执行任务异常");
- }
- return 0;
- }
- };
- return task;
- }
- /**
- * 添加数据到队列
- *
- * @see com.alipay.csCheck.biz.bench.schedule.ScheduleTask#addData(java.lang.Object)
- */
- @Override
- public void addRetryData(T parameterObject, int retryMode) {
- switch (retryMode) {
- case RETRY_MODE_30S:
- addRetryDateForQueue(parameterObject, retryDataQueue30S);
- break;
- case RETRY_MODE_5MIN:
- addRetryDateForQueue(parameterObject, retryDataQueue5Min);
- break;
- case RETRY_MODE_30MIN:
- addRetryDateForQueue(parameterObject, retryDataQueue30Min);
- break;
- default: // never reach here.
- break;
- }
- }
- /**
- * 按照对应模式添加重试数据到对应队列
- *
- * @param parameterObject
- * @param retryDataQueue
- */
- private void addRetryDateForQueue(T parameterObject, final BlockingQueue<T> retryDataQueue) {
- if (parameterObject != null) {
- if (retryDataQueue.size() >= this.getQueueSize()) {
- // LogUtil.warn(logger, "消费队列数据过大!!!!");
- }
- try {
- retryDataQueue.put(parameterObject);
- } catch (InterruptedException e) {
- // LogUtil.warn(logger, "添加队列数据异常!!!!", e.getMessage());
- }
- }
- }
- }
- package test;
- /**
- * 任务定时执行管理器 包含重试任务
- *
- * @author prime7
- */
- public interface RetryScheduleTask<T> extends ScheduleTask<T> {
- /**
- * 添加重试数据
- *
- * @param parameterObject
- */
- public void addRetryData(T parameterObject, int retryMode);
- }
最后 写个测试类进行一下测试:
- package test;
- import java.util.Date;
- import java.util.List;
- public class TestScheduler extends AbstractRetryScheduleTask<Integer> {
- public static void main(String[] args) {
- TestScheduler test = new TestScheduler();
- test.addData(0);
- test.addRetryData(1, TestScheduler.RETRY_MODE_30S);
- test.addRetryData(2, TestScheduler.RETRY_MODE_5MIN);
- test.addRetryData(3, TestScheduler.RETRY_MODE_30MIN);
- test.addData(0);
- test.addRetryData(1, TestScheduler.RETRY_MODE_30S);
- test.addRetryData(2, TestScheduler.RETRY_MODE_5MIN);
- test.addRetryData(3, TestScheduler.RETRY_MODE_30MIN);
- try {
- System.out.println("now sleep");
- Thread.sleep(10 * 1000);
- System.out.println("now weakup");
- test.addData(0);
- test.addRetryData(1, TestScheduler.RETRY_MODE_30S);
- test.addRetryData(2, TestScheduler.RETRY_MODE_5MIN);
- test.addRetryData(3, TestScheduler.RETRY_MODE_30MIN);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- @Override
- protected Integer doRetryData(List<Integer> data) {
- return doData(data);
- }
- @Override
- protected Integer doData(List<Integer> data) {
- System.out.println(data + " " + new Date().getTime());
- return 1;
- }
- }
相关推荐
对于错误处理,需要在作业的代码中妥善处理异常,并可选地使用Quartz提供的重试机制。在性能优化方面,可以根据实际情况调整调度器线程池的大小,以及优化作业本身的性能。 总结来说,Java实现任务调度是一个复杂但...
该项目是一款灵活、可靠且快速的分布式任务重试与调度平台,采用Java作为主要开发语言,并集成了JavaScript、CSS、Shell、HTML、Python等多种语言。该平台源码共1178个文件,其中Java源文件902个,XML配置文件107个...
ElasticJob-Lite提供了简单易用的API,同时支持故障转移和任务重试,适合处理大数据量的场景。 五、设计分布式调度系统 设计分布式调度系统时,需要考虑以下几个关键点: 1. 任务分配:根据任务特性和系统资源,将...
在Java调度系统中,可能包含异常捕获、重试机制、故障切换等策略,确保即使在出现问题时也能保证任务的正常运行。 7. **配置管理** 调度系统通常需要配置如任务调度频率、任务执行顺序等信息,这些配置可能被存储...
5. **容错与恢复**:如果任务执行失败,Jconch可能具备重试机制或故障转移功能,保证任务的可靠性。 6. **API友好**:Jconch的API设计简洁,使得开发者能够快速地将任务调度功能集成到项目中。 在压缩包中,你可以...
7. **错误处理和重试机制**:考虑到网络延迟或其他异常情况,应该有一个健壮的错误处理机制,例如消息的重试、死信队列等。 8. **性能优化**:可以通过批量发送、异步处理等方式提高消息传递的效率。 在实现过程中...
4. **故障恢复**:支持断点续传和任务重试,当任务失败时能够自动恢复,确保任务的可靠性。 5. **权限管理**:具备完整的用户、角色和权限管理机制,可以控制不同用户对任务的操作权限。 6. **API接口**:提供了...
- **容错与恢复**:通过备份、心跳检测和自动重试机制确保系统高可用。 - **扩展性**:设计应允许动态添加或移除节点,以适应负载变化。 - **数据一致性**:保证任务的状态在整个系统中的一致性,例如使用分布式锁或...
Quartz提供了异常处理机制,可以设置重新尝试的策略,例如在失败后立即重试,或者等待一段时间后再试。 9. **持久化**: 为了在应用重启后仍能保持作业计划,Quartz支持将Job和Trigger存储在数据库中。这样,即使...
Java Gearman Service 0.6.6 是一个用于Java开发的 Gearman 调度工具包,它提供了在Java环境中与Gearman服务器交互的能力。Gearman是一个通用的工作队列系统,设计用来解耦那些需要异步处理的任务,使得任务的发起者...
9. **错误处理与恢复**:在实现时间调度器时,错误处理和异常恢复机制是必不可少的,例如,如果Job执行失败,系统应有重试策略或者通知机制。 10. **性能优化**:在设计时间调度器时,需要考虑性能和效率,避免过度...
7. **错误处理与重试机制**:对于网络错误,如超时、服务器错误等,Java下载器应有相应的错误处理机制,可以自动重试,提高下载成功率。 8. **文件完整性校验**:下载完成后,可能会使用MD5或SHA等哈希算法对比文件...
7. **容错机制**:任务失败时,XXL-JOB提供重试和告警机制。可以根据配置决定是否自动重试,以及在多次失败后发送报警通知。 8. **日志追踪**:每个任务的执行都有详细的日志记录,方便开发者调试和排查问题。 9. ...
3. **容错机制**:jconch可能包含了错误处理和重试策略,当任务执行失败时,它会尝试重新执行以确保任务的完成。此外,它可能会有备份任务执行和故障转移的机制,以保证服务的连续性。 4. **监控与管理**:为了便于...
- **事务的幂等性**:保证每个操作的幂等性是TCC模式成功的关键,避免因为网络重试导致的数据不一致。 通过以上介绍,我们可以看出TCC在解决分布式事务问题上的优势,但也需要注意其复杂性和实施难度。正确理解和...
5. **异常处理**:任务执行过程中可能会遇到各种异常,系统需具备完善的异常处理机制,保证任务失败时能够记录错误信息并尝试重试或通知管理员。 6. **并发与锁**:当多个任务可能同时执行相同的操作时,需要考虑...
5. **异常处理和重试机制**:任务执行可能会失败,因此需要有异常处理和重试机制。这通常通过捕获异常、记录日志和决定是否重新调度任务来实现。 在实际应用中,DAG任务调度的优化主要聚焦于以下几个方面: - **...
此外,一旦单节点系统发生故障,正在进行的作业将会中断,且没有自动故障恢复机制,需要人工介入进行重试。 在大数据场景中,企业需要一种能够支持大规模数据集成并能提高作业处理效率的方法。因此,本文提出了一种...
3. **异常处理**:当任务出现故障时,EasyScheduler支持任务重试和失败恢复功能,允许用户选择特定节点进行重新执行,提高了任务的健壮性。 4. **任务控制**:用户可以暂停、继续或终止任务执行,以应对突发情况或...
- 错误处理和重试策略:为了确保数据的可靠上报,应考虑异常处理和重试机制,例如使用try-catch捕获异常,当上报失败时按照预设策略进行重试。 5. 示例代码: ```java import java.util.concurrent.Executors; ...