`

ThreadPool定时重试

阅读更多
项目需要当某事件触发时,执行http请求任务,失败时需要有重试机制,并根据失败次数的增加,重试间隔也相应增加,任务可能并发。
由于是耗时任务,首先考虑的就是用线程来实现,并且为了节约资源,因而选择线程池。
为了解决不定间隔的重试,选择Timer和TimerTask来完成

package threadpool;

public class ThreadPoolTest {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		System.out.println("start");
		
		ThreadPoolManager poolManager = new ThreadPoolManager(3);
		poolManager.start();

		MyTaskList list = new MyTaskList(poolManager);
		
		new MyTask(list, "A").start();
		new MyTask(list, "B").start();
		new MyTask(list, "C").start();
		new MyTask(list, "D").start();
		new MyTask(list, "E").start();
		new MyTask(list, "F").start();
		new MyTask(list, "G").start();

		try {
			Thread.sleep(30000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		poolManager.stop();
		
		System.out.println("stop");
		
	}

}

package threadpool;

import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolManager {

	/** 线程池的大小 */
	private int poolSize;
	private static final int MIN_POOL_SIZE = 1;
	private static final int MAX_POOL_SIZE = 10;
	/** 线程池 */
	private ExecutorService threadPool;
	/** 请求队列 */
	private LinkedList<ThreadPoolTask> asyncTasks;
	/** 轮询线程 */
	private Thread poolThread;
	/** 轮询时间 */
	private static final int SLEEP_TIME = 200;

	public ThreadPoolManager(int poolSize) {
		if (poolSize < MIN_POOL_SIZE)
			poolSize = MIN_POOL_SIZE;
		if (poolSize > MAX_POOL_SIZE)
			poolSize = MAX_POOL_SIZE;
		this.poolSize = poolSize;
		threadPool = Executors.newFixedThreadPool(this.poolSize);
		asyncTasks = new LinkedList<ThreadPoolTask>();
	}

	/**
	 * 向任务队列中添加任务
	 * 
	 * @param task
	 */
	public void addAsyncTask(ThreadPoolTask task) {
		synchronized (asyncTasks) {
			// Log.i(TAG, "add task: " + task.getURL());
			asyncTasks.addLast(task);
		}
	}

	/**
	 * 从任务队列中提取任务
	 * 
	 * @return
	 */
	private ThreadPoolTask getAsyncTask() {
		synchronized (asyncTasks) {
			if (asyncTasks.size() > 0) {
				ThreadPoolTask task = asyncTasks.removeFirst();
				// Log.i(TAG, "remove task: " + task.getURL());
				return task;
			}
		}
		return null;
	}

	/**
	 * 开启线程池轮询
	 * 
	 * @return
	 */
	public void start() {
		if (poolThread == null) {
			poolThread = new Thread(new PoolRunnable());
			poolThread.start();
		}
	}

	/**
	 * 结束轮询,关闭线程池
	 */
	public void stop() {
		poolThread.interrupt();
		poolThread = null;
	}

	/**
	 * 实现轮询的Runnable
	 * 
	 * @author carrey
	 * 
	 */
	private class PoolRunnable implements Runnable {
		@Override
		public void run() {
			// Log.i(TAG, "开始轮询");
			try {
				while (!Thread.currentThread().isInterrupted()) {
					ThreadPoolTask task = getAsyncTask();
					if (task == null) {
						try {
							Thread.sleep(SLEEP_TIME);
						} catch (InterruptedException e) {
							Thread.currentThread().interrupt();
						}
						continue;
					}
					threadPool.execute(task);
				}
			} finally {
				threadPool.shutdown();
			}
			// Log.i(TAG, "结束轮询");
		}
	}
}

package threadpool;


public class ThreadPoolTask implements Runnable {

	private String tag;
	
	private Callback callback;
	
	public ThreadPoolTask(String tag, Callback callback) {
		this.tag = tag;
		this.callback = callback;
	}

	@Override
	public void run() {
		System.out.println(tag + " is running on " + Thread.currentThread());
		
		try {
			// 模拟耗时任务
			Thread.sleep(700);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		if (callback != null)
			callback.onRetry();
	}

	public interface Callback {
		public void onRetry();
	}

}

package threadpool;

import java.lang.reflect.Field;
import java.util.Timer;
import java.util.TimerTask;

public class MyTaskList {

	private ThreadPoolManager poolManager;

	private Timer timer;

	public MyTaskList(ThreadPoolManager poolManager) {
		this.poolManager = poolManager;

		timer = new Timer();
	}

	public void addTask(ThreadPoolTask task) {
		if (task != null)
			poolManager.addAsyncTask(task);
	}

	public void addTask(TimerTask task, long delay) {
		// 重置TimerTask,不然会发生Exception
		try {
			Class<?> clazz = TimerTask.class;
			Field field = clazz.getDeclaredField("state");
			field.setAccessible(true);
			field.set(task, 0);
		} catch (Exception e) {
		}

		timer.schedule(task, delay);
	}

}

package threadpool;

import java.util.TimerTask;

import threadpool.ThreadPoolTask.Callback;

public class MyTask implements Callback {

	private MyTaskList list;
	private ThreadPoolTask task;

	private String tag;

	private int retry = 0;

	public MyTask(MyTaskList list, String tag) {
		this.list = list;
		this.tag = tag;
	}

	public void start() {
		task = new ThreadPoolTask(tag, this);
		start(0);
	}

	private void start(int retry) {
		// 最多重试3次
		if (retry >= 3) {
			System.out.println(tag + " finished " + Thread.currentThread());
			return;
		}

		doSomething();

		this.retry = retry;

		list.addTask(task);
	}

	@Override
	public void onRetry() {
		// 重试间隔
		list.addTask(timertask, 1000);
	}

	private TimerTask timertask = new TimerTask() {

		@Override
		public void run() {
			start(retry + 1);
		}

	};

	private void doSomething() {
		System.out.println("Retry[" + retry + "] " + tag + " on "
				+ Thread.currentThread());
	}
}
分享到:
评论

相关推荐

    定时任务quartz实现分组串行并行动态配置

    8. **错误处理与重试**:在编写Job时,应处理可能的异常情况,例如通过捕获异常并决定是否重试。Quartz也提供了`JobListener`和`TriggerListener`,可以对Job的执行结果进行监控,实现定制化的错误处理逻辑。 9. **...

    quartz 调用两次任务

    4. **恢复策略**:Quartz 提供了在失败时重试的策略,如果配置不当,可能会导致任务多次尝试执行。 5. **JobDetail 和 Trigger 的名称与 Group 不唯一**:Quartz 需要 JobDetail 和 Trigger 的名称和组名是唯一的,...

    springquartz源码

    - **JobExecutionException**:Job执行时抛出的异常会被捕获并处理,可以控制任务的中断或重试。 - **Job持久化**:当应用重启时,通过JobStore可以恢复之前的Job和Trigger状态,继续执行未完成的任务。 7. **...

    Quartz-2.2.3

    - 当Job执行异常时,Quartz可以配置重试策略或者将异常信息记录。 - 可以设置Job的Durability属性,即使没有关联的Trigger,Job也会被保留,等待重新被调度。 总之,Quartz-2.2.3作为一款成熟的定时任务框架,为...

    quartz定时器

    Quartz 提供了多种机制来处理 Job 执行过程中可能出现的异常,如重试、标记为失败等。 #### 七、总结 Quartz 是一个强大且灵活的 Java 定时任务框架,适用于各种应用场景,包括但不限于定时任务调度、批量数据处理...

    C#实现杀京东价格监控软件源码

    此外,考虑到网络状况的不稳定性和电商平台的反爬策略,错误处理和重试机制也是关键。我们可以使用try-catch语句捕获异常,并根据具体情况决定是否重新尝试获取数据。 最后,如果软件需要提供用户界面,可以使用WPF...

    quartz线程管理共7页.pdf.zip

    同时,Quartz提供了一些机制来处理异常情况,比如任务执行失败后的重试策略、错误处理回调等。 Quartz还支持集群部署,这意味着多个Quartz实例可以共享同一个调度任务,增强了系统的可用性和容错性。在分布式环境中...

    C# 百度谷歌关键字排名查询

    对于网络请求部分,还应该有处理网络异常和重试机制。 总的来说,"C# 百度谷歌关键字排名查询"是一个涉及网络请求、HTML解析、数据处理和分析的项目,涵盖了C#编程语言的核心特性以及Web开发的相关技术。通过学习和...

    C#同步考勤机数据到数据库

    同时,合理的错误处理和重试机制也是必不可少的,以应对可能出现的网络中断或数据库连接问题。 最后,为了提高效率和避免数据冲突,可以采用定时任务或者消息队列的方式来定期或实时获取考勤机数据。例如,可以使用...

    Quartz作业调度框架

    - Job的并发执行策略可以通过实现`org.quartz.JobExecutionContext`中的`isRecoverable()`方法来控制,决定当Job执行失败时是否需要重试。 7. **插件与扩展** - Quartz提供了一些内置插件,如JobListener(监听...

    Quartz.NET 配置文件详解1

    3. **durable** 和 **recover**: 分别表示 Job 是否持久化和是否在失败后重试。`durable=true` 表示即使调度器重启,Job 信息也会被恢复;`recover=false` 表示不尝试重新执行失败的 Job。 4. **trigger**: 触发器...

    Springboot整合quartz产生错误及解决方案

    quartz.threadPool.threadCount=10 ``` 3. 创建Job类:定义一个实现了`org.quartz.Job`接口的类,实现`execute(JobExecutionContext context)`方法,此方法包含了定时任务的具体逻辑。 ```java import org.quartz....

    spring的quartz使用实例

    8. **异常处理和重试策略** 可以通过实现`org.quartz.JobListener`接口,为Job添加错误处理逻辑。如果Job执行失败,可以通过调整Trigger重新安排执行。 9. **集群支持** Quartz支持集群部署,当在一个集群中配置...

    Java爬虫框架.pdf

    因此,框架需要具备重试机制和异常处理机制,以确保爬虫在遇到错误时能够继续运行。 综上所述,Java爬虫框架设计的复杂性和涉及的技术广度都非常高。一个良好的爬虫框架需要考虑到多线程的并发控制、任务调度和数据...

    用VB写“多线程”程序.zip_vb 多线程_vb多线程_visual basic_多线程

    - **活锁**:线程不断重试资源获取,但没有进展。 - **饥饿**:低优先级线程可能长时间无法获取资源。 8. **线程池** - **线程池**:系统维护的一个线程集合,用于执行短生命周期的任务,减少线程创建和销毁的...

    SyncForFast

    7. **性能优化**:为了实现“同步快”的目标,SyncForFast可能采用了各种性能优化策略,如批量处理文件、缓存策略、错误重试机制等,以提高同步速度和鲁棒性。 8. **日志和错误处理**:为了便于调试和监控,...

    confectionery

    6. 多线程:如果项目包含后台任务,如定时更新库存或发送电子邮件通知,可能会用到C#的并发和多线程特性,如`Task`类或`ThreadPool`。 7. 测试:为了保证代码质量,开发者可能会编写单元测试或集成测试,利用如...

Global site tag (gtag.js) - Google Analytics