您好,登錄后才能下訂單哦!
本文將介紹我曾經(jīng)做過的一個項(xiàng)目的服務(wù)器架構(gòu)和服務(wù)器編程的一些重要細(xì)節(jié)。
一、程序運(yùn)行環(huán)境
操作系統(tǒng):centos 7.0
編譯器:gcc/g++ 4.8.3 ? ? cmake 2.8.11
mysql數(shù)據(jù)庫:5.5.47
項(xiàng)目代碼管理工具:VS2013
一、程序結(jié)構(gòu)
該程序總共有17個線程,其中分為9個數(shù)據(jù)庫工作線程D和一個日志線程L,6個普通工作線程W,一個主線程M。(以下會用這些字母來代指這些線程)
(一)、數(shù)據(jù)庫工作線程的用途
?9個數(shù)據(jù)庫工作線程在線程啟動之初,與mysql建立連接,也就是說每個線程都與mysql保持一路連接,共9個數(shù)據(jù)庫連接。
?每個數(shù)據(jù)庫工作線程同時存在兩個任務(wù)隊(duì)列,第一個隊(duì)列A存放需要執(zhí)行數(shù)據(jù)庫增刪查改操作的任務(wù)sqlTask,第二個隊(duì)列B存放sqlTask執(zhí)行完成后的結(jié)果。sqlTask執(zhí)行完成后立即放入結(jié)果隊(duì)列中,因而結(jié)果隊(duì)列中任務(wù)也是一個個的需要執(zhí)行的任務(wù)。大致偽代碼如下:
void db_thread_func()
{
while (!m_bExit)
{
if (NULL != (pTask = m_sqlTask.Pop()))
{
//從m_sqlTask中取出的任務(wù)先執(zhí)行完成后,pTask將攜帶結(jié)果數(shù)據(jù)
pTask->Execute();
//得到結(jié)果后,立刻將該任務(wù)放入結(jié)果任務(wù)隊(duì)列
m_resultTask.Push(pTask);
continue;
}
sleep(1000);
}
}
現(xiàn)在的問題來了:
任務(wù)隊(duì)列A中的任務(wù)從何而來,目前只有消費(fèi)者,沒有生產(chǎn)者,那么生產(chǎn)者是誰?
這兩個問題先放一會兒,等到后面我再來回答。
(二)工作線程和主線程
在介紹主線程和工作線程具體做什么時,我們介紹下服務(wù)器編程中常常抽象出來的幾個概念(這里以tcp連接為例):
2.一個連接對應(yīng)一個TcpConnection對象,TcpConnection對象管理著這個連接的一些信息:如連接狀態(tài)、本端和對端的ip地址和端口號等。
3.數(shù)據(jù)通道對象Channel,Channel記錄了socket的句柄,因而是一個連接上執(zhí)行數(shù)據(jù)收發(fā)的真正執(zhí)行者,Channel對象一般作為TcpConnection的成員變量。
歸納起來:一個TcpServer依靠TcpListener對新連接的偵聽和處理,依靠TcpConnection對象對連接上的數(shù)據(jù)進(jìn)行管理,TcpConnection實(shí)際依靠Channel對數(shù)據(jù)進(jìn)行收發(fā),依靠TcpSession對數(shù)據(jù)進(jìn)行裝包和解包。也就是說一個TcpServer存在一個TcpListener,對應(yīng)多個TcpConnection,有幾個TcpConnection就有幾個TcpSession,同時也就有幾個Channel。
以上說的TcpServer、TcpListener、TcpConnection、Channel和TcpSession是服務(wù)器框架的網(wǎng)絡(luò)層。一個好的網(wǎng)絡(luò)框架,應(yīng)該做到與業(yè)務(wù)代碼脫耦。即上層代碼只需要拿到數(shù)據(jù),執(zhí)行業(yè)務(wù)邏輯,而不用關(guān)注數(shù)據(jù)的收發(fā)和網(wǎng)絡(luò)數(shù)據(jù)包的封包和解包以及網(wǎng)絡(luò)狀態(tài)的變化(比如網(wǎng)絡(luò)斷開與重連)。
拿數(shù)據(jù)的發(fā)送來說:
當(dāng)業(yè)務(wù)邏輯將數(shù)據(jù)交給TcpSession,TcpSession將數(shù)據(jù)裝好包后(裝包過程后可以有一些加密或壓縮操作),交給TcpConnection::SendData(),而TcpConnection::SendData()實(shí)際是調(diào)用Channel::SendData(),因?yàn)镃hannel含有socket句柄,所以Channel::SendData()真正調(diào)用send()/sendto()/write()方法將數(shù)據(jù)發(fā)出去。
對于數(shù)據(jù)的接收,稍微有一點(diǎn)不同:
通過select()/poll()/epoll()等IO multiplex技術(shù),確定好了哪些TcpConnection上有數(shù)據(jù)到來后,激活該TcpConnection的Channel對象去調(diào)用recv()/recvfrom()/read()來收取數(shù)據(jù)。數(shù)據(jù)收到以后,將數(shù)據(jù)交由TcpSession來處理,最終交給業(yè)務(wù)層。注意數(shù)據(jù)收取、解包乃至交給業(yè)務(wù)層是一定要分開的。我的意思是:最好不要解包并交給業(yè)務(wù)層和數(shù)據(jù)收取的邏輯放在一起。因?yàn)閿?shù)據(jù)收取是IO操作,而解包和交給業(yè)務(wù)層是邏輯計(jì)算操作。IO操作一般比邏輯計(jì)算要慢。到底如何安排要根據(jù)服務(wù)器業(yè)務(wù)來取舍,也就是說你要想好你的服務(wù)器程序的性能瓶頸在網(wǎng)絡(luò)IO還是邏輯計(jì)算,即使是網(wǎng)絡(luò)IO,也可以分為上行操作和下行操作,上行操作即客戶端發(fā)數(shù)據(jù)給服務(wù)器,下行即服務(wù)器發(fā)數(shù)據(jù)給客戶端。有時候數(shù)據(jù)上行少,下行大。(如游戲服務(wù)器,一個npc移動了位置,上行是該客戶端通知服務(wù)器自己最新位置,而下行確是服務(wù)器要告訴在場的每個客戶端)。
在我的博文《服務(wù)器端編程心得(一)—— 主線程與工作線程的分工》中介紹了,工作線程的流程:
while (!m_bQuit)
{
epoll_or_select_func();
handle_io_events();
handle_other_things();
}
其中epoll_or_select_func()即是上文所說的通過select()/poll()/epoll()等IO multiplex技術(shù),確定好了哪些TcpConnection上有數(shù)據(jù)到來。我的服務(wù)器代碼中一般只會監(jiān)測socket可讀事件,而不會監(jiān)測socket可寫事件。至于如何發(fā)數(shù)據(jù),文章后面會介紹。所以對于可讀事件,以epoll為例,這里需要設(shè)置的標(biāo)識位是:
EPOLLIN 普通可讀事件(當(dāng)連接正常時,產(chǎn)生這個事件,recv()/read()函數(shù)返回收到的字節(jié)數(shù);當(dāng)連接關(guān)閉,這兩個函數(shù)返回0,也就是說我們設(shè)置這個標(biāo)識已經(jīng)可以監(jiān)測到新來數(shù)據(jù)和對端關(guān)閉事件)
EPOLLRDHUP 對端關(guān)閉事件(linux man手冊上說這個事件可以監(jiān)測對端關(guān)閉,但我實(shí)際調(diào)試時發(fā)送即使對端關(guān)閉也沒觸發(fā)這個事件,仍然是EPOLLIN,只不過此時調(diào)用recv()/read()函數(shù),返回值會為0,所以實(shí)際項(xiàng)目中是否可以通過設(shè)置這個標(biāo)識來監(jiān)測對端關(guān)閉,仍然待考證)
EPOLLPRI 帶外數(shù)據(jù)
muduo里面將epoll_wait的超時事件設(shè)置為1毫秒,我的另一個項(xiàng)目將epoll_wait超時時間設(shè)置為10毫秒。這兩個數(shù)值供大家參考。
這個項(xiàng)目中,工作線程和主線程都是上文代碼中的邏輯,主線程監(jiān)聽偵聽socket上的可讀事件,也就是監(jiān)測是否有新連接來了。主線程和每個工作線程上都存在一個epollfd。如果新連接來了,則在主線程的handle_io_events()中接收新連接。產(chǎn)生的新連接的socket句柄掛接到哪個線程的epollfd上呢?這里采取的做法是round-robin算法,即存在一個對象CWorkerThreadManager記錄了各個工作線程上工作狀態(tài)。偽碼大致如下:
void attach_new_fd(int newsocketfd)
{
workerthread = get_next_worker_thread(next);
workerthread.attach_to_epollfd(newsocketfd);
++next;
if (next > max_worker_thread_num)
next = 0;
}
即先從第一個工作線程的epollfd開始掛接新來socket,接著累加索引,這樣下次就是第二個工作線程了。如果所以超出工作線程數(shù)目,則從第一個工作重新開始。這里解決了新連接socket“負(fù)載均衡”的問題。在實(shí)際代碼中還有個需要注意的細(xì)節(jié)就是:epoll_wait的函數(shù)中的struct epoll_event 數(shù)量開始到底要設(shè)置多少個才合理?存在的顧慮是,多了浪費(fèi),少了不夠用,我在曾經(jīng)一個項(xiàng)目中直接用的是4096:
const int EPOLL_MAX_EVENTS = 4096;
const int dwSelectTimeout = 10000;
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeout / 1000);
我在陳碩的muduo網(wǎng)絡(luò)庫中發(fā)現(xiàn)作者才用了一個比較好的思路,即動態(tài)擴(kuò)張數(shù)量:開始是n個,當(dāng)發(fā)現(xiàn)有事件的fd數(shù)量已經(jīng)到達(dá)n個后,將struct epoll_event數(shù)量調(diào)整成2n個,下次如果還不夠,則變成4n個,以此類推,作者巧妙地利用stl::vector在內(nèi)存中的連續(xù)性來實(shí)現(xiàn)了這種思路:
//初始化代碼
std::vector<struct epoll_event> events_(16);
//線程循環(huán)里面的代碼
while (m_bExit)
{
int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), 1);
if (numEvents > 0)
{
if (static_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size() * 2);
}
}
}
讀到這里,你可能覺得工作線程所做的工作也不過就是調(diào)用handle_io_events()來接收網(wǎng)絡(luò)數(shù)據(jù),其實(shí)不然,工作線程也可以做程序業(yè)務(wù)邏輯上的一些工作。也就是在handle_other_things()里面。那如何將這些工作加到handle_other_things()中去做呢?寫一個隊(duì)列,任務(wù)先放入隊(duì)列,再讓handle_other_things()從隊(duì)列中取出來做?我在該項(xiàng)目中也借鑒了muduo庫的做法。即handle_other_things()中調(diào)用一系列函數(shù)指針,偽碼如下:
void do_other_things()
{
somefunc();
}
//m_functors是一個stl::vector,其中每一個元素為一個函數(shù)指針
void somefunc()
{
for (size_t i = 0; i < m_functors.size(); ++i)
{
m_functors[i]();
}
m_functors.clear();
}
當(dāng)任務(wù)產(chǎn)生時,只要我們將執(zhí)行任務(wù)的函數(shù)push_back到m_functors這個stl::vector對象中即可。但是問題來了,如果是其他線程產(chǎn)生的任務(wù),兩個線程同時操作m_functors,必然要加鎖,這也會影響效率。muduo是這樣做的:
void add_task(const Functor& cb)
{
std::unique_lock<std::mutex> lock(mutex_);
m_functors.push_back(cb);
}
void do_task()
{
std::vector<Functor> functors;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(m_functors);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
}
看到?jīng)]有,利用一個棧變量functors將m_functors中的任務(wù)函數(shù)指針倒換(swap)過來了,這樣大大減小了對m_functors操作時的加鎖粒度。前后變化:變化前,相當(dāng)于原來A給B多少東西,B消耗多少,A給的時候,B不能消耗;B消耗的時候A不能給?,F(xiàn)在變成A將東西放到籃子里面去,B從籃子里面拿,B如果拿去一部分后,只有消耗完了才會來拿,或者A通知B去籃子里面拿,而B忙碌時,A是不會通知B來拿,這個時候A只管將東西放在籃子里面就可以了。
bool bBusy = false;
void add_task(const Functor& cb)
{
std::unique_lock<std::mutex> lock(mutex_);
m_functors_.push_back(cb);
//B不忙碌時只管往籃子里面加,不要通知B
if (!bBusy)
{
wakeup_to_do_task();
}
}
void do_task()
{
bBusy = true;
std::vector<Functor> functors;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
bBusy = false;
}
看,多巧妙的做法!
因?yàn)槊總€工作線程都存在一個m_functors,現(xiàn)在問題來了,如何將產(chǎn)生的任務(wù)均衡地分配給每個工作線程。這個做法類似上文中如何將新連接的socket句柄掛載到工作線程的epollfd上,也是round-robin算法。上文已經(jīng)描述,此處不再贅述。
還有種情況,就是希望任務(wù)產(chǎn)生時,工作線程能夠立馬執(zhí)行這些任務(wù),而不是等epoll_wait超時返回之后。這個時候的做法,就是使用一些技巧喚醒epoll_wait,linux系統(tǒng)可以使用socketpair或timerevent、eventfd等技巧(這個細(xì)節(jié)在我的博文《服務(wù)器端編程心得(一)—— 主線程與工作線程的分工》已經(jīng)詳細(xì)介紹過了)。
上文中留下三個問題:
2.數(shù)據(jù)庫線程任務(wù)隊(duì)列B中的任務(wù)將去何方,目前只有生產(chǎn)者沒有消費(fèi)者。
3.業(yè)務(wù)層的數(shù)據(jù)如何發(fā)送出去?
問題1的答案是:業(yè)務(wù)層產(chǎn)生任務(wù)可能會交給數(shù)據(jù)庫任務(wù)隊(duì)列A,這里的業(yè)務(wù)層代碼可能就是工作線程中do_other_things()函數(shù)執(zhí)行體中的調(diào)用。至于交給這個9個數(shù)據(jù)庫線程的哪一個的任務(wù)隊(duì)列,同樣采用了round-robin算法。所以就存在一個對象CDbThreadManager來管理這九個數(shù)據(jù)庫線程。下面的偽碼是向數(shù)據(jù)庫工作線程中加入任務(wù):
bool CDbThreadManager::AddTask(IMysqlTask* poTask )
{
if (m_index >= m_dwThreadsCount)
{
m_index = 0;
}
return m_aoMysqlThreads[m_index++].AddTask(poTask);
}
同理問題2中的消費(fèi)者也可能就是do_other_things()函數(shù)執(zhí)行體中的調(diào)用。
現(xiàn)在來說問題3,業(yè)務(wù)層的數(shù)據(jù)產(chǎn)生后,經(jīng)過TcpSession裝包后,需要發(fā)送的話,產(chǎn)生任務(wù)丟給工作線程的do_other_things(),然后在相關(guān)的Channel里面發(fā)送,因?yàn)闆]有監(jiān)測該socket上的可寫事件,所以該數(shù)據(jù)可能調(diào)用send()或者write()時會阻塞,沒關(guān)系,sleep()一會兒,繼續(xù)發(fā)送,一直嘗試,到數(shù)據(jù)發(fā)出去。偽碼如下:
bool Channel::Send()
{
int offset = 0;
while (true)
{
int n = ::send(socketfd, buf + offset, length - offset);
if (n == -1)
{
if (errno == EWOULDBLOCK)
{
::sleep(100);
continue;
}
}
//對方關(guān)閉了socket,這端建議也關(guān)閉
else if (n == 0)
{
close(socketfd);
return false;
}
offset += n;
if (offset >= length)
break;
}
return true;
}
最后,還有一個日志線程沒有介紹,高性能的日志實(shí)現(xiàn)方案目前并不常見。限于文章篇幅,下次再介紹。
zhangyl 2016.12.02晚12:35
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。