`

【转】java调度器(重试机制)实现

 
阅读更多

【转】http://blog.csdn.net/prime7/article/details/49837517

调度器分为正常调度,异常调度,异常调度根据不同的队列进行时间间隔的区分,采用ScheduledExecutorService进行时间间隔调度,调度时根据当前队列中addData进队列里的数据分配线程进行处理。先看正常调度,这个类在构造函数时就会被调度,可以添加set方法,配置好单独再进行scheduler:

 

[java] view plain copy
 
  1. package test;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5. import java.util.concurrent.BlockingQueue;  
  6. import java.util.concurrent.Callable;  
  7. import java.util.concurrent.ExecutionException;  
  8. import java.util.concurrent.ExecutorService;  
  9. import java.util.concurrent.Executors;  
  10. import java.util.concurrent.Future;  
  11. import java.util.concurrent.LinkedBlockingQueue;  
  12. import java.util.concurrent.ScheduledExecutorService;  
  13. import java.util.concurrent.TimeUnit;  
  14.   
  15. /** 
  16.  * 异步任务定时执行管理 
  17.  */  
  18. public abstract class AbstractScheduleTask<T> implements ScheduleTask<T> {  
  19.   
  20.     /** 任务队列 */  
  21.     protected final BlockingQueue<T> dataQueue;  
  22.     /** 线程池 */  
  23.     protected final ExecutorService executorService;  
  24.     /** 调度任务 */  
  25.     protected final ScheduledExecutorService scheduledExecutorService;  
  26.     /** 定时任务数 */  
  27.     private int scheduleSize = 1;  
  28.     /** 定时任务开始执行时间 */  
  29.     private int scheduleInitialDelay = 0;  
  30.     /** 定时任务间隔时间,正常单条数据的插入时间<3ms,队列的长度为1000,1000m秒的时间足够,避免队列的数据堆积 */  
  31.     private int scheduleDelay = 1000;  
  32.     /** 线程池大小 */  
  33.     private int threadPoolSize = 8;  
  34.     /** 队列大小 */  
  35.     private int queueSize = 2000;  
  36.     /** 线程批处理大小; */  
  37.     private int dataSize = 100;  
  38.   
  39.     /** 默认构造方法,加载定时任务 */  
  40.     public AbstractScheduleTask() {  
  41.         dataQueue = new LinkedBlockingQueue<T>(queueSize);  
  42.         executorService = Executors.newFixedThreadPool(threadPoolSize);  
  43.         scheduledExecutorService = Executors.newScheduledThreadPool(scheduleSize);  
  44.         schedule();  
  45.     }  
  46.   
  47.     /** 
  48.      * 具体业务数据处理 
  49.      *  
  50.      * @param data 
  51.      * @return 
  52.      */  
  53.     protected abstract Integer doData(final List<T> data);  
  54.   
  55.     /** 
  56.      * 添加数据到队列 
  57.      */  
  58.     @Override  
  59.     public void addData(T parameterObject) {  
  60.         if (parameterObject != null) {  
  61.             if (dataQueue.size() >= this.getQueueSize()) {  
  62.                 // 消费队列数据过大  
  63.             }  
  64.             try {  
  65.                 dataQueue.put(parameterObject);  
  66.             } catch (InterruptedException e) {  
  67.                 // 添加队列数据异常  
  68.             }  
  69.         }  
  70.     }  
  71.   
  72.     /** 
  73.      * 设置定时任务 设定执行线程计划,初始10s延迟,每次任务完成后延迟10s再执行一次任务 
  74.      */  
  75.     private void schedule() {  
  76.         for (int i = 0; i < scheduleSize; i++) {  
  77.             scheduledExecutorService.scheduleWithFixedDelay(new ScheduleHandler(), scheduleInitialDelay, scheduleDelay,  
  78.                     TimeUnit.MILLISECONDS);  
  79.         }  
  80.   
  81.     }  
  82.   
  83.     /** 
  84.      * 创建任务 
  85.      *  
  86.      * @param data 
  87.      * @return 
  88.      */  
  89.     private Callable<Integer> getTask(final List<T> data) {  
  90.         Callable<Integer> task = new Callable<Integer>() {  
  91.   
  92.             @Override  
  93.             public Integer call() throws Exception {  
  94.                 if (data == null) {  
  95.                     return 0;  
  96.                 }  
  97.                 try {  
  98.                     int result = doData(data);  
  99.                     // 任务执行完成  
  100.                     return result;  
  101.                 } catch (Throwable e) {  
  102.                     // 执行任务出现异常  
  103.                 }  
  104.                 return 0;  
  105.             }  
  106.         };  
  107.   
  108.         return task;  
  109.     }  
  110.   
  111.     /** 
  112.      * 定时任务 
  113.      */  
  114.     private final class ScheduleHandler implements Runnable {  
  115.         /** 
  116.          * 定时任务实现 
  117.          *  
  118.          * @see java.lang.Runnable#run() 
  119.          */  
  120.         public void run() {  
  121.             List<T> dataList = new ArrayList<T>(dataSize);  
  122.             while (!dataQueue.isEmpty()) {  
  123.                 if (dataList.size() == dataSize) {  
  124.                     break;  
  125.                 }  
  126.                 try {  
  127.                     T data = dataQueue.take();  
  128.                     dataList.add(data);  
  129.                 } catch (InterruptedException e) {  
  130.                     // 出队列执行异常  
  131.                 }  
  132.             }  
  133.             Callable<Integer> task = getTask(dataList);  
  134.             if (task == null) {  
  135.                 return;  
  136.             }  
  137.             Future<Integer> future = executorService.submit(task);  
  138.             try {  
  139.                 int result = future.get();  
  140.                 // 任务执行结果: result  
  141.             } catch (InterruptedException e) {  
  142.                 // 任务执行异常  
  143.             } catch (ExecutionException e) {  
  144.                 // 任务执行异常  
  145.             }  
  146.         }  
  147.   
  148.     }  
  149.   
  150.     /** 
  151.      * Getter method for property <tt>scheduleSize</tt>. 
  152.      *  
  153.      * @return property value of scheduleSize 
  154.      */  
  155.     public int getScheduleSize() {  
  156.         return scheduleSize;  
  157.     }  
  158.   
  159.     /** 
  160.      * Getter method for property <tt>scheduleInitialDelay</tt>. 
  161.      *  
  162.      * @return property value of scheduleInitialDelay 
  163.      */  
  164.     public int getScheduleInitialDelay() {  
  165.         return scheduleInitialDelay;  
  166.     }  
  167.   
  168.     /** 
  169.      * Getter method for property <tt>scheduleDelay</tt>. 
  170.      *  
  171.      * @return property value of scheduleDelay 
  172.      */  
  173.     public int getScheduleDelay() {  
  174.         return scheduleDelay;  
  175.     }  
  176.   
  177.     /** 
  178.      * Getter method for property <tt>threadPoolSize</tt>. 
  179.      *  
  180.      * @return property value of threadPoolSize 
  181.      */  
  182.     public int getThreadPoolSize() {  
  183.         return threadPoolSize;  
  184.     }  
  185.   
  186.     /** 
  187.      * Getter method for property <tt>queueSize</tt>. 
  188.      *  
  189.      * @return property value of queueSize 
  190.      */  
  191.     public int getQueueSize() {  
  192.         return queueSize;  
  193.     }  
  194.   
  195.     /** 
  196.      * Getter method for property <tt>dataSize</tt>. 
  197.      *  
  198.      * @return property value of dataSize 
  199.      */  
  200.     public int getDataSize() {  
  201.         return dataSize;  
  202.     }  
  203. }  


实现的添加数据接口的定义:

 

 

[java] view plain copy
 
  1. package test;  
  2.   
  3. /** 
  4.  * 任务定时执行管理器 
  5.  */  
  6. public interface ScheduleTask<T> {  
  7.   
  8.     /** 
  9.      * 添加数据 
  10.      *  
  11.      * @param parameterObject 
  12.      */  
  13.     public void addData(final T parameterObject);  
  14.   
  15. }  


再来看一下 实现带有重试机制的调度器,不同的重试机制使用不同的调度队列

 

 

[java] view plain copy
 
  1. package test;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5. import java.util.concurrent.BlockingQueue;  
  6. import java.util.concurrent.Callable;  
  7. import java.util.concurrent.ExecutionException;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10. import java.util.concurrent.LinkedBlockingQueue;  
  11. import java.util.concurrent.ScheduledExecutorService;  
  12. import java.util.concurrent.TimeUnit;  
  13.   
  14. /** 
  15.  * 重试调度器 
  16.  *  
  17.  * @author prime7 
  18.  */  
  19. public abstract class AbstractRetryScheduleTask<T> extends AbstractScheduleTask<T> implements RetryScheduleTask<T> {  
  20.   
  21.     /** 重试调度模式,供外部添加调用 */  
  22.     public static final int RETRY_MODE_30S = 1;  
  23.     public static final int RETRY_MODE_5MIN = 2;  
  24.     public static final int RETRY_MODE_30MIN = 3;  
  25.   
  26.     /** 重试任务队列 */  
  27.     protected final BlockingQueue<T> retryDataQueue30S;  
  28.     protected final BlockingQueue<T> retryDataQueue5Min;  
  29.     protected final BlockingQueue<T> retryDataQueue30Min;  
  30.   
  31.     /** 重试调度任务 */  
  32.     protected final ScheduledExecutorService retryScheduledExecutorService;  
  33.     /** 重试定时任务数 */  
  34.     private int retryScheduleSize = 3;  
  35.     /** 重试定时任务开始执行时间 */  
  36.     private int retryScheduleInitialDelay = 0;  
  37.     /** 重试任务间隔时间 */  
  38.     private int retryScheduleDelay30S = 1000// 30 * 1000;  
  39.     private int retryScheduleDelay5Min = 5000// 5 * 60 * 1000;  
  40.     private int retryScheduleDelay30Min = 10000// 30 * 60 * 1000;  
  41.     /** 队列大小 */  
  42.     private int retryQueueSize = 2000;  
  43.     /** 线程批处理大小; */  
  44.     private int retryDataSize = 100;  
  45.   
  46.     public AbstractRetryScheduleTask() {  
  47.         super();  
  48.         // LinkedBlockingQueue is Thread safety.  
  49.         retryDataQueue30S = new LinkedBlockingQueue<T>(retryQueueSize);  
  50.         retryDataQueue5Min = new LinkedBlockingQueue<T>(retryQueueSize);  
  51.         retryDataQueue30Min = new LinkedBlockingQueue<T>(retryQueueSize);  
  52.         retryScheduledExecutorService = Executors.newScheduledThreadPool(retryScheduleSize);  
  53.         schedule();  
  54.     }  
  55.   
  56.     private void schedule() {  
  57.         // init 30s retry scheduler.  
  58.         retryScheduledExecutorService.scheduleWithFixedDelay(new RetryScheduleHandler(retryDataQueue30S),  
  59.                 retryScheduleInitialDelay, retryScheduleDelay30S, TimeUnit.MILLISECONDS);  
  60.         // init 5min retry scheduler.  
  61.         retryScheduledExecutorService.scheduleWithFixedDelay(new RetryScheduleHandler(retryDataQueue5Min),  
  62.                 retryScheduleInitialDelay, retryScheduleDelay5Min, TimeUnit.MILLISECONDS);  
  63.         // init 30min retry scheduler.  
  64.         retryScheduledExecutorService.scheduleWithFixedDelay(new RetryScheduleHandler(retryDataQueue30Min),  
  65.                 retryScheduleInitialDelay, retryScheduleDelay30Min, TimeUnit.MILLISECONDS);  
  66.     }  
  67.   
  68.     /** 
  69.      * 具体业务重试数据处理 
  70.      *  
  71.      * @param data 
  72.      * @return 
  73.      */  
  74.     protected abstract Integer doRetryData(final List<T> data);  
  75.   
  76.     private final class RetryScheduleHandler implements Runnable {  
  77.         /** 
  78.          * 定时任务实现 
  79.          *  
  80.          * @see java.lang.Runnable#run() 
  81.          */  
  82.         private BlockingQueue<T> dataQueue;  
  83.   
  84.         public RetryScheduleHandler(BlockingQueue<T> retryQueue) {  
  85.             dataQueue = retryQueue;  
  86.         }  
  87.   
  88.         public void run() {  
  89.             List<T> dataList = new ArrayList<T>(retryDataSize);  
  90.             while (!dataQueue.isEmpty()) {  
  91.                 if (dataList.size() >= retryDataSize)  
  92.                     break;  
  93.                 try {  
  94.                     T data = dataQueue.take();  
  95.                     dataList.add(data);  
  96.                 } catch (InterruptedException e) {  
  97.                     // LogUtil.error(logger, e, "出队列执行异常!");  
  98.                 }  
  99.             }  
  100.   
  101.             Callable<Integer> task = getRetryTask(dataList);  
  102.             if (task == null)  
  103.                 return;  
  104.             Future<Integer> future = executorService.submit(task);  
  105.             try {  
  106.                 int result = future.get();  
  107.                 // LogUtil.info(logger, "任务执行结果:" + result);  
  108.             } catch (InterruptedException e) {  
  109.                 // LogUtil.error(logger, e, "任务执行异常!");  
  110.             } catch (ExecutionException e) {  
  111.                 // LogUtil.error(logger, e, "任务执行异常!");  
  112.             }  
  113.         }  
  114.     }  
  115.   
  116.     /** 
  117.      * 创建任务 
  118.      *  
  119.      * @param data 
  120.      * @return 
  121.      */  
  122.     private Callable<Integer> getRetryTask(final List<T> data) {  
  123.         Callable<Integer> task = new Callable<Integer>() {  
  124.             @Override  
  125.             public Integer call() throws Exception {  
  126.                 if (data == null) {  
  127.                     return 0;  
  128.                 }  
  129.                 try {  
  130.                     int result = doRetryData(data);  
  131.                     return result;  
  132.                 } catch (Throwable e) {  
  133.                     // LogUtil.error(logger, e, "执行任务异常");  
  134.                 }  
  135.                 return 0;  
  136.             }  
  137.         };  
  138.         return task;  
  139.     }  
  140.   
  141.     /** 
  142.      * 添加数据到队列 
  143.      *  
  144.      * @see com.alipay.csCheck.biz.bench.schedule.ScheduleTask#addData(java.lang.Object) 
  145.      */  
  146.     @Override  
  147.     public void addRetryData(T parameterObject, int retryMode) {  
  148.         switch (retryMode) {  
  149.         case RETRY_MODE_30S:  
  150.             addRetryDateForQueue(parameterObject, retryDataQueue30S);  
  151.             break;  
  152.         case RETRY_MODE_5MIN:  
  153.             addRetryDateForQueue(parameterObject, retryDataQueue5Min);  
  154.             break;  
  155.         case RETRY_MODE_30MIN:  
  156.             addRetryDateForQueue(parameterObject, retryDataQueue30Min);  
  157.             break;  
  158.         default// never reach here.  
  159.             break;  
  160.         }  
  161.     }  
  162.   
  163.     /** 
  164.      * 按照对应模式添加重试数据到对应队列 
  165.      *  
  166.      * @param parameterObject 
  167.      * @param retryDataQueue 
  168.      */  
  169.     private void addRetryDateForQueue(T parameterObject, final BlockingQueue<T> retryDataQueue) {  
  170.         if (parameterObject != null) {  
  171.             if (retryDataQueue.size() >= this.getQueueSize()) {  
  172.                 // LogUtil.warn(logger, "消费队列数据过大!!!!");  
  173.   
  174.             }  
  175.             try {  
  176.                 retryDataQueue.put(parameterObject);  
  177.             } catch (InterruptedException e) {  
  178.                 // LogUtil.warn(logger, "添加队列数据异常!!!!", e.getMessage());  
  179.             }  
  180.         }  
  181.     }  
  182. }  

 

[java] view plain copy
 
  1. package test;  
  2.   
  3. /** 
  4.  * 任务定时执行管理器 包含重试任务 
  5.  *  
  6.  * @author prime7 
  7.  */  
  8. public interface RetryScheduleTask<T> extends ScheduleTask<T> {  
  9.   
  10.     /** 
  11.      * 添加重试数据 
  12.      *  
  13.      * @param parameterObject 
  14.      */  
  15.     public void addRetryData(T parameterObject, int retryMode);  
  16.   
  17. }  


最后 写个测试类进行一下测试:

 

 

[java] view plain copy
 
  1. package test;  
  2.   
  3. import java.util.Date;  
  4. import java.util.List;  
  5.   
  6. public class TestScheduler extends AbstractRetryScheduleTask<Integer> {  
  7.   
  8.     public static void main(String[] args) {  
  9.         TestScheduler test = new TestScheduler();  
  10.         test.addData(0);  
  11.         test.addRetryData(1, TestScheduler.RETRY_MODE_30S);  
  12.         test.addRetryData(2, TestScheduler.RETRY_MODE_5MIN);  
  13.         test.addRetryData(3, TestScheduler.RETRY_MODE_30MIN);  
  14.         test.addData(0);  
  15.         test.addRetryData(1, TestScheduler.RETRY_MODE_30S);  
  16.         test.addRetryData(2, TestScheduler.RETRY_MODE_5MIN);  
  17.         test.addRetryData(3, TestScheduler.RETRY_MODE_30MIN);  
  18.         try {  
  19.             System.out.println("now sleep");  
  20.             Thread.sleep(10 * 1000);  
  21.             System.out.println("now weakup");  
  22.             test.addData(0);  
  23.             test.addRetryData(1, TestScheduler.RETRY_MODE_30S);  
  24.             test.addRetryData(2, TestScheduler.RETRY_MODE_5MIN);  
  25.             test.addRetryData(3, TestScheduler.RETRY_MODE_30MIN);  
  26.         } catch (InterruptedException e) {  
  27.             e.printStackTrace();  
  28.         }  
  29.     }  
  30.   
  31.     @Override  
  32.     protected Integer doRetryData(List<Integer> data) {  
  33.         return doData(data);  
  34.     }  
  35.   
  36.     @Override  
  37.     protected Integer doData(List<Integer> data) {  
  38.         System.out.println(data + " " + new Date().getTime());  
  39.         return 1;  
  40.     }  
  41. }  
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics