`
san_yun
  • 浏览: 2639933 次
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

Python实现线程池

 
阅读更多
关于线程池(thread pool)的概念请参考http://en.wikipedia.org/wiki/Thread_pool_pattern。在Python中使用线程是有硬伤的,因为Python(这里指C语言实现的Python)的基本调用都最后生成对应C语言的函数调用,因此Python中使用线程的开销太大,不过可以使用Stackless Python(Python的一个修改版)来增强Python中使用线程的表现。
同时由于Python中GIL的存在,导制在使用多CPU时Python无法充分利用多个CPU,目前pysco这个模块可以针对多CPU提高Python的效率。

在C语言里要实现个线程池,就要面对一堆的指针,还有pthread这个库中那些看起来很让人头痛的一些函数:
int pthread_create(pthread_t *restrict thread,
const pthread_attr_t *restrict attr,
void *(*start_routine)(void*), void *restrict arg);
而如果用Python来实现一个线程池的话就好多了,不仅结构十分清晰,而且代码看起来会很优美:

## {{{ http://code.activestate.com/recipes/203871/ (r3)
import threading
from time import sleep

# Ensure booleans exist (not needed for Python 2.2.1 or higher)
try:
    True
except NameError:
    False = 0
    True = not False

class ThreadPool:

    """Flexible thread pool class.  Creates a pool of threads, then
    accepts tasks that will be dispatched to the next available
    thread."""
    
    def __init__(self, numThreads):

        """Initialize the thread pool with numThreads workers."""
        
        self.__threads = []
        self.__resizeLock = threading.Condition(threading.Lock())
        self.__taskLock = threading.Condition(threading.Lock())
        self.__tasks = []
        self.__isJoining = False
        self.setThreadCount(numThreads)

    def setThreadCount(self, newNumThreads):

        """ External method to set the current pool size.  Acquires
        the resizing lock, then calls the internal version to do real
        work."""
        
        # Can't change the thread count if we're shutting down the pool!
        if self.__isJoining:
            return False
        
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(newNumThreads)
        finally:
            self.__resizeLock.release()
        return True

    def __setThreadCountNolock(self, newNumThreads):
        
        """Set the current pool size, spawning or terminating threads
        if necessary.  Internal use only; assumes the resizing lock is
        held."""
        
        # If we need to grow the pool, do so
        while newNumThreads > len(self.__threads):
            newThread = ThreadPoolThread(self)
            self.__threads.append(newThread)
            newThread.start()
        # If we need to shrink the pool, do so
        while newNumThreads < len(self.__threads):
            self.__threads[0].goAway()
            del self.__threads[0]

    def getThreadCount(self):

        """Return the number of threads in the pool."""
        
        self.__resizeLock.acquire()
        try:
            return len(self.__threads)
        finally:
            self.__resizeLock.release()

    def queueTask(self, task, args=None, taskCallback=None):

        """Insert a task into the queue.  task must be callable;
        args and taskCallback can be None."""
        
        if self.__isJoining == True:
            return False
        if not callable(task):
            return False
        
        self.__taskLock.acquire()
        try:
            self.__tasks.append((task, args, taskCallback))
            return True
        finally:
            self.__taskLock.release()

    def getNextTask(self):

        """ Retrieve the next task from the task queue.  For use
        only by ThreadPoolThread objects contained in the pool."""
        
        self.__taskLock.acquire()
        try:
            if self.__tasks == []:
                return (None, None, None)
            else:
                return self.__tasks.pop(0)
        finally:
            self.__taskLock.release()
    
    def joinAll(self, waitForTasks = True, waitForThreads = True):

        """ Clear the task queue and terminate all pooled threads,
        optionally allowing the tasks and threads to finish."""
        
        # Mark the pool as joining to prevent any more task queueing
        self.__isJoining = True

        # Wait for tasks to finish
        if waitForTasks:
            while self.__tasks != []:
                sleep(.1)

        # Tell all the threads to quit
        self.__resizeLock.acquire()
        try:
            self.__setThreadCountNolock(0)
            self.__isJoining = True

            # Wait until all threads have exited
            if waitForThreads:
                for t in self.__threads:
                    t.join()
                    del t

            # Reset the pool for potential reuse
            self.__isJoining = False
        finally:
            self.__resizeLock.release()
            
        print 'ok'

        
class ThreadPoolThread(threading.Thread):

    """ Pooled thread class. """
    
    threadSleepTime = 0.1

    def __init__(self, pool):

        """ Initialize the thread and remember the pool. """
        
        threading.Thread.__init__(self)
        self.__pool = pool
        self.__isDying = False
        
    def run(self):

        """ Until told to quit, retrieve the next task and execute
        it, calling the callback if any.  """
        
        while self.__isDying == False:
            cmd, args, callback = self.__pool.getNextTask()
            # If there's nothing to do, just sleep a bit
            if cmd is None:
                sleep(ThreadPoolThread.threadSleepTime)
            elif callback is None:
                cmd(args)
            else:
                callback(cmd(args))
    
    def goAway(self):

        """ Exit the run loop next time through."""
        
        self.__isDying = True

# Usage example
if __name__ == "__main__":

    from random import randrange

    # Sample task 1: given a start and end value, shuffle integers,
    # then sort them
    
    def sortTask(data):
        print "SortTask starting for ", data
        numbers = range(data[0], data[1])
        for a in numbers:
            rnd = randrange(0, len(numbers) - 1)
            a, numbers[rnd] = numbers[rnd], a
        print "SortTask sorting for ", data
        numbers.sort()
        print "SortTask done for ", data
        return "Sorter ", data

    # Sample task 2: just sleep for a number of seconds.

    def waitTask(data):
        print "WaitTask starting for ", data
        print "WaitTask sleeping for %d seconds" % data
        sleep(data)
        return "Waiter", data

    # Both tasks use the same callback

    def taskCallback(data):
        print "Callback called for", data

    # Create a pool with three worker threads

    pool = ThreadPool(3)

    # Insert tasks into the queue and let them run
    pool.queueTask(sortTask, (1000, 100000), taskCallback)
    pool.queueTask(waitTask, 5, taskCallback)
    pool.queueTask(sortTask, (200, 200000), taskCallback)
    pool.queueTask(waitTask, 2, taskCallback)
    pool.queueTask(sortTask, (3, 30000), taskCallback)
    pool.queueTask(waitTask, 7, taskCallback)

    # When all tasks are finished, allow the threads to terminate
    pool.joinAll()
#    print 'all complete'
## end of http://code.activestate.com/recipes/203871/ }}}

分享到:
评论

相关推荐

    python实现线程池的方法

    本文实例讲述了python实现线程池的方法。分享给大家供大家参考。具体如下: 原理:建立一个任务队列,然多个线程都从这个任务队列中取出任务然后执行,当然任务队列要加锁,详细请看代码 文件名:thrd_pool.py 系统...

    Python的线程池实现

    首先,Python标准库提供了一个名为`concurrent.futures`的模块,其中包含`ThreadPoolExecutor`类,它是实现线程池功能的基础。`ThreadPoolExecutor`允许我们创建一个线程池对象,通过`submit()`方法提交任务,`...

    python实现线程池并可自动拓展和减小线程数(csdn)————程序.pdf

    本篇将介绍如何使用Python实现一个线程池,该线程池还能根据设定的时间自动扩展和减少线程数量。 首先,我们导入所需的库:`queue`用于创建任务队列,`threading`用于处理线程相关操作,`time`和`datetime`用于时间...

    Python实现线程池代码分享

    原理:建立一个任务队列,然多个线程都从这个任务队列中取出任务然后执行,当然任务队列要加锁,详细请看代码 import threading import time import signal import os class task_info(object): ...

    Python实现的线程池

    在Python2.7中,虽然`concurrent.futures`模块没有被引入,但开发者可以通过`threading`模块自己实现线程池。描述中提到的"由7个类组成的小线程池"可能包括以下组件: 1. **ThreadPool类**:这是线程池的核心,负责...

    Python 使用threading+Queue实现线程池示例

    一、线程池 1、为什么需要使用线程池 1.1 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率。...比如:延时执行、定时循环执行的策略等,运用线程池都能进行很好的实现。 2、

    基础知识五、Python实现线程池之线程安全队列

    其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。 原创文章 15获赞 0访问量 557

    用Python实现一个简单的线程池

    线程池的概念是什么? 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是 如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以...

    基于python的线程池+源代码+文档说明

    基于Python的线程池。 -------- 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! &lt;项目介绍&gt; 1、该资源内项目代码都经过测试运行成功,功能ok的...

    用python实现的线程池实例代码

    ### 知识点详解:用Python实现线程池 #### 一、线程池概念与原理 线程池是一种管理并复用多个线程的方法,主要用于处理大量的短期任务。通过预创建一组工作线程,当有新任务到达时,线程池会分配一个空闲线程来处理...

    Python 应用之线程池.pdf

    综上所述,Python语言在实现线程池方面的应用不仅必要而且高效。它能够极大地减少线程维护的开销,提升程序处理多并发任务的能力,从而在处理大量维护对象时展现出卓越的性能。然而,在构建和应用线程池时,也应当...

    Python自定义线程池实现方法分析

    本文实例讲述了Python自定义线程池实现方法。分享给大家供大家参考,具体如下: 关于python的多线程,由与GIL的存在被广大群主所诟病,说python的多线程不是真正的多线程。但多线程处理IO密集的任务效率还是可以杠杠...

    浅谈python 线程池threadpool之实现

    ### 浅谈Python线程池ThreadPool之实现 ...`threadpool`库提供了一种简单易用的方式实现线程池功能,适用于各种多线程编程场景。理解线程池的基本概念和工作流程对于编写高效、可维护的并发程序至关重要。

    Python定时器线程池原理详解

    在Python中,`concurrent.futures`模块提供了`ThreadPoolExecutor`类来实现线程池。线程池有两种执行模式:抢占式和非抢占式。 抢占式线程池使用`submit()`方法提交任务,它会随机选择一个线程来执行任务,即使执行...

    python线程池threadpool实现篇

    本文将详细介绍Python线程池Threadpool的实现,涉及的概念、设计和编程方法,适合希望深入理解和使用Python线程池的开发者。 首先,让我们了解线程池中的核心组件。线程池Threadpool实现篇中会涉及到以下术语和概念...

Global site tag (gtag.js) - Google Analytics