溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

TARS C++客戶端是什么

發(fā)布時(shí)間:2021-11-25 15:52:00 來(lái)源:億速云 閱讀:168 作者:iii 欄目:云計(jì)算

本篇內(nèi)容介紹了“TARS C++客戶端是什么”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

什么是TARS

TARS是騰訊使用十年的微服務(wù)開(kāi)發(fā)框架,目前支持C++、Java、PHP、Node.js、Go語(yǔ)言。該開(kāi)源項(xiàng)目為用戶提供了涉及到開(kāi)發(fā)、運(yùn)維、以及測(cè)試的一整套微服務(wù)平臺(tái)PaaS解決方案,幫助一個(gè)產(chǎn)品或者服務(wù)快速開(kāi)發(fā)、部署、測(cè)試、上線。目前該框架應(yīng)用在騰訊各大核心業(yè)務(wù),基于該框架部署運(yùn)行的服務(wù)節(jié)點(diǎn)規(guī)模達(dá)到數(shù)十萬(wàn)。 TARS的通信模型中包含客戶端和服務(wù)端??蛻舳朔?wù)端之間主要是利用RPC進(jìn)行通信。本系列文章分上下兩篇,對(duì)RPC調(diào)用部分進(jìn)行源碼解析。本文是上篇,我們將以C++語(yǔ)言為載體,帶大家了解一下TARS的客戶端。

初識(shí)客戶端

TARS的客戶端最重要的類是Communicator,一個(gè)客戶端只能聲明出一個(gè)Communicator類實(shí)例,用戶可以通過(guò)CommunicatorPtr& Application::getCommunicator()獲取線程安全的Communicator類單例。Communicator類聚合了兩個(gè)比較重要的類,一個(gè)是CommunicatorEpoll,負(fù)責(zé)網(wǎng)絡(luò)線程的建立與通過(guò)ObjectProxyFactory生成ObjectProxy;另一個(gè)是ServantProxyFactory,生成不同的RPC服務(wù)句柄,即ServantProxy,用戶通過(guò)ServantProxy調(diào)用RPC服務(wù)。下面簡(jiǎn)單介紹幾個(gè)類的作用。

Communicator

一個(gè)Communicator實(shí)例就是一個(gè)客戶端,負(fù)責(zé)與服務(wù)端建立連接,生成RPC服務(wù)句柄,可以通過(guò)CommunicatorPtr& Application::getCommunicator()獲取Communicator實(shí)例,用戶最后不要自己聲明定義新的Communicator實(shí)例。

ServantProxy與ServantProxyFactory

ServantProxy就是一個(gè)服務(wù)代理,ServantProxy可以通過(guò)ServantProxyFactory工廠類生成,用戶往往通過(guò)Communicator的template<class T> void stringToProxy()接口間接調(diào)用ServantProxyFactory的ServantPrx::element_type* getServantProxy()接口以獲取服務(wù)代理,通過(guò)服務(wù)代理ServantProxy,用戶就可以進(jìn)行RPC調(diào)用了。ServantProxy內(nèi)含多個(gè)服務(wù)實(shí)體ObjectProxy(詳見(jiàn)下文第4小點(diǎn)),能夠幫助用戶在同一個(gè)服務(wù)代理內(nèi)進(jìn)行負(fù)載均衡。

CommunicatorEpoll

CommunicatorEpoll類代表客戶端的網(wǎng)絡(luò)模塊,內(nèi)含TC_Epoller作為IO復(fù)用,能夠同時(shí)處理不同主調(diào)線程(caller線程)的多個(gè)請(qǐng)求。CommunicatorEpoll內(nèi)含服務(wù)實(shí)體工廠類ObjectProxyFactory(詳見(jiàn)下文第4小點(diǎn)),意味著在同一網(wǎng)絡(luò)線程中,能夠產(chǎn)生不同服務(wù)的實(shí)體,能夠完成不同的RPC服務(wù)調(diào)用。CommunicatorEpoll還聚合了異步調(diào)用處理線程AsyncProcThread,負(fù)責(zé)接收到異步的響應(yīng)包之后,將響應(yīng)包交給該線程處理。

ObjectProxy與ObjectProxyFactory

ObjectProxy類是一個(gè)服務(wù)實(shí)體,注意與ServantProxy類是一個(gè)服務(wù)代理相區(qū)別,前者表示一個(gè)網(wǎng)絡(luò)線程上的某個(gè)服務(wù)實(shí)體A,后者表示對(duì)所有網(wǎng)絡(luò)線程上的某服務(wù)實(shí)體A的總代理,更詳細(xì)的介紹可見(jiàn)下文。ObjectProxy通過(guò)ObjectProxyFactory生成,而ObjectProxyFactory類的實(shí)例是CommunicatorEpoll的成員變量,意味著一個(gè)網(wǎng)絡(luò)線程CommunicatorEpoll能夠產(chǎn)生各種各樣的服務(wù)實(shí)體ObjectProxy,發(fā)起不同的RPC服務(wù)。ObjectProxy通過(guò)AdapterProxy來(lái)管理對(duì)服務(wù)端的連接。 好了,介紹完所有的類之后,先通過(guò)類圖理一理他們之間的關(guān)系,這個(gè)類圖在之后的文章中將會(huì)再次出現(xiàn)。

TARS C++客戶端是什么

TARS的客戶端最重要的類是Communicator,一個(gè)客戶端只能聲明出一個(gè)Communicator類實(shí)例,用戶可以通過(guò)CommunicatorPtr& Application::getCommunicator()獲取線程安全的Communicator類單例。Communicator類聚合了兩個(gè)比較重要的類,一個(gè)是CommunicatorEpoll,負(fù)責(zé)網(wǎng)絡(luò)線程的建立與通過(guò)ObjectProxyFactory生成ObjectProxy;另一個(gè)是ServantProxyFactory,生成不同的RPC服務(wù)句柄,即ServantProxy,用戶通過(guò)ServantProxy調(diào)用RPC服務(wù)。 根據(jù)用戶配置,Communicator擁有n個(gè)網(wǎng)絡(luò)線程,即n個(gè)CommunicatorEpoll。每個(gè)CommunicatorEpoll擁有一個(gè)ObjectProxyFactory類,每個(gè)ObjectProxyFactory可以生成一系列的不同服務(wù)的實(shí)體對(duì)象ObjectProxy,因此,假如Communicator擁有兩個(gè)CommunicatorEpoll,并有foo與bar這兩類不同的服務(wù)實(shí)體對(duì)象,那么如下圖(1-2)所示,每個(gè)CommunicatorEpoll可以通過(guò) ObjectProxyFactory創(chuàng)建兩類ObjectProxy,這是TARS客戶端的第一層負(fù)載均衡,每個(gè)線程都可以分擔(dān)所有服務(wù)的RPC請(qǐng)求,因此,一個(gè)服務(wù)的阻塞可能會(huì)影響其他服務(wù),因?yàn)榫W(wǎng)絡(luò)線程是多個(gè)服務(wù)實(shí)體ObjectProxy所共享的。

