注:本文主要內(nèi)容摘自筆者所著的《多核計(jì)算與程序設(shè)計(jì)》一書,略有修改,后續(xù)還會繼續(xù)發(fā)布系列文章,如有需要,可以考慮將一下地址加入到您的瀏覽器收藏夾中:http://software.intel.com/zh-cn/blogs/category/multicore/。
1、基本思想
動態(tài)任務(wù)調(diào)度可以將一系列分解好的任務(wù)進(jìn)行并行運(yùn)行,并取得一定程度的負(fù)載均衡。動態(tài)任務(wù)調(diào)度的最大作用就是用它來做并行計(jì)算。動態(tài)任務(wù)調(diào)度有多種方法,一般可以使用分布式隊(duì)列【1】來實(shí)現(xiàn),下面講解一種最簡單的嵌套型任務(wù)調(diào)度的實(shí)現(xiàn)方法。
對于嵌套型任務(wù),通常都有一個(gè)或多個(gè)開始任務(wù),其他任務(wù)的產(chǎn)生都源于這些開始任務(wù)。
調(diào)度的方法為,每個(gè)線程由一個(gè)本地隊(duì)列,另外由一個(gè)所有線程共享的隊(duì)列。當(dāng)每個(gè)線程產(chǎn)生n個(gè)新任務(wù)后,先檢查本地隊(duì)列是否為空,如果為空,則放入一個(gè)任務(wù)到本地隊(duì)列中。然后檢查共享隊(duì)列是否滿,如果未滿則將其他任務(wù)放入共享隊(duì)列中,否則放入到本地隊(duì)列中。
上面這個(gè)調(diào)度方法實(shí)際上和CDistributeQueue【1】中的進(jìn)隊(duì)操作方法是一樣的,因此可以使用CDistributeQueue來實(shí)現(xiàn)嵌套型動態(tài)任務(wù)的調(diào)度。
一般來說,嵌套型動態(tài)任務(wù)調(diào)度會遇到以下一些問題 :
- 1、 初始時(shí)可能只有一個(gè)任務(wù)運(yùn)行,此種情況下只能有一個(gè)線程運(yùn)行,其他線程必須掛起。當(dāng)動態(tài)任務(wù)產(chǎn)生后,需要喚醒掛起的線程進(jìn)行執(zhí)行。
- 2、 由于每個(gè)任務(wù)中會產(chǎn)生新的任務(wù),因此每個(gè)任務(wù)既是消費(fèi)者,同時(shí)也是生產(chǎn)者。在操作本地隊(duì)列時(shí),比非嵌套型任務(wù)調(diào)度更加方便,如何將本地隊(duì)列的操作最大化是首要考慮的問題。
根據(jù)上面的思想,下面設(shè)計(jì)一個(gè)CNestTaskScheduler類來實(shí)現(xiàn)對嵌套型動態(tài)任務(wù)的調(diào)度。
2、CNestTaskScheduler類的設(shè)計(jì)和實(shí)現(xiàn)
CNestTaskScheduler類的定義如下:
class CNestTaskScheduler {
private:
??? CThreadPool???? m_ThreadPool;//(TaskScheduler_StartFunc, NULL, 0);
??? CDistributedQueue<TASK, CLocalQueue<TASK>, CStealQueue<TASK>> m_DQueue;
??? THREADFUNC????? m_StartFunc;? //為線程池使用的線程入口函數(shù)指針
??? LONG? volatile? m_lTaskId;??? //Task Id,用于判斷是否喚醒對應(yīng)的線程
?
public:
??? CNestTaskScheduler();
??? virtual ~CNestTaskScheduler();
?
??? //下面兩個(gè)函數(shù)為調(diào)度器本身直接使用
??? void SetStartFunc(THREADFUNC StartFunc);
int GetTask(TASK &Task);
??? CThreadPool & GetThreadPool();
??? LONG AtomicIncrementTaskId();
?
??? //下面三個(gè)函數(shù)為調(diào)度器的使用者使用
??? void SpawnLocalTask(TASK &Task);
??? void SpawnTask(TASK &Task);
??? void BeginRootThread(TASK &Task);
};
類中的主要三個(gè)接口為
??? void SpawnLocalTask(TASK &Task);
??? void SpawnTask(TASK &Task);
void BeginRootThread(TASK &Task);
?
SpawnLocalTask()的主要作用是將動態(tài)生成的任務(wù)放入線程的本地隊(duì)列中;SpawnTask()的作用是將動態(tài)產(chǎn)生的任務(wù)放入分布式隊(duì)列中,當(dāng)然任務(wù)有可能被放入本地隊(duì)列,也有可能被放入共享隊(duì)列中;BeginRootThread()的作用是啟動初始的任務(wù)。
1) BeginRootTask() 的處理流程
BeginRootTask() 的處理流程較簡單,它先創(chuàng)建線程池,接著將一個(gè)原始任務(wù)放入到第 0 個(gè)線程的本地隊(duì)列中,然后執(zhí)行第 0 個(gè)線程,最后等待所有線程執(zhí)行完。處理流程如下圖所示:
?
?
? 圖 1 嵌套型任務(wù) BeginRootTask() 處理流程圖
BeginRootTask() 的代碼如下:
/** ?? 嵌套任務(wù)調(diào)度的開始根線程函數(shù)
?
???????? @param ? TASK &Task - 要執(zhí)行的最初任務(wù)
???????? @return ? void - 無 ?
*/
void CNestTaskScheduler::BeginRootThread(TASK &Task)
{
??? m_lTaskId = 0;
?
??? m_ThreadPool . CreateThreadPool ( m_StartFunc , this, 0);
??? m_DQueue . PushToLocalQueue ( Task , 0);
?? ? m_ThreadPool . ExecThread ( 0 ); ?
??? m_ThreadPool . WaitAllThread ();
}
?
BeginRootTask() 執(zhí)行后,只有第 0 個(gè)線程被執(zhí)行了,線程池中的其他線程都是處于掛起狀態(tài)。實(shí)際上在第 0 個(gè)線程的處理過程中,它會繼續(xù)調(diào)用 SpawnTask() , SpawnTask() 中需要判斷是否有線程被掛起,如果有則需要喚醒掛起的線程,下面就來看看 SpawnTask() 的詳細(xì)處理過程。
2) SpawnTask() 的處理流程
SpawnTask() 的功能主要是將任務(wù)放入到分布式隊(duì)列中。由于在 BeginRootThread() 中只執(zhí)行了第 0 個(gè)線程,其他線程都處于掛起狀態(tài),因此這個(gè)函數(shù)中還需要喚醒其他被掛起的線程,整個(gè)處理流程如下圖所示:
? 圖 2 嵌套型任務(wù) SpawnLocalTask() 處理流程圖
根據(jù)上面的處理流程, SpawnLocalTask() 的代碼實(shí)現(xiàn)如下:
/** ?? 嵌套任務(wù)調(diào)度的生成任務(wù)函數(shù)
??? 生成的任務(wù)被放入到分布式隊(duì)列中
?
???????? @param ? TASK &Task - 待執(zhí)行的任務(wù)
???????? @return ? void - 無 ?
*/
void CNestTaskScheduler::SpawnTask(TASK &Task)
{
??? if ( m_lTaskId < m_ThreadPool . GetThreadCount () )
??? {
??????? // 依次喚醒各個(gè)掛起的線程
??????? LONG Id = AtomicIncrement (& m_lTaskId );
??????? if ( Id < m_ThreadPool . GetThreadCount () )
??????? {
??????????? // 下面之所以可以對其他線程的本地隊(duì)列進(jìn)行無同步的操作,是因?yàn)?
??????????? // 訪問這些隊(duì)列的線程在進(jìn)隊(duì)操作之后才開始運(yùn)行
??????????? m_DQueue . PushToLocalQueue ( Task , Id );
??????????? m_ThreadPool . ExecThread ( Id );
??????? }
??????? else
??????? {
??????????? m_DQueue . EnQueue ( Task );
??????? }
??? }
??? else
??? {
??????? // 先判斷偷取隊(duì)列是否滿,如果未滿則放入偷取隊(duì)列中
??????? // 如果滿了則放入本地隊(duì)列中
??????? m_DQueue . EnQueue ( Task );
??? }
};
在處理喚醒其他線程的過程中,采用了原子操作來實(shí)現(xiàn),當(dāng)變量 m_lTaskId 的值小于給定線程數(shù)量時(shí),表明還有線程被掛起,因此將任務(wù)放入對應(yīng)被掛起線程的本地隊(duì)列中,然后再喚醒并執(zhí)行對應(yīng)被掛起的線程。
當(dāng)任務(wù)被放入分布式隊(duì)列后,線程池中的各個(gè)線程是如何處理分布式隊(duì)列中的任務(wù)的呢?下面就來看看線程池的入口函數(shù)的處理過程。
?
?
?
?
3、 CNestTaskScheduler 使用方法
注:完整的 CNestTaskScheduler 的源代碼,請到 CAPI 開源項(xiàng)目進(jìn)行下載,下載地址為: http://gforge.osdn.net.cn/projects/capi
?
下面以一個(gè)區(qū)間遞歸分拆為例講解如何使用 CNestTaskScheduler 。首先需要寫一個(gè)任務(wù)處理入口函數(shù),代碼如下:
struct RANGE {
??? int begin ;
??? int end ;
};
?
CNestTaskScheduler ? * pTaskSched = NULL ;
?
/** ?? 任務(wù)處理入口函數(shù)
???????? 將一個(gè)大的區(qū)間均分成兩個(gè)更小的區(qū)間
?
???????? @param ? void *args - 參數(shù),實(shí)際為 RANGE 類型 ??????
???????? @return ? unsigned int WINAPI - 總是返回 CAPI_SUCCESS ???
*/
unsigned int WINAPI RootTask (void * args )
{
??? RANGE ? * p = ( RANGE *) args ;
??? if ( p != NULL )
??? {
????? ? ? printf ( "Range: %ld - %ld\n" , p -> begin , p -> end );
??????? if ( p -> end - p -> begin < 128 )
??????? {
??????????? // 當(dāng)區(qū)間大小小于時(shí),不再進(jìn)行分拆
??????????? delete p ;
??????????? return 0;
??????? }
??????? int mid = ( p -> begin + p -> end + 1) / 2;
??????? RANGE * range1 , * range2 ;
?
??????? range1 = new RANGE ;
??????? range2 = new RANGE ;
?
????? ?? range1 -> begin = p -> begin ;
??????? range1 -> end = mid - 1;
??????? range2 -> begin = mid ;
??????? range2 -> end = p -> end ;
?
??????? TASK t1 , t2 ;
??????? t1 . pArg = range1 ;
??????? t2 . pArg = range2 ;
??????? t1 . func = RootTask ;
??????? t2 . func = RootTask ;
?
?????? ? pTaskSched -> SpawnLocalTask ( t1 );
??????? pTaskSched -> SpawnTask ( t2 );
?
??????? delete p ;
??? }
??? return 1;
}
?
任務(wù)處理函數(shù) RootTask() 中,先將一個(gè)大區(qū)間拆分成兩個(gè)更小的區(qū)間,然后將每個(gè)區(qū)間看成一個(gè)新的任務(wù),得到兩個(gè)新的任務(wù) t1 、 t2 ,然后調(diào)用 SpawnLocalTask() 將任務(wù) t1 放進(jìn)任務(wù)調(diào)度器的分布式隊(duì)列的本地隊(duì)列中。如果拆分后的區(qū)間小于給定的大小,就不再分拆。
下面的代碼演示了如何調(diào)用 CNestTaskScheduler 類來對一個(gè) 0 ~ 1023 的區(qū)間進(jìn)行并行拆分。
void main ( void )
{
?
??? TASK ??? task ;
??? RANGE ?? * pRange = new RANGE ;
?
??? pRange -> begin = 0;
??? pRange -> end = 1023;
?
??? task.func = RootTask ;
??? task . pArg = pRange ;
?
??? pTaskSched = new CNestTaskScheduler ;
???
??? pTaskSched -> BeginRootThread ( task );
?
??? delete pTaskSched ;
?
}
上面程序執(zhí)行后,打印的結(jié)果如下,從打印結(jié)果可以看出整個(gè)程序執(zhí)行中進(jìn)行的分拆過程。
Range: 0 - 1023
Range: 0 - 511
Range: 512 - 1023
Range: 0 - 255
Range: 512 - 767
Range: 0 - 127
Range: 512 - 639
Range: 256 - 511
Range: 768 - 1023
Range: 256 - 383
Range: 768 - 895
Range: 128 - 255
Range: 640 - 767
Range: 384 - 511
Range: 896 – 1023
?
當(dāng)然,我們需要用任務(wù)調(diào)度來實(shí)現(xiàn)并行計(jì)算,下面就來講一個(gè)具體的用任務(wù)調(diào)度進(jìn)行并行快速排序的實(shí)例。
?
?
3) 線程池入口函數(shù)處理流程
線程池入口函數(shù)的處理在一個(gè)循環(huán)中進(jìn)行,每次循環(huán)中,從分布式隊(duì)列中獲取任務(wù),然后執(zhí)行任務(wù)的啟動入口函數(shù),如果從分布式隊(duì)列中獲取任務(wù)失敗,則認(rèn)為所有任務(wù)被處理完,此時(shí)需要判斷是否還有掛起的線程,有則需要將掛起線程執(zhí)行起來讓其退出,然后退出循環(huán)并結(jié)束當(dāng)前線程。
? 圖 3 ? 線程池入口函數(shù)處理流程圖
?
/** ?? 嵌套任務(wù)調(diào)度的獲取任務(wù)函數(shù)
?
???????? @param ? TASK &Task - 接收從分布式隊(duì)列中獲取的任務(wù)
???????? @return ? int - 成功返回 CAPI_SUCCESS, 失敗返回 CAPI_FAILED. ??????
*/
int CNestTaskScheduler::GetTask(TASK &Task)
{
??? // 先從本地隊(duì)列獲取任務(wù)
??? // 本地獲取任務(wù)失敗后從偷取隊(duì)列獲取任務(wù)
??? return m_DQueue . DeQueue ( Task );
};
?
/** ?? 嵌套任務(wù)調(diào)度的線程池入口函數(shù)
?
???????? @param ? void *pArgs - CNestTaskScheduler 類型的參數(shù) ?????
???????? @return ? unsigned int WINAPI - 返回 ???
*/
unsigned int WINAPI NestTaskScheduler_StartFunc(void *pArgs)
{
??? CNestTaskScheduler ? * pSched = ( CNestTaskScheduler *) pArgs ;
?
??? TASK ??? Task ;
??? int ???? nRet ;
?
??? for ( ;; )
??? {
??????? nRet = pSched -> GetTask ( Task );
??????? if ( nRet == CAPI_FAILED )
??????? {
??????????? CThreadPool &ThreadPool = pSched->GetThreadPool();
???????????
??????????? // 喚醒一個(gè)掛起的線程 , 防止任務(wù)數(shù)量小于 CPU 核數(shù)時(shí),
??????????? // 仍然有任務(wù)處于掛起狀態(tài) , 從而導(dǎo)致 WaitAllThread() 處于死等狀態(tài)
??????????? // 這個(gè)喚醒過程是一個(gè)串行的過程,被喚醒的任務(wù)會繼續(xù)喚醒一個(gè)掛起線程
??????????? LONG Id = pSched->AtomicIncrementTaskId();
??????????? if ( Id < ThreadPool.GetThreadCount() )
??????????? {
??????????????? ThreadPool.ExecThread(Id);
??????????? }
??????? ???? break;
??????? }
??????? (*( Task . func ))( Task . pArg );
??? }
??? return 0;
}
?
在上面的線程入口處理函數(shù) NestTaskScheduler_StartFunc() 中,當(dāng)獲取任務(wù)失敗時(shí),表明所有任務(wù)都處理完畢。此時(shí)需要考慮一種特殊情況,即任務(wù)總數(shù)量小于線程數(shù)量的情況。由于線程池 CThreadPool 采用預(yù)創(chuàng)建線程的方法,所有預(yù)創(chuàng)建的線程初始處于掛起狀態(tài),獲取任務(wù)失敗后,可能還有若干線程沒有被分配到任務(wù),仍然處于掛起狀態(tài)。必須將這些掛起的任務(wù)恢復(fù)執(zhí)行讓其退出,否則 WaitAllThread() 函數(shù)將處于死等狀態(tài)。
NestTaskScheduler_StartFunc() 在處理喚醒掛起的線程的方法是逐個(gè)喚醒的方法,當(dāng)有某個(gè)執(zhí)行線程獲取任務(wù)失敗后,它先喚醒一個(gè)被掛起的線程,然后這個(gè)被喚醒的線程執(zhí)行后,它也會執(zhí)行 NestTaskScheduler_StartFunc() 函數(shù),當(dāng)然它獲取任務(wù)會失敗,接著它也會喚醒一個(gè)被掛起的線程,這樣一直下去,所有被掛起線程都會被喚醒并被退出。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
