`
iunknown
  • 浏览: 409519 次
社区版块
存档分类
最新评论

Build a thread pool in C

阅读更多
想找个轻便的 thread pool 实现,结果发现网上能找到的都是一些很重量级的,如 boost,ACE 里面的。唯有自己照着下面的需求实现了一个
http://paul.rutgers.edu/~csgates/CS416/proj2/index.html

源代码下载:
http://code.google.com/p/spserver/downloads/list
http://spserver.googlecode.com/files/threadpool-0.1.src.tar.gz

从 0.2 版开始,移植到了 windows 平台,直接使用 windows 的 thread api
http://spserver.googlecode.com/files/threadpool-0.2.src.tar.gz

这个 threadpool 是基于 pthread api 写的。除了 linux/unix 上有 pthread 之外,windows 下也有 pthread 的库。

尝试把设计的过程写一下。由于设计过程中,思路是很跳跃的,有很多的思考细节可能都漏写了。

线程池也是一种对象池,和其他的对象池(如连接池)有相同的地方,
目的是在使用相关的对象时能够避免创建和销毁对象带来的资源消耗,使得程序响应更快。
但线程和其他普通的对象也有不同,线程包含一个指令指针,还包括其他资源,如运行时的函数激活记录的堆栈、一组寄存器和线程特有的数据。所以线程池的管理和其他对象池的管理就有很多的不同。

1.在开始设计线程池的时候,首先可以先考虑一下连接池的使用场景。
  一般是由主线程主动地从池里面获取一个连接,主线程用完再把连接返回给池。
  主线程对连接对象的使用,就是调用连接对象的方法(按 OO 的说法,就是给对象发送消息 )。

2.按连接池的使用场景,要实现线程池,首先要有一个线程对象。
  当主线程从池里面获得一个线程之后,主线程要能够向这个线程发送消息,以达到主线程使用线程的目的。
  那么这个线程对象需要能够支持主线程向线程池的线程发送消息。
  通过阅读 pthread 的规范,可以了解到 pthread_cond_wait 是 POSIX 线程信号发送系统的核心。
  主线程要发送给线程池线程的信息还包括这次任务的参数:一个函数指针和提供给这个函数的参数。

关于 pthread_cond_wait 的有关资料
http://www-128.ibm.com/developerworks/cn/linux/thread/posix_thread3/index.html

3.在上面的第二点中,只考虑了如何从线程池获得线程,并使用线程,而没有考虑如何在使用完线程之后如何把线程归还给池。
  连接池的使用场景中,通常是有主线程来完成这个操作。但是在线程池的使用场景中,主线程在发送消息之后,
  通常就不再等待这个线程完成任务。即主线程通常只负责取,但不负责还。
  考虑到这一点,就需要在每个线程对象中记录自己所属的池,当完成任务之后,线程主动把自己归还到池里面。

  至此我们可以设计出线程对象的数据结构
typedef struct _thread_st {
        pthread_t id;
        pthread_mutex_t mutex;
        pthread_cond_t cond;
        dispatch_fn fn;
        void *arg;
        threadpool parent;
} _thread;


4.有了这个数据结构,对于 dispatch_threadpool 和 wrapper_fn 的设计也就比较容易想到了。

  在 dispatch_threadpool 中,从池里面获得一个线程,然后设置要发送给线程的信息。
  包括:thread->fn,thread->arg,thread->parent;接着正式发送
  pthread_cond_signal( &thread->cond ) ;

# int dispatch_threadpool(threadpool from_me, dispatch_fn dispatch_to_here, void *arg)  
# { 
......
#        } else {  
#                 pool->tp_index--;  
#                 thread = pool->tp_list[ pool->tp_index ];  
#                 pool->tp_list[ pool->tp_index ] = NULL;  
#   
#                 thread->fn = dispatch_to_here;  
#                 thread->arg = arg;  
#                 thread->parent = pool;  
#   
#                 pthread_mutex_lock( &thread->mutex );  
#                 pthread_cond_signal( &thread->cond ) ;  
#                 pthread_mutex_unlock ( &thread->mutex );  
#        } 
......
}


  在 wrapper_fn 中,每次执行 thread->fn( thread->arg ) 之后,主动把自己归还到池里面,
  然后等待主线程发送下一次消息。

# void * wrapper_fn( void * arg )  
# {  
......
#        for( ; 0 == ((_threadpool*)thread->parent)->tp_stop; ) { 
#                         thread->fn( thread->arg ); 
......
#                         save_thread( thread->parent, thread );
#                         pthread_cond_wait( &thread->cond, &thread->mutex );  
......
#        }
......
# }  


更多细节请参考完整的 实现代码
分享到:
评论
7 楼 iunknown 2007-03-01  
SteveGY 写道
使用pthread是一种portable的方法,不过,是不是可以考虑使用apr呢?
http://apr.apache.org/


apr 也是一个不错的选择,但是为了尽量避免为项目带入更多的依赖所以想找个轻便的 thread pool 实现。
目前的这个 threadpool 足够轻便,一个 h ,一个 c 文件,直接和可执行程序编译到一起就行了。

dreamhead 写道
如果请求很多,大于线程总数,后面的请求必须要等待前面的请求处理完毕,方能获取到可以运行的线程,代码才能从dispatch_threadpool中退出,


的确是这样实现的。按 [url]http://paul.rutgers.edu/~csgates/CS416/proj2/index.html [/url]的原始需求就是要这样实现的。

dreamhead 写道
从这个角度来说,它又不适合服务器端的应用,因为当请求量增大的时候,它会让主线程失去响应。


这个问题要看如何来使用线程池。如果在服务器的主线程中直接调用,就会出现这样的情况。

目前实际使用的情况是在另外一个线程(称为Executor线程)中来调用线程池。主线程把收到的情况放到一个队列,然后由Executor线程进行分派,即Executor负责调用dispatch_threadpool函数。

当初把threadpool设计成这样,就是想获得 lazy 创建的好处。

在 java 的 util.concurrent 库中有多种不同的 threadpool 策略,以适应不同的应用场景。这个帖子中实现的 threadpool 只是其中一种,所以对于很多场景也就不适合了。
6 楼 dreamhead 2007-03-01  
从楼主的考虑上来看,这段代码比较适合服务器的应用,尤其是其中涉及到线程的lazy创建。但对于高性能计算中用到的某些情况,这种lazy创建线程的方式就失去了其原有的优势,因为这种程序很可能是一上来就会把所有的线程都生成出来。

楼主的写法中,主线程采用同步的方式,也就是说,如果请求很多,大于线程总数,后面的请求必须要等待前面的请求处理完毕,方能获取到可以运行的线程,代码才能从dispatch_threadpool中退出,从这个角度来说,它又不适合服务器端的应用,因为当请求量增大的时候,它会让主线程失去响应。

所以,楼主的代码可用,但存在一些不甚理想的地方。

还有一种线程池的写法是,加入一个队列,请求是放入队列中。当线程完成一次处理之后,会从队列中取出下一个请求来处理。这种写法是一种异步的处理方式,不会让主线程失去响应。
具体实现可以参考,Java 理论与实践: 线程池与工作队列
5 楼 SteveGY 2007-02-23  
使用pthread是一种portable的方法,不过,是不是可以考虑使用apr呢?
http://apr.apache.org/
4 楼 Arath 2006-12-12  
恩恩,这样就好了,这和写代码多写注释一个道理。
编程中有很多东西都有现成的lib或则sample,但是如果自己能写一个那么至少会认识的更加深刻.
3 楼 iunknown 2006-12-12  
对于 dispatch_threadpool 再做一些说明。
1.上面只说明了 else 的那部分代码,else 前面的代码是创建线程的代码。
  即这个线程池的实现使用了 lazy 的方式,不是在 create_threadpool 的时候立即创建所有线程,
  而是在第一次使用的时候来创建线程。这样的做法,对于常见的应用来说,还是比较合适的。

2.当线程池里面没有可用的线程时(创建的线程数目已经达到指定的最大值,并且所有线程都在工作),
  如果这个时候主线程分配任务,那么这个时候调用 dispatch_threadpool 将会被阻塞,直到有一个线程可用。

  if( pool->tp_index <= 0 && pool->tp_total >= pool->tp_max_index ) {
    pthread_cond_wait( &pool->tp_idle, &pool->tp_mutex );
  }

2 楼 iunknown 2006-12-12  
引用

可能增加点自己的思路会更好,看代码是比较累的.


因为这句,所以把原来回复在这里的思路说明移动到了最前面。谢谢Arath的提示。
1 楼 Arath 2006-12-12  
看代码似乎是linux/unix/bsd上的吧?
可能增加点自己的思路会更好,看代码是比较累的.
对linux/unix/bsd的mutex signal之类的不是很熟悉,所以代码上虽看了个大概,但不是很确认,thread pool关键是要保证调度的效率和数据的安全.

相关推荐

    thread_pool:C ++线程池库

    git clone https://github.com/rvaser/thread_pool && cd thread_pool && mkdir build && cd build cmake -DCMAKE_BUILD_TYPE=Release .. && make 这将创建安装目标和单元测试。 运行make install将在您的系统上...

    Java concurrency in practice

    Java 5.0 is a huge step forward for the development of concurrent applications in Java, providing new higherͲlevel components and additional lowͲlevel mechanisms that make it easier for novices and ...

    微软内部资料-SQL性能优化2

    and page faults are not allowed at this level or above, most device drivers use non-paged pool to assure that they do not incur a page fault. Represented by Memory: Pool Nonpaged Bytes, typically ...

    mysql.data.dll

    We now use a timer thread to clean up dead connections from the connection pool. - Command timeout has been refactored to use network and stream timeouts instead of a timer. The result of this is ...

    Mastering the C++17 .pdf

    thread pool using std::future . I'll teach concepts beyond what you'd find in a reference manual. You'll learn the difference between monomorphic, polymorphic, and generic algorithms (Chapter 1 , ...

    RealThinClientSDKv628 full source

    By using non-blocking event-driven communication with a built-in thread pool, applications built using the RealThinClient SDK can handle thousands of active connections on all supported platforms by ...

    AkkaScala.pdf

    - **Agents:** Agents are used to manage state in a thread-safe manner. They are particularly useful when working with Java code in Akka. #### Networking Networking is a critical aspect of Akka, ...

    微软内部资料-SQL性能优化3

    SIX locks imply that we have shared access to a resource and we have also placed X locks at a lower level in the hierarchy. SQL Server never asks for SIX locks directly, they are always the result of ...

    Android-Thread-Pool:一个有用的线程执行器,它管理池中所有可运行的线程

    Android线程池 一个简单的Android线程池,该池基于Handler和Runable #联络我 电子邮件: 博客: : 如何使用? 创建实例 CustomExecutor executor =...CustomExecutor DEFAULT_THREAD_POOL_EXECUTOR = new Custo

    用PYTHON多线程处理Sphinx遇到的问题

    for t in thread_pool: if t is not None: t.join() # 假设documents是一个包含所有文档路径的列表 documents = ['path/to/doc1.rst', 'path/to/doc2.rst', ...] parallel_build(documents) ``` 以上代码展示了...

    python3.6.5参考手册 chm

    Build and C API Changes Other Improvements Deprecated New Keywords Deprecated Python behavior Deprecated Python modules, functions and methods asynchat asyncore dbm distutils grp importlib ...

    Java - The Well-Grounded Java Developer

    - **Overview**: Explores modern concurrency patterns and APIs available in Java, focusing on efficient parallel processing and thread safety. - **Key Concepts**: - **Java Memory Model (JMM)**: ...

    jthread-1.3.1.tar 源码和编译方法

    pool.execute(new ThreadTask() { public void run() { // 实现任务逻辑 } }); ``` 4.3 关闭线程池 完成所有任务后,关闭线程池: ```java pool.shutdown(); ``` 通过以上分析,我们可以看到jthread-1.3.1是一...

    springboot集成amazon aws s3对象存储sdk(javav2)

    // use a thread pool for concurrent uploads CompletionService&lt;PartETag&gt; completionService = new ExecutorCompletionService(executor); for (int i = 1; i ; i++) { UploadPartResponse response = s3...

    JAVA 创建线程池的注意事项

    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(); ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ...

    ACE中文技术开发文档

    - **Thread Pool**:ACE提供了一种线程池实现,可以有效地管理和复用线程资源,避免频繁创建和销毁线程带来的开销。 2. **ACE主要组件** - **Acceptor和Connector**:它们分别用于处理服务器端的连接请求和客户端...

    twh.rar_Boost_two boost

    7. **Boost.Pool**:内存池管理库,提高内存分配和释放的效率。 8. **Boost.Spirit**:是一个高级的解析库,可以用来构建解析器和词法分析器。 【压缩包子文件的文件名称列表】: - **Rmsvc-tools.jam**:这可能是 ...

    Springboot 整合Quartz

    quartz.thread-pool.size=10 quartz.auto-start-up=true ``` 或者在YAML格式中: ```yaml quartz: job-store-type: jdbc jdbc-job-store: table-prefix: QRTZ_ thread-pool: size: 10 auto-start-up: true `...

    Android-AsyncTask-Executor:已弃用 您想在任何版本的 Android 操作系统上同时执行您的 AsyncTask 吗? 你需要 AsyncTaskExecutor!

    #Android-AsyncTask-Executor #非常过时,请永远不要使用它看看 、 和其他用于并发数据处理的库 ##为什么要使用它?... task.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR); } else { task.ex

Global site tag (gtag.js) - Google Analytics