TARS C++客戶端是什么

Communicator類下另一個(gè)比較重要的ServantProxyFactory類的作用是依據(jù)實(shí)際服務(wù)端的信息(如服務(wù)器的socket標(biāo)志)與Communicator中客戶端的信息(如網(wǎng)絡(luò)線程數(shù))而生成ServantProxy句柄,通過(guò)句柄調(diào)用RPC服務(wù)。舉個(gè)例子,如下圖(1-3)所示,Communicator實(shí)例通過(guò)ServantProxyFactory成員變量的getServantProxy()接口在構(gòu)造fooServantProxy句柄的時(shí)候,會(huì)獲取Communicator實(shí)例下的所有CommunicatorEpoll(即CommunicatorEpoll-1與CommunicatorEpoll-2)中的fooObjectProxy(即fooObjectProxy-1與fooObjectProxy-2),并作為構(gòu)造fooServantProxy的參數(shù)。Communicator通過(guò)ServantProxyFactory能夠獲取foo與bar這兩類ServantProxy,ServantProxy與相應(yīng)的ObjectProxy存在相應(yīng)的聚合關(guān)系:

TARS C++客戶端是什么

另外,每個(gè)ObjectProxy都擁有一個(gè)EndpointManager,例如,fooObjectProxy 的EndpointManager管理fooObjectProxy 下面的所有fooAdapterProxy,每個(gè)AdapterProxy連接到一個(gè)提供相應(yīng)foo服務(wù)的服務(wù)端物理機(jī)socket上。通過(guò)EndpointManager還可以以不同的負(fù)載均衡方式獲取連接AdapterProxy。假如foo服務(wù)有兩臺(tái)物理機(jī),bar服務(wù)有一臺(tái)物理機(jī),那么ObjectProxy,EndpointManager與AdapterProxy關(guān)系如下圖(1-4)所示。上面提到,不同的網(wǎng)絡(luò)線程CommunicatorEpoll均可以發(fā)起同一RPC請(qǐng)求,對(duì)于同一RPC服務(wù),選取不同的ObjectProxy(或可認(rèn)為選取不同的網(wǎng)絡(luò)線程CommunicatorEpoll)是第一層的負(fù)載均衡,而對(duì)于同一個(gè)被選中的ObjectProxy,通過(guò)EndpointManager選擇不同的socket連接AdapterProxy(假如ObjectProxy有大于1個(gè)的AdapterProxy,如圖(1-4)的fooObjectProxy)是第二層的負(fù)載均衡。

TARS C++客戶端是什么

在客戶端進(jìn)行初始化時(shí),必須建立上面介紹的關(guān)系,因此相應(yīng)的類圖如圖(1-5)所示,通過(guò)類圖可以看出各類的關(guān)系,以及初始化需要用到的函數(shù)。

TARS C++客戶端是什么

初始化代碼

現(xiàn)在,通過(guò)代碼跟蹤來(lái)看看,在客戶端初始化過(guò)程中,各個(gè)類是如何被初始化出來(lái)并建立上述的架構(gòu)關(guān)系的。在簡(jiǎn)述之前,可以先看看函數(shù)的調(diào)用流程圖,若看不清晰,可以將圖片保存下來(lái),用看圖軟件放大查看,強(qiáng)烈建議結(jié)合文章的代碼解析以TARS源碼一起查看,文章后面的所有代碼流程圖均如此。 接下來(lái),將會(huì)按照函數(shù)調(diào)用流程圖來(lái)一步一步分析客戶端代理是如何被初始化出來(lái)的:

TARS C++客戶端是什么

1. 執(zhí)行stringToProxy

在客戶端程序中,一開(kāi)始會(huì)執(zhí)行下面的代碼進(jìn)行整個(gè)客戶端代理的初始化:

Communicator comm;
HelloPrx prx;
comm.stringToProxy("TestApp.HelloServer.HelloObj@tcp -h 1.1.1.1 -p 20001" , prx);

先聲明一個(gè)Communicator變量comm(其實(shí)不建議這么做)以及一個(gè)ServantProxy類的指針變量prx,在此處,服務(wù)為Hello,因此聲明一個(gè)HelloPrx prx。注意一個(gè)客戶端只能擁有一個(gè)Communicator。為了能夠獲得RPC的服務(wù)句柄,我們調(diào)用Communicator::stringToProxy(),并傳入服務(wù)端的信息與prx變量,函數(shù)返回后,prx就是RPC服務(wù)的句柄。 進(jìn)入Communicator::stringToProxy()函數(shù)中,我們通過(guò)Communicator::getServantProxy()來(lái)依據(jù)objectName與setName獲取服務(wù)代理ServantProxy:

/**
* 生成代理
* @param T
* @param objectName
* @param setName 指定set調(diào)用的setid
* @param proxy
*/
template<class T> void stringToProxy(const string& objectName, T& proxy,const string& setName="")
{
    ServantProxy * pServantProxy = getServantProxy(objectName,setName);
    proxy = (typename T::element_type*)(pServantProxy);
}

2.執(zhí)行Communicator的初始化函數(shù)

進(jìn)入Communicator::getServantProxy(),首先會(huì)執(zhí)行Communicator::initialize()來(lái)初始化Communicator,需要注意一點(diǎn),Communicator:: initialize()只會(huì)被執(zhí)行一次,下一次執(zhí)行Communicator::getServantProxy()將不會(huì)再次執(zhí)行Communicator:: initialize()函數(shù):

void Communicator::initialize()
{
    TC_LockT<TC_ThreadRecMutex> lock(*this);
    if (_initialized) //已經(jīng)被初始化則直接返回
        return;
    ......
}

進(jìn)入Communicator::initialize()函數(shù)中,在這里,將會(huì)new出上文介紹的與Communicator密切相關(guān)的類ServantProxyFactory與n個(gè)CommunicatorEpoll,n為客戶端的網(wǎng)絡(luò)線程數(shù),最小為1,最大為MAX_CLIENT_THREAD_NUM:

void Communicator::initialize()
{
    ......
    _servantProxyFactory = new ServantProxyFactory(this);
    ......
    for(size_t i = 0; i < _clientThreadNum; ++i)
    {
        _communicatorEpoll[i] = new CommunicatorEpoll(this, i);
        _communicatorEpoll[i]->start(); //啟動(dòng)網(wǎng)絡(luò)線程
    }
    ......
}

在CommunicatorEpoll的構(gòu)造函數(shù)中,ObjectProxyFactory被創(chuàng)建出來(lái),這是構(gòu)造圖(1-2)關(guān)系的前提。除此之外,還可以看到獲取相應(yīng)配置,創(chuàng)建并啟動(dòng)若干個(gè)異步回調(diào)后的處理線程。創(chuàng)建完成后,調(diào)用CommunicatorEpoll::start()啟動(dòng)網(wǎng)絡(luò)線程。至此,Communicator::initialize()順利執(zhí)行。通過(guò)下圖回顧上面的過(guò)程:

TARS C++客戶端是什么

3.嘗試獲取ServantProxy

代碼回到Communicator::getServantProxy()中 Communicator::getServantProxy()會(huì)執(zhí)行ServantProxyFactory::getServantProxy()并返回相應(yīng)的服務(wù)代理:

ServantProxy* Communicator::getServantProxy(const string& objectName, const string& setName)
{
    ……
    return _servantProxyFactory->getServantProxy(objectName,setName);
}

進(jìn)入ServantProxyFactory::getServantProxy(),首先會(huì)加鎖,從map<string, ServantPrx> _servantProxy中查找目標(biāo),若查找成功直接返回。若查找失敗,TARS需要構(gòu)造出相應(yīng)的ServantProxy,ServantProxy的構(gòu)造需要如圖(1-3)所示的相對(duì)應(yīng)的ObjectProxy作為構(gòu)造函數(shù)的參數(shù),由此可見(jiàn),在ServantProxyFactory::getServantProxy()中有如下獲取ObjectProxy指針數(shù)組的代碼:

ObjectProxy ** ppObjectProxy = new ObjectProxy * [_comm->getClientThreadNum()];
assert(ppObjectProxy != NULL);
for(size_t i = 0; i < _comm->getClientThreadNum(); ++i)
{
    ppObjectProxy[i] = _comm->getCommunicatorEpoll(i)->getObjectProxy(name, setName);
}

4.獲取ObjectProxy

代碼來(lái)到ObjectProxyFactory::getObjectProxy(),同樣,會(huì)首先加鎖,再?gòu)膍ap<string,ObjectProxy*> _objectProxys中查找是否已經(jīng)擁有目標(biāo)ObjectProxy,若查找成功直接返回。若查找失敗,需要新建一個(gè)新的ObjectProxy,通過(guò)類圖可知,ObjectProxy需要一個(gè)CommunicatorEpoll對(duì)象進(jìn)行初始化,由此關(guān)聯(lián)管理自己的CommunicatorEpoll,CommunicatorEpoll之后便可以通過(guò)getObjectProxy()接口獲取屬于自己的ObjectProxy。詳細(xì)過(guò)程可見(jiàn)下圖:

TARS C++客戶端是什么

5.建立ObjectProxy與AdapterProxy的關(guān)系

新建ObjectProxy的過(guò)程同樣非常值得關(guān)注,在ObjectProxy::ObjectProxy()中,關(guān)鍵代碼是:

_endpointManger.reset(new EndpointManager(this, _communicatorEpoll->getCommunicator(), sObjectProxyName, pCommunicatorEpoll->isFirstNetThread(), setName));

每個(gè)ObjectProxy都有屬于自己的EndpointManager負(fù)責(zé)管理到服務(wù)端的所有socket連接AdapterProxy,每個(gè)AdapterProxy連接到一個(gè)提供相應(yīng)服務(wù)的服務(wù)端物理機(jī)socket上。通過(guò)EndpointManager還可以以不同的負(fù)載均衡方式獲取與服務(wù)器的socket連接AdapterProxy。 ObjectProxy:: ObjectProxy()是圖(1-6)或者圖(1-8)中的略1,具體的代碼流程如圖(1-9)所示。ObjectProxy創(chuàng)建一個(gè)EndpointManager對(duì)象,在EndpointManager的初始化過(guò)程中,依據(jù)客戶端提供的信息,直接創(chuàng)建連接到服務(wù)端物理機(jī)的TCP/UDP連接AdapterProxy或者從代理中獲取服務(wù)端物理機(jī)socket列表后再創(chuàng)建TCP/UDP連接AdapterProxy。

TARS C++客戶端是什么

按照?qǐng)D(1-9)的程序流程執(zhí)行完成后,便會(huì)建立如圖(2-3)所示的一個(gè)ObjectProxy對(duì)多個(gè)AdapterProxy的關(guān)系。 新建ObjectProxy之后,就可以調(diào)用其ObjectProxy::initialize()函數(shù)進(jìn)行ObjectProxy對(duì)象的初始化了,當(dāng)然,需要將ObjectProxy對(duì)象插入ObjectProxyFactory的成員變量_objectProxys與_vObjectProxys中,方便下次直接返回ObjectProxy對(duì)象。

6.繼續(xù)完成ServantProxy的創(chuàng)建

退出層層的函數(shù)調(diào)用棧,代碼再次回 ServantProxyFactory::getServantProxy(),此時(shí),ServantProxyFactory已經(jīng)獲得相應(yīng)的ObjectProxy數(shù)組ObjectProxy** ppObjectProxy,接著便可以調(diào)用:

ServantPrx sp = new ServantProxy(_comm, ppObjectProxy, _comm->getClientThreadNum());

進(jìn)行ServantProxy的構(gòu)造。構(gòu)造完成便可以呈現(xiàn)出如圖(2-1)的關(guān)系。在ServantProxy的構(gòu)造函數(shù)中可以看到,ServantProxy在新建一個(gè)EndpointManagerThread變量,這是對(duì)外獲取路由請(qǐng)求的類,是TARS為調(diào)用邏輯而提供的多種解決跨地區(qū)調(diào)用等問(wèn)題的方案。同時(shí)可以看到:

for(size_t i = 0;i < _objectProxyNum; ++i)
{
   (*(_objectProxy + i))->setServantProxy(this);
}

建立了ServantProxy與ObjectProxy的相互關(guān)聯(lián)關(guān)系。剩下的是讀取配置文件,獲取相應(yīng)的信息。 構(gòu)造ServantProxy變量完成后,ServantProxyFactory::getServantProxy()獲取一些超時(shí)參數(shù),賦值給ServantProxy變量,同時(shí)將其放進(jìn)map<string, ServantPrx> _servantProxy中,方便下次直接查找獲取。 ServantProxyFactory::getServantProxy()將剛剛構(gòu)造的ServantProxy指針變量返回給調(diào)用他的Communicator::getServantProxy(),在Communicator::getServantProxy()中:

ServantProxy * Communicator::getServantProxy(const string& objectName,const string& setName)
{
    ……
    return _servantProxyFactory->getServantProxy(objectName,setName);
}

直接將返回值返回給調(diào)用起Communicator::getServantProxy()的Communicator::stringToProxy()。可以看到:

template<class T> void stringToProxy(const string& objectName, T& proxy,const string& setName="")
{
    ServantProxy * pServantProxy = getServantProxy(objectName,setName);
    proxy = (typename T::element_type*)(pServantProxy);
}

Communicator::stringToProxy()將返回值強(qiáng)制轉(zhuǎn)換為客戶端代碼中與HelloPrx prx同樣的類型HelloPrx。由于函數(shù)參數(shù)proxy就是prx的引用。那么實(shí)際就是將句柄prx成功初始化了,用戶可以利用句柄prx進(jìn)行RPC調(diào)用了。

同步調(diào)用

當(dāng)我們獲得一個(gè)ServantProxy句柄后,便可以進(jìn)行RPC調(diào)用了。Tars提供四種RPC調(diào)用方式,分別是同步調(diào)用,異步調(diào)用,promise調(diào)用與協(xié)程調(diào)用。其中最簡(jiǎn)單最常見(jiàn)的RPC調(diào)用方式是同步調(diào)用,接下來(lái),將簡(jiǎn)單分析Tars的同步調(diào)用。

現(xiàn)假設(shè)有一個(gè)MyDemo.StringServer.StringServantObj的服務(wù),提供一個(gè)RPC接口是append,傳入兩個(gè)string類型的變量,返回兩個(gè)string類型變量的拼接結(jié)果。而且假設(shè)有兩臺(tái)服務(wù)器,socket標(biāo)識(shí)分別是192.168.106.129:34132與192.168.106.130:34132,設(shè)置客戶端的網(wǎng)絡(luò)線程數(shù)為3,那么執(zhí)行如下代碼:

Communicator _comm;
StringServantPrx _proxy;
_comm.stringToProxy("MyDemo.StringServer.StringServantObj@tcp -h 192.168.106.129 -p 34132", _proxy);
_comm.stringToProxy("MyDemo.StringServer.StringServantObj@tcp -h 192.168.106.130 -p 34132", _proxy);

經(jīng)過(guò)上文關(guān)于客戶端初始化的分析介紹可知,可以得出如下圖所示的關(guān)系圖:

TARS C++客戶端是什么

獲取StringServantPrx _proxy后,直接調(diào)用:

string str1(abc-), str2(defg), rStr;
int retCode = _proxy->append(str1, str2, rStr);

成功進(jìn)行RPC同步調(diào)用后,返回的結(jié)果是rStr = “abc-defg”。

同樣,我們先看看與同步調(diào)用相關(guān)的類圖,如下圖所示:

TARS C++客戶端是什么

StringServantProxy是繼承自ServantProxy的,StringServantProxy提供了RPC同步調(diào)用的接口Int32 append(),當(dāng)用戶發(fā)起同步調(diào)用_proxy->append(str1, str2, rStr)時(shí),所進(jìn)行的函數(shù)調(diào)用過(guò)程如下圖所示。

TARS C++客戶端是什么

在函數(shù)StringServantProxy::append()中,程序會(huì)先構(gòu)造ServantProxy::tars_invoke()所需要的參數(shù),如請(qǐng)求包類型,RPC方法名,方法參數(shù)等,需要值得注意的是,傳遞參數(shù)中有一個(gè)ResponsePacket類型的變量,在同步調(diào)用中,最終的返回結(jié)果會(huì)放置在這個(gè)變量上。接下來(lái)便直接調(diào)用了ServantProxy::tars_invoke()方法:

tars_invoke(tars::TARSNORMAL, "append", _os.getByteBuffer(), context, _mStatus, rep);

在ServantProxy::tars_invoke()方法中,先創(chuàng)建一個(gè)ReqMessage變量msg,初始化msg變量,給變量賦值,如Tars版本號(hào),數(shù)據(jù)包類型,服務(wù)名,RPC方法名,Tars的上下文容器,同步調(diào)用的超時(shí)時(shí)間(單位為毫秒)等。最后,調(diào)用ServantProxy::invoke()進(jìn)行遠(yuǎn)程方法調(diào)用。

無(wú)論同步調(diào)用還是各種異步調(diào)用,ServantProxy::invoke()都是RPC調(diào)用的必經(jīng)之地。在ServantProxy::invoke()中,繼續(xù)填充傳遞進(jìn)來(lái)的變量ReqMessage msg。此外,還需要獲取調(diào)用者caller線程的線程私有數(shù)據(jù)ServantProxyThreadData,用來(lái)指導(dǎo)RPC調(diào)用。客戶端的每個(gè)caller線程都有屬于自己的維護(hù)調(diào)用上下文的線程私有數(shù)據(jù),如hash屬性,消息染色信息。最關(guān)鍵的還是每條caller線程與每條客戶端網(wǎng)絡(luò)線程CommunicatorEpoll進(jìn)行信息交互的橋梁——通信隊(duì)列ReqInfoQueue數(shù)組,數(shù)組中的每個(gè)ReqInfoQueue元素負(fù)責(zé)與一條網(wǎng)絡(luò)線程進(jìn)行交互,如圖(1-13)所示,圖中橙色陰影代表數(shù)組ReqInfoQueue[],陰影內(nèi)的圓柱體代表數(shù)組元素ReqInfoQueue。假如客戶端create兩條線程(下稱caller線程)發(fā)起StringServant服務(wù)的RPC請(qǐng)求,且客戶端網(wǎng)絡(luò)線程數(shù)設(shè)置為2,那么兩條caller線程各自有屬于自己的線程私有數(shù)據(jù)請(qǐng)求隊(duì)列數(shù)組ReqInfoQueue[],數(shù)組里面的ReqInfoQueue元素便是該數(shù)組對(duì)應(yīng)的caller線程與兩條網(wǎng)絡(luò)線程的通信橋梁,一條網(wǎng)絡(luò)線程對(duì)應(yīng)著數(shù)組里面的一個(gè)元素,通過(guò)網(wǎng)絡(luò)線程ID進(jìn)行數(shù)組索引。整個(gè)關(guān)系有點(diǎn)像生產(chǎn)者消費(fèi)者模型,生產(chǎn)者Caller線程向自己的線程私有數(shù)據(jù)**ReqInfoQueue[]**中的第N個(gè)元素ReqInfoQueue[N] push請(qǐng)求包,消費(fèi)者客戶端第N個(gè)網(wǎng)絡(luò)線程就會(huì)從這個(gè)隊(duì)列中pop請(qǐng)求包。

TARS C++客戶端是什么

閱讀代碼可能會(huì)發(fā)現(xiàn)幾個(gè)常量值,如MAX_CLIENT_THREAD_NUM=64,這是最大網(wǎng)絡(luò)線程數(shù),在圖(1-13)中為單個(gè)請(qǐng)求隊(duì)列數(shù)組ReqInfoQueue[]的最大size;MAX_CLIENT_NOTIFYEVENT_NUM=2048,在圖(1-13)中,可以看作caller線程的最大數(shù)量,或者請(qǐng)求隊(duì)列數(shù)組ReqInfoQueue[]的最大數(shù)量(反正兩者一一對(duì)應(yīng),每個(gè)caller線程都有自己的線程私有數(shù)據(jù)ReqInfoQueue[])。

接著依據(jù)caller線程的線程私有數(shù)據(jù)進(jìn)行第一次的負(fù)載均衡——選取ObjectProxy(即選擇網(wǎng)絡(luò)線程CommunicatorEpoll)和與之相對(duì)應(yīng)的ReqInfoQueue:

ObjectProxy * pObjProxy = NULL;
ReqInfoQueue * pReqQ = NULL;
//選擇網(wǎng)絡(luò)線程
selectNetThreadInfo(pSptd, pObjProxy, pReqQ);

在ServantProxy::selectNetThreadInfo()中,通過(guò)輪詢的形式來(lái)選取ObjectProxy與ReqInfoQueue。

退出ServantProxy::selectNetThreadInfo()后,便得到ObjectProxy_類型的pObjProxy及其對(duì)應(yīng)的ReqInfoQueue_類型的ReqInfoQueue,稍后通過(guò)pObjectProxy來(lái)發(fā)送RPC請(qǐng)求,請(qǐng)求信息會(huì)暫存在ReqInfoQueue中。

由于是同步調(diào)用,需要新建一個(gè)條件變量去監(jiān)聽(tīng)RPC的完成,可見(jiàn):

//同步調(diào)用 new 一個(gè)ReqMonitor
assert(msg->pMonitor == NULL);
if(msg->eType == ReqMessage::SYNC_CALL)
{
	msg->bMonitorFin = false;
	if(pSptd->_sched)
	{
    	msg->bCoroFlag = true;
    	msg->sched 	= pSptd->_sched;
    	msg->iCoroId   = pSptd->_sched->getCoroutineId();
	}
	else
	{
    	msg->pMonitor = new ReqMonitor;
	}
}

創(chuàng)建完條件變量,接下來(lái)往ReqInfoQueue中push_back()請(qǐng)求信息包msg,并通知pObjProxy所屬的CommunicatorEpoll進(jìn)行數(shù)據(jù)發(fā)送:

if(!pReqQ->push_back(msg,bEmpty))
{
	TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl);
	delete msg;
	msg = NULL;
	pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
	throw TarsClientQueueException("client queue full");
}
 
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

來(lái)到CommunicatorEpoll::notify()中,往請(qǐng)求事件通知數(shù)組NotifyInfo _notify[]中添加請(qǐng)求事件,通知CommunicatorEpoll進(jìn)行請(qǐng)求包的發(fā)送。注意了,這個(gè)函數(shù)的作用僅僅是通知網(wǎng)絡(luò)線程準(zhǔn)備發(fā)送數(shù)據(jù),通過(guò)TC_Epoller::mod()或者TC_Epoller::add()觸發(fā)一個(gè)EPOLLIN事件,從而促使阻塞在TC_Epoller::wait()(在CommunicatorEpoll::run()中阻塞)的網(wǎng)絡(luò)線程CommunicatorEpoll被喚醒,并設(shè)置喚醒后的epoll_event中的聯(lián)合體epoll_data變量為&_notify[iSeq].stFDInfo:

void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue)
{
	assert(iSeq < MAX_CLIENT_NOTIFYEVENT_NUM);
 
	if(_notify[iSeq].bValid)
	{
    	_ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
    	assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue);
	}
	else
	{
    	_notify[iSeq].stFDInfo.iType   = FDInfo::ET_C_NOTIFY;
    	_notify[iSeq].stFDInfo.p   	= (void*)msgQueue;
	    _notify[iSeq].stFDInfo.fd  	= _notify[iSeq].eventFd;
    	_notify[iSeq].stFDInfo.iSeq	= iSeq;
    	_notify[iSeq].notify.createSocket();
    	_notify[iSeq].bValid       	= true;
 
    	_ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
	}
}

就是經(jīng)過(guò)這么一個(gè)操作,網(wǎng)絡(luò)線程就可以被喚醒,喚醒后通過(guò)epoll_event變量可獲得&_notify[iSeq].stFDInfo。接下來(lái)的請(qǐng)求發(fā)送與響應(yīng)的接收會(huì)在后面會(huì)詳細(xì)介紹。

隨后,代碼再次回到ServantProxy::invoke(),阻塞于:

if(!msg->bMonitorFin)
{
	TC_ThreadLock::Lock lock(*(msg->pMonitor));
	//等待直到網(wǎng)絡(luò)線程通知過(guò)來(lái)
	if(!msg->bMonitorFin)
   {
    	msg->pMonitor->wait();
	}
}

等待網(wǎng)絡(luò)線程接收到數(shù)據(jù)后,對(duì)其進(jìn)行喚醒。 接收到響應(yīng)后,檢查是否成功獲取響應(yīng),是則直接退出函數(shù)即可,響應(yīng)信息在傳入的參數(shù)msg中:

if(msg->eStatus == ReqMessage::REQ_RSP && msg->response.iRet == TARSSERVERSUCCESS)
{
	snprintf(pSptd->_szHost, sizeof(pSptd->_szHost), "%s", msg->adapter->endpoint().desc().c_str());
	//成功
	return;
}

若接收失敗,會(huì)拋出異常,并刪除msg:

TarsException::throwException(ret, os.str());

若接收成功,退出ServantProxy::invoke()后,回到ServantProxy::tars_invoke(),獲取ResponsePacket類型的響應(yīng)信息,并刪除msg包:

rsp = msg->response;
delete msg;
msg = NULL;

代碼回到StringServantProxy::append(),此時(shí)經(jīng)過(guò)同步調(diào)用,可以直接獲取RPC返回值并回到客戶端中。

網(wǎng)絡(luò)線程發(fā)送請(qǐng)求

上面提到,當(dāng)在ServantProxy::invoke()中,調(diào)用CommunicatorEpoll::notify()通知網(wǎng)絡(luò)線程進(jìn)行請(qǐng)求發(fā)送后,接下來(lái),網(wǎng)絡(luò)線程的具體執(zhí)行流程如下圖所示:

