溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

服務(wù)器端編程心得(三)—— 一個服務(wù)器程序的架構(gòu)介紹

發(fā)布時間:2020-05-23 15:21:56 來源:網(wǎng)絡(luò) 閱讀:2972 作者:張小方32 欄目:建站服務(wù)器

本文將介紹我曾經(jīng)做過的一個項(xiàng)目的服務(wù)器架構(gòu)和服務(wù)器編程的一些重要細(xì)節(jié)。

一、程序運(yùn)行環(huán)境

操作系統(tǒng):centos 7.0

編譯器:gcc/g++ 4.8.3 ? ? cmake 2.8.11

mysql數(shù)據(jù)庫:5.5.47

項(xiàng)目代碼管理工具:VS2013

一、程序結(jié)構(gòu)

該程序總共有17個線程,其中分為9個數(shù)據(jù)庫工作線程D和一個日志線程L,6個普通工作線程W,一個主線程M。(以下會用這些字母來代指這些線程)

(一)、數(shù)據(jù)庫工作線程的用途

?9個數(shù)據(jù)庫工作線程在線程啟動之初,與mysql建立連接,也就是說每個線程都與mysql保持一路連接,共9個數(shù)據(jù)庫連接。

?每個數(shù)據(jù)庫工作線程同時存在兩個任務(wù)隊(duì)列,第一個隊(duì)列A存放需要執(zhí)行數(shù)據(jù)庫增刪查改操作的任務(wù)sqlTask,第二個隊(duì)列B存放sqlTask執(zhí)行完成后的結(jié)果。sqlTask執(zhí)行完成后立即放入結(jié)果隊(duì)列中,因而結(jié)果隊(duì)列中任務(wù)也是一個個的需要執(zhí)行的任務(wù)。大致偽代碼如下:

void db_thread_func()
{
    while (!m_bExit)
    {
        if (NULL != (pTask = m_sqlTask.Pop()))
        {
            //從m_sqlTask中取出的任務(wù)先執(zhí)行完成后,pTask將攜帶結(jié)果數(shù)據(jù)
            pTask->Execute();           
            //得到結(jié)果后,立刻將該任務(wù)放入結(jié)果任務(wù)隊(duì)列
            m_resultTask.Push(pTask);
            continue;
        }

        sleep(1000);
    }
}

現(xiàn)在的問題來了:

  1. 任務(wù)隊(duì)列A中的任務(wù)從何而來,目前只有消費(fèi)者,沒有生產(chǎn)者,那么生產(chǎn)者是誰?

  2. 任務(wù)隊(duì)列B中的任務(wù)將去何方,目前只有生產(chǎn)者沒有消費(fèi)者。

這兩個問題先放一會兒,等到后面我再來回答。

(二)工作線程和主線程

在介紹主線程和工作線程具體做什么時,我們介紹下服務(wù)器編程中常常抽象出來的幾個概念(這里以tcp連接為例):

  1. ?TcpServer 即Tcp服務(wù),服務(wù)器需要綁定ip地址和端口號,并在該端口號上偵聽客戶端的連接(往往由一個成員變量TcpListener來管理偵聽細(xì)節(jié))。所以一個TcpServer要做的就是這些工作。除此之外,每當(dāng)有新連接到來時,TcpServer需要接收新連接,當(dāng)多個新連接存在時,TcpServer需要有條不紊地管理這些連接:連接的建立、斷開等,即產(chǎn)生和管理下文中說的TcpConnection對象。

2.一個連接對應(yīng)一個TcpConnection對象,TcpConnection對象管理著這個連接的一些信息:如連接狀態(tài)、本端和對端的ip地址和端口號等。

3.數(shù)據(jù)通道對象Channel,Channel記錄了socket的句柄,因而是一個連接上執(zhí)行數(shù)據(jù)收發(fā)的真正執(zhí)行者,Channel對象一般作為TcpConnection的成員變量。

  1. TcpSession對象,是將Channel收取的數(shù)據(jù)進(jìn)行解包,或者對準(zhǔn)備好的數(shù)據(jù)進(jìn)行裝包,并傳給Channel發(fā)送。

歸納起來:一個TcpServer依靠TcpListener對新連接的偵聽和處理,依靠TcpConnection對象對連接上的數(shù)據(jù)進(jìn)行管理,TcpConnection實(shí)際依靠Channel對數(shù)據(jù)進(jìn)行收發(fā),依靠TcpSession對數(shù)據(jù)進(jìn)行裝包和解包。也就是說一個TcpServer存在一個TcpListener,對應(yīng)多個TcpConnection,有幾個TcpConnection就有幾個TcpSession,同時也就有幾個Channel。

