`
muscle-liu
  • 浏览: 229663 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

对 ThreadPool 的理解

阅读更多
虽然从 Java5 开始 JDK 里的 java.util.concurrent 包内建了线程池,你不必自己实现线程池,但理解线程的实现原理对 Java 编程很有用。
当你想把运行在你的程序中的线程控制在一定的数量之内,线程池就显得非常有用。

引用

原理:

用一个阻塞队列里(Blocking Queue)来存储线程池的所有空闲线程。不用为每个任务都创建一个新的线程,可以把任务当参数传到线程池里。只要当线程池有空闲的线程时,这个任务就会被执行。


BlockQueue.java
public class BlockingQueue {

    private List queue = new LinkedList();
    private int limit = 10;

    public BlockingQueue(int limit) {
        this.limit = limit;
    }

    public synchronized Object enqueue(Object item)
            throws InterruptedException {
        while (this.queue.size() == this.limit) {
            System.out.println("Arrive to the pool's max size:"+this.limit);
            wait();
        }
        if (this.queue.size() == 0) {
            notifyAll();
        }

        System.out.println("Add a new thread to the pool.Pool's size before 

adding:"+this.queue.size());
        this.queue.add(item);
        
        return this.queue.get(this.queue.size()-1);
    }

    public synchronized Object dequeue()
            throws InterruptedException {
        while (this.queue.size() == 0) {
            System.out.println("Pool's size is 0.");
            wait();
        }
        if (this.queue.size() == this.limit) {
            notifyAll();
        }

        System.out.println("Remove a thread from the pool.Pool's size before 

removing:"+(this.queue.size()-1));
        return this.queue.remove(0);
    }
    
    public synchronized int size(){
        return this.queue.size();
    }
}

当阻塞队列没达到界限值(最大值与最小值)时,插入与出列正常,没限制。阻塞队列达到最大值时,再想插入一个线程,队列就会停止操作,让插入在等待,同时唤醒出列操作。相反,当全部出列后,再想出列时,出列操作就会停止,在等待,同时唤醒插入操作。


线程池通常用在多线程服务端的,下边是实现线程池的其他类,其中:
/**
 * Implements the thread pool
 * @author winxp
 */
public class ThreadPool {

    private BlockingQueue taskQueue = null;
    private List<TaskThread> threads = new ArrayList<TaskThread>();
    private boolean isStopped = false;

    /**
     * Create a new ThreadPool
     * @param noOfThreads Initiate num of threads in the thread pool线程池的初始化线程数
     * @param maxNoOfTasks Max num of threads in the thread pool线程池的最大线程数
     */
    public ThreadPool(int noOfThreads, int maxNoOfTasks) {
        taskQueue = new BlockingQueue(maxNoOfTasks);

        for (int i = 0; i < noOfThreads; i++) {
            threads.add(new TaskThread(taskQueue));
        }
        for (TaskThread thread : threads) {
            thread.start();
        }
    }
    
    /**
     *
     * @param task
     */
    public synchronized void execute(Runnable task){
        if(this.isStopped)throw 
                new IllegalStateException("ThreadPool is stopped");
        try {
            this.taskQueue.enqueue(task);
        } catch (InterruptedException ex) {
           ex.printStackTrace();
        }
    }

    /**
     *
     */
    public synchronized void stop() {
        this.isStopped = true;
        for (TaskThread thread : threads) {
            thread.interrupt();
        }
    }
}

/**
 * Simulate that dequeue from the BlockingQueue, and execute the dequeuing task
 */
public class TaskThread extends Thread {

    private BlockingQueue taskQueue = null;
    private boolean isStopped = false;

    public TaskThread(BlockingQueue queue) {
        taskQueue = queue;
    }

    @Override
    public void run() {
        while (!isStopped()) {
            try {
                Runnable runnable = (Runnable) taskQueue.dequeue();
                runnable.run();
            } catch (Exception e) {
                //log or otherwise report exception,
                //but keep pool thread alive.
            }
        }
    }
    
    @Override
    public synchronized void interrupt() {
        isStopped = true;
        super.interrupt();//break pool thread out of dequeue() call.
    }

    public synchronized boolean isStopped() {
        return isStopped;
    }
}



测试类:
/**
 * Testing class
 */
class TestThread extends Thread{
    private int num;
    public TestThread(int num){
        this.num = num;
    }
    
    @Override
    public void run(){
        System.out.println("TestThread index: "+num);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ex) {
        }
    }

    /**
     *
     * @param args
     */
    public static void main(String[] args){
        ThreadPool tp = new ThreadPool(3,10);
        for(int i = 0;i<15;i++){
            TestThread test = new TestThread(i);
            tp.execute(test);
        }
    }
}
4
0
分享到:
评论
4 楼 天下无贼 2012-04-03  
Sorry,看错了。是run方法()
3 楼 天下无贼 2012-04-03  
ThreadPool 这个类的68,69行似乎有问题。Ruunable接口应该没有start()方法吧?
2 楼 fhyfufangyu 2011-05-23  
truth315 写道

 while (this.queue.size() == this.limit) {   
           System.out.println("Arrive to the pool's max size:"+this.limit);   
            wait();   
       }


为什么不能把WHILE改为IF,然后把唤醒的notifyAll()改为notify();


为什么用while可以参考jdk 文档中“虚假唤醒”的描述:

在没有被通知、中断或超时的情况下,线程还可以唤醒一个所谓的虚假唤醒 (spurious wakeup)。虽然这种情况在实践中很少发生,但是应用程序必须通过以下方式防止其发生,即对应该导致该线程被提醒的条件进行测试,如果不满足该条件,则继续等待。换句话说,等待应总是发生在循环中,如下面的示例:

synchronized (obj) {
while (<condition does not hold>)
obj.wait(timeout);
... // Perform action appropriate to condition
     }

1 楼 truth315 2010-05-12  

 while (this.queue.size() == this.limit) {   
           System.out.println("Arrive to the pool's max size:"+this.limit);   
            wait();   
       }


为什么不能把WHILE改为IF,然后把唤醒的notifyAll()改为notify();

相关推荐

    threadpool

    4. **线程同步与通信**:为了保证线程安全,线程池的实现通常会用到互斥锁、条件变量等同步原语,确保对线程池数据结构的访问是线程安全的。 5. **线程池的扩展与收缩**:线程池可以根据系统负载动态调整其大小。...

    ThreadPool

    总的来说,"ThreadPool"是一个关于多线程编程和任务调度的重要主题,尤其在Windows环境下,理解和掌握线程池的使用能帮助开发者编写出更加高效、稳定的并发程序。通过深入学习和实践,可以利用C++或者其他语言实现...

    Boost threadpool优先级实例

    通过阅读和分析这些代码,开发者可以更好地理解如何在实际项目中应用Boost.Threadpool来处理任务,特别是优先级任务。 总结而言,Boost.Threadpool为C++开发者提供了一种高效、灵活的线程管理工具,允许他们在多...

    threadpool.tar.gz

    在C++编程中,线程池是一种用于管理并发任务的技术,它通过预先创建一组线程来处理任务,而不是为每个任务创建一个新的线程。在"threadpool.tar.gz...通过理解和应用这些概念,开发者可以创建高效、可扩展的并发程序。

    VC++ThreadPool程序

    本篇文章将围绕“VC++ ThreadPool程序”进行详细讲解,旨在帮助读者理解和掌握如何在VC++环境中构建和使用线程池。 线程池是一种线程使用模式,它预先创建一组线程,等待任务到来时分配给这些线程执行,而不是每次...

    Tomat研究之ThreadPool

    总之,Tomcat的线程池机制是其高性能和高并发能力的关键所在,通过对`ThreadPool`、`ControlRunnable`、`MonitorRunnable`等类的深入解析,我们不仅能够理解其内部的工作原理,还能学习到如何在自己的项目中设计和...

    ThreadPool-master.zip

    线程池是一种多线程处理形式,用于管理并发任务的...通过对ThreadPool-master项目的学习,你可以深入理解线程池的工作机制,以及如何在C++中高效地实现和使用线程池。这将有助于提升你在并发编程和系统优化方面的技能。

    python threadpool

    `concurrent.futures`支持异步执行、取消任务、超时控制等功能,而且它的API设计更加面向对象,易于理解和使用。 ### 五、实际应用示例 在实际应用中,`threadpool`常用于处理大量的I/O密集型任务,如网络请求、...

    threadpool_informationmcb_threadpool_c++threadpool_

    在C++编程中,线程池是一种用于管理并发任务的机制,它允许高效地调度和执行多个线程,而不是为每个任务创建一个新的线程。...通过理解和掌握这些概念,开发者可以构建出高效且可靠的线程池解决方案。

    ThreadPool.zip

    《深入理解多线程编程:ThreadPool.zip实例解析》 在计算机科学中,多线程编程是一种重要的技术,它允许多个任务在同一时间执行,从而提高了程序的效率和响应性。在Java等编程语言中,ThreadPool是实现多线程处理的...

    C# ThreadPool 多线程 代码示例

    总的来说,理解和熟练运用`ThreadPool`是C#多线程编程的关键,它可以提高程序性能,合理利用系统资源。通过`QueueUserWorkItem`方法,我们可以轻松地将任务提交给线程池,让系统自动进行调度和执行。在实际开发中,...

    C++11 线程池 ThreadPool

    C++11是C++语言的一个重要版本更新,它引入了大量的新特性,其中包括对多线程的支持。线程池(ThreadPool)是一种管理线程资源的有效方式,它在现代并发编程中扮演着至关重要的角色。线程池允许程序预先创建一组线程...

    QT_ThreadPool.rar

    QT_ThreadPool是一个基于QT5框架实现的线程池项目,旨在提供一种高效、灵活的多线程处理方式。线程池是一种线程管理机制,它预先创建一组线程,待有任务需要执行时,从线程池中分配线程来执行任务,而不是每次任务...

    C# Thread、ThreadPool、Task测试

    在C#编程中,线程(Thread)、线程池(ThreadPool)和任务(Task)是并行处理和异步操作的重要组成部分。理解它们的工作原理和使用方法对于优化应用程序的性能至关重要。下面将详细阐述这三个概念及其相关知识点。 ...

    Tomat组件研究之ThreadPool

    在深入理解Tomcat ThreadPool之前,需要对一些基础概念有所了解。 首先,ThreadPool组件的类图和整体结构涉及到了几个核心类和接口,包括ThreadPool、ThreadPoolListener、ThreadWithAttributes、ControlRunnable、...

    threadpool_src.zip

    标题 "threadpool_src.zip" 暗示了这是一个关于线程池实现的源代码压缩包。线程池是一种多线程编程中的管理机制,它允许高效地管理和调度多个并发...同时,对于深入理解操作系统内核的线程管理和调度机制也会有所助益。

    线程池threadpool_src

    通过分析和理解“线程池threadpool_src”的源代码,开发者可以学习如何自定义线程池,如何优化任务调度策略,以及如何在多线程环境下保证程序的稳定性和效率。同时,了解线程池的工作原理对于提升软件的并发处理能力...

Global site tag (gtag.js) - Google Analytics