`
san_yun
  • 浏览: 2639595 次
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

A generic programming thread pool

 
阅读更多
## {{{ http://code.activestate.com/recipes/203871/ (r3)
import threading
from time import sleep

# Ensure booleans exist (not needed for Python 2.2.1 or higher)
try:
    True
except NameError:
    False = 0
    True = not False

class ThreadPool:

    """Flexible thread pool class.  Creates a pool of threads, then
    accepts tasks that will be dispatched to the next available
    thread."""
    
    def __init__(self, numThreads):

        """Initialize the thread pool with numThreads workers."""
        
        self.__threads = []
        self.__resizeLock = threading.Condition(threading.Lock())
        self.__taskLock = threading.Condition(threading.Lock())
        self.__tasks = []
        self.__isJoining = False
        self.setThreadCount(numThreads)

    def setThreadCount(self, newNumThreads):

        """ External method to set the current pool size.  Acquires
        the resizing lock, then calls the internal version to do real
        work."""
        
        # Can't change the thread count if we're shutting down the pool!
        if self.__isJoining:
            return False
        
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(newNumThreads)
        finally:
            self.__resizeLock.release()
        return True

    def __setThreadCountNolock(self, newNumThreads):
        
        """Set the current pool size, spawning or terminating threads
        if necessary.  Internal use only; assumes the resizing lock is
        held."""
        
        # If we need to grow the pool, do so
        while newNumThreads > len(self.__threads):
            newThread = ThreadPoolThread(self)
            self.__threads.append(newThread)
            newThread.start()
        # If we need to shrink the pool, do so
        while newNumThreads < len(self.__threads):
            self.__threads[0].goAway()
            del self.__threads[0]

    def getThreadCount(self):

        """Return the number of threads in the pool."""
        
        self.__resizeLock.acquire()
        try:
            return len(self.__threads)
        finally:
            self.__resizeLock.release()

    def queueTask(self, task, args=None, taskCallback=None):

        """Insert a task into the queue.  task must be callable;
        args and taskCallback can be None."""
        
        if self.__isJoining == True:
            return False
        if not callable(task):
            return False
        
        self.__taskLock.acquire()
        try:
            self.__tasks.append((task, args, taskCallback))
            return True
        finally:
            self.__taskLock.release()

    def getNextTask(self):

        """ Retrieve the next task from the task queue.  For use
        only by ThreadPoolThread objects contained in the pool."""
        
        self.__taskLock.acquire()
        try:
            if self.__tasks == []:
                return (None, None, None)
            else:
                return self.__tasks.pop(0)
        finally:
            self.__taskLock.release()
    
    def joinAll(self, waitForTasks = True, waitForThreads = True):

        """ Clear the task queue and terminate all pooled threads,
        optionally allowing the tasks and threads to finish."""
        
        # Mark the pool as joining to prevent any more task queueing
        self.__isJoining = True

        # Wait for tasks to finish
        if waitForTasks:
            while self.__tasks != []:
                sleep(.1)

        # Tell all the threads to quit
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(0)
            self.__isJoining = True

            # Wait until all threads have exited
            if waitForThreads:
                for t in self.__threads:
                    t.join()
                    del t

            # Reset the pool for potential reuse
            self.__isJoining = False
        finally:
            self.__resizeLock.release()


        
class ThreadPoolThread(threading.Thread):

    """ Pooled thread class. """
    
    threadSleepTime = 0.1

    def __init__(self, pool):

        """ Initialize the thread and remember the pool. """
        
        threading.Thread.__init__(self)
        self.__pool = pool
        self.__isDying = False
        
    def run(self):

        """ Until told to quit, retrieve the next task and execute
        it, calling the callback if any.  """
        
        while self.__isDying == False:
            cmd, args, callback = self.__pool.getNextTask()
            # If there's nothing to do, just sleep a bit
            if cmd is None:
                sleep(ThreadPoolThread.threadSleepTime)
            elif callback is None:
                cmd(args)
            else:
                callback(cmd(args))
    
    def goAway(self):

        """ Exit the run loop next time through."""
        
        self.__isDying = True

# Usage example
if __name__ == "__main__":

    from random import randrange

    # Sample task 1: given a start and end value, shuffle integers,
    # then sort them
    
    def sortTask(data):
        print "SortTask starting for ", data
        numbers = range(data[0], data[1])
        for a in numbers:
            rnd = randrange(0, len(numbers) - 1)
            a, numbers[rnd] = numbers[rnd], a
        print "SortTask sorting for ", data
        numbers.sort()
        print "SortTask done for ", data
        return "Sorter ", data

    # Sample task 2: just sleep for a number of seconds.

    def waitTask(data):
        print "WaitTask starting for ", data
        print "WaitTask sleeping for %d seconds" % data
        sleep(data)
        return "Waiter", data

    # Both tasks use the same callback

    def taskCallback(data):
        print "Callback called for", data

    # Create a pool with three worker threads

    pool = ThreadPool(3)

    # Insert tasks into the queue and let them run
    pool.queueTask(sortTask, (1000, 100000), taskCallback)
    pool.queueTask(waitTask, 5, taskCallback)
    pool.queueTask(sortTask, (200, 200000), taskCallback)
    pool.queueTask(waitTask, 2, taskCallback)
    pool.queueTask(sortTask, (3, 30000), taskCallback)
    pool.queueTask(waitTask, 7, taskCallback)

    # When all tasks are finished, allow the threads to terminate
    pool.joinAll()
## end of http://code.activestate.com/recipes/203871/ }}}
 
分享到:
评论

相关推荐

    Generic Programming(泛型程序设计小手册)

    Generic Programming(泛型程序设计小手册)中文chm版

    Modern C++ Design: Generic Programming and Design Patterns Applied

    Modern C++ Design: Generic Programming and Design Patterns Applied By Andrei Alexandrescu Publisher : Addison Wesley Pub Date : February 01, 2001 ISBN : 0-201-70431-5 ...

    Modern C++ Design Generic Programming and Design Patterns Applied.pdf

    Displaying extraordinary creativity and programming virtuosity, Alexandrescu offers a cutting-edge approach to design that unites design patterns, generic programming, and C++, enabling programmers ...

    From Mathematics to Generic Programming

    这是一本内容丰富而又通俗易懂的书籍,由优秀的软件设计师 Alexander A. Stepanov 与其同事 Daniel E. Rose 所撰写。作者在书中解释泛型编程的原则及其所依据的抽象数学概念,以帮助你写出简洁而强大的代码。

    Modern C++ Design Generic Programming and Design Patterns Applied .pdf

    《Modern C++ Design: Generic Programming and Design Patterns Applied》是由 Andrei Alexandrescu 编写的一本关于 C++ 的高级技术书籍,该书由 Addison Wesley 出版社于 2001 年 2 月出版,ISBN 为 0-201-70431-5...

    Generic Programming and the STL

    关于STL编程比较好的一本书,中文的,个人认为还是比较清楚的^^

    推荐的C++书籍 4.3 Generic programming and the STL

    &lt;&lt;generic programming and the stl&gt;&gt;让你从oo向gp转变 光用不行,我们还有必要了解stl的工作原理,那么源码剖析&gt;&gt;会解决你所有的困惑 level 5 对于c++无非是oo和gp,想进一步提升oo,&lt;&lt;exeptional c++ style&gt;&gt;是一...

    泛型编程与STL Generic Programming and the STL: Using and Extending the C++ Standard Template Library

     许多程序员可能并不知道,C++不仅是一个面向对象程序语言, 它还适用于泛型编程(generic programming)。这项技术可以大大增强你的能力,协助你写出高效率并可重复运用的软件组件(software components)。  本书由...

    Generic Programming for Scientific Computing in C++, Java, and C#

    ### 泛型编程在科学计算中的应用:C++、Java与C#的比较 #### 摘要概览 本文是一篇关于泛型编程在科学计算领域应用的研究论文。研究对比了Java、C#和C++三种语言在实现泛型(参数化类型)方面的表现,并重点介绍了...

    Modern C++ Design Generic Programming and Desig

    《现代C++设计:泛型编程与设计模式应用》是一本由Andrei Alexandrescu编写的经典著作,它深入探讨了C++中的高级设计技术,尤其是泛型编程和设计模式的应用。这本书对于想要提升C++编程技能的专业人士来说,是一份不...

    Generic-programming-and-STL.zip_generic_generic programming

    通用编程(Generic Programming)是一种编程范式,它的核心思想是编写可重用的、不依赖特定数据类型的代码。这种编程方式允许程序员创建独立于具体数据类型的算法和数据结构,从而提高了代码的灵活性和可维护性。在...

    Modern C++ Design Generic Programming and Design Patterns Applied epub

    Modern C++ Design Generic Programming and Design Patterns Applied 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 查看此书详细信息请在美国亚马逊官网搜索此书

    generic programming.pdf

    泛型编程(Generic Programming)作为一种编程范式,其核心在于设计出可重用且高效的算法集合。它不仅仅局限于特定的数据类型,而是通过参数化类型的方式使得同一段代码可以处理多种数据结构。这种能力在现代软件...

    Generic Programming with Haskell(带书签)

    《Generic Programming with Haskell》是一本由Roland Backhouse和Jeremy Gibbons编辑的书籍,属于Lecture Notes in Computer Science系列(第2793卷)。该书由Springer出版社出版,并在2003年正式发行。本书探讨了...

    Algorithm Specialization in Generic Programming

    通用编程(Generic Programming)是一种编程范式,它允许开发者创建高度可重用的软件库。其中,C++是实现通用编程的典型语言之一。本文探讨了算法专业化在通用编程中的应用,并提出了一种针对C++的受限泛型扩展。 *...

Global site tag (gtag.js) - Google Analytics