以上說的TcpServer、TcpListener、TcpConnection、Channel和TcpSession是服務(wù)器框架的網(wǎng)絡(luò)層。一個好的網(wǎng)絡(luò)框架,應(yīng)該做到與業(yè)務(wù)代碼脫耦。即上層代碼只需要拿到數(shù)據(jù),執(zhí)行業(yè)務(wù)邏輯,而不用關(guān)注數(shù)據(jù)的收發(fā)和網(wǎng)絡(luò)數(shù)據(jù)包的封包和解包以及網(wǎng)絡(luò)狀態(tài)的變化(比如網(wǎng)絡(luò)斷開與重連)。

拿數(shù)據(jù)的發(fā)送來說:

當(dāng)業(yè)務(wù)邏輯將數(shù)據(jù)交給TcpSession,TcpSession將數(shù)據(jù)裝好包后(裝包過程后可以有一些加密或壓縮操作),交給TcpConnection::SendData(),而TcpConnection::SendData()實(shí)際是調(diào)用Channel::SendData(),因?yàn)镃hannel含有socket句柄,所以Channel::SendData()真正調(diào)用send()/sendto()/write()方法將數(shù)據(jù)發(fā)出去。

對于數(shù)據(jù)的接收,稍微有一點(diǎn)不同:

通過select()/poll()/epoll()等IO multiplex技術(shù),確定好了哪些TcpConnection上有數(shù)據(jù)到來后,激活該TcpConnection的Channel對象去調(diào)用recv()/recvfrom()/read()來收取數(shù)據(jù)。數(shù)據(jù)收到以后,將數(shù)據(jù)交由TcpSession來處理,最終交給業(yè)務(wù)層。注意數(shù)據(jù)收取、解包乃至交給業(yè)務(wù)層是一定要分開的。我的意思是:最好不要解包并交給業(yè)務(wù)層和數(shù)據(jù)收取的邏輯放在一起。因?yàn)閿?shù)據(jù)收取是IO操作,而解包和交給業(yè)務(wù)層是邏輯計(jì)算操作。IO操作一般比邏輯計(jì)算要慢。到底如何安排要根據(jù)服務(wù)器業(yè)務(wù)來取舍,也就是說你要想好你的服務(wù)器程序的性能瓶頸在網(wǎng)絡(luò)IO還是邏輯計(jì)算,即使是網(wǎng)絡(luò)IO,也可以分為上行操作和下行操作,上行操作即客戶端發(fā)數(shù)據(jù)給服務(wù)器,下行即服務(wù)器發(fā)數(shù)據(jù)給客戶端。有時候數(shù)據(jù)上行少,下行大。(如游戲服務(wù)器,一個npc移動了位置,上行是該客戶端通知服務(wù)器自己最新位置,而下行確是服務(wù)器要告訴在場的每個客戶端)。

在我的博文《服務(wù)器端編程心得(一)—— 主線程與工作線程的分工》中介紹了,工作線程的流程:

while (!m_bQuit)
{
    epoll_or_select_func();

    handle_io_events();

    handle_other_things();
}

其中epoll_or_select_func()即是上文所說的通過select()/poll()/epoll()等IO multiplex技術(shù),確定好了哪些TcpConnection上有數(shù)據(jù)到來。我的服務(wù)器代碼中一般只會監(jiān)測socket可讀事件,而不會監(jiān)測socket可寫事件。至于如何發(fā)數(shù)據(jù),文章后面會介紹。所以對于可讀事件,以epoll為例,這里需要設(shè)置的標(biāo)識位是:

EPOLLIN 普通可讀事件(當(dāng)連接正常時,產(chǎn)生這個事件,recv()/read()函數(shù)返回收到的字節(jié)數(shù);當(dāng)連接關(guān)閉,這兩個函數(shù)返回0,也就是說我們設(shè)置這個標(biāo)識已經(jīng)可以監(jiān)測到新來數(shù)據(jù)和對端關(guān)閉事件)

EPOLLRDHUP 對端關(guān)閉事件(linux man手冊上說這個事件可以監(jiān)測對端關(guān)閉,但我實(shí)際調(diào)試時發(fā)送即使對端關(guān)閉也沒觸發(fā)這個事件,仍然是EPOLLIN,只不過此時調(diào)用recv()/read()函數(shù),返回值會為0,所以實(shí)際項(xiàng)目中是否可以通過設(shè)置這個標(biāo)識來監(jiān)測對端關(guān)閉,仍然待考證)

EPOLLPRI 帶外數(shù)據(jù)

muduo里面將epoll_wait的超時事件設(shè)置為1毫秒,我的另一個項(xiàng)目將epoll_wait超時時間設(shè)置為10毫秒。這兩個數(shù)值供大家參考。

這個項(xiàng)目中,工作線程和主線程都是上文代碼中的邏輯,主線程監(jiān)聽偵聽socket上的可讀事件,也就是監(jiān)測是否有新連接來了。主線程和每個工作線程上都存在一個epollfd。如果新連接來了,則在主線程的handle_io_events()中接收新連接。產(chǎn)生的新連接的socket句柄掛接到哪個線程的epollfd上呢?這里采取的做法是round-robin算法,即存在一個對象CWorkerThreadManager記錄了各個工作線程上工作狀態(tài)。偽碼大致如下:

void attach_new_fd(int newsocketfd)
{
    workerthread = get_next_worker_thread(next);
    workerthread.attach_to_epollfd(newsocketfd);
    ++next;
    if (next > max_worker_thread_num)
        next = 0;
}

即先從第一個工作線程的epollfd開始掛接新來socket,接著累加索引,這樣下次就是第二個工作線程了。如果所以超出工作線程數(shù)目,則從第一個工作重新開始。這里解決了新連接socket“負(fù)載均衡”的問題。在實(shí)際代碼中還有個需要注意的細(xì)節(jié)就是:epoll_wait的函數(shù)中的struct epoll_event 數(shù)量開始到底要設(shè)置多少個才合理?存在的顧慮是,多了浪費(fèi),少了不夠用,我在曾經(jīng)一個項(xiàng)目中直接用的是4096:

const int EPOLL_MAX_EVENTS = 4096;
const int dwSelectTimeout = 10000;
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeout / 1000);

我在陳碩的muduo網(wǎng)絡(luò)庫中發(fā)現(xiàn)作者才用了一個比較好的思路,即動態(tài)擴(kuò)張數(shù)量:開始是n個,當(dāng)發(fā)現(xiàn)有事件的fd數(shù)量已經(jīng)到達(dá)n個后,將struct epoll_event數(shù)量調(diào)整成2n個,下次如果還不夠,則變成4n個,以此類推,作者巧妙地利用stl::vector在內(nèi)存中的連續(xù)性來實(shí)現(xiàn)了這種思路:

//初始化代碼
std::vector<struct epoll_event> events_(16);

//線程循環(huán)里面的代碼
while (m_bExit)
{
    int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), 1);
    if (numEvents > 0)
    {
        if (static_cast<size_t>(numEvents) == events_.size())
        {
            events_.resize(events_.size() * 2);
        }
    }
}

讀到這里,你可能覺得工作線程所做的工作也不過就是調(diào)用handle_io_events()來接收網(wǎng)絡(luò)數(shù)據(jù),其實(shí)不然,工作線程也可以做程序業(yè)務(wù)邏輯上的一些工作。也就是在handle_other_things()里面。那如何將這些工作加到handle_other_things()中去做呢?寫一個隊(duì)列,任務(wù)先放入隊(duì)列,再讓handle_other_things()從隊(duì)列中取出來做?我在該項(xiàng)目中也借鑒了muduo庫的做法。即handle_other_things()中調(diào)用一系列函數(shù)指針,偽碼如下:

void do_other_things()
{
    somefunc();
}

//m_functors是一個stl::vector,其中每一個元素為一個函數(shù)指針
void somefunc()
{
    for (size_t i = 0; i < m_functors.size(); ++i)
    {
        m_functors[i]();
    }

    m_functors.clear();
}

當(dāng)任務(wù)產(chǎn)生時,只要我們將執(zhí)行任務(wù)的函數(shù)push_back到m_functors這個stl::vector對象中即可。但是問題來了,如果是其他線程產(chǎn)生的任務(wù),兩個線程同時操作m_functors,必然要加鎖,這也會影響效率。muduo是這樣做的:

void add_task(const Functor& cb)
{
    std::unique_lock<std::mutex> lock(mutex_);
    m_functors.push_back(cb);   
}

void do_task()
{
    std::vector<Functor> functors;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(m_functors);
    }

    for (size_t i = 0; i < functors.size(); ++i)
    {
        functors[i]();
    }
}

看到?jīng)]有,利用一個棧變量functors將m_functors中的任務(wù)函數(shù)指針倒換(swap)過來了,這樣大大減小了對m_functors操作時的加鎖粒度。前后變化:變化前,相當(dāng)于原來A給B多少東西,B消耗多少,A給的時候,B不能消耗;B消耗的時候A不能給?,F(xiàn)在變成A將東西放到籃子里面去,B從籃子里面拿,B如果拿去一部分后,只有消耗完了才會來拿,或者A通知B去籃子里面拿,而B忙碌時,A是不會通知B來拿,這個時候A只管將東西放在籃子里面就可以了。

bool bBusy = false;
void add_task(const Functor& cb)
{
    std::unique_lock<std::mutex> lock(mutex_);
    m_functors_.push_back(cb);

    //B不忙碌時只管往籃子里面加,不要通知B
    if (!bBusy)
    {
        wakeup_to_do_task();
    }
}

void do_task()
{
    bBusy = true;
    std::vector<Functor> functors;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for (size_t i = 0; i < functors.size(); ++i)
    {
        functors[i]();
    }

    bBusy = false;
}

看,多巧妙的做法!

因?yàn)槊總€工作線程都存在一個m_functors,現(xiàn)在問題來了,如何將產(chǎn)生的任務(wù)均衡地分配給每個工作線程。這個做法類似上文中如何將新連接的socket句柄掛載到工作線程的epollfd上,也是round-robin算法。上文已經(jīng)描述,此處不再贅述。

還有種情況,就是希望任務(wù)產(chǎn)生時,工作線程能夠立馬執(zhí)行這些任務(wù),而不是等epoll_wait超時返回之后。這個時候的做法,就是使用一些技巧喚醒epoll_wait,linux系統(tǒng)可以使用socketpair或timerevent、eventfd等技巧(這個細(xì)節(jié)在我的博文《服務(wù)器端編程心得(一)—— 主線程與工作線程的分工》已經(jīng)詳細(xì)介紹過了)。

上文中留下三個問題:

  1. 數(shù)據(jù)庫線程任務(wù)隊(duì)列A中的任務(wù)從何而來,目前只有消費(fèi)者,沒有生產(chǎn)者,那么生產(chǎn)者是誰?

2.數(shù)據(jù)庫線程任務(wù)隊(duì)列B中的任務(wù)將去何方,目前只有生產(chǎn)者沒有消費(fèi)者。

3.業(yè)務(wù)層的數(shù)據(jù)如何發(fā)送出去?

問題1的答案是:業(yè)務(wù)層產(chǎn)生任務(wù)可能會交給數(shù)據(jù)庫任務(wù)隊(duì)列A,這里的業(yè)務(wù)層代碼可能就是工作線程中do_other_things()函數(shù)執(zhí)行體中的調(diào)用。至于交給這個9個數(shù)據(jù)庫線程的哪一個的任務(wù)隊(duì)列,同樣采用了round-robin算法。所以就存在一個對象CDbThreadManager來管理這九個數(shù)據(jù)庫線程。下面的偽碼是向數(shù)據(jù)庫工作線程中加入任務(wù):

bool CDbThreadManager::AddTask(IMysqlTask* poTask )
{
    if (m_index >= m_dwThreadsCount)
    {
        m_index = 0;
    }

    return m_aoMysqlThreads[m_index++].AddTask(poTask);
}

同理問題2中的消費(fèi)者也可能就是do_other_things()函數(shù)執(zhí)行體中的調(diào)用。

現(xiàn)在來說問題3,業(yè)務(wù)層的數(shù)據(jù)產(chǎn)生后,經(jīng)過TcpSession裝包后,需要發(fā)送的話,產(chǎn)生任務(wù)丟給工作線程的do_other_things(),然后在相關(guān)的Channel里面發(fā)送,因?yàn)闆]有監(jiān)測該socket上的可寫事件,所以該數(shù)據(jù)可能調(diào)用send()或者write()時會阻塞,沒關(guān)系,sleep()一會兒,繼續(xù)發(fā)送,一直嘗試,到數(shù)據(jù)發(fā)出去。偽碼如下:

bool Channel::Send()
{
    int offset = 0;
    while (true)
    {
        int n = ::send(socketfd, buf + offset, length - offset);
        if (n == -1)
        {
            if (errno == EWOULDBLOCK)
            {
                ::sleep(100);
                continue;
            }
        }
        //對方關(guān)閉了socket,這端建議也關(guān)閉
        else if (n == 0)
        {
            close(socketfd);
            return false;
        }

        offset += n;
        if (offset >= length)
            break;

    }

    return true;    
}

最后,還有一個日志線程沒有介紹,高性能的日志實(shí)現(xiàn)方案目前并不常見。限于文章篇幅,下次再介紹。

zhangyl 2016.12.02晚12:35

服務(wù)器端編程心得(三)—— 一個服務(wù)器程序的架構(gòu)介紹

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI