锁定老帖子 主题:线程池的实现
精华帖 (1) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (1)
|
|
---|---|
作者 | 正文 |
发表时间:2009-12-25
感谢各位的关注,我在下一楼贴出修改后的代码。针对凤舞凰扬的回复,我希望与你探讨下。
引用
2. 虽然你有一个releaseThreads方法作为线程池大小控制,但是因为它不是线程安全,所以得到的size是不准确控制的。比如讲,如果多个线程同时进入到releaseThreads方法,如果刚好是临界值+1,那么一个线程做完大小的判断,还来不及kill掉多余线程时,那么其余线程都会依次通过大小的判断,从而导致线程池会kill掉多个线程,导致最后线程池大小小于临界值,就有可能出现问题。
3. 同样的问题也出现在getTask方法中,如果此时只有一个任务了,一个线程通过size=1的判断,还没来得及执行remove方法时,另外一个线程也通过了该判断,从而导致下个线程在执行remove方法时会抛出异常。而这种情况没有被有效处理,那么线程调用这个方法时,也就出现了线程泄漏,线程死掉。
我在修改方案中对两个方法加上了同步。我有一个疑义, private LinkedBlockingQueue<OjadbThread> ojadbThreads; 这两个队列本身是线程同步的,一定要再同步方法才能保证size取值是安全的么?
引用
5. 在execute方法中每次都将多余的线程kill掉,只保持最小临界值,其实是低效率的。其实理解楼主的minum这个临界值其实是最小可用线程数,但如果并发量大于这个数时,其实这个线程池会有频繁地创建线程,关闭线程,有违线程池的初衷。
这么处理是一种线程实现策略,策略可以定义多种。我将在有空时研究下JDK的Fixed线程的实现策略。谢谢。
引用
6. 在线程中这句if (result.length() > 0)判断危险啊,你怎么知道task返回什么,容易NullpointerException,又会有异常泄漏啊。
result不会返回null,这里我默认返回值是String了,应该是不严谨的。我增加了try的处理。
引用
7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下)
这点我在设计时就想到了,但我很无奈。如果你有好的办法,希望不吝赐教!谢谢。 |
|
返回顶楼 | |
发表时间:2009-12-25
package ojadb.core.thread; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class OjadbThreadPool { private int minimum; private int maximum; private LinkedBlockingQueue<OjadbThread> ojadbThreads; private BlockingQueue<OjadbTask> ojadbTasks; public OjadbThreadPool(int min, int max) throws OjadbException { if (0 < min && min < max) { minimum = min; maximum = max; ojadbThreads = new LinkedBlockingQueue<OjadbThread>(); ojadbTasks = new LinkedBlockingQueue<OjadbTask>(); for (int i = 0; i < minimum; i++) { OjadbThread worker = new OjadbThread(this, "worker" + i); worker.start(); } } else { throw new OjadbException("无法建立线程池: 最小值必须大于零,小于最大值"); } } void addWorker(OjadbThread ojadbThreadWorker) { ojadbThreads.offer(ojadbThreadWorker); } public void excute(OjadbTask thread) { // 生产任务 ojadbTasks.offer(thread); OjadbThread worker = ojadbThreads.remove(); if (worker != null) { worker.work(); } else if (ojadbThreads.size() < maximum) { OjadbThread newWorker = new OjadbThread(this, "worker" + ojadbThreads.size()); newWorker.start(); } else { } releaseThreads(); } private synchronized void releaseThreads() { while (ojadbThreads.size() > minimum) { OjadbThread worker = ojadbThreads.remove(); if (worker != null) { System.out.println(worker + "is killed"); worker.kill(); } } } public synchronized OjadbTask getTask() { if (ojadbTasks.size() == 0) { return null; } // 消费任务 return ojadbTasks.remove(); } }
package ojadb.core.thread; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class OjadbThread extends Thread { private AtomicBoolean isActived; private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private OjadbThreadPool pool; public OjadbThread(OjadbThreadPool pool, String threadName) { super(threadName); this.pool = pool; isActived = new AtomicBoolean(true); } @Override public synchronized void start() { super.start(); } @Override public void run() { try { lock.lock(); while (isActived.get()) { OjadbTask task = pool.getTask(); if (task != null) { try { String result = (String) task.excute(); if (result.length() > 0) { pool.addWorker(this); // 懒工人策略 -- 没人招呼工作,绝不睡醒 condition.await(); } } catch (Exception e) { } } else { pool.addWorker(this); condition.await(); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void work() { condition.signal(); } public void kill() { isActived.set(false); work(); } @Override public String toString() { return getName(); } }
package ojadb.core.thread; public class OjadbTask { private String threadName; public OjadbTask(String threadName) { this.threadName = threadName; } public Object excute() { System.out.println("excuting " + threadName); return "success"; } public String getName() { return threadName; } public String toString() { return threadName; } }
|
|
返回顶楼 | |
发表时间:2009-12-26
引用 我在修改方案中对两个方法加上了同步。我有一个疑义,
private LinkedBlockingQueue<OjadbThread> ojadbThreads; private BlockingQueue<OjadbTask> ojadbTasks 这两个队列本身是线程同步的,一定要再同步方法才能保证size取值是安全的么? 对象本身同步并不代表对象与外部多个操作是同步的。其实啊,这是很多刚开始接触多线程编程的人的误解的,如果想了解更多,推荐你看一本书,电子工业出版社的《java并发编程实践》 引用 这么处理是一种线程实现策略,策略可以定义多种。我将在有空时研究下JDK的Fixed线程的实现策略。谢谢 倒不是固定数量的线程池是好的。作为一个池来说,伸缩性是非常重要的,但是另外一个关键就是伸缩性的效率的效果更加是应该考虑的,否则会适得其反。其实你看看我写的那个关于task/worker的帖子吧,其中worker就是一个工作线程池,它是如何根据request的情况来进行伸缩性的调整的。
引用 result不会返回null,这里我默认返回值是String了,应该是不严谨的。我增加了try的处理
所以我用的是危险这个词。你如果做的是一个中间组件供大家使用。你就不会清楚知道调用者任务的具体实现是怎样。 引用 这点我在设计时就想到了,但我很无奈。如果你有好的办法,希望不吝赐教!谢谢。
其实很容易,引入管理线程。这个管理线程可以管理一个控制变量,从而实现对其他子线程的控制。同时管理线程在所有子线程结束后再进行一些相关处理(比如将文件移入到指定文件夹,确保流关闭等等)。 |
|
返回顶楼 | |
发表时间:2010-01-07
7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下)
|
|
返回顶楼 | |