TARS C++客戶端是什么

由于CommunicatorEpoll繼承自TC_Thread,在上文1.2.2節(jié)中的第2小點(diǎn)的初始化CommunicatorEpoll之后便調(diào)用其CommunicatorEpoll::start()函數(shù)啟動(dòng)網(wǎng)絡(luò)線程,網(wǎng)絡(luò)線程在CommunicatorEpoll::run()中一直等待_ep.wait(iTimeout)。由于在上一節(jié)的描述中,在CommunicatorEpoll::notify(),caller線程發(fā)起了通知notify,網(wǎng)絡(luò)線程在CommunicatorEpoll::run()就會(huì)調(diào)用CommunicatorEpoll::handle()處理通知:

void CommunicatorEpoll::run()
{
	......
    	try
    	{
        	int iTimeout = ((_waitTimeout < _timeoutCheckInterval) ? _waitTimeout : _timeoutCheckInterval);
 
        	int num = _ep.wait(iTimeout);
 
        	if(_terminate)
        	{
            	break;
        	}
 
        	//先處理epoll的網(wǎng)絡(luò)事件
        	for (int i = 0; i < num; ++i)
        	{
            	//獲取epoll_event變量的data,就是1.3.1節(jié)中提過(guò)的&_notify[iSeq].stFDInfo
            	const epoll_event& ev = _ep.get(i);
            	uint64_t data = ev.data.u64;
 
            	if(data == 0)
            	{
                	continue; //data非指針, 退出循環(huán)
            	}
            	handle((FDInfo*)data, ev.events);
        	}
    	}
	......
   
}

在CommunicatorEpoll::handle()中,通過(guò)傳遞進(jìn)來(lái)的epoll_event中的data成員變量獲取前面被選中的ObjectProxy并調(diào)用其ObjectProxy::invoke()函數(shù):

