论坛首页 Java企业应用论坛

线程池的实现

浏览 14017 次
精华帖 (1) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (1)
作者 正文
   发表时间:2009-12-19   最后修改:2010-11-16
OO

自己实现了一个简单的线程池。希望回复者讨论技术问题,不要说都已经有了,你还重复造轮子。

我的目的是讨论线程池的实现,而不是提供使用。

 

测试代码:

package threadpool;

import ojadb.core.thread.OjadbThreadPool;
import ojadb.core.thread.task.OjadbTask;
import ojadb.exception.OjadbException;

public class BasicTest {
	public static void main(String[] ojadbs) throws OjadbException {
		OjadbThreadPool pool = new OjadbThreadPool(5, 10);
		OjadbTask thread = new OjadbTask("test thread");
		pool.excute(thread);
		System.exit(0);
	}
}

 

线程池:

package ojadb.core.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import ojadb.core.thread.task.OjadbTask;
import ojadb.core.thread.thread.OjadbThread;
import ojadb.exception.OjadbException;

public class OjadbThreadPool {
	private int minimum;
	private int maximum;
	private LinkedBlockingQueue<OjadbThread> ojadbThreads;
	private BlockingQueue<OjadbTask> ojadbTasks;

	public OjadbThreadPool(int min, int max) throws OjadbException {
		if (0 < min && min < max) {
			minimum = min;
			maximum = max;
			ojadbThreads = new LinkedBlockingQueue<OjadbThread>();
			ojadbTasks = new LinkedBlockingQueue<OjadbTask>();
			for (int i = 0; i < minimum; i++) {
				OjadbThread worker = new OjadbThread(this, "worker" + i);
				worker.start();
			}
		} else {
			throw new OjadbException("无法建立线程池: 最小值必须大于零,小于最大值");
		}
	}

	public void addWorker(OjadbThread ojadbThreadWorker) {
		ojadbThreads.offer(ojadbThreadWorker);
	}

	public void excute(OjadbTask thread) {
		// 生产任务
		ojadbTasks.offer(thread);

		OjadbThread worker = ojadbThreads.remove();
		if (worker != null) {
			worker.work();
		} else if (ojadbThreads.size() < maximum) {
			OjadbThread newWorker = new OjadbThread(this, "worker" + ojadbThreads.size());
			newWorker.start();
		} else {

		}
		releaseThreads();
	}

	private void releaseThreads() {
		while (ojadbThreads.size() > minimum) {
			OjadbThread worker = ojadbThreads.remove();
			System.out.println(worker + "is killed");
			worker.kill();
		}
	}

	public OjadbTask getTask() {
		if (ojadbTasks.size() == 0)
			return null;
		// 消费任务
		return ojadbTasks.remove();
	}
}

 

内部线程:

package ojadb.core.thread.thread;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import ojadb.core.thread.OjadbThreadPool;
import ojadb.core.thread.task.OjadbTask;

public class OjadbThread extends Thread {
	private boolean isActived;
	private ReentrantLock lock = new ReentrantLock();
	private Condition condition = lock.newCondition();
	private OjadbThreadPool pool;

	public OjadbThread(OjadbThreadPool pool, String threadName) {
		super(threadName);
		this.pool = pool;
		isActived = true;
	}

	@Override
	public synchronized void start() {
		super.start();
	}

	@Override
	public void run() {
		try {
			lock.lock();

			while (isActived) {
				OjadbTask task = pool.getTask();
				if (task != null) {
					String result = (String) task.excute();

					if (result.length() > 0) {
						pool.addWorker(this);
						// 懒工人策略 -- 没人招呼工作,绝不睡醒
						condition.await();
					}
				} else {
					pool.addWorker(this);
					condition.await();
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}

	public void work() {
		try {
			lock.lock();
			condition.signal();
		} finally {
			lock.unlock();
		}
	}

	public void kill() {
		isActived = false;
		work();
	}

	@Override
	public String toString() {
		return getName();
	}
}

 

用户线程:

package ojadb.core.thread.task;

public class OjadbTask {
	private String threadName;

	public OjadbTask(String threadName) {
		this.threadName = threadName;
	}

	public Object excute() {
		System.out.println("excuting " + threadName);
		return "success";
	}

	public String getName() {
		return threadName;
	}

	public String toString() {
		return threadName;
	}
}

  

   发表时间:2009-12-21  
OjadbTask 干嘛要叫成"用户线程"啊... 明明是任务的...
0 请登录后投票
   发表时间:2009-12-21  
感觉楼主的代码更像是一个任务队列,还不太是线程池。另外漏洞有点多,举几个例子。

1.OjadbThreadPool首先应该单例。

2.这段逻辑是不是不太对啊,看了半天没明白。

如果能从ojadbThreads里取到一个线程(!=null),那就work?可是work的的代码是lock+signal

还有这段代码结束之后为什么要release?

 OjadbThread worker = ojadbThreads.remove();  
         if (worker != null) {  
             worker.work();  
         } else if (ojadbThreads.size() < maximum) {  
             OjadbThread newWorker = new OjadbThread(this, "worker" + ojadbThreads.size());  
             newWorker.start();  
         }  else {  
   
         }  
         releaseThreads();


3.OjadbThreadPool里kill掉的线程,isActive=false,然后lock,就完了?

0 请登录后投票
   发表时间:2009-12-21   最后修改:2009-12-21
楼主学着写,精神层面值得鼓励,不过技术层面,还是有相当多比较严重的问题的,其中最主要的是对多线程并发中同步控制以及池本身的概念认识不够。
1. 线程池OjadbThreadPool 本身以public方式暴露addWorker的方法,那么外部调用是否可以执行这个方法呢?这样的话,提供进来的线程对象就有问题了。(其实楼主的这个方法只是想让线程本身进行回调的,以通知池自己可用)
2. 虽然你有一个releaseThreads方法作为线程池大小控制,但是因为它不是线程安全,所以得到的size是不准确控制的。比如讲,如果多个线程同时进入到releaseThreads方法,如果刚好是临界值+1,那么一个线程做完大小的判断,还来不及kill掉多余线程时,那么其余线程都会依次通过大小的判断,从而导致线程池会kill掉多个线程,导致最后线程池大小小于临界值,就有可能出现问题。
3. 同样的问题也出现在getTask方法中,如果此时只有一个任务了,一个线程通过size=1的判断,还没来得及执行remove方法时,另外一个线程也通过了该判断,从而导致下个线程在执行remove方法时会抛出异常。而这种情况没有被有效处理,那么线程调用这个方法时,也就出现了线程泄漏,线程死掉。
4. 说到上述问题,就发现楼主对remove方法理解不够,remove如果失败,是会抛出异常的。如果来的任务多个你的线程池,结果就是线程池的38行,53行代码出现异常。因为38行异常,下面的代码在这种情况下,其实都是废的。
5. 在execute方法中每次都将多余的线程kill掉,只保持最小临界值,其实是低效率的。其实理解楼主的minum这个临界值其实是最小可用线程数,但如果并发量大于这个数时,其实这个线程池会有频繁地创建线程,关闭线程,有违线程池的初衷。
6. 在线程中这句if (result.length() > 0)判断危险啊,你怎么知道task返回什么,容易NullpointerException,又会有异常泄漏啊。
7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下)
8. 在OjadbThread 中的work方法中,是不需要lock和unlock。简单说明下,如果condition.signal()唤醒了当前OjadbThread 的主线程,那么它必然就会获得lock(它是在lock状态等待),这个时候调用work方法的其他线程是没法unlock的。其实啊,condition.signal()是线程安全方法,不用担心同步问题的。而另外这种处理带来的第二个问题就是通知泄漏。
9. 楼主对于线程的控制器了解还不够,比如线程中isActive变量的使用,应该使用transient关键字或者使用AtomicBoolean对象。
    楼主这些问题非常容易验证,同时跑几十个线程,然后往里面任意添加任务,运行一小段时间就会出现的。
    如果楼主有兴趣,可以看看我写的任务、工作组件,其中有自己实现线程池的例子。
    http://phenix.iteye.com/admin/blogs/457200
1 请登录后投票
   发表时间:2009-12-21   最后修改:2009-12-22
   另外,顺带回答一下楼上(3楼)的几个问题。
引用
1.OjadbThreadPool首先应该单例。

   这个没关系的,不是问题。单例与否应该由客户控制,而不是池本身。客观是允许多个池存在的。
引用

2.这段逻辑是不是不太对啊,看了半天没明白。
如果能从ojadbThreads里取到一个线程(!=null),那就work?可是work的的代码是lock+signal

  这段逻辑倒是没太大问题,主要是起一个通知的作用。
引用
3.OjadbThreadPool里kill掉的线程,isActive=false,然后lock,就完了?

   这段怀疑得对,楼主应该加上volatile关键字或者使用AtomicBoolean对象。
0 请登录后投票
   发表时间:2009-12-21  
因为专所以专(专业)
0 请登录后投票
   发表时间:2009-12-21  
lz学习态度挺好的 连接池正好能用上 Queue 也不错
0 请登录后投票
   发表时间:2009-12-21  
我也正想学习一下线程池原理和实现,仔细看了下楼主的实现
OjadbThread 内部是不需要lock的,每个线程内部都new了一个lock,而这个lock只有该线程使用
各个线程自己的线程栈是安全的,一个线程是访问不到另一个线程的局部变量的,需要同步的都是多个线程都能同时访问到的外部资源,OjadbThread 中run()方法和work()是我觉得都不需要进行 lock,楼主主要是用了condition的await()和signal()方法来使全线程进入等待状态和唤醒该线程,不存在竞争
0 请登录后投票
   发表时间:2009-12-22  
看了楼主的代码,也看了几位的回复,知道了更多线程池的知识!
0 请登录后投票
   发表时间:2009-12-22  
  回答三楼,应该是volatile, 不是transient,呵呵,敲错了,谢谢提醒,真是不好意思,差点误导了。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics