日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

多核中的動態(tài)任務(wù)調(diào)度

系統(tǒng) 1812 0

注:本文主要內(nèi)容摘自筆者所著的《多核計(jì)算與程序設(shè)計(jì)》一書,略有修改,后續(xù)還會繼續(xù)發(fā)布系列文章,如有需要,可以考慮將一下地址加入到您的瀏覽器收藏夾中:http://software.intel.com/zh-cn/blogs/category/multicore/。

1、基本思想

動態(tài)任務(wù)調(diào)度可以將一系列分解好的任務(wù)進(jìn)行并行運(yùn)行,并取得一定程度的負(fù)載均衡。動態(tài)任務(wù)調(diào)度的最大作用就是用它來做并行計(jì)算。動態(tài)任務(wù)調(diào)度有多種方法,一般可以使用分布式隊(duì)列【1】來實(shí)現(xiàn),下面講解一種最簡單的嵌套型任務(wù)調(diào)度的實(shí)現(xiàn)方法。

對于嵌套型任務(wù),通常都有一個(gè)或多個(gè)開始任務(wù),其他任務(wù)的產(chǎn)生都源于這些開始任務(wù)。

調(diào)度的方法為,每個(gè)線程由一個(gè)本地隊(duì)列,另外由一個(gè)所有線程共享的隊(duì)列。當(dāng)每個(gè)線程產(chǎn)生n個(gè)新任務(wù)后,先檢查本地隊(duì)列是否為空,如果為空,則放入一個(gè)任務(wù)到本地隊(duì)列中。然后檢查共享隊(duì)列是否滿,如果未滿則將其他任務(wù)放入共享隊(duì)列中,否則放入到本地隊(duì)列中。

上面這個(gè)調(diào)度方法實(shí)際上和CDistributeQueue【1】中的進(jìn)隊(duì)操作方法是一樣的,因此可以使用CDistributeQueue來實(shí)現(xiàn)嵌套型動態(tài)任務(wù)的調(diào)度。

一般來說,嵌套型動態(tài)任務(wù)調(diào)度會遇到以下一些問題

  • 1、 初始時(shí)可能只有一個(gè)任務(wù)運(yùn)行,此種情況下只能有一個(gè)線程運(yùn)行,其他線程必須掛起。當(dāng)動態(tài)任務(wù)產(chǎn)生后,需要喚醒掛起的線程進(jìn)行執(zhí)行。
  • 2、 由于每個(gè)任務(wù)中會產(chǎn)生新的任務(wù),因此每個(gè)任務(wù)既是消費(fèi)者,同時(shí)也是生產(chǎn)者。在操作本地隊(duì)列時(shí),比非嵌套型任務(wù)調(diào)度更加方便,如何將本地隊(duì)列的操作最大化是首要考慮的問題。

根據(jù)上面的思想,下面設(shè)計(jì)一個(gè)CNestTaskScheduler類來實(shí)現(xiàn)對嵌套型動態(tài)任務(wù)的調(diào)度。

2、CNestTaskScheduler類的設(shè)計(jì)和實(shí)現(xiàn)

CNestTaskScheduler類的定義如下:

class CNestTaskScheduler {

private:

??? CThreadPool???? m_ThreadPool;//(TaskScheduler_StartFunc, NULL, 0);

??? CDistributedQueue<TASK, CLocalQueue<TASK>, CStealQueue<TASK>> m_DQueue;

??? THREADFUNC????? m_StartFunc;? //為線程池使用的線程入口函數(shù)指針

??? LONG? volatile? m_lTaskId;??? //Task Id,用于判斷是否喚醒對應(yīng)的線程

?

public:

??? CNestTaskScheduler();

??? virtual ~CNestTaskScheduler();

?

??? //下面兩個(gè)函數(shù)為調(diào)度器本身直接使用

??? void SetStartFunc(THREADFUNC StartFunc);

int GetTask(TASK &Task);

??? CThreadPool & GetThreadPool();

??? LONG AtomicIncrementTaskId();

?

??? //下面三個(gè)函數(shù)為調(diào)度器的使用者使用

??? void SpawnLocalTask(TASK &Task);

??? void SpawnTask(TASK &Task);

??? void BeginRootThread(TASK &Task);

};

類中的主要三個(gè)接口為

??? void SpawnLocalTask(TASK &Task);

??? void SpawnTask(TASK &Task);

void BeginRootThread(TASK &Task);

?

SpawnLocalTask()的主要作用是將動態(tài)生成的任務(wù)放入線程的本地隊(duì)列中;SpawnTask()的作用是將動態(tài)產(chǎn)生的任務(wù)放入分布式隊(duì)列中,當(dāng)然任務(wù)有可能被放入本地隊(duì)列,也有可能被放入共享隊(duì)列中;BeginRootThread()的作用是啟動初始的任務(wù)。

1) BeginRootTask() 的處理流程

BeginRootTask() 的處理流程較簡單,它先創(chuàng)建線程池,接著將一個(gè)原始任務(wù)放入到第 0 個(gè)線程的本地隊(duì)列中,然后執(zhí)行第 0 個(gè)線程,最后等待所有線程執(zhí)行完。處理流程如下圖所示:

?

?

? 1 嵌套型任務(wù) BeginRootTask() 處理流程圖

BeginRootTask() 的代碼如下:

/** ?? 嵌套任務(wù)調(diào)度的開始根線程函數(shù)

?

???????? @param ? TASK &Task - 要執(zhí)行的最初任務(wù)

???????? @return ? void - ?

*/

void CNestTaskScheduler::BeginRootThread(TASK &Task)

{

??? m_lTaskId = 0;

?

??? m_ThreadPool . CreateThreadPool ( m_StartFunc , this, 0);

??? m_DQueue . PushToLocalQueue ( Task , 0);

?? ? m_ThreadPool . ExecThread ( 0 ); ?

??? m_ThreadPool . WaitAllThread ();

}

?

BeginRootTask() 執(zhí)行后,只有第 0 個(gè)線程被執(zhí)行了,線程池中的其他線程都是處于掛起狀態(tài)。實(shí)際上在第 0 個(gè)線程的處理過程中,它會繼續(xù)調(diào)用 SpawnTask() SpawnTask() 中需要判斷是否有線程被掛起,如果有則需要喚醒掛起的線程,下面就來看看 SpawnTask() 的詳細(xì)處理過程。

2) SpawnTask() 的處理流程

SpawnTask() 的功能主要是將任務(wù)放入到分布式隊(duì)列中。由于在 BeginRootThread() 中只執(zhí)行了第 0 個(gè)線程,其他線程都處于掛起狀態(tài),因此這個(gè)函數(shù)中還需要喚醒其他被掛起的線程,整個(gè)處理流程如下圖所示:

?

? 2 嵌套型任務(wù) SpawnLocalTask() 處理流程圖

根據(jù)上面的處理流程, SpawnLocalTask() 的代碼實(shí)現(xiàn)如下:

/** ?? 嵌套任務(wù)調(diào)度的生成任務(wù)函數(shù)

??? 生成的任務(wù)被放入到分布式隊(duì)列中

?

???????? @param ? TASK &Task - 待執(zhí)行的任務(wù)

???????? @return ? void - ?

*/

void CNestTaskScheduler::SpawnTask(TASK &Task)

{

??? if ( m_lTaskId < m_ThreadPool . GetThreadCount () )

??? {

??????? // 依次喚醒各個(gè)掛起的線程

??????? LONG Id = AtomicIncrement (& m_lTaskId );

??????? if ( Id < m_ThreadPool . GetThreadCount () )

??????? {

??????????? // 下面之所以可以對其他線程的本地隊(duì)列進(jìn)行無同步的操作,是因?yàn)?

??????????? // 訪問這些隊(duì)列的線程在進(jìn)隊(duì)操作之后才開始運(yùn)行

??????????? m_DQueue . PushToLocalQueue ( Task , Id );

??????????? m_ThreadPool . ExecThread ( Id );

??????? }

??????? else

??????? {

??????????? m_DQueue . EnQueue ( Task );

??????? }

??? }

??? else

??? {

??????? // 先判斷偷取隊(duì)列是否滿,如果未滿則放入偷取隊(duì)列中

??????? // 如果滿了則放入本地隊(duì)列中

??????? m_DQueue . EnQueue ( Task );

??? }

};

在處理喚醒其他線程的過程中,采用了原子操作來實(shí)現(xiàn),當(dāng)變量 m_lTaskId 的值小于給定線程數(shù)量時(shí),表明還有線程被掛起,因此將任務(wù)放入對應(yīng)被掛起線程的本地隊(duì)列中,然后再喚醒并執(zhí)行對應(yīng)被掛起的線程。

當(dāng)任務(wù)被放入分布式隊(duì)列后,線程池中的各個(gè)線程是如何處理分布式隊(duì)列中的任務(wù)的呢?下面就來看看線程池的入口函數(shù)的處理過程。

?

?

?

?

3、 CNestTaskScheduler 使用方法

注:完整的 CNestTaskScheduler 的源代碼,請到 CAPI 開源項(xiàng)目進(jìn)行下載,下載地址為: http://gforge.osdn.net.cn/projects/capi

?

下面以一個(gè)區(qū)間遞歸分拆為例講解如何使用 CNestTaskScheduler 。首先需要寫一個(gè)任務(wù)處理入口函數(shù),代碼如下:

struct RANGE {

??? int begin ;

??? int end ;

};

?

CNestTaskScheduler ? * pTaskSched = NULL ;

?

/** ?? 任務(wù)處理入口函數(shù)

???????? 將一個(gè)大的區(qū)間均分成兩個(gè)更小的區(qū)間

?

???????? @param ? void *args - 參數(shù),實(shí)際為 RANGE 類型 ??????

???????? @return ? unsigned int WINAPI - 總是返回 CAPI_SUCCESS ???

*/

unsigned int WINAPI RootTask (void * args )

{

??? RANGE ? * p = ( RANGE *) args ;

??? if ( p != NULL )

??? {

????? ? ? printf ( "Range: %ld - %ld\n" , p -> begin , p -> end );

??????? if ( p -> end - p -> begin < 128 )

??????? {

??????????? // 當(dāng)區(qū)間大小小于時(shí),不再進(jìn)行分拆

??????????? delete p ;

??????????? return 0;

??????? }

??????? int mid = ( p -> begin + p -> end + 1) / 2;

??????? RANGE * range1 , * range2 ;

?

??????? range1 = new RANGE ;

??????? range2 = new RANGE ;

?

????? ?? range1 -> begin = p -> begin ;

??????? range1 -> end = mid - 1;

??????? range2 -> begin = mid ;

??????? range2 -> end = p -> end ;

?

??????? TASK t1 , t2 ;

??????? t1 . pArg = range1 ;

??????? t2 . pArg = range2 ;

??????? t1 . func = RootTask ;

??????? t2 . func = RootTask ;

?

?????? ? pTaskSched -> SpawnLocalTask ( t1 );

??????? pTaskSched -> SpawnTask ( t2 );

?

??????? delete p ;

??? }

??? return 1;

}

?

任務(wù)處理函數(shù) RootTask() 中,先將一個(gè)大區(qū)間拆分成兩個(gè)更小的區(qū)間,然后將每個(gè)區(qū)間看成一個(gè)新的任務(wù),得到兩個(gè)新的任務(wù) t1 t2 ,然后調(diào)用 SpawnLocalTask() 將任務(wù) t1 放進(jìn)任務(wù)調(diào)度器的分布式隊(duì)列的本地隊(duì)列中。如果拆分后的區(qū)間小于給定的大小,就不再分拆。

下面的代碼演示了如何調(diào)用 CNestTaskScheduler 類來對一個(gè) 0 1023 的區(qū)間進(jìn)行并行拆分。

void main ( void )

{

?

??? TASK ??? task ;

??? RANGE ?? * pRange = new RANGE ;

?

??? pRange -> begin = 0;

??? pRange -> end = 1023;

?

??? task.func = RootTask ;

??? task . pArg = pRange ;

?

??? pTaskSched = new CNestTaskScheduler ;

???

??? pTaskSched -> BeginRootThread ( task );

?

??? delete pTaskSched ;

?

}

上面程序執(zhí)行后,打印的結(jié)果如下,從打印結(jié)果可以看出整個(gè)程序執(zhí)行中進(jìn)行的分拆過程。

Range: 0 - 1023

Range: 0 - 511

Range: 512 - 1023

Range: 0 - 255

Range: 512 - 767

Range: 0 - 127

Range: 512 - 639

Range: 256 - 511

Range: 768 - 1023

Range: 256 - 383

Range: 768 - 895

Range: 128 - 255

Range: 640 - 767

Range: 384 - 511

Range: 896 – 1023

?

當(dāng)然,我們需要用任務(wù)調(diào)度來實(shí)現(xiàn)并行計(jì)算,下面就來講一個(gè)具體的用任務(wù)調(diào)度進(jìn)行并行快速排序的實(shí)例。

?

?

3) 線程池入口函數(shù)處理流程

線程池入口函數(shù)的處理在一個(gè)循環(huán)中進(jìn)行,每次循環(huán)中,從分布式隊(duì)列中獲取任務(wù),然后執(zhí)行任務(wù)的啟動入口函數(shù),如果從分布式隊(duì)列中獲取任務(wù)失敗,則認(rèn)為所有任務(wù)被處理完,此時(shí)需要判斷是否還有掛起的線程,有則需要將掛起線程執(zhí)行起來讓其退出,然后退出循環(huán)并結(jié)束當(dāng)前線程。

?

? 3 ? 線程池入口函數(shù)處理流程圖

?

/** ?? 嵌套任務(wù)調(diào)度的獲取任務(wù)函數(shù)

?

???????? @param ? TASK &Task - 接收從分布式隊(duì)列中獲取的任務(wù)

???????? @return ? int - 成功返回 CAPI_SUCCESS, 失敗返回 CAPI_FAILED. ??????

*/

int CNestTaskScheduler::GetTask(TASK &Task)

{

??? // 先從本地隊(duì)列獲取任務(wù)

??? // 本地獲取任務(wù)失敗后從偷取隊(duì)列獲取任務(wù)

??? return m_DQueue . DeQueue ( Task );

};

?

/** ?? 嵌套任務(wù)調(diào)度的線程池入口函數(shù)

?

???????? @param ? void *pArgs - CNestTaskScheduler 類型的參數(shù) ?????

???????? @return ? unsigned int WINAPI - 返回 ???

*/

unsigned int WINAPI NestTaskScheduler_StartFunc(void *pArgs)

{

??? CNestTaskScheduler ? * pSched = ( CNestTaskScheduler *) pArgs ;

?

??? TASK ??? Task ;

??? int ???? nRet ;

?

??? for ( ;; )

??? {

??????? nRet = pSched -> GetTask ( Task );

??????? if ( nRet == CAPI_FAILED )

??????? {

??????????? CThreadPool &ThreadPool = pSched->GetThreadPool();

???????????

??????????? // 喚醒一個(gè)掛起的線程 , 防止任務(wù)數(shù)量小于 CPU 核數(shù)時(shí),

??????????? // 仍然有任務(wù)處于掛起狀態(tài) , 從而導(dǎo)致 WaitAllThread() 處于死等狀態(tài)

??????????? // 這個(gè)喚醒過程是一個(gè)串行的過程,被喚醒的任務(wù)會繼續(xù)喚醒一個(gè)掛起線程

??????????? LONG Id = pSched->AtomicIncrementTaskId();

??????????? if ( Id < ThreadPool.GetThreadCount() )

??????????? {

??????????????? ThreadPool.ExecThread(Id);

??????????? }

??????? ???? break;

??????? }

??????? (*( Task . func ))( Task . pArg );

??? }

??? return 0;

}

?

在上面的線程入口處理函數(shù) NestTaskScheduler_StartFunc() 中,當(dāng)獲取任務(wù)失敗時(shí),表明所有任務(wù)都處理完畢。此時(shí)需要考慮一種特殊情況,即任務(wù)總數(shù)量小于線程數(shù)量的情況。由于線程池 CThreadPool 采用預(yù)創(chuàng)建線程的方法,所有預(yù)創(chuàng)建的線程初始處于掛起狀態(tài),獲取任務(wù)失敗后,可能還有若干線程沒有被分配到任務(wù),仍然處于掛起狀態(tài)。必須將這些掛起的任務(wù)恢復(fù)執(zhí)行讓其退出,否則 WaitAllThread() 函數(shù)將處于死等狀態(tài)。

NestTaskScheduler_StartFunc() 在處理喚醒掛起的線程的方法是逐個(gè)喚醒的方法,當(dāng)有某個(gè)執(zhí)行線程獲取任務(wù)失敗后,它先喚醒一個(gè)被掛起的線程,然后這個(gè)被喚醒的線程執(zhí)行后,它也會執(zhí)行 NestTaskScheduler_StartFunc() 函數(shù),當(dāng)然它獲取任務(wù)會失敗,接著它也會喚醒一個(gè)被掛起的線程,這樣一直下去,所有被掛起線程都會被喚醒并被退出。

多核中的動態(tài)任務(wù)調(diào)度


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 海淀区| 方山县| 丰县| 罗甸县| 黄大仙区| 察隅县| 石狮市| 玉田县| 静乐县| 永安市| 潼关县| 柯坪县| 辉南县| 石台县| 辽源市| 博白县| 偏关县| 壤塘县| 隆回县| 德化县| 休宁县| 当涂县| 宁陕县| 白山市| 盐池县| 东丰县| 蒲城县| 拉孜县| 潞城市| 永寿县| 上林县| 中阳县| 湘阴县| 永丰县| 宝坻区| 武功县| 尼勒克县| 谷城县| 保靖县| 万载县| 南京市|