溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

libevent業(yè)務(wù)數(shù)據(jù)處理的方法

發(fā)布時(shí)間:2022-05-09 14:31:32 來源:億速云 閱讀:278 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“l(fā)ibevent業(yè)務(wù)數(shù)據(jù)處理的方法”,在日常操作中,相信很多人在libevent業(yè)務(wù)數(shù)據(jù)處理的方法問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”libevent業(yè)務(wù)數(shù)據(jù)處理的方法”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

一個(gè) loop 的主要結(jié)構(gòu)一般如下所示:

while (!m_bQuitFlag)
{
epoll_or_select_func();

handle_io_events();

handle_other_things();
}
 

對(duì)于一些業(yè)務(wù)邏輯處理比較簡(jiǎn)單、不會(huì)太耗時(shí)的應(yīng)用來說,handle_io_events() 方法除了收發(fā)數(shù)據(jù)也可以直接用來直接做業(yè)務(wù)的處理,即其結(jié)構(gòu)如下:

void handle_io_events()
{
//收發(fā)數(shù)據(jù)
recv_or_send_data();

//解包并處理數(shù)據(jù)
decode_packages_and_process();
}
 

其中 recv_or_send_data() 方法中調(diào)用 send/recv API 進(jìn)行實(shí)際的網(wǎng)絡(luò)數(shù)據(jù)收發(fā)。以收數(shù)據(jù)為例,收完數(shù)據(jù)存入接收緩沖區(qū)后,接下來進(jìn)行解包處理,然后進(jìn)行業(yè)務(wù)處理,例如一個(gè)登陸數(shù)據(jù)包,其業(yè)務(wù)就是驗(yàn)證登陸的賬戶密碼是否正確、記錄其登陸行為等等。從程序函數(shù)調(diào)用堆棧來看,這些業(yè)務(wù)處理邏輯其實(shí)是直接在網(wǎng)絡(luò)收發(fā)數(shù)據(jù)線程中處理的。我的意思是:網(wǎng)絡(luò)線程調(diào)用 handle_io_events() 方法,handle_io_events() 方法調(diào)用 decode_packages_and_process() 方法,decode_packages_and_process() 方法做具體的業(yè)務(wù)邏輯處理。

需要注意的是,為了讓網(wǎng)絡(luò)層與業(yè)務(wù)層脫耦,網(wǎng)絡(luò)層中通常會(huì)提供一些回調(diào)函數(shù)的接口,這些回調(diào)函數(shù)我們將其指向具體的業(yè)務(wù)處理函數(shù)。以 libevent 網(wǎng)絡(luò)庫的用法為例:

int main(int argc, char **argv)
{
struct event_base *base;
struct evconnlistener *listener;
struct event *signal_event;

struct sockaddr_in sin;

base = event_base_new();

memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);

//listener_cb是我們自定義回調(diào)函數(shù)
listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr*)&sin,
sizeof(sin));

if (!listener) {
fprintf(stderr, "Could not create a listener!\n");
return 1;
}

//signal_cb是我們自定義回調(diào)函數(shù)
signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);

if (!signal_event || event_add(signal_event, NULL)<0) {
fprintf(stderr, "Could not create/add a signal event!\n");
return 1;
}

//啟動(dòng)loop
event_base_dispatch(base);

evconnlistener_free(listener);
event_free(signal_event);
event_base_free(base);

printf("done\n");
return 0;
}
 

上述代碼根據(jù) libevent 自帶的 helloworld 示例修改而來,其中 listener_cbsignal_cb 是自定義的回調(diào)函數(shù),有相應(yīng)的事件觸發(fā)后,libevent 的事件循環(huán)會(huì)調(diào)用我們?cè)O(shè)置的回調(diào),在這些回調(diào)函數(shù)中,我們可以編寫自己的業(yè)務(wù)邏輯代碼。

這種基本的服務(wù)器結(jié)構(gòu),我們可以繪制成如下流程圖:

libevent業(yè)務(wù)數(shù)據(jù)處理的方法

這是這個(gè)結(jié)構(gòu)的最基本邏輯,在這基礎(chǔ)上可以延伸出很多變體。不知道讀者有沒有發(fā)現(xiàn),上述流程圖中第三步解包和業(yè)務(wù)邏輯處理這一步中(位于 handle_io_events() 中的 decode_packages_and_process() 方法中),如果業(yè)務(wù)邏輯處理過程比較耗時(shí)(例如,從數(shù)據(jù)庫取大量數(shù)據(jù)、寫文件),那么會(huì)導(dǎo)致 網(wǎng)絡(luò)線程在這個(gè)步驟停留時(shí)間很長(zhǎng),導(dǎo)致很久以后才能執(zhí)行下一次循環(huán),影響網(wǎng)絡(luò)數(shù)據(jù)的檢測(cè)和收發(fā),最終導(dǎo)致整個(gè)程序的效率低下。

