我有一个这样的线程池的场景,相信很多人都遇到过:
1,每个用户都可以添加多个任务;
2,有很多的用户和很多的任务;
3,每个用户添加的任务必须有序串行执行,即在同一时刻不能有同时执行一个用户的两个任务;
4,实时性:只要线程池线程有空闲的,那么用户提交任务后必须立即执行;尽可能提高线程的利用率。
代码比较简洁,基本满足上述要求:
public class SerialThreadExecutor { private Executor executor; private ConcurrentMap<Object, SequentialJob> serialJobs = new ConcurrentHashMap<Object, SequentialJob>(); public SerialThreadExecutor(Executor executor) { super(); this.executor = executor; } public void executeSerially(Object key, Runnable r) { SequentialJob job = serialJobs.get(key); if (job == null) { job = new SequentialJob(key); SequentialJob oldJob = serialJobs.putIfAbsent(key, job); if (oldJob != null) { job = oldJob; } } job.addJob(r); } private class SequentialJob implements Runnable { private BlockingQueue<Runnable> jobs = new LinkedBlockingQueue<Runnable>(); private Object key; private AtomicBoolean running = new AtomicBoolean(false); public SequentialJob(Object key) { this.key = key; } public void run() { Runnable r = null; while (true) { try { r = jobs.poll(50, TimeUnit.MILLISECONDS); if (r != null) { r.run(); } else {//[1] synchronized (this) { if (jobs.isEmpty() && running.compareAndSet(true, false)) { return; } else { continue; } } } } catch (InterruptedException e) { // TODO e.printStackTrace(); } } } public void addJob(Runnable r) { synchronized (this) { jobs.add(r); if (running.compareAndSet(false, true)) { executor.execute(this);//[2] } } } } }
这个实现有几个缺陷:
1,每次添加一个任务都要进入一次锁,有一点小小开销;
2,serialJobs会一直在内存中,当某个key的任务很久没有添加了,对应的SequentialJob对象一直存在,虽然不占用很多内存,但对于有洁癖的人来说或,还是不爽。
抛砖引玉,看看广大网友是否可以优化。
(异常处理等细节大家就不要理会了)
注意构造executor时executor.execute(Runnable)不能阻塞,否则会有死锁风险:
当线程池t1:executor线程不够用时,executor.execute()等待空闲线程(分支[2]),此时获取了SequentialJob锁L1,任务线程t2把这个用户的所有任务执行完后,进入分支[1],等待t1释放L1,从而造成死锁。