`
2277259257
  • 浏览: 515426 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

连接池/线程池

 
阅读更多

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

 

一个请求程序创建一个线程,如果突发情况下大量的请求出现会创建大量的线程从而造成内存溢出情况,为限制此情况,引入线程池限制程序创建线程的数量,使其不会超出线程池的最大容量

 

 

 

什么是线程池?

诸如web服务器、数据库服务器、文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务。构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新的服务对象,然后在新的服务对象中为请求服务。但当有大量请求并发访问时,服务器不断的创建和销毁对象的开销很大。所以提高服务器效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这样就引入了的概念,的概念使得人们可以定制一定量的资源,然后对这些资源进行复用,而不是频繁的创建和销毁。

线程池是预先创建线程的一种技术。线程池在还没有任务到来之前,创建一定数量的线程,放入空闲队列中。这些线程都是处于睡眠状态,即均为启动,不消耗CPU,而只是占用较小的内存空间。当请求到来之后,缓冲池给这次请求分配一个空闲线程,把请求传入此线程中运行,进行处理。当预先创建的线程都处于运行状态,即预制线程不够,线程池可以自由创建一定数量的新线程,用于处理更多的请求。当系统比较闲的时候,也可以通过移除一部分一直处于停用状态的线程。

线程池的注意事项

虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。在使用线程池时需注意线程池大小与性能的关系,注意并发风险、死锁、资源不足和线程泄漏等问题。

1)线程池大小。多线程应用并非线程越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。一般来说,如果代码结构合理的话,线程数目与CPU 数量相适合即可。如果线程运行时可能出现阻塞现象,可相应增加池的大小;如有必要可采用自适应算法来动态调整线程池的大小,以提高CPU 的有效利用率和系统的整体性能。

2)并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。

3)线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。

简单线程池的设计

一个典型的线程池,应该包括如下几个部分:
1
、线程池管理器(ThreadPool),用于启动、停用,管理线程池
2
、工作线程(WorkThread),线程池中的线程
3
、请求接口(WorkRequest),创建请求对象,以供工作线程调度任务的执行
4
、请求队列(RequestQueue,用于存放和提取请求
5
、结果队列(ResultQueue,用于存储请求执行后返回的结果

线程池管理器,通过添加请求的方法(putRequest)向请求队列(RequestQueue)添加请求,这些请求事先需要实现请求接口,即传递工作函数、参数、结果处理函数、以及异常处理函数。之后初始化一定数量的工作线程,这些线程通过轮询的方式不断查看请求队列(RequestQueue),只要有请求存在,则会提取出请求,进行执行。然后,线程池管理器调用方法(poll)查看结果队列(resultQueue)是否有值,如果有值,则取出,调用结果处理函数执行。通过以上讲述,不难发现,这个系统的核心资源在于请求队列和结果队列,工作线程通过轮询requestQueue获得人物,主线程通过查看结果队列,获得执行结果。因此,对这个队列的设计,要实现线程同步,以及一定阻塞和超时机制的设计,以防止因为不断轮询而导致的过多cpu开销。在本文中,将会用python语言实现,pythonQueue,就是很好的实现了对线程同步机制。

 

使用Python实现:

 

#-*-encoding:utf-8-*-
'''
Created on 2012-3-9
@summary:
线程池
@contact: mailto:zhanglixinseu@gmail.com
@author: zhanglixin
'''

import sys
import threading
import Queue
import traceback

# 定义一些Exception,用于自定义异常处理

class NoResultsPending(Exception):
   
"""All works requests have been processed"""
   
pass

class NoWorkersAvailable(Exception):
   
"""No worket threads available to process remaining requests."""
   
pass

def _handle_thread_exception(request, exc_info):
   
"""默认的异常处理函数,只是简单的打印"""
    traceback.print_exception(*exc_info)

#classes

class WorkerThread(threading.Thread):
   
"""后台线程,真正的工作线程,从请求队列(requestQueue)中获取work
   
并将执行后的结果添加到结果队列(resultQueue)"""
   
def__init__(self,requestQueue,resultQueue,poll_timeout=5,**kwds):
        threading.Thread.
__init__(self,**kwds)
       
'''设置为守护进行'''
        self.setDaemon(True)
        self._requestQueue = requestQueue
        self._resultQueue = resultQueue
        self._poll_timeout = poll_timeout
       
'''设置一个flag信号,用来表示该线程是否还被dismiss,默认为false'''
        self._dismissed = threading.Event()
        self.start()
       
   
def run(self):
       
'''每个线程尽可能多的执行work,所以采用loop
       
只要线程可用,并且requestQueuework未完成,则一直loop'''
       
while True:
           
if self._dismissed.is_set():
               
break
           
try:
               
'''
                Queue.Queue
队列设置了线程同步策略,并且可以设置timeout
               
一直block,直到requestQueue有值,或者超时
                '''

                request = self._requestQueue.get(True,self._poll_timeout)
           
except Queue.Empty:
               
continue
           
else:
               
'''之所以在这里再次判断dimissed,是因为之前的timeout时间里,很有可能,该线程被dismiss掉了'''
               
if self._dismissed.is_set():
                    self._requestQueue.put(request)
                   
break
               
try:
                   
'''执行callable,讲请求和结果以tuple的方式放入requestQueue'''
                    result = request.callable(*request.args,**request.kwds)
                   
print self.getName()
                    self._resultQueue.put((request,result))
               
except:
                    
'''异常处理'''
                    request.exception = True
                    self._resultQueue.put((request,sys.exc_info()))
   
   
def dismiss(self):
       
'''设置一个标志,表示完成当前work之后,退出'''
        self._dismissed.set()


class WorkRequest:
   
'''
    @param callable_:
,可定制的,执行work的函数
    @param args:
列表参数
    @param kwds:
字典参数
    @param requestID: id
    @param callback:
可定制的,处理resultQueue队列元素的函数
    @param exc_callback:
可定制的,处理异常的函数
    '''

   
def__init__(self,callable_,args=None,kwds=None,requestID=None,
                 callback=None,exc_callback=_handle_thread_exception):
       
if requestID == None:
            self.requestID = id(self)
       
else:
           
try:
                self.requestID = hash(requestID)
           
except TypeError:
               
raise TypeError("requestId must be hashable")   
        self.exception = False
        self.callback = callback
        self.exc_callback = exc_callback
        self.callable = callable_
        self.args = args
or []
        self.kwds = kwds
or {}
       
   
def__str__(self):
       
return"WorkRequest id=%s args=%r kwargs=%r exception=%s" % \
            (self.requestID,self.args,self.kwds,self.exception)
           
class ThreadPool:
   
'''
    @param num_workers:
初始化的线程数量
    @param q_size,resq_size: requestQueue
result队列的初始大小
    @param poll_timeout:
设置工作线程WorkerThreadtimeout,也就是等待requestQueuetimeout
    '''

   
def__init__(self,num_workers,q_size=0,resq_size=0,poll_timeout=5):
        self._requestQueue = Queue.Queue(q_size)
        self._resultQueue = Queue.Queue(resq_size)
        self.workers = []
        self.dismissedWorkers = []
        self.workRequests = {}
#设置个字典,方便使用
        self.createWorkers(num_workers,poll_timeout)

   
def createWorkers(self,num_workers,poll_timeout=5):
       
'''创建num_workersWorkThread,默认timeout5'''
       
for i in range(num_workers):
            self.workers.append(WorkerThread(self._requestQueue,self._resultQueue,poll_timeout=poll_timeout))                          
    
   
def dismissWorkers(self,num_workers,do_join=False):
       
'''停用num_workers数量的线程,并加入dismiss_list'''
        dismiss_list = []
       
for i in range(min(num_workers,len(self.workers))):
            worker = self.workers.pop()
            worker.dismiss()
            dismiss_list.append(worker)
       
if do_join :
           
for worker in dismiss_list:
                worker.join()
       
else:
            self.dismissedWorkers.extend(dismiss_list)
   
   
def joinAllDismissedWorkers(self):
        
'''join 所有停用的thread'''
       
#print len(self.dismissedWorkers)
        for worker in self.dismissedWorkers:
            worker.join()
        self.dismissedWorkers = []
   
   
def putRequest(self,request ,block=True,timeout=None):
       
assert isinstance(request,WorkRequest)
       
assertnot getattr(request,'exception',None)
       
'''queue满了,也就是容量达到了前面设定的q_size,它将一直阻塞,直到有空余位置,或是timeout'''
        self._requestQueue.put(request, block, timeout)
        self.workRequests[request.requestID] = request
       
   
def poll(self,block = False):
       
while True:
           
ifnot self.workRequests:
               
raise NoResultsPending
           
elif block andnot self.workers:
               
raise NoWorkersAvailable
           
try:
                
'''默认只要resultQueue有值,则取出,否则一直block'''
                request , result = self._resultQueue.get(block=block)
               
if request.exception and request.exc_callback:
                    request.exc_callback(request,result)
               
if request.callback andnot (request.exception and request.exc_callback):
                    request.callback(request,result)
               
del self.workRequests[request.requestID]
           
except Queue.Empty:
               
break
   
   
def wait(self):
       
while True:
           
try:
                self.poll(True)
           
except NoResultsPending:
               
break
   
   
def workersize(self):
       
return len(self.workers)
   
   
def stop(self):
       
'''join 所有的thread,确保所有的线程都执行完毕'''
        self.dismissWorkers(self.workersize(),True)
        self.joinAllDismissedWorkers()

 

测试代码:

 

#Test a demo

if__name__=='__main__':
   
import random
   
import time
   
import datetime
   
def do_work(data):
        time.sleep(random.randint(1,3))
        res = str(datetime.datetime.now()) +
"" +str(data)
       
return res
   
   
def print_result(request,result):
       
print"---Result from request %s : %r" % (request.requestID,result)
   
    main = ThreadPool(3)
   
for i in range(40):
        req = WorkRequest(do_work,args=[i],kwds={},callback=print_result)
        main.putRequest(req)
       
print"work request #%s added." % req.requestID
   
   
print'-'*20, main.workersize(),'-'*20
   
    counter = 0
   
while True:
       
try:
            time.sleep(0.5)
            main.poll()
           
if(counter==5):
               
print"Add 3 more workers threads"
                main.createWorkers(3)
               
print'-'*20, main.workersize(),'-'*20
           
if(counter==10):
               
print"dismiss 2 workers threads"
                main.dismissWorkers(2)
               
print'-'*20, main.workersize(),'-'*20
            counter+=1
       
except NoResultsPending:
           
print"no pending results"
           
break
   
    main.stop()
   
print"Stop"

 

 

分享到:
评论

相关推荐

    对象池&线程池&数据库连接池

    对象池、线程池和数据库连接池都是资源复用机制的实例,它们有效地解决了频繁创建和销毁对象带来的开销,提高了系统的效率。以下是对这些概念的详细解释: 1. **对象池**: 对象池是一种设计模式,它的核心思想是...

    C-epoll-连接池-线程池.zip

    本项目“C-epoll-连接池-线程池.zip”正是针对这一需求,采用C语言编写,实现了基于Epoll的事件驱动模型,结合线程池和数据库连接池技术,旨在优化服务器性能,提升系统的可扩展性和资源利用率。 首先,Epoll是...

    基于C++实现的连接池、线程池、内存池、对象池项目源码含项目说明.zip

    作为五大池之一(内存池、连接池、线程池、进程池、协程池),线程池的应用非常广泛,不管是客户 端程序,还是后台服务程序,都是提高业务处理能力的必备模块。有很多开源的线程池实现,虽然各自 接口使用上稍有区别,...

    Linux + C + Epoll实现高并发服务器(线程池 + 数据库连接池)

    在Linux 系统下面用C 语言实现的高并发服务器的代码,具体用到了Epoll,线程池,数据库连接池。 具体可以看下连接http://blog.csdn.net/wuyuxing24/article/details/48758927

    okhttp中连接池实现

    它的核心特性之一就是连接池(Connection Pool),它在提高网络性能和减少延迟方面起到了关键作用。本文将深入探讨OkHttp中的连接池实现,包括连接对象的添加、移除机制以及其工作原理。 首先,我们需要了解什么是...

    Linux 线程池+连接池

    在IT领域,线程池和连接池是两个关键的概念,特别是在服务器端的系统设计和优化中。它们在提高系统性能、资源管理和响应速度方面发挥着重要作用。本文将深入探讨Linux环境下的线程池和连接池,并结合C语言的epoll...

    线程池和mysql连接池的实现

    线程池和MySQL连接池是两种重要的资源管理技术,在多线程编程和数据库操作中扮演着关键角色。本文将详细探讨这两个概念,并结合在Ubuntu 12.04环境下使用C++实现线程池和MySQL连接池的方法。 线程池是一种优化并发...

    JAVA服务器端Socket线程池

    此方法用于启动线程池,首先调整线程池的最大最小限制,然后打开指定数量的初始线程,并创建一个监控线程来监控线程池的状态。 ##### 4.5 方法`runIt` ```java public void runIt(Socket cs) { // r为task if ...

    基于tomcat的连接数与线程池详解

    在Tomcat的配置和性能优化中,了解如何设置连接数和线程池是至关重要的。 首先,Connector按照处理连接的方式可以分为不同的协议类型,包括BIO(阻塞IO)、NIO(非阻塞IO)、APR(Apache Portable Runtime)。BIO...

    C# Socket连接池

    本文将深入探讨“C# Socket连接池”的实现原理、优势以及如何利用线程池技术来构建和优化它。同时,我们也会提及Windows计数器在监控和分析线程池性能中的作用。 首先,理解Socket连接池的概念。Socket是网络通信的...

    Qt 多线程连接数据库——数据库连接池

    * 数据库连接池特点: * 获取连接时不需要了解连接的名字,连接池内部维护连接的名字 * 支持多线程,保证获取到的连接一定是没有被其他线程正在使用 * 按需创建连接,可以创建多个连接,可以控制连接的数量 * 连接...

    六、进程池与线程池学习ppt

    【进程池与线程池学习】是计算机编程中并发处理的重要概念,主要涉及多线程和多进程的高效管理。在分布式系统、网络编程以及大数据处理等领域,进程池和线程池的应用尤为广泛。 首先,我们需要理解进程和线程的基本...

    使用Java编写的RabbitMQ连接池方法

    RabbitMQ客户连接池的Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...

    drm_server1.1.rar_DRMPOOL_libevent_libevent linux_内存池_线程池

    libevent负责高效地监听和调度大量的网络连接,而内存池和线程池则确保了资源的合理分配和快速响应,使得服务在面对大规模用户并发时仍能保持稳定和高效。 总结,drm_server1.1服务是Linux环境下利用libevent库构建...

    java手写连接池

    Java手写连接池是一个编程实践,它涉及到数据库管理和优化,主要目标是提高数据库访问的效率和资源利用率。在Java中,连接池是一个管理数据库连接的系统,它预先创建一定数量的数据库连接,当应用程序需要时可以立即...

    xianchengchi.zip_Socket 线程池_socket池_线程池_线程池socket

    Socket线程池是一种优化策略,用于管理大量的并发Socket连接。本篇将详细探讨Socket线程池的概念、工作原理以及它如何解决多个线程对同一个套接字进行写操作的问题。 首先,我们来理解什么是Socket。Socket是网络上...

    DBConnectionPool.rar_Java 线程池_True_线程 连接池

    private String dbDiv = "" // 数据库驱动 private String dbUrl = "" // 数据 URL ... pooledConnectionVector = null // 存放连接池中数据库连接的向量 , 初始时为 null 存放的对象为 PooledConnection 型

    一个好用的连接池类,用于数据库连接等方面

    连接池的思想并不仅限于数据库连接,也可以应用于其他资源的管理,比如线程池、网络连接池等。通过类似的方式,我们可以预先创建一组资源,然后根据需求进行分配和回收,从而提高资源利用率,减少创建和销毁的开销。...

    Delphi数据库三层连接池

    "Delphi数据库三层连接池"是一个专门针对Delphi开发环境设计的数据库访问解决方案,它着重于提高数据库连接的复用性和效率,以减少系统资源的消耗。在本文中,我们将深入探讨连接池的概念、Delphi数据库操作的封装...

    连接池 连接池连接池 连接池

    在IT行业中,数据库连接池是优化数据库访问性能和资源管理的关键技术。连接池,顾名思义,就是一池可以复用的数据库连接。在Java和JSP开发中,它扮演着至关重要的角色,帮助提高应用程序的效率和响应速度。 连接池...

Global site tag (gtag.js) - Google Analytics