`
javasogo
  • 浏览: 1815765 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

boost之ThreadPool

阅读更多

threadpool是基于boost库实现的一个线程池子库,但线程池实现起来不是很复杂。我们从threadpool中又能学到什么东西呢?

它是基于boost库实现的,如果大家对boost库有兴趣,看看一个简单的实现还是可以学到点东西的。

threadpool基本功能

1、任务封装,包括普通任务(task_func)和优先级任务(prio_task_func)。

2、调度策略,包括fifo_scheduler(先进先出)、lifo_scheduler(后进先出)、prio_scheduler(优先级)。

3、结束策略,包括wait_for_all_tasks(全部任务等待)、wait_for_active_tasks(激活任务等待)、immediately(立即结束)。

4、动态修改线程池个数功能。

5、基于future封装的异步返回值获取功能。

在sorceforge上有一个用boost编写的线程池。该线程池和boost结合的比较好,并且提供了多种任务执行策略,使用也非常简单。 下载地址: http://threadpool.sourceforge.net/ 这个线程池不需要编译,只要在项目中包含其头文件就可以了。

一、源代码分析

quickstart分析(\threadpool\libs\threadpool\quickstart

这个例子的代码很简单,但已经全部展示了线程池的核心内容,包括建立、调度、同步等操作。

view plaincopy to clipboardprint?

// Create fifo thread pool container with two threads.

pool tp(2);

// Add some tasks to the pool.

tp.schedule(&first_task);

tp.schedule(&second_task);

// Wait until all tasks are finished.

tp.wait();

// Create fifo thread pool container with two threads.

pool tp(2);

// Add some tasks to the pool.

tp.schedule(&first_task);

tp.schedule(&second_task);

// Wait until all tasks are finished.

tp.wait();

pool的定义具体见pool.hpp,但使用了pimpl模式,核心代码见pool_core.hpp文件。

下面是pool的定义

typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;

typedef fifo_pool pool;

从上面可以知道,pool实际就是fifo_pool,从模板参数可以看到,使用了fifo_schedulerwait_for_all_tasks

对于线程池有点理解的都知道,一般都是那几样东西,线程的封装,条件变量,队列数据结构。

所以简单的能做的很简单,复杂的的就看你的策略需求了。

对基于boost库的threadpool子库来说,上面的三样东西都是现成的,线程封装和条件变量直接使用thread子库就行,队列使用stl的标准容器。

task_adaptors.hpp

对线程任务的封装,所谓task,我们可以理解成需要运行的函数。

threadpool最大限度的使用了functionbind功能来封装函数,这点和thread子库类似。

文件中涉及的内容主要有三个:task_funcprio_task_funclooped_task_func

对普通task的封装

typedef function0<void> task_func;

如果对bindfunction熟悉的应该很好理解。

对优先级任务的封装

class prio_task_func

这个类很简单,重载了两个方法,

operator()是仿函数的用法,

operator<是用于优先级比较使用的,用于stl容器的元素比较。

size_policies.hpp

size的封装,包括empty_controllerresize_controllerstatic_size

shutdown_policies.hpp

对线程池结束的策略封装,包括wait_for_all_taskswait_for_active_tasksimmediately

这几个类很简单,具体操作封装在pool中。

线程池运行过程中,包括队列中等待的task,线程正在运行的task

所以结束的时候,对这些task的策略操作是有选择的。

scheduling_policies.hpp

对任务调度测试的封装,包括fifo_schedulerlifo_schedulerprio_scheduler

实际上,这三个类的相似程度很高,大家可能更喜欢用继承和虚函数实现。

前面说到保存task的队列数据结构,在这里就看的很清楚了。

fifolifo使用的是std::dequeprio使用的是std::priority_queue,其他部分代码没什么好说的了。

pool_adaptors.hpp

对全局schedule函数的几种封装。

future.hpp

好像thread子库也有future,但不清楚是否是一样的内容。

threadpoolfuture是为了封装异步函数调用返回值实现的。

简单点理解,就是schedule任务的时候,把一个指针在两者间绑定起来,后面就可以通过future来获取返回值了。

当然,获取返回值的过程应该是阻塞的,任务未完成时只能wait

locking_ptr.hpp

LockingPtr的简单封装,具体可googlevolatile - Multithreaded Programmer's Best Friend》。

threadpool大量使用了volatile关键字,所以需要LockingPtr保护。

scope_guard.hpp

对函数对象的封装,利用C++析构函数时调用一个在构造函数时绑定的函数对象。

worker_thread.hpp

对工作线程的封装,这个封装不是指底层线程api封装,因为这部分是由boostthread子库提供的。

封装针对的是循环执行task的逻辑函数(线程跑起来就loop run某个函数,从队列中获取task执行,空闲时等待。)

我们重点看的是runcreate_and_attach

这两个函数连起来看,就很清楚了,create_and_attach通过bind方式生成一个thread执行run方法。

run方法中的这条语句就是一个简单的loop操作,

while(m_pool->execute_task()) {}

所以,当execute_task返回值为false时,run函数就结束了,bind该函数的thread也就结束了。

ok,来到这里,有必要简单的把整个调用过程说明一下。

// Create fifo thread pool container with two threads.

pool tp(2);

该操作会调用pool的构造函数

view plaincopy to clipboardprint?

thread_pool(size_t initial_threads = 0)

: m_core(new pool_core_type)

, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))

{

size_policy_type::init(*m_core, initial_threads);

}

thread_pool(size_t initial_threads = 0)

: m_core(new pool_core_type)

, m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))

{

size_policy_type::init(*m_core, initial_threads);

}

由于pimpl模式,所以所有代码都封装在m_core内实现的。

pool默认的线程个数为0,通过size_policy_type::init来初始化。

size_policy_type是一个模板参数,pool对应的是fifo,所以也就是static_size类型了。

//static_size类的init函数

view plaincopy to clipboardprint?

static void init(Pool& pool, size_t const worker_count)

{

pool.resize(worker_count);

}

static void init(Pool& pool, size_t const worker_count)

{

pool.resize(worker_count);

}

//pool_coreresize函数

这个函数有点长,主要是做动态配置线程个数的逻辑操作,create_and_attach也是在这里调用的。

view plaincopy to clipboardprint?

//worker_threadcreate_and_attach函数

static void create_and_attach(shared_ptr<pool_type> const & pool)

{

shared_ptr<worker_thread> worker(new worker_thread(pool));

if(worker)

{

//run是线程的loop函数

worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));

}

}

//worker_threadcreate_and_attach函数

static void create_and_attach(shared_ptr<pool_type> const & pool)

{

shared_ptr<worker_thread> worker(new worker_thread(pool));

if(worker)

{

//run是线程的loop函数

worker->m_thread.reset(new boost::thread(bind(&worker_thread::run, worker)));

}

}

view plaincopy to clipboardprint?

//worker_threadrun函数

void run()

{

scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));

while(m_pool->execute_task()) {} //loop直到返回值为false

notify_exception.disable();

m_pool->worker_destructed(this->shared_from_this());

}

//worker_threadrun函数

void run()

{

scope_guard notify_exception(bind(&worker_thread::died_unexpectedly, this));

while(m_pool->execute_task()) {} //loop直到返回值为false

notify_exception.disable();

m_pool->worker_destructed(this->shared_from_this());

}

//pool_coreexecute_task函数

这个函数有点长,简单点说,就是从队列中获取task然后执行,如果队列为空,则线程需要wait操作。

由于threadpool支持动态resize线程个数,从该函数我们也是可以看出来是如何做到的。

view plaincopy to clipboardprint?

// decrease number of threads if necessary

if(m_worker_count > m_target_worker_count)

{

return false; // terminate worker

}

// decrease number of threads if necessary

if(m_worker_count > m_target_worker_count)

{

return false; // terminate worker

}

pool内部使用了多个整数来记录现在个数,譬如m_worker_countm_target_worker_count

m_worker_count是当前激活运行中的线程个数。

m_target_worker_count是最新动态配置的线程个数。

当个数不匹配时,通过返回false方式结束线程。

// Add some tasks to the pool.

tp.schedule(&first_task);

view plaincopy to clipboardprint?

//thread_poolschedule函数

bool schedule(task_type const & task)

{

return m_core->schedule(task);

}

//pool_coreschedule函数(和execute_task函数强相关)

bool schedule(task_type const & task) volatile

{

locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

if(lockedThis->m_scheduler.push(task))

{

//task成功入队列后,notify_one一个线程。

lockedThis->m_task_or_terminate_workers_event.notify_one();

return true;

}

else

{

return false;

}

}

//thread_poolschedule函数

bool schedule(task_type const & task)

{

return m_core->schedule(task);

}

//pool_coreschedule函数(和execute_task函数强相关)

bool schedule(task_type const & task) volatile

{

locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

if(lockedThis->m_scheduler.push(task))

{

//task成功入队列后,notify_one一个线程。

lockedThis->m_task_or_terminate_workers_event.notify_one();

return true;

}

else

MsoNo

分享到:
评论

相关推荐

    Boost threadpool优先级实例

    Boost库是C++编程语言中的一个流行开源库,提供了丰富的功能,其中包括线程池(Boost.Threadpool)模块。本文将深入探讨如何使用Boost库中的线程池来处理具有优先级的任务,以及普通任务的执行。 首先,我们需要...

    boost threadpool(修复内存泄露后的版本)

    《Boost.Threadpool:内存泄漏修复版详解》 Boost.Threadpool是一个功能强大的线程池库,由Boost库家族提供,用于高效地管理和调度并发任务。在实际应用中,内存泄漏问题可能会严重影响程序的性能和稳定性,因此,...

    基于boost库的threadpool的半官方实现

    boost库就不介绍了,网上很容易查到,这个threadpool 是基于boost库的半官方实现,使用方法请参照博客http://blog.csdn.net/yuguanquan1990/article/details/39899853

    threadpool

    线程池(threadpool)是计算机程序中一种有效的多线程处理形式,它预先创建一组线程,待有任务需要执行时,从线程池中取出一个线程来执行任务,任务完成后,线程并不销毁,而是返回线程池等待下一次的任务分配。...

    boost线程库源码,程序员自用

    Boost线程库是C++开发中的一个强大工具,它为C++标准库提供了额外的线程支持。Boost库本身是一个开源集合,包含了各种各样的高质量、跨平台的C++库,其中线程库(Boost.Thread)是提升C++多线程编程能力的重要组件。...

    深入实践Boost:Boost程序库开发的94个秘笈

    6. **并发和并行编程**:包括 Boost.ThreadPool,线程池管理;Boost.Chrono,时间点和时间段的操作;Boost.Coroutine,实现轻量级协程。 7. **算法和数据结构**:如 Boost.Graph,图算法库;Boost.Range,提供统一...

    boost线程池(thread pool)

    Boost线程池(ThreadPool)是这个库的一部分,它允许开发者有效地管理多个线程,避免频繁创建和销毁线程带来的开销。线程池的概念是预先创建一组线程,当有任务需要执行时,任务会被分配到空闲的线程,而不是每次都...

    ThreadPool:一个简单的C ++线程池实现

    C ++中的线程池 描述 这是C ++中的简单ThreadPool实现。 此实现提供以下功能: 每个池实例的可配置线程数。 动态线程数修改...Boost 1.48.0(用于boost :: functions) GNU Make 3.81或更高版本 G ++ 4.7.2

    Learning.Boost.Cplusplus.Libraries.1783551216

    Harness the power of Python to analyze data and create insightful predictive models About This Book Learn data mining in practical terms, using a wide variety of libraries and techniques ...

    ThreadPool

    在C++编程中,实现线程池可以通过标准库中的`std::thread`,或者使用第三方库如Boost.Thread。线程池的设计通常包括以下几个核心部分: 1. **线程池初始化**:创建一定数量的工作线程,并将它们设置为等待状态。 2....

    boost 线程池源代码文档

    - Boost线程池库提供了一个名为`boost::threadpool`的命名空间,其中包含`thread_pool`类,它是线程池的核心实现。 - `thread_pool`类包含了任务队列和工作线程的管理。用户可以通过向线程池提交任务(通常是一个...

    threadpool-0_2_5-src

    《深入理解Boost线程池:threadpool-0_2_5-src源码解析》 Boost库,作为C++的一个重要扩展库,为开发者提供了许多强大的工具和类库,其中包括线程池实现——threadpool。线程池是多线程编程中的一个关键概念,通过...

    boost测试代码

    1. **线程池(Threadpool)**:线程池是一种多线程编程的设计模式,它预先创建了一组线程,当有任务需要执行时,不再直接创建新线程,而是从线程池中取出一个闲置的线程来执行任务。这样可以减少线程创建和销毁的...

    boost 线程池源代码

    在`threadpool-0_2_5-src`这个压缩包中,包含了线程池的源代码,你可以通过阅读源码了解其内部实现细节,如线程池的初始化、任务的提交和执行、线程的管理和同步机制等。这对于深入理解线程池的工作原理以及学习如何...

    c++实现的线程池源代码threadpool

    在标题提到的"C++实现的线程池源代码threadpool"中,我们可以看到三个压缩包文件:`boost_1_32_0.rar`、`ThreadPoolDemo.rar`、`thread_pool.rar`,它们可能包含了不同实现线程池的C++源代码。 首先,`boost_1_32_0...

    simple-threadpool:基于Boost.Thread的简单线程池

    通过这个"simple-threadpool"项目,你可以学习到如何使用Boost.Thread库来构建一个基本的线程池,并了解线程池在并发编程中的作用和实现原理。同时,这也是理解和实践C++多线程编程的一个很好的起点。

    Windows 线程池的VS2010 C++ 实现

    Windows下貌似都没有一个比较好用的线程池,至少我没有找到,因此,我就利用函数模板,类模板实现了一个类似于boost的threadpool的一个类。具体,大家可以下载下来了解下,另外,这个事VS2010的工程,不过线程池的...

    threadpool.rar

    Boost库,一个流行的C++工具集,提供了对线程池的支持,使得开发者能够轻松地利用多核处理器的优势,同时保持程序的稳定性和高性能。 线程池的工作原理在于预先创建一组线程,而不是在需要时动态创建。当任务到来时...

    Boost_86799_VC2013_X86

    10. **Threadpool**:管理一组线程,实现高效的并行任务调度。 11. **TypeErasure**:允许创建类型擦除的对象,实现类似C++模板的功能,但不需知道实际类型。 标签 "c++" 和 "boost" 明确指出这是与C++语言和Boost...

Global site tag (gtag.js) - Google Analytics