浏览 9980 次
锁定老帖子 主题:Python实现线程池
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2011-03-02
最后修改:2011-05-03
最近在做一些文本处理方面的事情,考虑到程序利用并发性可以提高执行效率(不纠结特殊反例),于是入围的Idea如使用多进程或多线程达到期望的目标,对于进程或线程的创建是有代价的,那么我们是否可以实现一个线程池来达到已创建的线程反复使用从而使代价降低到最小呢? # !/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading import time class WorkManager(object): def __init__(self, work_num=1000,thread_num=2): self.work_queue = Queue.Queue() self.threads = [] self.__init_work_queue(work_num) self.__init_thread_pool(thread_num) """ 初始化线程 """ def __init_thread_pool(self,thread_num): for i in range(thread_num): self.threads.append(Work(self.work_queue)) """ 初始化工作队列 """ def __init_work_queue(self, jobs_num): for i in range(jobs_num): self.add_job(do_job, i) """ 添加一项工作入队 """ def add_job(self, func, *args): self.work_queue.put((func, list(args)))#任务入队,Queue内部实现了同步机制 """ 等待所有线程运行完毕 """ def wait_allcomplete(self): for item in self.threads: if item.isAlive():item.join() class Work(threading.Thread): def __init__(self, work_queue): threading.Thread.__init__(self) self.work_queue = work_queue self.start() def run(self): #死循环,从而让创建的线程在一定条件下关闭退出 while True: try: do, args = self.work_queue.get(block=False)#任务异步出队,Queue内部实现了同步机制 do(args) self.work_queue.task_done()#通知系统任务完成 except: break #具体要做的任务 def do_job(args): time.sleep(0.1)#模拟处理时间 print threading.current_thread(), list(args) if __name__ == '__main__': start = time.time() work_manager = WorkManager(10000, 10)#或者work_manager = WorkManager(10000, 20) work_manager.wait_allcomplete() end = time.time() print "cost all time: %s" % (end-start) 2次开启不同的线程数运行结果如下: #work_manager = WorkManager(10000, 10) cost all time: 100.641790867(单位:秒) #work_manager = WorkManager(10000, 20) cost all time:50.5233478546(单位:秒) 上面实现了线程池的雏形,展现了基本原理,当然要想成为通用的API需要做很多的工作,希望本文能够起到抛砖引玉的效果。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2011-03-10
这个 太长了吧~
其实我还是怀念flex框架下的“多线程”式写法 就是开num事件,然后在事件结束后判断是否满足运行完毕的条件 如果是就跳出 如果不是 再来一轮 可惜在线程中,似乎没有线程析构的说法 也就是当线程完毕的时候 我们是拿不到信号的 (while 轮询? 不好) 看到过一个简易版本的线程池 哪个简单干净 利于理解 |
|
返回顶楼 | |
发表时间:2011-03-11
最近做一个项目,也需要用到一个线程池,所以找了一个老外的开源项目中用的线程池代码,总共有 task, workthread, threadpool三大部分。不太清楚的是你说的queue是什么lib? import threading class WorkerTask(object): """A task to be performed by the ThreadPool.""" def __init__(self, function, args=(), kwargs={}): self.function = function self.args = args self.kwargs = kwargs def __call__(self): self.function(*self.args, **self.kwargs) class WorkerThread(threading.Thread): """A thread managed by a thread pool.""" def __init__(self, pool): threading.Thread.__init__(self) self.setDaemon(True) self.pool = pool self.busy = False self._started = False self._event = None def work(self): if self._started is True: if self._event is not None and not self._event.isSet(): self._event.set() else: self._started = True self.start() def run(self): while True: self.busy = True while len(self.pool._tasks) > 0: try: task = self.pool._tasks.pop() task() except IndexError: # Just in case another thread grabbed the task 1st. pass # Sleep until needed again self.busy = False if self._event is None: self._event = threading.Event() else: self._event.clear() self._event.wait() class ThreadPool(object): """Executes queued tasks in the background.""" def __init__(self, max_pool_size=10): self.max_pool_size = max_pool_size self._threads = [] self._tasks = [] def _addTask(self, task): self._tasks.append(task) worker_thread = None for thread in self._threads: if thread.busy is False: worker_thread = thread break if worker_thread is None and len(self._threads) <= self.max_pool_size: worker_thread = WorkerThread(self) self._threads.append(worker_thread) if worker_thread is not None: worker_thread.work() def addTask(self, function, args=(), kwargs={}): self._addTask(WorkerTask(function, args, kwargs)) class GlobalThreadPool(object): """ThreadPool Singleton class.""" _instance = None def __init__(self): """Create singleton instance """ if GlobalThreadPool._instance is None: # Create and remember instance GlobalThreadPool._instance = ThreadPool() def __getattr__(self, attr): """ Delegate get access to implementation """ return getattr(self._instance, attr) def __setattr__(self, attr, val): """ Delegate set access to implementation """ return setattr(self._instance, attr, val)
|
|
返回顶楼 | |
发表时间:2011-03-11
simomo 写道
最近做一个项目,也需要用到一个线程池,所以找了一个老外的开源项目中用的线程池代码,总共有 task, workthread, threadpool三大部分。不太清楚的是你说的queue是什么lib?
应该是2.5之后新增的 标准库的那个Queue...2.6后 multipleprocessing里面也有一个。不过那个是给多进程用的。。。 |
|
返回顶楼 | |