您好,登錄后才能下訂單哦!
這篇文章主要介紹“如何使用ePump框架編寫TCP服務器”,在日常操作中,相信很多人在如何使用ePump框架編寫TCP服務器問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何使用ePump框架編寫TCP服務器”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
基于非阻塞、多線程、事件驅動模型的 ePump 框架可以很好地解決復雜的線程調度、高效通信等問題,使用 ePump 框架可快速開發(fā)各類通信服務器系統(tǒng),像常見的HTTP Web服務器、RTMP流媒體服務器、及時通信消息系統(tǒng)等等。
在使用ePump框架編程前,首先需要從GitHub下載并安裝C語言基礎庫adif 數(shù)據(jù)結構和基礎算法庫和ePump框架,ePump框架依賴于adif基礎庫,adif 基礎庫 和 ePump 框架都是標準C語言開發(fā),并以庫的形式集成到應用程序中。下載這兩個開源系統(tǒng)的代碼到本地,make && make install 后,編譯成功的動態(tài)庫和靜態(tài)庫缺省地被安裝到 /usr/local/lib 目錄下,頭文件則安裝到 /usr/local/include/adif 和 /usr/local/include 中。
下面講解如何使用ePump框架來開發(fā)一個echo功能的TCP服務器程序。
使用這兩個庫編程時,需要包含adif基礎庫和ePump框架的頭文件:
#include "adifall.ext" #include "epump.h" #include <signal.h>
其中adifall.ext文件包含了adif庫中所有基礎數(shù)據(jù)結構和算法模塊的頭文件,具體功能可以參考開源項目adif 數(shù)據(jù)結構和基礎算法庫。epump.h是調用ePump框架功能模塊的頭文件。由于需要處理信號,包含了signal.h文件。
大家都知道,創(chuàng)建TCP監(jiān)聽服務器時,最基本的通信開發(fā)啟蒙知識三部曲,首先要創(chuàng)建socket_t文件描述符,綁定本地IP地址和端口,在指定端口上啟動監(jiān)聽,等待客戶端發(fā)起TCP連接請求。但是對于高級程序員或商業(yè)級服務器需求的系統(tǒng)來說,高性能是必須的終極需求,開發(fā)人員還需要嚴肅地考慮如下問題:
TCP服務器系統(tǒng)既要支持IPv4地址,也要支持IPv6地址;
如何采用多線程或多進程來處理每一個連接請求;
短時間內產(chǎn)生大量的TCP連接請求時,如何在多個線程或多個進程間負載均衡;
由于線程或進程總數(shù)有限(小于1024),單臺機器處理幾十萬并發(fā)的TCP連接時,如何采用多路復用技術解決大并發(fā)I/O;
這些問題直接考驗了商業(yè)級通信服務器系統(tǒng)其性能的高低、吞吐能力的大小、CPU處理能力的高效運用等等,能解決好這些問題,無疑是一個能經(jīng)受實戰(zhàn)的好系統(tǒng),ePump框架天生就是解決這些問題的好手。
使用ePump框架,先調用API接口,創(chuàng)建ePump框架實例,
epcore_t * pcore = NULL; void * mlisten = NULL; int listenport = 8080; gpcore = pcore = epcore_new(65536, 1);
其中第一個參數(shù)是服務器系統(tǒng)能同時允許打開的文件描述符的最大數(shù)量,也就是這款TCP服務器能支撐的最大并發(fā)數(shù)量,第二個參數(shù)一般設置為1,指的是創(chuàng)建iodev_t等基礎設備對象時,盡量將其派送到當前工作線程。
使用ePump框架,啟動TCP監(jiān)聽服務很簡單,調用eptcp_mlisten接口即可,
mlisten = eptcp_mlisten(pcore, NULL, listenport, NULL, echo_pump, pcore); if (!mlisten) goto exit; printf("EchoSrv TCP Port: %d being listened\n", listenport);
按照頭文件中的描述,該接口函數(shù)定義如下:
/* Note: automatically detect if Linux kernel supported REUSEPORT. if supported, create listen socket for every current running epump threads and future-started epump threads. if not, create only one listen socket for all epump threads to bind. */ void * eptcp_mlisten (void * vpcore, char * localip, int port, void * para, IOHandler * cb, void * cbpara);
最新的Linux操作系統(tǒng)對于通信編程做了很多優(yōu)化,其中對于內核對象Socket使用REUSEPORT選項,來解決端口復用問題,這樣使得每一個線程或進程都可以監(jiān)聽同一個端口,并接受新的連接請求。那么大量的客戶端同時對監(jiān)聽端口發(fā)起TCP三路握手,想建立到達服務器的TCP連接時,這些連接到底交給哪一個啟動了監(jiān)聽服務的線程或進程?這個問題大家自己做功課,這里不贅述。
ePump框架中采用 epoll / select等多路復用接口來監(jiān)聽連接到來和讀寫事件,監(jiān)聽設備和定時器并產(chǎn)生事件的線程是epump線程,處理事件的線程是worker線程。eptcp_mlisten自動地為每一個epump線程創(chuàng)建listen socket(支持REUSEPORT時),或創(chuàng)建一個listen socket但綁定到每一個epump線程中。這樣大量的連接請求到來時,將會由epump線程處理其負載均衡。
eptcp_mlisten函數(shù)的第一個參數(shù)是ePump框架實例;第二個參數(shù)為綁定的本機IP地址,如果綁定所有本機IP地址,該值為NULL;第三個參數(shù)為監(jiān)聽的端口號;第四個參數(shù)是設置當前正在創(chuàng)建的監(jiān)聽設備的綁定參數(shù),一般跟當前監(jiān)聽設備iodev_t有關的實例對象;第五個參數(shù)是設置當前正在創(chuàng)建的監(jiān)聽設備iodev_t對象當有連接請求過來時的回調函數(shù);第六個參數(shù)是傳遞給回調函數(shù)的參數(shù)變量。
作者在程序中習慣為所有的事件回調處理設置為一個統(tǒng)一的回調函數(shù)echo_pump,當然大家可以根據(jù)自己的愛好和習慣,為每個iodev_t對象的讀寫事件設置不同的回調函數(shù)。
這樣,啟動這個TCP服務器監(jiān)聽服務時,前面提到的各種商業(yè)TCP服務器需面對的問題,這里都解決了。
這個sample程序中,給大家示范了定時器的用法。使用定時器可以定期做一些檢查或校驗工作,譬如一個TCP連接長時間沒有數(shù)據(jù)往來,通過定時器機制來關閉著這些不活躍的TCP連接。
iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore);
這是啟動一個90秒后發(fā)送超時TIMEOUT事件的定時器,超時事件將由echo_pump函數(shù)來處理。本例的回調函數(shù)中沒有處理不活躍TCP連接的代碼,大家感興趣可自行添加。
創(chuàng)建了TCP監(jiān)聽服務后,需要啟動ePump框架的兩類線程組:epump線程組 和 worker線程組,代碼如下:
cpunum = get_cpu_num(); epumpnum = cpunum * 0.2; if (epumpnum < 3) epumpnum = 3; workernum = cpunum - epumpnum; if (workernum < 3) workernum = 3; /* start 3 worker threads */ epcore_start_worker(pcore, workernum); /* start 3 epump threads */ epcore_start_epump(pcore, epumpnum - 1); /* main thread executing the epump_main_proc as an epump thread */ epump_main_start(pcore, 0); epcore_clean(pcore); printf("main thread exited\n"); return 0; exit: epcore_clean(pcore); printf("main thread exception exited\n"); return -1;
以上介紹的是主程序,下面需要介紹的回調函數(shù)echo_pump的實現(xiàn).
回調函數(shù)的原型定義如下:
typedef int IOHandler (void * vmgmt, void * pobj, int event, int fdtype);
第一個參數(shù)是設置回調函數(shù)時給定的參數(shù),第二個是當前產(chǎn)生事件的對象,或者是iodev_t對象,或者是iotimer_t對象,第三個參數(shù)是event事件類型,第四個參數(shù)是文件描述符類型。
其中的event事件類型如下:
/* define the event type, including getting connected, connection accepted, readable, * writable, timeout. the working threads will be driven by these events */ #define IOE_CONNECTED 1 #define IOE_CONNFAIL 2 #define IOE_ACCEPT 3 #define IOE_READ 4 #define IOE_WRITE 5 #define IOE_INVALID_DEV 6 #define IOE_TIMEOUT 100 #define IOE_DNS_RECV 200 #define IOE_USER_DEFINED 10000
其中的文件描述符類型fdtype預定義值共有:
/* the definition of FD type in the EventPump device */ #define FDT_LISTEN 0x01 #define FDT_CONNECTED 0x02 #define FDT_ACCEPTED 0x04 #define FDT_UDPSRV 0x08 #define FDT_UDPCLI 0x10 #define FDT_USOCK_LISTEN 0x20 #define FDT_USOCK_CONNECTED 0x40 #define FDT_USOCK_ACCEPTED 0x80 #define FDT_RAWSOCK 0x100 #define FDT_FILEDEV 0x200 #define FDT_TIMER 0x10000 #define FDT_USERCMD 0x20000 #define FDT_LINGER_CLOSE 0x40000 #define FDT_STDIN 0x100000 #define FDT_STDOUT 0x200000
當被監(jiān)聽端口8080上收到一個TCP連接請求時,echo_pump函數(shù)會被回調,回調參數(shù)中event為IOE_ACCEPT,fdtype為FDT_LISTEN,其中pobj就是監(jiān)聽設備對象。
switch (event) { case IOE_ACCEPT: if (fdtype != FDT_LISTEN) return -1; while (1) { pdev = eptcp_accept(iodev_epcore(vobj), vobj, NULL, &ret, echo_pump, pcore, BIND_ONE_EPUMP); if (!pdev) break; printf("\nThreadID=%lu, ListenFD=%d EPumpID=%lu WorkerID=%lu " " ==> Accept NewFD=%d EPumpID=%lu\n", get_threadid(), iodev_fd(vobj), epumpid(iodev_epump(vobj)), workerid(worker_thread_self(pcore)), iodev_fd(pdev), epumpid(iodev_epump(pdev))); } break;
這里使用了一個while循環(huán)來調用eptcp_accept函數(shù),目的是解決多個TCP連接同時到來,ePump框架使用一個事件通知驅動回調函數(shù)去處理和執(zhí)行的情況,不使用循環(huán)處理,就會漏掉某些客戶的TCP連接請求。
函數(shù) eptcp_accept 接受TCP連接請求,并創(chuàng)建新連接對應的iodev_t設備對象pdev,設置該對象在數(shù)據(jù)可讀時的回調函數(shù),這個函數(shù)會自動處理多線程之間的連接設備對象的負載均衡。函數(shù)成功執(zhí)行完后的結果是一個新的客戶端TCP連接建立起來了,針對該新連接進行數(shù)據(jù)讀取操作的回調函數(shù)也都設置了。
eptcp_accept函數(shù)的原型如下:
void * eptcp_accept (void * vpcore, void * vld, void * para, int * retval, IOHandler * cb, void * cbpara, int bindtype);
第一個參數(shù)為ePump框架實例,第二個參數(shù)是監(jiān)聽設備iodev_t對象,由回調函數(shù)攜帶進來,第三個參數(shù)是新創(chuàng)建的客戶端TCP連接設備iodev_t對象的內置參數(shù),第四個參數(shù)為返回值,大于等于0表示連接建立成功,小于0失敗,第五個和第六個參數(shù)為新創(chuàng)建的連接對象的回調函數(shù),第七個參數(shù)是設置綁定epump線程的指令類型,共有如下幾種:
/* bind type specifying how iodev_t devices are bound to the underlying epump thread */ #define BIND_NONE 0 #define BIND_ONE_EPUMP 1 #define BIND_GIVEN_EPUMP 2 #define BIND_ALL_EPUMP 3 #define BIND_NEW_FOR_EPUMP 4 #define BIND_CURRENT_EPUMP 5
綁定epump線程的指令類型共有6個,其含義如下:
BIND_NONE是初始值,不綁定任何epump線程;
BIND_ONE_EPUMP是從當前epump線程中找一個負載最低的線程來綁定;
BIND_GIVEN_EPUMP是指定一個epump線程來建立綁定;
BIND_ALL_EPUMP是綁定所有的epump線程。這中情況一般是在監(jiān)聽設備對象創(chuàng)建后,一般在Linux內核版本低于3.9版本情況下,即不支持REUSEPORT功能時,使用這個類型。
BIND_NEW_FOR_EPUMP一般用于ePump框架內部,應用程序不建議使用。
BIND_CURRENT_EPUMP是綁定產(chǎn)生當前連接事件的epump線程。系統(tǒng)內部和操作系統(tǒng)內核對負載會實現(xiàn)均衡分配,一般建議應用開發(fā)時使用這個類型。
一旦綁定了epump線程,就可能立即產(chǎn)生可讀事件,并驅動回調函數(shù)來處理。如果新的的pdev對象是由另外一個工作線程來處理時,上述這個例子中就可能出現(xiàn)打印語句還沒結束,該新連接設備對象可讀事件的回調函數(shù)就已經(jīng)執(zhí)行了。在商業(yè)級系統(tǒng)開發(fā)過程中,調用本函數(shù)接受客戶端新連接并創(chuàng)建新的設備對象pdev后,需要做很多跟連接設備對象相關聯(lián)的數(shù)據(jù)結構的初始化工作,在這些初始化操作完成之后,再調用 iodev_bind_epump 函數(shù)來綁定epump線程,所以,這種情況下接受新連接時,一般不設置綁定關系,而是將第七個參數(shù)設置為 BIND_NONE。
pdev = eptcp_accept(iodev_epcore(vobj), vobj, NULL, &ret, echo_pump, pcore, BIND_NONE); /* do some initialization of related objects, examples as following */ /* pcon = http_con_fetch(mgmt); pcon->pdev = pdev; iodev_para_set(pdev, (void *)pcon->conid); pcon->hl = hl; pcon->casetype = HTTP_SERVER; pcon->reqdiag = hl->reqdiag; pcon->reqdiagobj = hl->reqdiagobj; pcon->ssl_link = hl->ssl_link; str_cpy(pcon->srcip, iodev_rip(pcon->pdev)); str_cpy(pcon->dstip, iodev_lip(pcon->pdev)); pcon->srcport = iodev_rport(pcon->pdev); pcon->dstport = iodev_lport(pcon->pdev); pcon->createtime = time(&pcon->stamp); pcon->transbgn = pcon->stamp;*/ iodev_bind_epump(pdev, BIND_CURRENT_EPUMP, NULL);
建立好TCP連接之后,客戶端會發(fā)送數(shù)據(jù)到服務器,ePump框架中對所有socket文件描述符設置成了非阻塞模式,數(shù)據(jù)到達本機時,內核會產(chǎn)生可讀事件,由ePump框架驅動回調函數(shù)來處理數(shù)據(jù)讀操作。
case IOE_READ: ret = tcp_nb_recv(iodev_fd(vobj), rcvbuf, sizeof(rcvbuf), &num); if (ret < 0) { printf("Client %s:%d close the connection while receiving, epump: %lu\n", iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj)) ); iodev_close(vobj); return -100; } ret = tcp_nb_send(iodev_fd(vobj), rcvbuf, num, &sndnum); if (ret < 0) { printf("Client %s:%d close the connection while sending, epump: %lu\n", iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj))); iodev_close(vobj); return -100; } break;
采用非阻塞模式的讀數(shù)據(jù)函數(shù),讀取客戶端請求內容。這個讀函數(shù) tcp_nb_recv 是在adif基礎庫中實現(xiàn)的,調用系統(tǒng)調用read并一直讀到出現(xiàn) EAGAIN 錯誤為止,表示此次可讀事件的所有數(shù)據(jù)都被讀完。開發(fā)者需要注意的是,在回調函數(shù)中處理ePump框架的可讀事件時,一定要將所有的位于內核緩沖區(qū)中的數(shù)據(jù)讀取完,不建議讀一部分數(shù)據(jù)、留一部分數(shù)據(jù)。
由于本sample程序實現(xiàn)的是echo回彈功能,讀取了客戶端多少數(shù)據(jù),就返回客戶端多少數(shù)據(jù)。所以立即使用 tcp_nb_send 函數(shù)發(fā)送這些數(shù)據(jù)到客戶端。
本例中示范了定時器的啟動和超時處理,當定時器給定的時間逝去后,會產(chǎn)生TIMEOUT事件,并驅動回調函數(shù)來處理。ePump框架的定時器實例對象存活周期僅僅是在創(chuàng)建定時器到超時處理完成這段時間,即ePump框架的定時器是一次性的,超時處理完后,系統(tǒng)會自動銷毀該定時器對象。對于循環(huán)定時器,需要在處理超時事件時,重新啟動新的定時器實例。
case IOE_TIMEOUT: cmdid = iotimer_cmdid(vobj); if (cmdid == 1001) { printf("\nThreadID=%lu IOTimerID=%lu EPumpID=%lu timeout, curtick=%lu\n", get_threadid(), iotimer_id(vobj), epumpid(iotimer_epump(vobj)), time(0)); epcore_print(pcore); iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore); } break;
定時器的用例非常廣泛,開發(fā)人員可以根據(jù)實際需求來使用該功能。
以上詳細介紹了如何運用ePump框架實現(xiàn)一個完整的具有echo回彈功能的TCP服務器,代碼詳細如下:
/* * Copyright (c) 2003-2021 Ke Hengzhong <kehengzhong@hotmail.com> * All rights reserved. */ #include "adifall.ext" #include <signal.h> #include "epump.h" epcore_t * gpcore = NULL; int echo_pump (void * vpcore, void * vobj, int event, int fdtype); static void signal_handler(int sig) { switch(sig) { case SIGHUP: printf("hangup signal catched\n"); break; case SIGTERM: case SIGKILL: case SIGINT: printf("terminate signal catched, now exiting...\n"); epcore_stop_epump(gpcore); epcore_stop_worker(gpcore); usleep(1000); break; } } int main (int argc, char ** argv) { epcore_t * pcore = NULL; void * mlisten = NULL; int listenport = 8080; signal(SIGCHLD, SIG_IGN); /* ignore child */ signal(SIGTSTP, SIG_IGN); /* ignore tty signals */ signal(SIGTTOU, SIG_IGN); signal(SIGPIPE, SIG_IGN); signal(SIGTTIN, SIG_IGN); signal(SIGHUP, signal_handler); /* catch hangup signal */ signal(SIGTERM, signal_handler); /* catch kill signal */ signal(SIGINT, signal_handler); /* catch SIGINT signal */ gpcore = pcore = epcore_new(65536, 1); /* do some initialization */ mlisten = eptcp_mlisten(pcore, NULL, listenport, NULL, echo_pump, pcore); if (!mlisten) goto exit; printf("EchoSrv TCP Port: %d being listened\n\n", listenport); iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore); /* start 2 worker threads */ epcore_start_worker(pcore, 2); /* start 1 epump threads */ epcore_start_epump(pcore, 1); /* main thread executing the epump_main_proc as an epump thread */ epump_main_start(pcore, 0); epcore_clean(pcore); printf("main thread exited\n"); return 0; exit: epcore_clean(pcore); printf("main thread exception exited\n"); return -1; } int echo_pump (void * vpcore, void * vobj, int event, int fdtype) { epcore_t * pcore = (epcore_t *)vpcore; iodev_t * pdev = NULL; int cmdid; int ret = 0, sndnum = 0; char rcvbuf[2048]; int num = 0; switch (event) { case IOE_ACCEPT: if (fdtype != FDT_LISTEN) return -1; while (1) { pdev = eptcp_accept(iodev_epcore(vobj), vobj, NULL, &ret, echo_pump, pcore, BIND_ONE_EPUMP); if (!pdev) break; printf("\nThreadID=%lu, ListenFD=%d EPumpID=%lu WorkerID=%lu " " ==> Accept NewFD=%d EPumpID=%lu\n", get_threadid(), iodev_fd(vobj), epumpid(iodev_epump(vobj)), workerid(worker_thread_self(pcore)), iodev_fd(pdev), epumpid(iodev_epump(pdev))); } break; case IOE_READ: ret = tcp_nb_recv(iodev_fd(vobj), rcvbuf, sizeof(rcvbuf), &num); if (ret < 0) { printf("Client %s:%d close the connection while receiving, epump: %lu\n", iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj)) ); iodev_close(vobj); return -100; } printf("\nThreadID=%lu FD=%d EPumpID=%lu WorkerID=%lu Recv %d bytes from %s:%d\n", get_threadid(), iodev_fd(vobj), epumpid(iodev_epump(vobj)), workerid(worker_thread_self(pcore)), num, iodev_rip(vobj), iodev_rport(vobj)); printOctet(stderr, rcvbuf, 0, num, 2); ret = tcp_nb_send(iodev_fd(vobj), rcvbuf, num, &sndnum); if (ret < 0) { printf("Client %s:%d close the connection while sending, epump: %lu\n", iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj))); iodev_close(vobj); return -100; } break; case IOE_WRITE: case IOE_CONNECTED: break; case IOE_TIMEOUT: cmdid = iotimer_cmdid(vobj); if (cmdid == 1001) { printf("\nThreadID=%lu IOTimerID=%lu EPumpID=%lu timeout, curtick=%lu\n", get_threadid(), iotimer_id(vobj), epumpid(iotimer_epump(vobj)), time(0)); epcore_print(pcore); iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore); } break; case IOE_INVALID_DEV: break; default: break; } printf("ThreadID=%lu event: %d fdtype: %d WorkerID=%lu\n\n", get_threadid(), event, fdtype, workerid(worker_thread_self(pcore))); return 0; }
這個示例中使用大量的多余的打印代碼,看起沒那么美觀,有潔癖的程序員可以去掉。
使用gcc編譯以上代碼的命令如下:
gcc -g -O3 -Wall -DUNIX -I/usr/local/include -I/usr/local/include/adif -L/usr/local/lib -lm -lpthread -ladif -lepump echosrv.c -o echosrv
編譯完成后大家執(zhí)行并調試,享受編程樂趣。
以上用一個TCP服務器程序來展示如何使用ePump框架進行編程的實例,管中窺豹,以一概全,感興趣的程序員可以下載和體驗。
使用ePump框架最成功的案例是eJet Web服務器開源項目,這是一個輕量級、高性能、嵌入式Web服務器,各項功能不遜于Nginx。研究這個項目可以有助于理解ePump框架的工作原理。
簡單總結ePump框架的功能特點:
ePump框架封裝了很多瑣碎的容易出錯誤的細節(jié),讓開發(fā)人員將更多時間花在業(yè)務處理上;
將復雜的各個操作系統(tǒng)都互不兼容的多路復用技術封裝后,提供了標準的接口給程序員,大大節(jié)省了應用開發(fā)周期;
高效利用事件驅動、多線程調度機制來實現(xiàn)多核CPU的并行運算能力;
使用ePump開發(fā)高性能程序,代碼簡單干練,可靠性高;
對IPv6、DNS等頭提供了支持;
到此,關于“如何使用ePump框架編寫TCP服務器”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。