`

【转】java调度器(重试机制)实现

 
阅读更多

【转】http://blog.csdn.net/prime7/article/details/49837517

调度器分为正常调度,异常调度,异常调度根据不同的队列进行时间间隔的区分,采用ScheduledExecutorService进行时间间隔调度,调度时根据当前队列中addData进队列里的数据分配线程进行处理。先看正常调度,这个类在构造函数时就会被调度,可以添加set方法,配置好单独再进行scheduler:

 

[java] view plain copy
 
  1. package test;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5. import java.util.concurrent.BlockingQueue;  
  6. import java.util.concurrent.Callable;  
  7. import java.util.concurrent.ExecutionException;  
  8. import java.util.concurrent.ExecutorService;  
  9. import java.util.concurrent.Executors;  
  10. import java.util.concurrent.Future;  
  11. import java.util.concurrent.LinkedBlockingQueue;  
  12. import java.util.concurrent.ScheduledExecutorService;  
  13. import java.util.concurrent.TimeUnit;  
  14.   
  15. /** 
  16.  * 异步任务定时执行管理 
  17.  */  
  18. public abstract class AbstractScheduleTask<T> implements ScheduleTask<T> {  
  19.   
  20.     /** 任务队列 */  
  21.     protected final BlockingQueue<T> dataQueue;  
  22.     /** 线程池 */  
  23.     protected final ExecutorService executorService;  
  24.     /** 调度任务 */  
  25.     protected final ScheduledExecutorService scheduledExecutorService;  
  26.     /** 定时任务数 */  
  27.     private int scheduleSize = 1;  
  28.     /** 定时任务开始执行时间 */  
  29.     private int scheduleInitialDelay = 0;  
  30.     /** 定时任务间隔时间,正常单条数据的插入时间<3ms,队列的长度为1000,1000m秒的时间足够,避免队列的数据堆积 */  
  31.     private int scheduleDelay = 1000;  
  32.     /** 线程池大小 */  
  33.     private int threadPoolSize = 8;  
  34.     /** 队列大小 */  
  35.     private int queueSize = 2000;  
  36.     /** 线程批处理大小; */  
  37.     private int dataSize = 100;  
  38.   
  39.     /** 默认构造方法,加载定时任务 */  
  40.     public AbstractScheduleTask() {  
  41.         dataQueue = new LinkedBlockingQueue<T>(queueSize);  
  42.         executorService = Executors.newFixedThreadPool(threadPoolSize);  
  43.         scheduledExecutorService = Executors.newScheduledThreadPool(scheduleSize);  
  44.         schedule();  
  45.     }  
  46.   
  47.     /** 
  48.      * 具体业务数据处理 
  49.      *  
  50.      * @param data 
  51.      * @return 
  52.      */  
  53.     protected abstract Integer doData(final List<T> data);  
  54.   
  55.     /** 
  56.      * 添加数据到队列 
  57.      */  
  58.     @Override  
  59.     public void addData(T parameterObject) {  
  60.         if (parameterObject != null) {  
  61.             if (dataQueue.size() >= this.getQueueSize()) {  
  62.                 // 消费队列数据过大  
  63.             }  
  64.             try {  
  65.                 dataQueue.put(parameterObject);  
  66.             } catch (InterruptedException e) {  
  67.                 // 添加队列数据异常  
  68.             }  
  69.         }  
  70.     }  
  71.   
  72.     /** 
  73.      * 设置定时任务 设定执行线程计划,初始10s延迟,每次任务完成后延迟10s再执行一次任务 
  74.      */  
  75.     private void schedule() {  
  76.         for (int i = 0; i < scheduleSize; i++) {  
  77.             scheduledExecutorService.scheduleWithFixedDelay(new ScheduleHandler(), scheduleInitialDelay, scheduleDelay,  
  78.                     TimeUnit.MILLISECONDS);  
  79.         }  
  80.   
  81.     }  
  82.   
  83.     /** 
  84.      * 创建任务 
  85.      *  
  86.      * @param data 
  87.      * @return 
  88.      */  
  89.     private Callable<Integer> getTask(final List<T> data) {  
  90.         Callable<Integer> task = new Callable<Integer>() {  
  91.   
  92.             @Override  
  93.             public Integer call() throws Exception {  
  94.                 if (data == null) {  
  95.                     return 0;  
  96.                 }  
  97.                 try {  
  98.                     int result = doData(data);  
  99.                     // 任务执行完成  
  100.                     return result;  
  101.                 } catch (Throwable e) {  
  102.                     // 执行任务出现异常  
  103.                 }  
  104.                 return 0;  
  105.             }  
  106.         };  
  107.   
  108.         return task;  
  109.     }  
  110.   
  111.     /** 
  112.      * 定时任务 
  113.      */  
  114.     private final class ScheduleHandler implements Runnable {  
  115.         /** 
  116.          * 定时任务实现 
  117.          *  
  118.          * @see java.lang.Runnable#run() 
  119.          */  
  120.         public void run() {  
  121.             List<T> dataList = new ArrayList<T>(dataSize);  
  122.             while (!dataQueue.isEmpty()) {  
  123.                 if (dataList.size() == dataSize) {  
  124.                     break;  
  125.                 }  
  126.                 try {  
  127.                     T data = dataQueue.take();  
  128.                     dataList.add(data);  
  129.                 } catch (InterruptedException e) {  
  130.                     // 出队列执行异常  
  131.                 }  
  132.             }  
  133.             Callable<Integer> task = getTask(dataList);  
  134.             if (task == null) {  
  135.                 return;  
  136.             }  
  137.             Future<Integer> future = executorService.submit(task);  
  138.             try {  
  139.                 int result = future.get();  
  140.                 // 任务执行结果: result  
  141.             } catch (InterruptedException e) {  
  142.                 // 任务执行异常  
  143.             } catch (ExecutionException e) {  
  144.                 // 任务执行异常  
  145.             }  
  146.         }  
  147.   
  148.     }  
  149.   
  150.     /** 
  151.      * Getter method for property <tt>scheduleSize</tt>. 
  152.      *  
  153.      * @return property value of scheduleSize 
  154.      */  
  155.     public int getScheduleSize() {  
  156.         return scheduleSize;  
  157.     }  
  158.   
  159.     /** 
  160.      * Getter method for property <tt>scheduleInitialDelay</tt>. 
  161.      *  
  162.      * @return property value of scheduleInitialDelay 
  163.      */  
  164.     public int getScheduleInitialDelay() {  
  165.         return scheduleInitialDelay;  
  166.     }  
  167.   
  168.     /** 
  169.      * Getter method for property <tt>scheduleDelay</tt>. 
  170.      *  
  171.      * @return property value of scheduleDelay 
  172.      */  
  173.     public int getScheduleDelay() {  
  174.         return scheduleDelay;  
  175.     }  
  176.   
  177.     /** 
  178.      * Getter method for property <tt>threadPoolSize</tt>. 
  179.      *  
  180.      * @return property value of threadPoolSize 
  181.      */  
  182.     public int getThreadPoolSize() {  
  183.         return threadPoolSize;  
  184.     }  
  185.   
  186.     /** 
  187.      * Getter method for property <tt>queueSize</tt>. 
  188.      *  
  189.      * @return property value of queueSize 
  190.      */  
  191.     public int getQueueSize() {  
  192.         return queueSize;  
  193.     }  
  194.   
  195.     /** 
  196.      * Getter method for property <tt>dataSize</tt>. 
  197.      *  
  198.      * @return property value of dataSize 
  199.      */  
  200.     public int getDataSize() {  
  201.         return dataSize;  
  202.     }  
  203. }  


实现的添加数据接口的定义:

 

 

[java] view plain copy
 
  1. package test;  
  2.   
  3. /** 
  4.  * 任务定时执行管理器 
  5.  */  
  6. public interface ScheduleTask<T> {  
  7.   
  8.     /** 
  9.      * 添加数据 
  10.      *  
  11.      * @param parameterObject 
  12.      */  
  13.     public void addData(final T parameterObject);  
  14.   
  15. }  


再来看一下 实现带有重试机制的调度器,不同的重试机制使用不同的调度队列

 

 

[java] view plain copy
 
  1. package test;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5. import java.util.concurrent.BlockingQueue;  
  6. import java.util.concurrent.Callable;  
  7. import java.util.concurrent.ExecutionException;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10. import java.util.concurrent.LinkedBlockingQueue;  
  11. import java.util.concurrent.ScheduledExecutorService;  
  12. import java.util.concurrent.TimeUnit;  
  13.   
  14. /** 
  15.  * 重试调度器 
  16.  *  
  17.  * @author prime7 
  18.  */  
  19. public abstract class AbstractRetryScheduleTask<T> extends AbstractScheduleTask<T> implements RetryScheduleTask<T> {  
  20.   
  21.     /** 重试调度模式,供外部添加调用 */  
  22.     public static final int RETRY_MODE_30S = 1;  
  23.     public static final int RETRY_MODE_5MIN = 2;  
  24.     public static final int RETRY_MODE_30MIN = 3;  
  25.   
  26.     /** 重试任务队列 */  
  27.     protected final BlockingQueue<T> retryDataQueue30S;  
  28.     protected final BlockingQueue<T> retryDataQueue5Min;  
  29.     protected final BlockingQueue<T> retryDataQueue30Min;  
  30.   
  31.     /** 重试调度任务 */  
  32.     protected final ScheduledExecutorService retryScheduledExecutorService;  
  33.     /** 重试定时任务数 */  
  34.     private int retryScheduleSize = 3;  
  35.     /** 重试定时任务开始执行时间 */  
  36.     private int retryScheduleInitialDelay = 0;  
  37.     /** 重试任务间隔时间 */  
  38.     private int retryScheduleDelay30S = 1000// 30 * 1000;  
  39.     private int retryScheduleDelay5Min = 5000// 5 * 60 * 1000;  
  40.     private int retryScheduleDelay30Min = 10000// 30 * 60 * 1000;  
  41.     /** 队列大小 */  
  42.     private int retryQueueSize = 2000;  
  43.     /** 线程批处理大小; */  
  44.     private int retryDataSize = 100;  
  45.   
  46.     public AbstractRetryScheduleTask() {  
  47.         super();  
  48.         // LinkedBlockingQueue is Thread safety.  
  49.         retryDataQueue30S = new LinkedBlockingQueue<T>(retryQueueSize);  
  50.         retryDataQueue5Min = new LinkedBlockingQueue<T>(retryQueueSize);  
  51.         retryDataQueue30Min = new LinkedBlockingQueue<T>(retryQueueSize);  
  52.         retryScheduledExecutorService = Executors.newScheduledThreadPool(retryScheduleSize);  
  53.         schedule();  
  54.     }  
  55.   
  56.     private void schedule() {  
  57.         // init 30s retry scheduler.  
  58.         retryScheduledExecutorService.scheduleWithFixedDelay(new RetryScheduleHandler(retryDataQueue30S),  
  59.                 retryScheduleInitialDelay, retryScheduleDelay30S, TimeUnit.MILLISECONDS);  
  60.         // init 5min retry scheduler.  
  61.         retryScheduledExecutorService.scheduleWithFixedDelay(new RetryScheduleHandler(retryDataQueue5Min),  
  62.                 retryScheduleInitialDelay, retryScheduleDelay5Min, TimeUnit.MILLISECONDS);  
  63.         // init 30min retry scheduler.  
  64.         retryScheduledExecutorService.scheduleWithFixedDelay(new RetryScheduleHandler(retryDataQueue30Min),  
  65.                 retryScheduleInitialDelay, retryScheduleDelay30Min, TimeUnit.MILLISECONDS);  
  66.     }  
  67.   
  68.     /** 
  69.      * 具体业务重试数据处理 
  70.      *  
  71.      * @param data 
  72.      * @return 
  73.      */  
  74.     protected abstract Integer doRetryData(final List<T> data);  
  75.   
  76.     private final class RetryScheduleHandler implements Runnable {  
  77.         /** 
  78.          * 定时任务实现 
  79.          *  
  80.          * @see java.lang.Runnable#run() 
  81.          */  
  82.         private BlockingQueue<T> dataQueue;  
  83.   
  84.         public RetryScheduleHandler(BlockingQueue<T> retryQueue) {  
  85.             dataQueue = retryQueue;  
  86.         }  
  87.   
  88.         public void run() {  
  89.             List<T> dataList = new ArrayList<T>(retryDataSize);  
  90.             while (!dataQueue.isEmpty()) {  
  91.                 if (dataList.size() >= retryDataSize)  
  92.                     break;  
  93.                 try {  
  94.                     T data = dataQueue.take();  
  95.                     dataList.add(data);  
  96.                 } catch (InterruptedException e) {  
  97.                     // LogUtil.error(logger, e, "出队列执行异常!");  
  98.                 }  
  99.             }  
  100.   
  101.             Callable<Integer> task = getRetryTask(dataList);  
  102.             if (task == null)  
  103.                 return;  
  104.             Future<Integer> future = executorService.submit(task);  
  105.             try {  
  106.                 int result = future.get();  
  107.                 // LogUtil.info(logger, "任务执行结果:" + result);  
  108.             } catch (InterruptedException e) {  
  109.                 // LogUtil.error(logger, e, "任务执行异常!");  
  110.             } catch (ExecutionException e) {  
  111.                 // LogUtil.error(logger, e, "任务执行异常!");  
  112.             }  
  113.         }  
  114.     }  
  115.   
  116.     /** 
  117.      * 创建任务 
  118.      *  
  119.      * @param data 
  120.      * @return 
  121.      */  
  122.     private Callable<Integer> getRetryTask(final List<T> data) {  
  123.         Callable<Integer> task = new Callable<Integer>() {  
  124.             @Override  
  125.             public Integer call() throws Exception {  
  126.                 if (data == null) {  
  127.                     return 0;  
  128.                 }  
  129.                 try {  
  130.                     int result = doRetryData(data);  
  131.                     return result;  
  132.                 } catch (Throwable e) {  
  133.                     // LogUtil.error(logger, e, "执行任务异常");  
  134.                 }  
  135.                 return 0;  
  136.             }  
  137.         };  
  138.         return task;  
  139.     }  
  140.   
  141.     /** 
  142.      * 添加数据到队列 
  143.      *  
  144.      * @see com.alipay.csCheck.biz.bench.schedule.ScheduleTask#addData(java.lang.Object) 
  145.      */  
  146.     @Override  
  147.     public void addRetryData(T parameterObject, int retryMode) {  
  148.         switch (retryMode) {  
  149.         case RETRY_MODE_30S:  
  150.             addRetryDateForQueue(parameterObject, retryDataQueue30S);  
  151.             break;  
  152.         case RETRY_MODE_5MIN:  
  153.             addRetryDateForQueue(parameterObject, retryDataQueue5Min);  
  154.             break;  
  155.         case RETRY_MODE_30MIN:  
  156.             addRetryDateForQueue(parameterObject, retryDataQueue30Min);  
  157.             break;  
  158.         default// never reach here.  
  159.             break;  
  160.         }  
  161.     }  
  162.   
  163.     /** 
  164.      * 按照对应模式添加重试数据到对应队列 
  165.      *  
  166.      * @param parameterObject 
  167.      * @param retryDataQueue 
  168.      */  
  169.     private void addRetryDateForQueue(T parameterObject, final BlockingQueue<T> retryDataQueue) {  
  170.         if (parameterObject != null) {  
  171.             if (retryDataQueue.size() >= this.getQueueSize()) {  
  172.                 // LogUtil.warn(logger, "消费队列数据过大!!!!");  
  173.   
  174.             }  
  175.             try {  
  176.                 retryDataQueue.put(parameterObject);  
  177.             } catch (InterruptedException e) {  
  178.                 // LogUtil.warn(logger, "添加队列数据异常!!!!", e.getMessage());  
  179.             }  
  180.         }  
  181.     }  
  182. }  

 

[java] view plain copy
 
  1. package test;  
  2.   
  3. /** 
  4.  * 任务定时执行管理器 包含重试任务 
  5.  *  
  6.  * @author prime7 
  7.  */  
  8. public interface RetryScheduleTask<T> extends ScheduleTask<T> {  
  9.   
  10.     /** 
  11.      * 添加重试数据 
  12.      *  
  13.      * @param parameterObject 
  14.      */  
  15.     public void addRetryData(T parameterObject, int retryMode);  
  16.   
  17. }  


最后 写个测试类进行一下测试:

 

 

[java] view plain copy
 
  1. package test;  
  2.   
  3. import java.util.Date;  
  4. import java.util.List;  
  5.   
  6. public class TestScheduler extends AbstractRetryScheduleTask<Integer> {  
  7.   
  8.     public static void main(String[] args) {  
  9.         TestScheduler test = new TestScheduler();  
  10.         test.addData(0);  
  11.         test.addRetryData(1, TestScheduler.RETRY_MODE_30S);  
  12.         test.addRetryData(2, TestScheduler.RETRY_MODE_5MIN);  
  13.         test.addRetryData(3, TestScheduler.RETRY_MODE_30MIN);  
  14.         test.addData(0);  
  15.         test.addRetryData(1, TestScheduler.RETRY_MODE_30S);  
  16.         test.addRetryData(2, TestScheduler.RETRY_MODE_5MIN);  
  17.         test.addRetryData(3, TestScheduler.RETRY_MODE_30MIN);  
  18.         try {  
  19.             System.out.println("now sleep");  
  20.             Thread.sleep(10 * 1000);  
  21.             System.out.println("now weakup");  
  22.             test.addData(0);  
  23.             test.addRetryData(1, TestScheduler.RETRY_MODE_30S);  
  24.             test.addRetryData(2, TestScheduler.RETRY_MODE_5MIN);  
  25.             test.addRetryData(3, TestScheduler.RETRY_MODE_30MIN);  
  26.         } catch (InterruptedException e) {  
  27.             e.printStackTrace();  
  28.         }  
  29.     }  
  30.   
  31.     @Override  
  32.     protected Integer doRetryData(List<Integer> data) {  
  33.         return doData(data);  
  34.     }  
  35.   
  36.     @Override  
  37.     protected Integer doData(List<Integer> data) {  
  38.         System.out.println(data + " " + new Date().getTime());  
  39.         return 1;  
  40.     }  
  41. }  
分享到:
评论

相关推荐

    java实现任务调度

    对于错误处理,需要在作业的代码中妥善处理异常,并可选地使用Quartz提供的重试机制。在性能优化方面,可以根据实际情况调整调度器线程池的大小,以及优化作业本身的性能。 总结来说,Java实现任务调度是一个复杂但...

    后端开发-基于Java的分布式调度后端实现.zip

    ElasticJob-Lite提供了简单易用的API,同时支持故障转移和任务重试,适合处理大数据量的场景。 五、设计分布式调度系统 设计分布式调度系统时,需要考虑以下几个关键点: 1. 任务分配:根据任务特性和系统资源,将...

    基于Java调度系统的源代码.zip

    在Java调度系统中,可能包含异常捕获、重试机制、故障切换等策略,确保即使在出现问题时也能保证任务的正常运行。 7. **配置管理** 调度系统通常需要配置如任务调度频率、任务执行顺序等信息,这些配置可能被存储...

    java源码:Java任务调度 jconch.rar

    5. **容错与恢复**:如果任务执行失败,Jconch可能具备重试机制或故障转移功能,保证任务的可靠性。 6. **API友好**:Jconch的API设计简洁,使得开发者能够快速地将任务调度功能集成到项目中。 在压缩包中,你可以...

    消息总线java实现

    7. **错误处理和重试机制**:考虑到网络延迟或其他异常情况,应该有一个健壮的错误处理机制,例如消息的重试、死信队列等。 8. **性能优化**:可以通过批量发送、异步处理等方式提高消息传递的效率。 在实现过程中...

    海豚调度器3.1.8-dolphinscheduler-3.1.8-bin.tar

    4. **故障恢复**:支持断点续传和任务重试,当任务失败时能够自动恢复,确保任务的可靠性。 5. **权限管理**:具备完整的用户、角色和权限管理机制,可以控制不同用户对任务的操作权限。 6. **API接口**:提供了...

    【Java毕业设计】挖坑,毕业后做了几年的任务调度,想聊聊分布式任务调度系统的设计与实现,打算先用 Java 写一版.zip

    - **容错与恢复**:通过备份、心跳检测和自动重试机制确保系统高可用。 - **扩展性**:设计应允许动态添加或移除节点,以适应负载变化。 - **数据一致性**:保证任务的状态在整个系统中的一致性,例如使用分布式锁或...

    xxl-job的管理在代码中实现,新增,编辑,执行等操作

    - 重试机制:任务执行失败时,XXL-JOB可设置重试次数,避免因临时错误导致任务无法完成。 - 失败通知:支持配置失败通知,当任务执行失败时,通过邮件、短信等方式通知相关人员。 7. **扩展性**: - 分布式集群...

    quartz作业调度测试

    Quartz提供了异常处理机制,可以设置重新尝试的策略,例如在失败后立即重试,或者等待一段时间后再试。 9. **持久化**: 为了在应用重启后仍能保持作业计划,Quartz支持将Job和Trigger存储在数据库中。这样,即使...

    java-gearman-service-0.6.6.jar gearman java调度工具包

    Java Gearman Service 0.6.6 是一个用于Java开发的 Gearman 调度工具包,它提供了在Java环境中与Gearman服务器交互的能力。Gearman是一个通用的工作队列系统,设计用来解耦那些需要异步处理的任务,使得任务的发起者...

    时间调度器Demo

    9. **错误处理与恢复**:在实现时间调度器时,错误处理和异常恢复机制是必不可少的,例如,如果Job执行失败,系统应有重试策略或者通知机制。 10. **性能优化**:在设计时间调度器时,需要考虑性能和效率,避免过度...

    java下载管理器

    7. **错误处理与重试机制**:对于网络错误,如超时、服务器错误等,Java下载器应有相应的错误处理机制,可以自动重试,提高下载成功率。 8. **文件完整性校验**:下载完成后,可能会使用MD5或SHA等哈希算法对比文件...

    分布式任务调度

    7. **容错机制**:任务失败时,XXL-JOB提供重试和告警机制。可以根据配置决定是否自动重试,以及在多次失败后发送报警通知。 8. **日志追踪**:每个任务的执行都有详细的日志记录,方便开发者调试和排查问题。 9. ...

    基于Java的任务调度 jconch.zip

    3. **容错机制**:jconch可能包含了错误处理和重试策略,当任务执行失败时,它会尝试重新执行以确保任务的完成。此外,它可能会有备份任务执行和故障转移的机制,以保证服务的连续性。 4. **监控与管理**:为了便于...

    TCC实现分布式事物,java 源码

    - **事务的幂等性**:保证每个操作的幂等性是TCC模式成功的关键,避免因为网络重试导致的数据不一致。 通过以上介绍,我们可以看出TCC在解决分布式事务问题上的优势,但也需要注意其复杂性和实施难度。正确理解和...

    Java项目实战-基于SSH的任务调度系统的设计与实现(附源码,部署说明).zip

    5. **异常处理**:任务执行过程中可能会遇到各种异常,系统需具备完善的异常处理机制,保证任务失败时能够记录错误信息并尝试重试或通知管理员。 6. **并发与锁**:当多个任务可能同时执行相同的操作时,需要考虑...

    PowerJob 分布式调度与计算框架 v4.3.6.zip

    当任务执行失败时,可以设置重试策略,如立即重试、延时重试或按照指定次数重试。此外,还有任务失败通知机制,通过邮件、短信等方式及时通知开发者,以便快速定位和解决问题。 **4. 弹性扩缩容** PowerJob能够根据...

    DAG任务调度算法&优化现实_Java_下载.zip

    5. **异常处理和重试机制**:任务执行可能会失败,因此需要有异常处理和重试机制。这通常通过捕获异常、记录日志和决定是否重新调度任务来实现。 在实际应用中,DAG任务调度的优化主要聚焦于以下几个方面: - **...

    一种改进的分布式ETL作业调度方法实现.pdf

    此外,一旦单节点系统发生故障,正在进行的作业将会中断,且没有自动故障恢复机制,需要人工介入进行重试。 在大数据场景中,企业需要一种能够支持大规模数据集成并能提高作业处理效率的方法。因此,本文提出了一种...

    DolphinScheduler任务调度系统 v3.2.0.zip

    DolphinScheduler具备重试、失败转移、补偿等容错机制,确保任务在异常情况下能够自动恢复或执行备份方案。 11. **性能优化**: 在v3.2.0版本中,可能针对任务调度性能进行了优化,提升了任务处理速度,减少了...

Global site tag (gtag.js) - Google Analytics