论坛首页 Java企业应用论坛

具有相同属性任务串行有序执行的线程池设计

浏览 3220 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2014-09-04   最后修改:2014-09-04

我有一个这样的线程池的场景,相信很多人都遇到过: 
1,每个用户都可以添加多个任务; 
2,有很多的用户和很多的任务; 
3,每个用户添加的任务必须有序串行执行,即在同一时刻不能有同时执行一个用户的两个任务; 
4,实时性:只要线程池线程有空闲的,那么用户提交任务后必须立即执行;尽可能提高线程的利用率。 

代码比较简洁,基本满足上述要求:

public class SerialThreadExecutor {
	private Executor executor;
	private ConcurrentMap<Object, SequentialJob> serialJobs = new ConcurrentHashMap<Object, SequentialJob>();

	public SerialThreadExecutor(Executor executor) {
		super();
		this.executor = executor;
	}

	public void executeSerially(Object key, Runnable r) {
		SequentialJob job = serialJobs.get(key);
		if (job == null) {
			job = new SequentialJob(key);
			SequentialJob oldJob = serialJobs.putIfAbsent(key, job);
			if (oldJob != null) {
				job = oldJob;
			}
		}
		job.addJob(r);
	}

	private class SequentialJob implements Runnable {
		private BlockingQueue<Runnable> jobs = new LinkedBlockingQueue<Runnable>();
		private Object key;
		private AtomicBoolean running = new AtomicBoolean(false);

		public SequentialJob(Object key) {
			this.key = key;
		}

		public void run() {
			Runnable r = null;
			while (true) {
				try {
					r = jobs.poll(50, TimeUnit.MILLISECONDS);
					if (r != null) {
						r.run();
					} else {//[1]
						synchronized (this) {
							if (jobs.isEmpty()
									&& running.compareAndSet(true, false)) {
								return;
							} else {
								continue;
							}
						}
					}
				} catch (InterruptedException e) {
					// TODO
					e.printStackTrace();
				}
			}
		}

		public void addJob(Runnable r) {
			synchronized (this) {
				jobs.add(r);
				if (running.compareAndSet(false, true)) {
					executor.execute(this);//[2]
				}
			}
		}
	}
}

这个实现有几个缺陷:

1,每次添加一个任务都要进入一次锁,有一点小小开销;

2,serialJobs会一直在内存中,当某个key的任务很久没有添加了,对应的SequentialJob对象一直存在,虽然不占用很多内存,但对于有洁癖的人来说或,还是不爽。

 

抛砖引玉,看看广大网友是否可以优化。

(异常处理等细节大家就不要理会了)

 

  注意构造executor时executor.execute(Runnable)不能阻塞,否则会有死锁风险:

当线程池t1:executor线程不够用时,executor.execute()等待空闲线程(分支[2]),此时获取了SequentialJob锁L1,任务线程t2把这个用户的所有任务执行完后,进入分支[1],等待t1释放L1,从而造成死锁。

论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics