- 浏览: 497488 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (250)
- concurrent (11)
- io (1)
- CI (10)
- linux (57)
- windows (2)
- java (38)
- mac (4)
- eclipse (9)
- db (13)
- python (5)
- groovy (5)
- flex (7)
- hibernate (5)
- odb (8)
- netbeans (1)
- web (31)
- book (14)
- erlang (2)
- communication (2)
- virtualization (5)
- jUnit (0)
- jsf (1)
- perl (1)
- java jax-rs (5)
- Jenkins (2)
- Jenkins Plugin (3)
- android (2)
- git (1)
- big data (0)
- 试读 (1)
最新评论
-
yzzy4793:
讲的很清楚,明白
同步synchronized方法和代码块 -
aa51513:
中文乱码式硬伤
Jersey2.x对REST请求处理流程的分析 -
feiwomoshu1991:
...
同步synchronized方法和代码块 -
marshan:
启动失败的原因是加载的类版本冲突,因此你首先要保证依赖的版本和 ...
richfaces中facelet版本升级到2时的典型错误和解决办法 -
zhaohang6688:
请问我按照你的方式修改还是报错 错误信息还是这个 是为什么啊 ...
richfaces中facelet版本升级到2时的典型错误和解决办法
自己实现了一个简单的线程池。希望回复者讨论技术问题,不要说都已经有了,你还重复造轮子。
我的目的是讨论线程池的实现,而不是提供使用。
测试代码:
package threadpool; import ojadb.core.thread.OjadbThreadPool; import ojadb.core.thread.task.OjadbTask; import ojadb.exception.OjadbException; public class BasicTest { public static void main(String[] ojadbs) throws OjadbException { OjadbThreadPool pool = new OjadbThreadPool(5, 10); OjadbTask thread = new OjadbTask("test thread"); pool.excute(thread); System.exit(0); } }
线程池:
package ojadb.core.thread; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import ojadb.core.thread.task.OjadbTask; import ojadb.core.thread.thread.OjadbThread; import ojadb.exception.OjadbException; public class OjadbThreadPool { private int minimum; private int maximum; private LinkedBlockingQueue<OjadbThread> ojadbThreads; private BlockingQueue<OjadbTask> ojadbTasks; public OjadbThreadPool(int min, int max) throws OjadbException { if (0 < min && min < max) { minimum = min; maximum = max; ojadbThreads = new LinkedBlockingQueue<OjadbThread>(); ojadbTasks = new LinkedBlockingQueue<OjadbTask>(); for (int i = 0; i < minimum; i++) { OjadbThread worker = new OjadbThread(this, "worker" + i); worker.start(); } } else { throw new OjadbException("无法建立线程池: 最小值必须大于零,小于最大值"); } } public void addWorker(OjadbThread ojadbThreadWorker) { ojadbThreads.offer(ojadbThreadWorker); } public void excute(OjadbTask thread) { // 生产任务 ojadbTasks.offer(thread); OjadbThread worker = ojadbThreads.remove(); if (worker != null) { worker.work(); } else if (ojadbThreads.size() < maximum) { OjadbThread newWorker = new OjadbThread(this, "worker" + ojadbThreads.size()); newWorker.start(); } else { } releaseThreads(); } private void releaseThreads() { while (ojadbThreads.size() > minimum) { OjadbThread worker = ojadbThreads.remove(); System.out.println(worker + "is killed"); worker.kill(); } } public OjadbTask getTask() { if (ojadbTasks.size() == 0) return null; // 消费任务 return ojadbTasks.remove(); } }
内部线程:
package ojadb.core.thread.thread; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import ojadb.core.thread.OjadbThreadPool; import ojadb.core.thread.task.OjadbTask; public class OjadbThread extends Thread { private boolean isActived; private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private OjadbThreadPool pool; public OjadbThread(OjadbThreadPool pool, String threadName) { super(threadName); this.pool = pool; isActived = true; } @Override public synchronized void start() { super.start(); } @Override public void run() { try { lock.lock(); while (isActived) { OjadbTask task = pool.getTask(); if (task != null) { String result = (String) task.excute(); if (result.length() > 0) { pool.addWorker(this); // 懒工人策略 -- 没人招呼工作,绝不睡醒 condition.await(); } } else { pool.addWorker(this); condition.await(); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void work() { try { lock.lock(); condition.signal(); } finally { lock.unlock(); } } public void kill() { isActived = false; work(); } @Override public String toString() { return getName(); } }
用户线程:
package ojadb.core.thread.task; public class OjadbTask { private String threadName; public OjadbTask(String threadName) { this.threadName = threadName; } public Object excute() { System.out.println("excuting " + threadName); return "success"; } public String getName() { return threadName; } public String toString() { return threadName; } }
评论
14 楼
youngJiang
2010-01-07
<div class="quote_div">
<p> </p>
<p> </p>
<div class="quote_div">7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下) </div>
<p> </p>
</div>
<p><br>这句话什么意思,偶程序没有看明白,能否解释的清楚一点儿?</p>
<p> </p>
<p> </p>
<div class="quote_div">7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下) </div>
<p> </p>
</div>
<p><br>这句话什么意思,偶程序没有看明白,能否解释的清楚一点儿?</p>
13 楼
凤舞凰扬
2009-12-26
引用
我在修改方案中对两个方法加上了同步。我有一个疑义,
private LinkedBlockingQueue<OjadbThread> ojadbThreads;
private BlockingQueue<OjadbTask> ojadbTasks
这两个队列本身是线程同步的,一定要再同步方法才能保证size取值是安全的么?
private LinkedBlockingQueue<OjadbThread> ojadbThreads;
private BlockingQueue<OjadbTask> ojadbTasks
这两个队列本身是线程同步的,一定要再同步方法才能保证size取值是安全的么?
对象本身同步并不代表对象与外部多个操作是同步的。其实啊,这是很多刚开始接触多线程编程的人的误解的,如果想了解更多,推荐你看一本书,电子工业出版社的《java并发编程实践》
引用
这么处理是一种线程实现策略,策略可以定义多种。我将在有空时研究下JDK的Fixed线程的实现策略。谢谢
倒不是固定数量的线程池是好的。作为一个池来说,伸缩性是非常重要的,但是另外一个关键就是伸缩性的效率的效果更加是应该考虑的,否则会适得其反。其实你看看我写的那个关于task/worker的帖子吧,其中worker就是一个工作线程池,它是如何根据request的情况来进行伸缩性的调整的。
引用
result不会返回null,这里我默认返回值是String了,应该是不严谨的。我增加了try的处理
所以我用的是危险这个词。你如果做的是一个中间组件供大家使用。你就不会清楚知道调用者任务的具体实现是怎样。
引用
这点我在设计时就想到了,但我很无奈。如果你有好的办法,希望不吝赐教!谢谢。
其实很容易,引入管理线程。这个管理线程可以管理一个控制变量,从而实现对其他子线程的控制。同时管理线程在所有子线程结束后再进行一些相关处理(比如将文件移入到指定文件夹,确保流关闭等等)。
12 楼
marshan
2009-12-25
<pre name="code" class="java">package ojadb.core.thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class OjadbThreadPool {
private int minimum;
private int maximum;
private LinkedBlockingQueue<OjadbThread> ojadbThreads;
private BlockingQueue<OjadbTask> ojadbTasks;
public OjadbThreadPool(int min, int max) throws OjadbException {
if (0 < min && min < max) {
minimum = min;
maximum = max;
ojadbThreads = new LinkedBlockingQueue<OjadbThread>();
ojadbTasks = new LinkedBlockingQueue<OjadbTask>();
for (int i = 0; i < minimum; i++) {
OjadbThread worker = new OjadbThread(this, "worker" + i);
worker.start();
}
} else {
throw new OjadbException("无法建立线程池: 最小值必须大于零,小于最大值");
}
}
void addWorker(OjadbThread ojadbThreadWorker) {
ojadbThreads.offer(ojadbThreadWorker);
}
public void excute(OjadbTask thread) {
// 生产任务
ojadbTasks.offer(thread);
OjadbThread worker = ojadbThreads.remove();
if (worker != null) {
worker.work();
} else if (ojadbThreads.size() < maximum) {
OjadbThread newWorker = new OjadbThread(this, "worker" + ojadbThreads.size());
newWorker.start();
} else {
}
releaseThreads();
}
private synchronized void releaseThreads() {
while (ojadbThreads.size() > minimum) {
OjadbThread worker = ojadbThreads.remove();
if (worker != null) {
System.out.println(worker + "is killed");
worker.kill();
}
}
}
public synchronized OjadbTask getTask() {
if (ojadbTasks.size() == 0) {
return null;
}
// 消费任务
return ojadbTasks.remove();
}
}</pre>
<p> </p>
<pre name="code" class="java">package ojadb.core.thread;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class OjadbThread extends Thread {
private AtomicBoolean isActived;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private OjadbThreadPool pool;
public OjadbThread(OjadbThreadPool pool, String threadName) {
super(threadName);
this.pool = pool;
isActived = new AtomicBoolean(true);
}
@Override
public synchronized void start() {
super.start();
}
@Override
public void run() {
try {
lock.lock();
while (isActived.get()) {
OjadbTask task = pool.getTask();
if (task != null) {
try {
String result = (String) task.excute();
if (result.length() > 0) {
pool.addWorker(this);
// 懒工人策略 -- 没人招呼工作,绝不睡醒
condition.await();
}
} catch (Exception e) {
}
} else {
pool.addWorker(this);
condition.await();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void work() {
condition.signal();
}
public void kill() {
isActived.set(false);
work();
}
@Override
public String toString() {
return getName();
}
}
</pre>
<p> </p>
<p> </p>
<pre name="code" class="java">package ojadb.core.thread;
public class OjadbTask {
private String threadName;
public OjadbTask(String threadName) {
this.threadName = threadName;
}
public Object excute() {
System.out.println("excuting " + threadName);
return "success";
}
public String getName() {
return threadName;
}
public String toString() {
return threadName;
}
}
</pre>
<p> </p>
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class OjadbThreadPool {
private int minimum;
private int maximum;
private LinkedBlockingQueue<OjadbThread> ojadbThreads;
private BlockingQueue<OjadbTask> ojadbTasks;
public OjadbThreadPool(int min, int max) throws OjadbException {
if (0 < min && min < max) {
minimum = min;
maximum = max;
ojadbThreads = new LinkedBlockingQueue<OjadbThread>();
ojadbTasks = new LinkedBlockingQueue<OjadbTask>();
for (int i = 0; i < minimum; i++) {
OjadbThread worker = new OjadbThread(this, "worker" + i);
worker.start();
}
} else {
throw new OjadbException("无法建立线程池: 最小值必须大于零,小于最大值");
}
}
void addWorker(OjadbThread ojadbThreadWorker) {
ojadbThreads.offer(ojadbThreadWorker);
}
public void excute(OjadbTask thread) {
// 生产任务
ojadbTasks.offer(thread);
OjadbThread worker = ojadbThreads.remove();
if (worker != null) {
worker.work();
} else if (ojadbThreads.size() < maximum) {
OjadbThread newWorker = new OjadbThread(this, "worker" + ojadbThreads.size());
newWorker.start();
} else {
}
releaseThreads();
}
private synchronized void releaseThreads() {
while (ojadbThreads.size() > minimum) {
OjadbThread worker = ojadbThreads.remove();
if (worker != null) {
System.out.println(worker + "is killed");
worker.kill();
}
}
}
public synchronized OjadbTask getTask() {
if (ojadbTasks.size() == 0) {
return null;
}
// 消费任务
return ojadbTasks.remove();
}
}</pre>
<p> </p>
<pre name="code" class="java">package ojadb.core.thread;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class OjadbThread extends Thread {
private AtomicBoolean isActived;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private OjadbThreadPool pool;
public OjadbThread(OjadbThreadPool pool, String threadName) {
super(threadName);
this.pool = pool;
isActived = new AtomicBoolean(true);
}
@Override
public synchronized void start() {
super.start();
}
@Override
public void run() {
try {
lock.lock();
while (isActived.get()) {
OjadbTask task = pool.getTask();
if (task != null) {
try {
String result = (String) task.excute();
if (result.length() > 0) {
pool.addWorker(this);
// 懒工人策略 -- 没人招呼工作,绝不睡醒
condition.await();
}
} catch (Exception e) {
}
} else {
pool.addWorker(this);
condition.await();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void work() {
condition.signal();
}
public void kill() {
isActived.set(false);
work();
}
@Override
public String toString() {
return getName();
}
}
</pre>
<p> </p>
<p> </p>
<pre name="code" class="java">package ojadb.core.thread;
public class OjadbTask {
private String threadName;
public OjadbTask(String threadName) {
this.threadName = threadName;
}
public Object excute() {
System.out.println("excuting " + threadName);
return "success";
}
public String getName() {
return threadName;
}
public String toString() {
return threadName;
}
}
</pre>
<p> </p>
11 楼
marshan
2009-12-25
<div class="quote_title">感谢各位的关注,我在下一楼贴出修改后的代码。针对凤舞凰扬的回复,我希望与你探讨下。</div>
<div class="quote_title">引用</div>
<div class="quote_div">2. 虽然你有一个releaseThreads方法作为线程池大小控制,但是因为它不是线程安全,所以得到的size是不准确控制的。比如讲,如果多个线程同时进入到releaseThreads方法,如果刚好是临界值+1,那么一个线程做完大小的判断,还来不及kill掉多余线程时,那么其余线程都会依次通过大小的判断,从而导致线程池会kill掉多个线程,导致最后线程池大小小于临界值,就有可能出现问题。 <br>3. 同样的问题也出现在getTask方法中,如果此时只有一个任务了,一个线程通过size=1的判断,还没来得及执行remove方法时,另外一个线程也通过了该判断,从而导致下个线程在执行remove方法时会抛出异常。而这种情况没有被有效处理,那么线程调用这个方法时,也就出现了线程泄漏,线程死掉。 </div>
<p> </p>
<p>我在修改方案中对两个方法加上了同步。我有一个疑义, </p>
<p>private LinkedBlockingQueue<OjadbThread> ojadbThreads;<br>private BlockingQueue<OjadbTask> ojadbTasks</p>
<p>这两个队列本身是线程同步的,一定要再同步方法才能保证size取值是安全的么?</p>
<p> </p>
<div class="quote_title">引用</div>
<div class="quote_div">5. 在execute方法中每次都将多余的线程kill掉,只保持最小临界值,其实是低效率的。其实理解楼主的minum这个临界值其实是最小可用线程数,但如果并发量大于这个数时,其实这个线程池会有频繁地创建线程,关闭线程,有违线程池的初衷。 </div>
<p>这么处理是一种线程实现策略,策略可以定义多种。我将在有空时研究下JDK的Fixed线程的实现策略。谢谢。</p>
<p> </p>
<div class="quote_title">引用</div>
<div class="quote_div">6. 在线程中这句if (result.length() > 0)判断危险啊,你怎么知道task返回什么,容易NullpointerException,又会有异常泄漏啊。 </div>
<p> result不会返回null,这里我默认返回值是String了,应该是不严谨的。我增加了try的处理。</p>
<p> </p>
<div class="quote_title">引用</div>
<div class="quote_div">7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下) </div>
<p>这点我在设计时就想到了,但我很无奈。如果你有好的办法,希望不吝赐教!谢谢。</p>
<div class="quote_title">引用</div>
<div class="quote_div">2. 虽然你有一个releaseThreads方法作为线程池大小控制,但是因为它不是线程安全,所以得到的size是不准确控制的。比如讲,如果多个线程同时进入到releaseThreads方法,如果刚好是临界值+1,那么一个线程做完大小的判断,还来不及kill掉多余线程时,那么其余线程都会依次通过大小的判断,从而导致线程池会kill掉多个线程,导致最后线程池大小小于临界值,就有可能出现问题。 <br>3. 同样的问题也出现在getTask方法中,如果此时只有一个任务了,一个线程通过size=1的判断,还没来得及执行remove方法时,另外一个线程也通过了该判断,从而导致下个线程在执行remove方法时会抛出异常。而这种情况没有被有效处理,那么线程调用这个方法时,也就出现了线程泄漏,线程死掉。 </div>
<p> </p>
<p>我在修改方案中对两个方法加上了同步。我有一个疑义, </p>
<p>private LinkedBlockingQueue<OjadbThread> ojadbThreads;<br>private BlockingQueue<OjadbTask> ojadbTasks</p>
<p>这两个队列本身是线程同步的,一定要再同步方法才能保证size取值是安全的么?</p>
<p> </p>
<div class="quote_title">引用</div>
<div class="quote_div">5. 在execute方法中每次都将多余的线程kill掉,只保持最小临界值,其实是低效率的。其实理解楼主的minum这个临界值其实是最小可用线程数,但如果并发量大于这个数时,其实这个线程池会有频繁地创建线程,关闭线程,有违线程池的初衷。 </div>
<p>这么处理是一种线程实现策略,策略可以定义多种。我将在有空时研究下JDK的Fixed线程的实现策略。谢谢。</p>
<p> </p>
<div class="quote_title">引用</div>
<div class="quote_div">6. 在线程中这句if (result.length() > 0)判断危险啊,你怎么知道task返回什么,容易NullpointerException,又会有异常泄漏啊。 </div>
<p> result不会返回null,这里我默认返回值是String了,应该是不严谨的。我增加了try的处理。</p>
<p> </p>
<div class="quote_title">引用</div>
<div class="quote_div">7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下) </div>
<p>这点我在设计时就想到了,但我很无奈。如果你有好的办法,希望不吝赐教!谢谢。</p>
10 楼
zzw0309452
2009-12-23
楼上的很厉害
9 楼
凤舞凰扬
2009-12-22
回答三楼,应该是volatile, 不是transient,呵呵,敲错了,谢谢提醒,真是不好意思,差点误导了。
8 楼
changyuxin
2009-12-22
看了楼主的代码,也看了几位的回复,知道了更多线程池的知识!
7 楼
ztk5912
2009-12-21
我也正想学习一下线程池原理和实现,仔细看了下楼主的实现
OjadbThread 内部是不需要lock的,每个线程内部都new了一个lock,而这个lock只有该线程使用
各个线程自己的线程栈是安全的,一个线程是访问不到另一个线程的局部变量的,需要同步的都是多个线程都能同时访问到的外部资源,OjadbThread 中run()方法和work()是我觉得都不需要进行 lock,楼主主要是用了condition的await()和signal()方法来使全线程进入等待状态和唤醒该线程,不存在竞争
OjadbThread 内部是不需要lock的,每个线程内部都new了一个lock,而这个lock只有该线程使用
各个线程自己的线程栈是安全的,一个线程是访问不到另一个线程的局部变量的,需要同步的都是多个线程都能同时访问到的外部资源,OjadbThread 中run()方法和work()是我觉得都不需要进行 lock,楼主主要是用了condition的await()和signal()方法来使全线程进入等待状态和唤醒该线程,不存在竞争
6 楼
whaosoft
2009-12-21
lz学习态度挺好的 连接池正好能用上 Queue 也不错
5 楼
jhaij
2009-12-21
因为专所以专(专业)
4 楼
凤舞凰扬
2009-12-21
另外,顺带回答一下楼上(3楼)的几个问题。
这个没关系的,不是问题。单例与否应该由客户控制,而不是池本身。客观是允许多个池存在的。
2.这段逻辑是不是不太对啊,看了半天没明白。
如果能从ojadbThreads里取到一个线程(!=null),那就work?可是work的的代码是lock+signal
这段逻辑倒是没太大问题,主要是起一个通知的作用。
这段怀疑得对,楼主应该加上volatile关键字或者使用AtomicBoolean对象。
引用
1.OjadbThreadPool首先应该单例。
这个没关系的,不是问题。单例与否应该由客户控制,而不是池本身。客观是允许多个池存在的。
引用
2.这段逻辑是不是不太对啊,看了半天没明白。
如果能从ojadbThreads里取到一个线程(!=null),那就work?可是work的的代码是lock+signal
这段逻辑倒是没太大问题,主要是起一个通知的作用。
引用
3.OjadbThreadPool里kill掉的线程,isActive=false,然后lock,就完了?
这段怀疑得对,楼主应该加上volatile关键字或者使用AtomicBoolean对象。
3 楼
凤舞凰扬
2009-12-21
楼主学着写,精神层面值得鼓励,不过技术层面,还是有相当多比较严重的问题的,其中最主要的是对多线程并发中同步控制以及池本身的概念认识不够。
1. 线程池OjadbThreadPool 本身以public方式暴露addWorker的方法,那么外部调用是否可以执行这个方法呢?这样的话,提供进来的线程对象就有问题了。(其实楼主的这个方法只是想让线程本身进行回调的,以通知池自己可用)
2. 虽然你有一个releaseThreads方法作为线程池大小控制,但是因为它不是线程安全,所以得到的size是不准确控制的。比如讲,如果多个线程同时进入到releaseThreads方法,如果刚好是临界值+1,那么一个线程做完大小的判断,还来不及kill掉多余线程时,那么其余线程都会依次通过大小的判断,从而导致线程池会kill掉多个线程,导致最后线程池大小小于临界值,就有可能出现问题。
3. 同样的问题也出现在getTask方法中,如果此时只有一个任务了,一个线程通过size=1的判断,还没来得及执行remove方法时,另外一个线程也通过了该判断,从而导致下个线程在执行remove方法时会抛出异常。而这种情况没有被有效处理,那么线程调用这个方法时,也就出现了线程泄漏,线程死掉。
4. 说到上述问题,就发现楼主对remove方法理解不够,remove如果失败,是会抛出异常的。如果来的任务多个你的线程池,结果就是线程池的38行,53行代码出现异常。因为38行异常,下面的代码在这种情况下,其实都是废的。
5. 在execute方法中每次都将多余的线程kill掉,只保持最小临界值,其实是低效率的。其实理解楼主的minum这个临界值其实是最小可用线程数,但如果并发量大于这个数时,其实这个线程池会有频繁地创建线程,关闭线程,有违线程池的初衷。
6. 在线程中这句if (result.length() > 0)判断危险啊,你怎么知道task返回什么,容易NullpointerException,又会有异常泄漏啊。
7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下)
8. 在OjadbThread 中的work方法中,是不需要lock和unlock。简单说明下,如果condition.signal()唤醒了当前OjadbThread 的主线程,那么它必然就会获得lock(它是在lock状态等待),这个时候调用work方法的其他线程是没法unlock的。其实啊,condition.signal()是线程安全方法,不用担心同步问题的。而另外这种处理带来的第二个问题就是通知泄漏。
9. 楼主对于线程的控制器了解还不够,比如线程中isActive变量的使用,应该使用transient关键字或者使用AtomicBoolean对象。
楼主这些问题非常容易验证,同时跑几十个线程,然后往里面任意添加任务,运行一小段时间就会出现的。
如果楼主有兴趣,可以看看我写的任务、工作组件,其中有自己实现线程池的例子。
http://phenix.iteye.com/admin/blogs/457200
1. 线程池OjadbThreadPool 本身以public方式暴露addWorker的方法,那么外部调用是否可以执行这个方法呢?这样的话,提供进来的线程对象就有问题了。(其实楼主的这个方法只是想让线程本身进行回调的,以通知池自己可用)
2. 虽然你有一个releaseThreads方法作为线程池大小控制,但是因为它不是线程安全,所以得到的size是不准确控制的。比如讲,如果多个线程同时进入到releaseThreads方法,如果刚好是临界值+1,那么一个线程做完大小的判断,还来不及kill掉多余线程时,那么其余线程都会依次通过大小的判断,从而导致线程池会kill掉多个线程,导致最后线程池大小小于临界值,就有可能出现问题。
3. 同样的问题也出现在getTask方法中,如果此时只有一个任务了,一个线程通过size=1的判断,还没来得及执行remove方法时,另外一个线程也通过了该判断,从而导致下个线程在执行remove方法时会抛出异常。而这种情况没有被有效处理,那么线程调用这个方法时,也就出现了线程泄漏,线程死掉。
4. 说到上述问题,就发现楼主对remove方法理解不够,remove如果失败,是会抛出异常的。如果来的任务多个你的线程池,结果就是线程池的38行,53行代码出现异常。因为38行异常,下面的代码在这种情况下,其实都是废的。
5. 在execute方法中每次都将多余的线程kill掉,只保持最小临界值,其实是低效率的。其实理解楼主的minum这个临界值其实是最小可用线程数,但如果并发量大于这个数时,其实这个线程池会有频繁地创建线程,关闭线程,有违线程池的初衷。
6. 在线程中这句if (result.length() > 0)判断危险啊,你怎么知道task返回什么,容易NullpointerException,又会有异常泄漏啊。
7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下)
8. 在OjadbThread 中的work方法中,是不需要lock和unlock。简单说明下,如果condition.signal()唤醒了当前OjadbThread 的主线程,那么它必然就会获得lock(它是在lock状态等待),这个时候调用work方法的其他线程是没法unlock的。其实啊,condition.signal()是线程安全方法,不用担心同步问题的。而另外这种处理带来的第二个问题就是通知泄漏。
9. 楼主对于线程的控制器了解还不够,比如线程中isActive变量的使用,应该使用transient关键字或者使用AtomicBoolean对象。
楼主这些问题非常容易验证,同时跑几十个线程,然后往里面任意添加任务,运行一小段时间就会出现的。
如果楼主有兴趣,可以看看我写的任务、工作组件,其中有自己实现线程池的例子。
http://phenix.iteye.com/admin/blogs/457200
2 楼
prowl
2009-12-21
感觉楼主的代码更像是一个任务队列,还不太是线程池。另外漏洞有点多,举几个例子。
1.OjadbThreadPool首先应该单例。
2.这段逻辑是不是不太对啊,看了半天没明白。
如果能从ojadbThreads里取到一个线程(!=null),那就work?可是work的的代码是lock+signal
还有这段代码结束之后为什么要release?
3.OjadbThreadPool里kill掉的线程,isActive=false,然后lock,就完了?
1.OjadbThreadPool首先应该单例。
2.这段逻辑是不是不太对啊,看了半天没明白。
如果能从ojadbThreads里取到一个线程(!=null),那就work?可是work的的代码是lock+signal
还有这段代码结束之后为什么要release?
OjadbThread worker = ojadbThreads.remove(); if (worker != null) { worker.work(); } else if (ojadbThreads.size() < maximum) { OjadbThread newWorker = new OjadbThread(this, "worker" + ojadbThreads.size()); newWorker.start(); } else { } releaseThreads();
3.OjadbThreadPool里kill掉的线程,isActive=false,然后lock,就完了?
1 楼
nishizhutoua
2009-12-21
OjadbTask 干嘛要叫成"用户线程"啊... 明明是任务的...
发表评论
-
[童虎退壳系列]死锁演示
2011-10-13 01:53 946package creative.fire.multit ... -
[童虎退壳系列]方法加锁测试
2011-10-13 01:34 1161package creative.fire.multit ... -
线程池 浅析
2010-12-03 15:26 1053本文是对java线程池的粗浅分析,视野局限于线程池的基本实现, ... -
并发集合 CopyOnWrite
2010-11-28 01:59 1082CopyOnWriteArrayList 内部结构比较简 ... -
并发集合 Queue
2010-11-28 01:54 1257ArrayBlockingQueue 内部结构 ... -
并发集合 ConcurrentHash
2010-11-21 21:09 1259Synchronized Collections -- ... -
同步器
2010-11-17 02:08 1052Latch 门闩 CountDownLatch 的一个有用特 ... -
LRU缓存的实现之性能测试
2009-12-08 15:19 1407针对上一篇文章,这里给出性能测试:10000条随机数据,50个 ... -
LRU缓存的实现
2009-12-08 11:42 2429LinkedHashMap是一个现成的LRU实现。但其缺乏并发 ... -
同步synchronized方法和代码块
2009-03-15 22:40 7714同步synchronized方法和代码块 打个比方:一个ob ...
相关推荐
Windows下一个比较完美的线程池实现和示例 本线程池提供了如下功能: 1.能根据任务个数和当前线程的多少在最小/最大线程个数之间自动调整(Vista后的系统有 SetThreadpoolThreadMaximum 等函数有类似功能); 2.能方便...
Django异步任务线程池实现原理主要涉及以下几个核心知识点: 1. 异步任务执行原理: 当Django应用在处理耗时的任务时,通常会阻塞主线程,导致用户在等待处理结果时无法进行其他操作。为了解决这个问题,Django采用...
在处理大量数据导入数据库的场景中,使用...通过以上步骤,可以利用EasyExcel和线程池实现百万级数据从Excel导入到数据库的功能。这种方式可以提高数据处理的效率,减少内存占用,并且能够更好地利用多核CPU的优势。
总结来说,这个简单的C++线程池实现是一个学习多线程和并发编程的好起点。它通过封装线程管理和任务调度,为开发者提供了一种更高效、更可控的方式来处理并发任务。在实际应用中,线程池可以被扩展以适应更复杂的...
总结起来,Linux C系统编程中使用线程池实现类似`cp`命令的功能,是一个涉及多线程编程、任务调度和同步控制的综合实践。通过这样的实现,我们可以提高文件复制操作的并发性和效率,同时降低系统资源的消耗。在深入...
### C++线程池实现原理分析 #### 一、引言 线程池是一种软件设计模式,用于管理和控制大量线程的创建与销毁,尤其是在处理大量短期任务时,它可以显著提高程序性能。线程池的核心思想是预先创建一组线程,并让它们...
在"java 线程池实现多并发队列后进先出"这个主题中,我们关注的是线程池如何利用特定类型的队列来实现后进先出(LIFO,Last-In-First-Out)的行为。通常,线程池默认使用先进先出(FIFO,First-In-First-Out)的队列...
5. **异步下载**:通过线程池实现的下载是异步的,这意味着主线程不会被阻塞,用户界面仍然可以保持流畅。这是在Android中进行网络操作时必须遵循的原则,因为网络操作在主线程上执行会导致ANR(应用无响应)错误。 ...
在本篇文章中,我们将深入探讨Python中的线程池实现,并参考提供的`ThreadPool.py`源码进行分析。 首先,Python标准库提供了一个名为`concurrent.futures`的模块,其中包含`ThreadPoolExecutor`类,它是实现线程池...
以下将详细讲解基于Win32的C++线程池实现的关键概念和技术。 首先,我们需要理解Win32 API中的线程池接口。Windows提供了CreateThreadpool、SetThreadpoolCallbackPool、QueueUserWorkItem等函数来创建和管理线程池...
前段时间发布了《Windows下一个比较完美的线程池实现和示例》(http://download.csdn.net/detail/fishjam/5106672),根据下载量和评论来看,对大家还比较有用。 现在发布一个利用该线程池实现的Http上传下载实现,...
不过,上述代码展示的是一个自定义的线程池实现,它可能没有使用Java标准库中的`ExecutorService`。 这个自定义线程池的实现包括以下几个关键组件: 1. **线程池参数**: - `reserve`:保留线程数,这些线程不...
"基于UNIX C语言的一种线程池实现" 一、标题解释 "基于UNIX C语言的一种线程池实现",这篇文章介绍了在UNIX操作系统下使用C语言实现的一个线程池方案。线程池是一种常用的设计模式,能够提高系统的性能和可扩展性。...
下面是一个简化的线程池实现的伪代码: ```cpp class ThreadPool { public: void enqueue(std::function()> task); // 提交任务 ~ThreadPool(); // 关闭线程池 private: std::vector<std::thread> workers; // ...
在"基于win32的C++线程池实现(改进版)"中,开发者已经针对上一版本的问题进行了修复,如崩溃和内存泄漏。这些问题是多线程编程中常见的挑战,崩溃可能是由于线程间的同步问题或者资源管理不当,而内存泄漏则可能导致...
本项目涉及的核心知识点是“Linux下的socket线程池实现”,这涉及到多个技术层面,包括socket编程、多线程技术和线程池的管理。 首先,让我们了解什么是Socket。Socket是网络通信的基本接口,它允许应用程序通过...
Tomcat提供了两种线程池实现,一种是基于Apache Portable Runtime (APR)的Pool技术,另一种是纯Java实现的ThreadPool。本文主要探讨后者,即Java实现的线程池。 Java实现的线程池位于`tomcat-util.jar`中,初始化时...
基于现代C++的高效线程池实现源码+项目文档说明(兼容Linux、macOS 和 Windows系统).zip 【说明】 【1】项目代码完整且功能都验证ok,确保稳定可靠运行后才上传。欢迎下载使用!在使用过程中,如有问题或建议,请及时...
线程池实现
本篇文章将深入讲解如何在Socket服务端实现线程池,以及其在实际项目中的应用。 首先,我们来看`SocketServer.java`。这个文件通常包含了服务器端的主逻辑,其中包括了服务器的启动、监听客户端连接和创建线程池来...