void CommunicatorEpoll::handle(FDInfo * pFDInfo, uint32_t events)
{
	try
	{
    	assert(pFDInfo != NULL);
 
    	//隊(duì)列有消息通知過(guò)來(lái)
    	if(FDInfo::ET_C_NOTIFY == pFDInfo->iType)
    	{
        	ReqInfoQueue * pInfoQueue=(ReqInfoQueue*)pFDInfo->p;
        	ReqMessage * msg = NULL;
 
        	try
        	{
            	while(pInfoQueue->pop_front(msg))
            	{
             	......
 
                	try
                	{
                    	msg->pObjectProxy->invoke(msg);
                	}
                	......
            	}
        	}
        	......
    	}
    	......
}

在ObjectProxy::invoke()中將進(jìn)行第二次的負(fù)載均衡,像圖(1-4)所示,每個(gè)ObjectProxy通過(guò)EndpointManager可以以不同的負(fù)載均衡方式對(duì)AdapterProxy進(jìn)行選取選擇:

void ObjectProxy::invoke(ReqMessage * msg)
{
	......
	//選擇一個(gè)遠(yuǎn)程服務(wù)的Adapter來(lái)調(diào)用
	AdapterProxy * pAdapterProxy = NULL;
	bool bFirst = _endpointManger->selectAdapterProxy(msg, pAdapterProxy);
	......
}

在EndpointManager:: selectAdapterProxy()中,有多種負(fù)載均衡的方式來(lái)選取AdapterProxy,如getHashProxy(),getWeightedProxy(),getNextValidProxy()等。

獲取AdapterProxy之后,便將選擇到的AdapterProxy賦值給EndpointManager:: selectAdapterProxy()函數(shù)中的引用參數(shù)pAdapterProxy,隨后執(zhí)行:

void ObjectProxy::invoke(ReqMessage * msg)
{
   ......
	msg->adapter = pAdapterProxy;
	pAdapterProxy->invoke(msg);
}

調(diào)用pAdapterProxy將請(qǐng)求信息發(fā)送出去。而在AdapterProxy::invoke()中,AdapterProxy將調(diào)用Transceiver::sendRequset()進(jìn)行請(qǐng)求的發(fā)送。 至此,對(duì)應(yīng)同步調(diào)用的網(wǎng)絡(luò)線程發(fā)送請(qǐng)求的工作就結(jié)束了,網(wǎng)絡(luò)線程會(huì)回到CommunicatorEpoll::run()中,繼續(xù)等待數(shù)據(jù)的收發(fā)。

網(wǎng)絡(luò)線程接收響應(yīng)

當(dāng)網(wǎng)絡(luò)線程CommunicatorEpoll接收到響應(yīng)數(shù)據(jù)之后,如同之前發(fā)送請(qǐng)求那樣, 在CommunicatorEpoll::run()中,程序獲取活躍的epoll_event的變量,并將其中的epoll_data_t data傳遞給CommunicatorEpoll::handle():

//先處理epoll的網(wǎng)絡(luò)事件
for (int i = 0; i < num; ++i)
{
	const epoll_event& ev = _ep.get(i);
	uint64_t data = ev.data.u64;
 
	if(data == 0)
   {
   	continue; //data非指針, 退出循環(huán)
	}
	handle((FDInfo*)data, ev.events);
}

接下來(lái)的程序流程如下圖所示:

TARS C++客戶端是什么

在CommunicatorEpoll::handle()中,從epoll_data::data中獲取Transceiver指針,并調(diào)用CommunicatorEpoll::handleInputImp():

Transceiver *pTransceiver = (Transceiver*)pFDInfo->p;
//先收包
if (events & EPOLLIN)
{
	try
	{
handleInputImp(pTransceiver);
	}
	catch(exception & e)
	{
TLOGERROR("[TARS]CommunicatorEpoll::handle exp:"<<e.what()<<" ,line:"<<__LINE__<<endl);
	}
	catch(...)
	{
TLOGERROR("[TARS]CommunicatorEpoll::handle|"<<__LINE__<<endl);
	}
}

在CommunicatorEpoll::handleInputImp()中,除了對(duì)連接的判斷外,主要做兩件事,調(diào)用Transceiver::doResponse()以及AdapterProxy::finishInvoke(ResponsePacket&),前者的工作是從socket連接中獲取響應(yīng)數(shù)據(jù)并判斷接收的數(shù)據(jù)是否為一個(gè)完整的RPC響應(yīng)包。后者的作用是將響應(yīng)結(jié)果返回給客戶端,同步調(diào)用的會(huì)喚醒阻塞等待在條件變量中的caller線程,異步調(diào)用的會(huì)在異步回調(diào)處理線程中執(zhí)行回調(diào)函數(shù)。 在AdapterProxy::finishInvoke(ResponsePacket&)中,需要注意一點(diǎn),假如是同步調(diào)用的,需要獲取響應(yīng)包rsp對(duì)應(yīng)的ReqMessage信息,在Tars中,執(zhí)行:

ReqMessage * msg = NULL;
// 獲取響應(yīng)包rsp對(duì)應(yīng)的msg信息,并在超時(shí)隊(duì)列中剔除該msg
bool retErase = _timeoutQueue->erase(rsp.iRequestId, msg);

在找回響應(yīng)包對(duì)應(yīng)的請(qǐng)求信息msg的同時(shí),將其在超時(shí)隊(duì)列中剔除出來(lái)。接著執(zhí)行:

msg->eStatus = ReqMessage::REQ_RSP;
msg->response = rsp;
finishInvoke(msg);

程序調(diào)用另一個(gè)重載函數(shù)AdapterProxy::finishInvoke(ReqMessage*),在AdapterProxy::finishInvoke(ReqMessage*)中,不同的RPC調(diào)用方式會(huì)執(zhí)行不同的動(dòng)作,例如同步調(diào)用會(huì)喚醒對(duì)應(yīng)的caller線程:

//同步調(diào)用,喚醒ServantProxy線程
if(msg->eType == ReqMessage::SYNC_CALL)
{
	if(!msg->bCoroFlag)
	{
    	assert(msg->pMonitor);
 
    	TC_ThreadLock::Lock sync(*(msg->pMonitor));
    	msg->pMonitor->notify();
    	msg->bMonitorFin = true;
	}
	else
	{
    	msg->sched->put(msg->iCoroId);
	}
 
	return ;
}

至此,對(duì)應(yīng)同步調(diào)用的網(wǎng)絡(luò)線程接收響應(yīng)的工作就結(jié)束了,網(wǎng)絡(luò)線程會(huì)回到CommunicatorEpoll::run()中,繼續(xù)等待數(shù)據(jù)的收發(fā)。 綜上,客戶端同步調(diào)用的過(guò)程如下圖所示。

TARS C++客戶端是什么

異步調(diào)用

在Tars中,除了最常見(jiàn)的同步調(diào)用之外,還可以進(jìn)行異步調(diào)用,異步調(diào)用可分三種:普通的異步調(diào)用,promise異步調(diào)用與協(xié)程異步調(diào)用,這里簡(jiǎn)單介紹普通的異步調(diào)用,看看其與上文介紹的同步調(diào)用有何異同。

異步調(diào)用不會(huì)阻塞整個(gè)客戶端程序,調(diào)用完成(請(qǐng)求發(fā)送)之后,用戶可以繼續(xù)處理其他事情,等接收到響應(yīng)之后,Tars會(huì)在異步處理線程當(dāng)中執(zhí)行用戶實(shí)現(xiàn)好的回調(diào)函數(shù)。在這里,會(huì)用到《Effective C++》中條款35所介紹的“藉由Non-Virtual Interface手法實(shí)現(xiàn)Template Method模式”,用戶需要繼承一個(gè)XXXServantPrxCallback基類,并實(shí)現(xiàn)里面的虛函數(shù),異步回調(diào)線程會(huì)在收到響應(yīng)包之后回調(diào)這些虛函數(shù),具體的異步調(diào)用客戶端示例這里不作詳細(xì)介紹,在Tars的Example中會(huì)找到相應(yīng)的示例代碼。

初始化

本文第一章已經(jīng)詳細(xì)介紹了客戶端的初始化,這里再簡(jiǎn)單提一下,在第一章的“1.2.2初始化代碼跟蹤- 2.執(zhí)行Communicator的初始化函數(shù)”中,已經(jīng)提到說(shuō),在每一個(gè)網(wǎng)絡(luò)線程CommunicatorEpoll的初始化過(guò)程中,會(huì)創(chuàng)建_asyncThreadNum條異步線程,等待異步調(diào)用的時(shí)候處理響應(yīng)數(shù)據(jù):

CommunicatorEpoll::CommunicatorEpoll(Communicator * pCommunicator,size_t netThreadSeq)
{
 ......
   //異步線程數(shù)
	_asyncThreadNum = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncthread", "3"));
 
	if(_asyncThreadNum == 0)
	{
    	_asyncThreadNum = 3;
	}
 
	if(_asyncThreadNum > MAX_CLIENT_ASYNCTHREAD_NUM)
	{
    	_asyncThreadNum = MAX_CLIENT_ASYNCTHREAD_NUM;
	}
 ......
	//異步隊(duì)列的大小
	size_t iAsyncQueueCap = TC_Common::strto<size_t>(pCommunicator->getProperty("asyncqueuecap", "10000"));
	if(iAsyncQueueCap < 10000)
	{
    	iAsyncQueueCap = 10000;
	}
 ......
	//創(chuàng)建異步線程
	for(size_t i = 0; i < _asyncThreadNum; ++i)
	{
    	_asyncThread[i] = new AsyncProcThread(iAsyncQueueCap);
    	_asyncThread[i]->start();
	}
 ......
}

在開(kāi)始講述異步調(diào)用與接收響應(yīng)之前,先看看大致的調(diào)用過(guò)程,與圖(1-16)的同步調(diào)用來(lái)個(gè)對(duì)比。

TARS C++客戶端是什么

跟同步調(diào)用的示例一樣,現(xiàn)在有一MyDemo.StringServer.StringServantObj的服務(wù),提供一個(gè)RPC接口是append,傳入兩個(gè)string類型的變量,返回兩個(gè)string類型變量的拼接結(jié)果。在執(zhí)行tars2cpp而生成的文件中,定義了回調(diào)函數(shù)基類StringServantPrxCallback,用戶需要public繼承這個(gè)基類并實(shí)現(xiàn)自己的方法,例如:

class asyncClientCallback : public StringServantPrxCallback {
public:
  void callback_append(Int32 ret, const string& rStr) {
	cout <<  "append: async callback success and retCode is " << ret << " ,rStr is " << rStr << "\n";
  }
  void callback_append_exception(Int32 ret) {
	cout <<  "append: async callback fail and retCode is " << ret << "\n";
  }
};

然后,用戶就可以通過(guò)proxy->async_append(new asyncClientCallback(), str1, str2)進(jìn)行異步調(diào)用了,調(diào)用過(guò)程與上文的同步調(diào)用差不多,函數(shù)調(diào)用流程如下圖所示,可以與圖(1-12)進(jìn)行比較,看看同步調(diào)用與異步調(diào)用的異同。

TARS C++客戶端是什么

在異步調(diào)用中,客戶端發(fā)起異步調(diào)用_proxy->async_append(new asyncClientCallback(), str1, str2)后,在函數(shù)StringServantProxy::async_append()中,程序同樣會(huì)先構(gòu)造ServantProxy::tars_invoke_async()所需要的參數(shù),如請(qǐng)求包類型,RPC方法名,方法參數(shù)等,與同步調(diào)用的一個(gè)區(qū)別是,還傳遞了承載回調(diào)函數(shù)的派生類實(shí)例。接下來(lái)便直接調(diào)用了ServantProxy::tars_invoke_async()方法:

tars_invoke_async(tars::TARSNORMAL,"append", _os.getByteBuffer(), context, _mStatus, callback)

在ServantProxy::tars_invoke_async()方法中,先創(chuàng)建一個(gè)ReqMessage變量msg,初始化msg變量,給變量賦值,如Tars版本號(hào),數(shù)據(jù)包類型,服務(wù)名,RPC方法名,Tars的上下文容器,異步調(diào)用的超時(shí)時(shí)間(單位為毫秒)以及異步調(diào)用后的回調(diào)函數(shù)ServantProxyCallbackPtr callback(等待異步調(diào)用返回響應(yīng)后回調(diào)里面的函數(shù))等。最后,與同步調(diào)用一樣,調(diào)用ServantProxy::invoke()進(jìn)行遠(yuǎn)程方法調(diào)用。

在ServantProxy::invoke()中,繼續(xù)填充傳遞進(jìn)來(lái)的變量ReqMessage msg。此外,還需要獲取調(diào)用者caller線程的線程私有數(shù)據(jù)ServantProxyThreadData,用來(lái)指導(dǎo)RPC調(diào)用。與同步調(diào)用一樣,利用ServantProxy::selectNetThreadInfo()來(lái)輪詢選取ObjectProxy(網(wǎng)絡(luò)線程CommunicatorEpoll)與對(duì)應(yīng)的ReqInfoQueue,詳細(xì)可看同步調(diào)用中的介紹,注意區(qū)分客戶端中的調(diào)用者caller線程與網(wǎng)絡(luò)線程,以及之間的通信橋梁。

退出ServantProxy::selectNetThreadInfo()后,便得到ObjectProxy_類型的pObjProxy及其對(duì)應(yīng)的ReqInfoQueue_類型的ReqInfoQueue,在異步調(diào)用中,不需要建立條件變量來(lái)阻塞進(jìn)程,直接通過(guò)pObjectProxy來(lái)發(fā)送RPC請(qǐng)求,請(qǐng)求信息會(huì)暫存在ReqInfoQueue中:

if(!pReqQ->push_back(msg,bEmpty))
{
	TLOGERROR("[TARS][ServantProxy::invoke msgQueue push_back error num:" << pSptd->_netSeq << "]" << endl);
 
	delete msg;
	msg = NULL;
 
	pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);
 
	throw TarsClientQueueException("client queue full");
}
 
pObjProxy->getCommunicatorEpoll()->notify(pSptd->_reqQNo, pReqQ);

在之后,就不需要做任何的工作,退出層層函數(shù)調(diào)用,回到客戶端中,程序可以繼續(xù)執(zhí)行其他動(dòng)作。

接收響應(yīng)與函數(shù)回調(diào)

異步調(diào)用的請(qǐng)求發(fā)送過(guò)程與同步調(diào)用的一致,都是在網(wǎng)絡(luò)線程中通過(guò)ObjectProxy去調(diào)用AdapterProxy來(lái)發(fā)送數(shù)據(jù)。但是在接收到響應(yīng)之后,通過(guò)圖(1-15)可以看到,在函數(shù)AdapterProxy::finishInvoke(ReqMessage*)中,同步調(diào)用會(huì)通過(guò)msg->pMonitor->notify()喚醒客戶端的caller線程來(lái)接收響應(yīng)包,而在異步調(diào)用中,則是如圖(1-19)所示,CommunicatorEpoll與AsyncProcThread的關(guān)系如圖(1-20)所示。

TARS C++客戶端是什么

TARS C++客戶端是什么

在函數(shù)AdapterProxy::finishInvoke(ReqMessage*)中,程序通過(guò):

//異步回調(diào),放入回調(diào)處理線程中
_objectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);

將信息包msg(帶響應(yīng)信息)放到異步回調(diào)處理線程中,在CommunicatorEpoll::pushAsyncThreadQueue()中,通過(guò)輪詢的方式選擇異步回調(diào)處理線程處理接收到的響應(yīng)包,異步處理線程數(shù)默認(rèn)是3,最大是1024。

void CommunicatorEpoll::pushAsyncThreadQueue(ReqMessage * msg)
{
	//先不考慮每個(gè)線程隊(duì)列數(shù)目不一致的情況
	_asyncThread[_asyncSeq]->push_back(msg);
	_asyncSeq ++;
 
	if(_asyncSeq == _asyncThreadNum)
	{
    	_asyncSeq = 0;
	}
}

選取之后,通過(guò)AsyncProcThread::push_back(),將msg包放在響應(yīng)包隊(duì)列AsyncProcThread::_msgQueue中,然后通過(guò)AsyncProcThread:: notify()函數(shù)通知本異步回調(diào)處理線程進(jìn)行處理,AsyncProcThread:: notify()函數(shù)可以令阻塞在AsyncProcThread:: run()中的AsyncProcThread::timedWait()的異步處理線程被喚醒。

在AsyncProcThread::run()中,主要執(zhí)行下面的程序進(jìn)行函數(shù)回調(diào):

if (_msgQueue->pop_front(msg))
{
 ......
 
	try
	{
    	ReqMessagePtr msgPtr = msg;
    	msg->callback->onDispatch(msgPtr);
	}
	catch (exception& e)
	{
    	TLOGERROR("[TARS][AsyncProcThread exception]:" << e.what() << endl);
	}
	catch (...)
	{
    	TLOGERROR("[TARS][AsyncProcThread exception.]" << endl);
	}
}

通過(guò)msg->callback,程序可以調(diào)用回調(diào)函數(shù)基類StringServantPrxCallback里面的onDispatch()函數(shù)。在StringServantPrxCallback:: onDispatch()中,分析此次響應(yīng)所對(duì)應(yīng)的RPC方法名,獲取響應(yīng)結(jié)果,并通過(guò)動(dòng)態(tài)多態(tài),執(zhí)行用戶所定義好的派生類的虛函數(shù)。通過(guò)ReqMessagePtr的引用計(jì)數(shù),還可以將ReqNessage* msg刪除掉,與同步調(diào)用不同,同步調(diào)用的msg的新建與刪除都在caller線程中,而異步調(diào)用的msg在caller線程上構(gòu)造,在異步回調(diào)處理線程中析構(gòu)。

“TARS C++客戶端是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI