前面
已經(jīng)介紹了ITcp接口,而AgileTcp就是ESFramework給出的ITcp的參考實現(xiàn)。在之前,我曾經(jīng)講解過
模擬完成端口的Tcp組件實現(xiàn)和異步Tcp組件
實現(xiàn),在它們的基礎(chǔ)之上,我更改了處理策略,而形成了AgileTcp,目的是更清晰的結(jié)構(gòu)、更高的效率。這個策略會在后面講到。
Tcp組件主要控制著系統(tǒng)與終端用戶的所有消息的進出,ITcp接口描述了這個組件的外貌,告訴外部如何使用Tcp組件、如何與Tcp組件交互。而從實現(xiàn)的角度來看,我們必須理清Tcp組件的職責(zé):
(1)管理所有已經(jīng)建立的Tcp連接
(2)管理與每個連接相對應(yīng)接收緩沖區(qū)
(3)管理所有的工作者線程
(4)處理長度大于接收緩沖區(qū)的消息
我們來看看如何滿足這些職責(zé)。
由于每個連接都對應(yīng)著一個接收緩沖區(qū),所以可以將它們封裝在一起形成ContextKey(連接上下文):

ContextKey中封裝的是ISafeNetworkStream而不是NetworkStream,原因可
參見這里
。
IsDataManaging屬性表明工作線程是否正在處理本連接對應(yīng)的緩沖區(qū)中的數(shù)據(jù),F(xiàn)irstMessageExist屬性用于標(biāo)志接收到的第一條消息,因為系統(tǒng)可能需要對接收到的第一條消息做一些特殊的處理。
任何時刻,可能都有成千上萬個連接存活著;任何時刻,都可能有新的連接建立、或已有的連接被釋放。所有這些ContextKey對象需要被管理起來,這就是上下文管理器IContextKeyManager:
{
void InsertContextKey(ContextKeycontext_key);
void DisposeAllContextKey();
void RemoveContextKey( int streamHashCode);
ISafeNetworkStreamGetNetStream( int streamHashCode);
int ConnectionCount{ get ;}
ICollectionContextKeyList{ get ;}
event CbSimpleIntStreamCountChanged;
}
說到上下文管理器,先要講講如何標(biāo)志每個不同的上下文了?使用連接字,連接字是Tcp連接的Hashcode,它們與連接一一對應(yīng),并且不會重復(fù)。所以在源碼中你經(jīng)常會看到名為“streamHashCode”的參數(shù)和變量。由于Tcp連接與streamHashCode一一對應(yīng),所以GetNetStream方法的實現(xiàn)就非常簡單。不知道你是否記得,RoundedMessage類中有個ConnectID字段,它就是連接字,與streamHashCode意義一樣。根據(jù)此字段,你可以清楚的知道這個RoundedMessage來自于哪個連接。
關(guān)于工作者線程,很幸運的是,我們可以直接使用.NET提供的后臺線程池,而不必要再去手動管理,這可以省卻一些麻煩。當(dāng)然你也可以使用ThreadPool類,甚至你可以從頭開始實現(xiàn)自己的線程池組件,這也是不困難的。
我經(jīng)常被問到,接收緩沖區(qū)應(yīng)該開辟多大?這取決于你的應(yīng)用,但是有一點是錯不了的――緩沖區(qū)的大小至少要大于消息頭Header的大小,否則麻煩就多了。根據(jù)我的經(jīng)驗,一般緩沖區(qū)的大小至少應(yīng)該能容納所有接收消息中的60%-80%。對于大于緩沖區(qū)大小的消息,ESFramework采用的策略是使用緩沖區(qū)池IBufferPool。
{
byte []RentBuffer( int minSize);
void GivebackBuffer( byte []buffer);
}
通過上面的介紹我們已經(jīng)知道如何滿足Tcp組件的職責(zé),現(xiàn)在我們來看看更細(xì)的實現(xiàn)策略:
(1)使用Checker線程。
使用Checker線程是AgileTcp組件的區(qū)別于模擬完成端口的Tcp組件實現(xiàn)和異步Tcp組件的主要特色。當(dāng)AgileTcp啟動時,Checker線程也隨之啟動,這個線程的主要工作就是檢查已經(jīng)存在的每個連接上是否有數(shù)據(jù)要接收(還記得Select網(wǎng)絡(luò)模型),這可以通過NetworkStream.DataAvailable屬性知道。如果發(fā)現(xiàn)某個連接上有待接收的數(shù)據(jù),就將其放到工作者線程中去處理,并設(shè)置前面提到的ContextKey.IsDataManaging屬性,然后再判斷下個連接,如此循環(huán)下去。
{
while ( ! this .stop)
{
foreach (ContextKeykey in this .contextKeyManager.ContextKeyList)
{
if ( this .stop)
{
break ;
}
if (( ! key. IsDataManaging ) && key.NetStream. DataAvailable )
{
key.IsDataManaging = true ;
CbContextKeycb = new CbContextKey( this .DataManaging);
cb. BeginInvoke (key, null , null );
}
}
System.Threading.Thread.Sleep( 50 );
}
}
(2)將消息頭的解析置于Tcp組件之中
將消息頭解析置于Tcp組件之中這個方案我層考慮了非常久,原因是,這會破壞Tcp組件的單純性,使得Tcp組件與協(xié)議(Contract)有所關(guān)聯(lián)。最終采用這個策略的第一個理由是清晰,第二個理由是效率。清晰在于簡化了ContextKey結(jié)構(gòu),避免了使用消息分裂器這樣復(fù)雜的算法組件(如果大家看過我以前關(guān)于通信方案的文章,一定會得到這樣的答案)。效率在于,當(dāng)在此解析了Header之后,后面所有的處理器都可以使用這個Header對象了,而不用在自己去解析。這也是NetMessage類中有個Header字段的原因。
(3)針對于某個連接,只有當(dāng)上一個消息處理完并將回復(fù)發(fā)送后(如果有回復(fù)的話),才會去接收下一個消息。
這個策略會使很多事情變得簡單,而且不會影響任何有用的特性。由于不會在處理消息的時候去接收下一個消息,所以可以直接處理接收緩沖區(qū)中的數(shù)據(jù),而不需要將數(shù)據(jù)從接收緩沖區(qū)拷貝到另外的地方去處理。這又對效率提高有所幫助。
綜上所述,我們可以總結(jié)工作者線程要做的事情:首先,從連接上接收MessageHeaderSize個字節(jié),解析成Header,然后在接收Header. MessageBodyLength個字節(jié),即是Body,接著構(gòu)造成RoundedMessage對象交給消息分配器去處理,最后將得到的處理結(jié)果發(fā)送出去。代碼如下所示:

AgileTcp組件的主要原理差不多就這些了,這種實現(xiàn)有個缺點,不知大家發(fā)現(xiàn)沒有。那就是當(dāng)客戶端主動斷開連接或掉線時,AgileTcp組件可能感受不到(除非對應(yīng)的連接上正在發(fā)送或接收數(shù)據(jù),此時會拋出異常),因為當(dāng)連接斷開時,key.NetStream.DataAvailable不會拋出異常,而是仍然返回false。這是個問題,幸好有補救的辦法,一是要求客戶端下線的時候給服務(wù)器發(fā)送Logoff消息,二是使用定時掉線檢查器(IUserOnLineChecker)。當(dāng)服務(wù)器檢查或發(fā)現(xiàn)某用戶下線時,即可調(diào)用ITcpClientsController.DisposeOneConnection方法來釋放對應(yīng)的連接和Context。(你應(yīng)該還記得ITcp接口是從ITcpClientsController繼承的)。關(guān)于這個問題,你有更好的解決辦法嗎?
感謝關(guān)注!
上一篇文章:
ESFramework介紹之(21)-- Tcp組件接口ITcp介紹
轉(zhuǎn)到:
ESFramework 可復(fù)用的通信框架(序)
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