因此,對(duì)于這種情形,我們需要將業(yè)務(wù)處理邏輯單獨(dú)拆出來交給另外的業(yè)務(wù)工作線程處理,業(yè)務(wù)工作線程可以是一個(gè)線程池,這個(gè)過程業(yè)務(wù)數(shù)據(jù)從網(wǎng)絡(luò)線程組流向業(yè)務(wù)線程組。

這樣的程序結(jié)構(gòu)圖如下圖所示:

libevent業(yè)務(wù)數(shù)據(jù)處理的方法

上圖中,對(duì)于網(wǎng)絡(luò)線程將業(yè)務(wù)數(shù)據(jù)包交給業(yè)務(wù)線程,可以使用一個(gè)共享的業(yè)務(wù)數(shù)據(jù)隊(duì)列來實(shí)現(xiàn),此時(shí)網(wǎng)絡(luò)線程是生產(chǎn)者,業(yè)務(wù)線程從業(yè)務(wù)數(shù)據(jù)隊(duì)列中取出任務(wù)去處理,業(yè)務(wù)線程是消費(fèi)者。業(yè)務(wù)線程處理完成后如果需要將結(jié)果數(shù)據(jù)發(fā)出去,則再將數(shù)據(jù)交給網(wǎng)絡(luò)線程。這里處理后的數(shù)據(jù)從業(yè)務(wù)線程再次流向網(wǎng)絡(luò)線程,那么如何將數(shù)據(jù)從業(yè)務(wù)線程交給網(wǎng)絡(luò)線程呢?這里以發(fā)數(shù)據(jù)為例,一般有三種方法:

方法一

直接調(diào)用相應(yīng)的的發(fā)數(shù)據(jù)的方法,如果你的網(wǎng)絡(luò)線程本身也會(huì)調(diào)用這些發(fā)數(shù)據(jù)的方法,那么此時(shí)就可能會(huì)出現(xiàn)網(wǎng)絡(luò)線程和業(yè)務(wù)線程同時(shí)對(duì)發(fā)方法進(jìn)行調(diào)用,相當(dāng)于多個(gè)線程同時(shí)調(diào)用 socket send 函數(shù),這樣可能會(huì)導(dǎo)致同一個(gè)連接上的數(shù)據(jù)順序有問題,此時(shí)的做法時(shí),利用鎖機(jī)制,同一時(shí)刻只有一個(gè)線程可以調(diào)用 socket send 方法。這里給出一段偽代碼,假設(shè) TcpConnection 對(duì)象表示某路連接,無論網(wǎng)絡(luò)線程還是業(yè)務(wù)線程處理完數(shù)據(jù)后需要發(fā)送數(shù)據(jù),則使用:

void TcpConnection::sendData(const std::string& data)
{
//加上鎖
std::lock_guard<std::mutex> scoped_lock(m_mutexForConnection);
//在這里調(diào)用 send
}
 

方法一的做法在設(shè)計(jì)上來說,存在讓人不滿意的地方,即數(shù)據(jù)發(fā)送應(yīng)該屬于網(wǎng)絡(luò)層自己的事情,而不是其他模塊(這里指的是業(yè)務(wù)線程)強(qiáng)行搶奪過來越俎代庖。

方法二

前面章節(jié)介紹了存在定時(shí)器結(jié)構(gòu)的情況,網(wǎng)絡(luò)線程結(jié)構(gòu)變成如下流程:

while (!m_bQuitFlag)
{
check_and_handle_timers();

epoll_or_select_func();

handle_io_events();
}
 

業(yè)務(wù)線程可以將需要發(fā)送的數(shù)據(jù)放入另外一個(gè)共享區(qū)域中(例如相應(yīng)的 TcpConnection 對(duì)象的一個(gè)成員變量中),定時(shí)器定時(shí)從這個(gè)共享區(qū)域取出來,再發(fā)送出去,這種方案的優(yōu)點(diǎn)是網(wǎng)絡(luò)線程做了它該做的事情,缺點(diǎn)是需要添加定時(shí)器,讓程序邏輯變得復(fù)雜,且定時(shí)器是每隔一段時(shí)間才會(huì)觸發(fā),發(fā)送的數(shù)據(jù)可能會(huì)有一定的延遲。

方法三

利用線程執(zhí)行流中的 handle_other_things() 方法,再來看下前面章節(jié)中介紹的基本結(jié)構(gòu):

while (!m_bQuitFlag)
{
epoll_or_select_func();

handle_io_events();

handle_other_things();
}

到此,關(guān)于“l(fā)ibevent業(yè)務(wù)數(shù)據(jù)處理的方法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI