论坛首页 Java企业应用论坛

线程池的实现

浏览 14016 次
精华帖 (1) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (1)
作者 正文
   发表时间:2009-12-25  
感谢各位的关注,我在下一楼贴出修改后的代码。针对凤舞凰扬的回复,我希望与你探讨下。
引用
2. 虽然你有一个releaseThreads方法作为线程池大小控制,但是因为它不是线程安全,所以得到的size是不准确控制的。比如讲,如果多个线程同时进入到releaseThreads方法,如果刚好是临界值+1,那么一个线程做完大小的判断,还来不及kill掉多余线程时,那么其余线程都会依次通过大小的判断,从而导致线程池会kill掉多个线程,导致最后线程池大小小于临界值,就有可能出现问题。
3. 同样的问题也出现在getTask方法中,如果此时只有一个任务了,一个线程通过size=1的判断,还没来得及执行remove方法时,另外一个线程也通过了该判断,从而导致下个线程在执行remove方法时会抛出异常。而这种情况没有被有效处理,那么线程调用这个方法时,也就出现了线程泄漏,线程死掉。 

 

我在修改方案中对两个方法加上了同步。我有一个疑义, 

private LinkedBlockingQueue<OjadbThread> ojadbThreads;
private BlockingQueue<OjadbTask> ojadbTasks

这两个队列本身是线程同步的,一定要再同步方法才能保证size取值是安全的么?

 

引用
5. 在execute方法中每次都将多余的线程kill掉,只保持最小临界值,其实是低效率的。其实理解楼主的minum这个临界值其实是最小可用线程数,但如果并发量大于这个数时,其实这个线程池会有频繁地创建线程,关闭线程,有违线程池的初衷。

这么处理是一种线程实现策略,策略可以定义多种。我将在有空时研究下JDK的Fixed线程的实现策略。谢谢。

 

引用
6. 在线程中这句if (result.length() > 0)判断危险啊,你怎么知道task返回什么,容易NullpointerException,又会有异常泄漏啊。

 result不会返回null,这里我默认返回值是String了,应该是不严谨的。我增加了try的处理。

 

引用
7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下)  

这点我在设计时就想到了,但我很无奈。如果你有好的办法,希望不吝赐教!谢谢。

0 请登录后投票
   发表时间:2009-12-25  
package ojadb.core.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class OjadbThreadPool {

    private int minimum;
    private int maximum;
    private LinkedBlockingQueue<OjadbThread> ojadbThreads;
    private BlockingQueue<OjadbTask> ojadbTasks;

    public OjadbThreadPool(int min, int max) throws OjadbException {
        if (0 < min && min < max) {
            minimum = min;
            maximum = max;
            ojadbThreads = new LinkedBlockingQueue<OjadbThread>();
            ojadbTasks = new LinkedBlockingQueue<OjadbTask>();
            for (int i = 0; i < minimum; i++) {
                OjadbThread worker = new OjadbThread(this, "worker" + i);
                worker.start();
            }
        } else {
            throw new OjadbException("无法建立线程池: 最小值必须大于零,小于最大值");
        }
    }

    void addWorker(OjadbThread ojadbThreadWorker) {
        ojadbThreads.offer(ojadbThreadWorker);
    }

    public void excute(OjadbTask thread) {
        // 生产任务
        ojadbTasks.offer(thread);

        OjadbThread worker = ojadbThreads.remove();
        if (worker != null) {
            worker.work();
        } else if (ojadbThreads.size() < maximum) {
            OjadbThread newWorker = new OjadbThread(this, "worker" + ojadbThreads.size());
            newWorker.start();
        } else {
        }
        releaseThreads();
    }

    private synchronized void releaseThreads() {
        while (ojadbThreads.size() > minimum) {
            OjadbThread worker = ojadbThreads.remove();

            if (worker != null) {
                System.out.println(worker + "is killed");
                worker.kill();
            }
        }
    }

    public synchronized OjadbTask getTask() {
        if (ojadbTasks.size() == 0) {
            return null;
        }
        // 消费任务
        return ojadbTasks.remove();
    }
}

 

package ojadb.core.thread;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class OjadbThread extends Thread {

    private AtomicBoolean isActived;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private OjadbThreadPool pool;

    public OjadbThread(OjadbThreadPool pool, String threadName) {
        super(threadName);
        this.pool = pool;
        isActived = new AtomicBoolean(true);
    }

    @Override
    public synchronized void start() {
        super.start();
    }

    @Override
    public void run() {
        try {
            lock.lock();

            while (isActived.get()) {
                OjadbTask task = pool.getTask();
                if (task != null) {
                    try {
                        String result = (String) task.excute();
                        if (result.length() > 0) {
                            pool.addWorker(this);
                            // 懒工人策略 -- 没人招呼工作,绝不睡醒
                            condition.await();
                        }
                    } catch (Exception e) {
                    }
                } else {
                    pool.addWorker(this);
                    condition.await();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void work() {
        condition.signal();
    }

    public void kill() {
        isActived.set(false);
        work();
    }

    @Override
    public String toString() {
        return getName();
    }
}

 

 

package ojadb.core.thread;

public class OjadbTask {
    private String threadName;

    public OjadbTask(String threadName) {
        this.threadName = threadName;
    }

    public Object excute() {
        System.out.println("excuting " + threadName);
        return "success";
    }

    public String getName() {
        return threadName;
    }

    public String toString() {
        return threadName;
    }
}

 

0 请登录后投票
   发表时间:2009-12-26  
引用
我在修改方案中对两个方法加上了同步。我有一个疑义,

private LinkedBlockingQueue<OjadbThread> ojadbThreads;
private BlockingQueue<OjadbTask> ojadbTasks

这两个队列本身是线程同步的,一定要再同步方法才能保证size取值是安全的么?

   对象本身同步并不代表对象与外部多个操作是同步的。其实啊,这是很多刚开始接触多线程编程的人的误解的,如果想了解更多,推荐你看一本书,电子工业出版社的《java并发编程实践》
引用
这么处理是一种线程实现策略,策略可以定义多种。我将在有空时研究下JDK的Fixed线程的实现策略。谢谢
倒不是固定数量的线程池是好的。作为一个池来说,伸缩性是非常重要的,但是另外一个关键就是伸缩性的效率的效果更加是应该考虑的,否则会适得其反。其实你看看我写的那个关于task/worker的帖子吧,其中worker就是一个工作线程池,它是如何根据request的情况来进行伸缩性的调整的。
引用
result不会返回null,这里我默认返回值是String了,应该是不严谨的。我增加了try的处理

   所以我用的是危险这个词。你如果做的是一个中间组件供大家使用。你就不会清楚知道调用者任务的具体实现是怎样。
引用
这点我在设计时就想到了,但我很无奈。如果你有好的办法,希望不吝赐教!谢谢。

   其实很容易,引入管理线程。这个管理线程可以管理一个控制变量,从而实现对其他子线程的控制。同时管理线程在所有子线程结束后再进行一些相关处理(比如将文件移入到指定文件夹,确保流关闭等等)。
0 请登录后投票
   发表时间:2010-01-07  

 

 

7. 因为线程执行是异步的,所以在线程还没来得及完成任务时,线程池已经做完对于线程池大小的判断,这个时候,线程在执行完会被返回池中,而且得不到有效释放(只能依赖于下一次任务的执行),这样的话,在大并发的情况下,线程池会出现有一段时间容量过大的危险。(尤其是任务提交的频率在非均匀的情况下)  

 


这句话什么意思,偶程序没有看明白,能否解释的清楚一点儿?

0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics