注:本文主要内容摘自笔者所著的《多核计算与程序设计》一书,略有修改,后续还会继续发布系列文章,如有需要,可以考虑将一下地址加入到您的浏览器收藏夹中:http://software.intel.com/zh-cn/blogs/category/multicore/。
1、基本思想
动态任务调度可以将一系列分解好的任务进行并行运行,并取得一定程度的负载均衡。动态任务调度的最大作用就是用它来做并行计算。动态任务调度有多种方法,一般可以使用分布式队列【1】来实现,下面讲解一种最简单的嵌套型任务调度的实现方法。
对于嵌套型任务,通常都有一个或多个开始任务,其他任务的产生都源于这些开始任务。
调度的方法为,每个线程由一个本地队列,另外由一个所有线程共享的队列。当每个线程产生n个新任务后,先检查本地队列是否为空,如果为空,则放入一个任务到本地队列中。然后检查共享队列是否满,如果未满则将其他任务放入共享队列中,否则放入到本地队列中。
上面这个调度方法实际上和CDistributeQueue【1】中的进队操作方法是一样的,因此可以使用CDistributeQueue来实现嵌套型动态任务的调度。
一般来说,嵌套型动态任务调度会遇到以下一些问题:
- 1、 初始时可能只有一个任务运行,此种情况下只能有一个线程运行,其他线程必须挂起。当动态任务产生后,需要唤醒挂起的线程进行执行。
- 2、 由于每个任务中会产生新的任务,因此每个任务既是消费者,同时也是生产者。在操作本地队列时,比非嵌套型任务调度更加方便,如何将本地队列的操作最大化是首要考虑的问题。
根据上面的思想,下面设计一个CNestTaskScheduler类来实现对嵌套型动态任务的调度。
2、CNestTaskScheduler类的设计和实现
CNestTaskScheduler类的定义如下:
class CNestTaskScheduler {
private:
CThreadPool m_ThreadPool;//(TaskScheduler_StartFunc, NULL, 0);
CDistributedQueue<TASK, CLocalQueue<TASK>, CStealQueue<TASK>> m_DQueue;
THREADFUNC m_StartFunc; //为线程池使用的线程入口函数指针
LONG volatile m_lTaskId; //Task Id,用于判断是否唤醒对应的线程
public:
CNestTaskScheduler();
virtual ~CNestTaskScheduler();
//下面两个函数为调度器本身直接使用
void SetStartFunc(THREADFUNC StartFunc);
int GetTask(TASK &Task);
CThreadPool & GetThreadPool();
LONG AtomicIncrementTaskId();
//下面三个函数为调度器的使用者使用
void SpawnLocalTask(TASK &Task);
void SpawnTask(TASK &Task);
void BeginRootThread(TASK &Task);
};
类中的主要三个接口为
void SpawnLocalTask(TASK &Task);
void SpawnTask(TASK &Task);
void BeginRootThread(TASK &Task);
SpawnLocalTask()的主要作用是将动态生成的任务放入线程的本地队列中;SpawnTask()的作用是将动态产生的任务放入分布式队列中,当然任务有可能被放入本地队列,也有可能被放入共享队列中;BeginRootThread()的作用是启动初始的任务。
1) BeginRootTask()的处理流程
BeginRootTask()的处理流程较简单,它先创建线程池,接着将一个原始任务放入到第0个线程的本地队列中,然后执行第0个线程,最后等待所有线程执行完。处理流程如下图所示:
图1 嵌套型任务BeginRootTask()处理流程图
BeginRootTask()的代码如下:
/** 嵌套任务调度的开始根线程函数
@param TASK &Task - 要执行的最初任务
@return void - 无
*/
void CNestTaskScheduler::BeginRootThread(TASK &Task)
{
m_lTaskId = 0;
m_ThreadPool.CreateThreadPool(m_StartFunc, this, 0);
m_DQueue.PushToLocalQueue(Task, 0);
m_ThreadPool.ExecThread( 0 );
m_ThreadPool.WaitAllThread();
}
BeginRootTask()执行后,只有第0个线程被执行了,线程池中的其他线程都是处于挂起状态。实际上在第0个线程的处理过程中,它会继续调用SpawnTask(),SpawnTask()中需要判断是否有线程被挂起,如果有则需要唤醒挂起的线程,下面就来看看SpawnTask()的详细处理过程。
2) SpawnTask()的处理流程
SpawnTask()的功能主要是将任务放入到分布式队列中。由于在BeginRootThread()中只执行了第0个线程,其他线程都处于挂起状态,因此这个函数中还需要唤醒其他被挂起的线程,整个处理流程如下图所示:
图2 嵌套型任务SpawnLocalTask()处理流程图
根据上面的处理流程,SpawnLocalTask()的代码实现如下:
/** 嵌套任务调度的生成任务函数
生成的任务被放入到分布式队列中
@param TASK &Task - 待执行的任务
@return void - 无
*/
void CNestTaskScheduler::SpawnTask(TASK &Task)
{
if ( m_lTaskId < m_ThreadPool.GetThreadCount() )
{
//依次唤醒各个挂起的线程
LONG Id = AtomicIncrement(&m_lTaskId);
if ( Id < m_ThreadPool.GetThreadCount() )
{
//下面之所以可以对其他线程的本地队列进行无同步的操作,是因为
// 访问这些队列的线程在进队操作之后才开始运行
m_DQueue.PushToLocalQueue(Task, Id);
m_ThreadPool.ExecThread(Id);
}
else
{
m_DQueue.EnQueue(Task);
}
}
else
{
//先判断偷取队列是否满,如果未满则放入偷取队列中
//如果满了则放入本地队列中
m_DQueue.EnQueue(Task);
}
};
在处理唤醒其他线程的过程中,采用了原子操作来实现,当变量m_lTaskId的值小于给定线程数量时,表明还有线程被挂起,因此将任务放入对应被挂起线程的本地队列中,然后再唤醒并执行对应被挂起的线程。
当任务被放入分布式队列后,线程池中的各个线程是如何处理分布式队列中的任务的呢?下面就来看看线程池的入口函数的处理过程。
3、CNestTaskScheduler使用方法
注:完整的CNestTaskScheduler的源代码,请到CAPI开源项目进行下载,下载地址为:http://gforge.osdn.net.cn/projects/capi
下面以一个区间递归分拆为例讲解如何使用CNestTaskScheduler。首先需要写一个任务处理入口函数,代码如下:
struct RANGE {
int begin;
int end;
};
CNestTaskScheduler *pTaskSched = NULL;
/** 任务处理入口函数
将一个大的区间均分成两个更小的区间
@param void *args - 参数,实际为RANGE类型
@return unsigned int WINAPI - 总是返回CAPI_SUCCESS
*/
unsigned int WINAPI RootTask(void *args)
{
RANGE *p = (RANGE *)args;
if ( p != NULL )
{
printf("Range: %ld - %ld\n", p->begin, p->end);
if ( p->end - p->begin < 128 )
{
//当区间大小小于时,不再进行分拆
delete p;
return 0;
}
int mid = (p->begin + p->end + 1) / 2;
RANGE *range1, *range2;
range1 = new RANGE;
range2 = new RANGE;
range1->begin = p->begin;
range1->end = mid - 1;
range2->begin = mid;
range2->end = p->end;
TASK t1, t2;
t1.pArg = range1;
t2.pArg = range2;
t1.func = RootTask;
t2.func = RootTask;
pTaskSched->SpawnLocalTask(t1);
pTaskSched->SpawnTask(t2);
delete p;
}
return 1;
}
任务处理函数RootTask()中,先将一个大区间拆分成两个更小的区间,然后将每个区间看成一个新的任务,得到两个新的任务t1、t2,然后调用SpawnLocalTask()将任务t1放进任务调度器的分布式队列的本地队列中。如果拆分后的区间小于给定的大小,就不再分拆。
下面的代码演示了如何调用CNestTaskScheduler类来对一个0~1023的区间进行并行拆分。
void main(void)
{
TASK task;
RANGE *pRange = new RANGE;
pRange->begin = 0;
pRange->end = 1023;
task.func = RootTask;
task.pArg = pRange;
pTaskSched = new CNestTaskScheduler;
pTaskSched->BeginRootThread(task);
delete pTaskSched;
}
上面程序执行后,打印的结果如下,从打印结果可以看出整个程序执行中进行的分拆过程。
Range: 0 - 1023
Range: 0 - 511
Range: 512 - 1023
Range: 0 - 255
Range: 512 - 767
Range: 0 - 127
Range: 512 - 639
Range: 256 - 511
Range: 768 - 1023
Range: 256 - 383
Range: 768 - 895
Range: 128 - 255
Range: 640 - 767
Range: 384 - 511
Range: 896 – 1023
当然,我们需要用任务调度来实现并行计算,下面就来讲一个具体的用任务调度进行并行快速排序的实例。
3) 线程池入口函数处理流程
线程池入口函数的处理在一个循环中进行,每次循环中,从分布式队列中获取任务,然后执行任务的启动入口函数,如果从分布式队列中获取任务失败,则认为所有任务被处理完,此时需要判断是否还有挂起的线程,有则需要将挂起线程执行起来让其退出,然后退出循环并结束当前线程。
图3 线程池入口函数处理流程图
/** 嵌套任务调度的获取任务函数
@param TASK &Task - 接收从分布式队列中获取的任务
@return int - 成功返回CAPI_SUCCESS, 失败返回CAPI_FAILED.
*/
int CNestTaskScheduler::GetTask(TASK &Task)
{
//先从本地队列获取任务
//本地获取任务失败后从偷取队列获取任务
return m_DQueue.DeQueue(Task);
};
/** 嵌套任务调度的线程池入口函数
@param void *pArgs - CNestTaskScheduler类型的参数
@return unsigned int WINAPI - 返回
*/
unsigned int WINAPI NestTaskScheduler_StartFunc(void *pArgs)
{
CNestTaskScheduler *pSched = (CNestTaskScheduler *)pArgs;
TASK Task;
int nRet;
for ( ;; )
{
nRet = pSched->GetTask(Task);
if ( nRet == CAPI_FAILED )
{
CThreadPool &ThreadPool = pSched->GetThreadPool();
// 唤醒一个挂起的线程,防止任务数量小于CPU核数时,
// 仍然有任务处于挂起状态,从而导致WaitAllThread()处于死等状态
// 这个唤醒过程是一个串行的过程,被唤醒的任务会继续唤醒一个挂起线程
LONG Id = pSched->AtomicIncrementTaskId();
if ( Id < ThreadPool.GetThreadCount() )
{
ThreadPool.ExecThread(Id);
}
break;
}
(*(Task.func))(Task.pArg);
}
return 0;
}
在上面的线程入口处理函数NestTaskScheduler_StartFunc()中,当获取任务失败时,表明所有任务都处理完毕。此时需要考虑一种特殊情况,即任务总数量小于线程数量的情况。由于线程池CThreadPool采用预创建线程的方法,所有预创建的线程初始处于挂起状态,获取任务失败后,可能还有若干线程没有被分配到任务,仍然处于挂起状态。必须将这些挂起的任务恢复执行让其退出,否则WaitAllThread()函数将处于死等状态。
NestTaskScheduler_StartFunc()在处理唤醒挂起的线程的方法是逐个唤醒的方法,当有某个执行线程获取任务失败后,它先唤醒一个被挂起的线程,然后这个被唤醒的线程执行后,它也会执行NestTaskScheduler_StartFunc()函数,当然它获取任务会失败,接着它也会唤醒一个被挂起的线程,这样一直下去,所有被挂起线程都会被唤醒并被退出。
分享到:
相关推荐
在多核处理器系统中,任务调度算法是系统性能的关键所在。基于改进EDF(Earliest Deadline First, earliest deadline first)的多核处理器混合任务调度算法,是一种高效的任务调度方法。在这篇文章中,我们将详细...
为更好地解决多核系统实时任务调度问题,针对基本蚁群算法求解最短路径过程中容易陷入局部最优的情况,对基本蚁群算法进行了改进。改进算法根据系统的实际情况对概率选择公式做出调整,同时根据相应策略对信息素进行...
本文主要从操作系统层面出发,探讨了处理结构变化下的多核任务调度。系统采用了混合调度策略,包括簇间独立调度和簇内统一调度。这种策略允许在不同层次上进行资源管理和任务分配,以实现最佳性能。 调度模式是多核...
《多核处理器中任务调度与负载均衡的研究》这篇文章聚焦于多核处理器环境下如何有效地进行任务分配和调度,以实现系统的高效运行和负载均衡。多核处理器由于其并行计算能力的提升,已经成为现代计算机系统的核心组成...
总的来说,这篇论文提供的混合粒子群算法在异构多核处理器的任务调度中展现出优越的性能,为优化多核处理器平台的任务分配策略提供了新的思路,对于提升数据中心、高性能计算以及嵌入式系统等领域的数据处理能力具有...
异构多核处理器的任务调度算法.pdf
而“重定时有向无环图”则为任务调度提供了高效的建模工具。 总结起来,本文通过深入研究多核处理器系统节能调度技术,特别是结合DVFS和DAG模型,提出了新的节能策略,为嵌入式系统的能效提升提供了新的思路。随着...
面向多核多任务场景的Linux任务调度算法设计 Linux操作系统中,任务调度算法是影响系统性能的关键组件之一。在多核多任务场景下,Linux任务调度算法面临着实时响应不足的问题。本文提出了基于GAS模型的改进任务调度...
针对多核处理器在调度多个任务时效率不高的问题,提出了一种基于粒子群优化算法的嵌入式多核多线程系统任务调度算法,用来找寻任务调度过程中的最优解,以求取任务的最短完成时间。在算法中通过针对多核多线程任务...
提出了基于粒子群优化算法的嵌入式多核多线程系统任务调度算法,用于寻找任务调度过程中的最优解,以求取任务的最短完成时间。在算法中,通过针对多核多线程任务模型而选择粒子群算法的适应度函数,综合利用局部最优...
Linux操作系统在多核任务调度中,为每个核心建立活动就绪队列,并将任务分为140个优先级,其中100个供实时任务使用,40个供普通用户任务。每个任务有时间片,用完后会移到等待队列重新分配。当一个核心的活动队列...
本文将深入探讨由裴颂文、宁静和张俊格等人提出的针对CPU-GPU异构多核系统的动态任务调度算法,该算法旨在解决负载均衡问题,提高系统效率。 首先,理解CPU-GPU异构计算系统的基本概念至关重要。CPU擅长于执行复杂...
针对此问题,提出了一种CPU-GPU异构多核系统的动态任务调度算法。该算法充分利用CPU的线程资源和GPU的计算资源,准确测量CPU和GPU的计算能力,从而动态调整分配到CPU和GPU上的数据块大小,减小负载的总执行时间,...
高实时性异构多核处理器任务调度算法.pdf 本论文提出了一种高实时性任务调度算法,名为HRS A(High Real-time Scheduling Algorithm),旨在解决异构多核处理器环境下的任务调度问题。该算法通过融合Min-Min算法、L...
针对此问题,提出了一种CPU-GPU异构多核系统的动态任务调度算法。该算法充分利用CPU的线程资源和GPU的计算资源,准确测量CPU和GPU的计算能力,从而动态调整分配到CPU和GPU上的数据块大小,减小负载的总执行时间,...
ARINC653分区操作系统多核处理器任务调度设计 本文主要讨论了ARINC653分区操作系统多核处理器任务调度设计问题。ARINC653是一种标准的实时操作系统,广泛应用于航空电子设备中。随着多核处理器的发展,如何高效地...