您好,登錄后才能下訂單哦!
小編給大家分享一下redis多路復(fù)用技術(shù)的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
redis 是一個(gè)單線程卻性能非常好的內(nèi)存數(shù)據(jù)庫, 主要用來作為緩存系統(tǒng)。 redis 采用網(wǎng)絡(luò)IO多路復(fù)用技術(shù)來保證在多連接的時(shí)候, 系統(tǒng)的高吞吐量。
redis的多路復(fù)用, 提供了select, epoll, evport, kqueue幾種選擇,在編譯的時(shí)候來選擇一種。
我們一般運(yùn)行的服務(wù)器都是LINUX系統(tǒng)上面, 并且我對(duì)Solaris和Mac系統(tǒng)不是很了解, 我們這里重點(diǎn)比較一下select、poll和epoll 3種多路復(fù)用的差異。
select: 單個(gè)進(jìn)程所能打開的最大連接數(shù)有FD_SETSIZE宏定義, 其大小為1024或者2048; FD數(shù)目劇增后, 會(huì)帶來性能問題;消息傳遞從內(nèi)核到與到用戶空間,需要copy數(shù)據(jù);
性能問題:
(1)每次調(diào)用select,都需要把fd集合從用戶態(tài)拷貝到內(nèi)核態(tài),這個(gè)開銷在fd很多時(shí)會(huì)很大
(2)同時(shí)每次調(diào)用select都需要在內(nèi)核遍歷傳遞進(jìn)來的所有fd,這個(gè)開銷在fd很多時(shí)也很大
poll: 基本上與select一樣, 不通點(diǎn)在于沒有FD數(shù)目的限制, 因?yàn)榈讓訉?shí)現(xiàn)不是一個(gè)數(shù)組, 而是鏈表;
select 在LINUX的接口:
#include <sys/select.h>
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
select 函數(shù)的參數(shù):
- nfds:fd_set的FD的個(gè)數(shù), 采用位圖的方式記錄fd_set集合的FD狀態(tài);
- readfds: fd_set 集合中來監(jiān)控有哪些讀操作沒有被block, 如果有可讀,select
- writefds:fd_set 集合中來監(jiān)控有哪些寫操作沒有被block;
- exceptfds: fd_set 集合中來監(jiān)控有哪些except操作沒有被block;
- timeout: FD z最小被block的時(shí)間, 如果timeout的2個(gè)field都是0, 會(huì)立刻返回, 如果該參數(shù)是NULL, 會(huì)一直block;
select如果有一個(gè)或者多個(gè)讀操作, 寫操作, except操作不被block, 返回大于1的數(shù)值; 若果沒有不被block的FD, 但是某些FD block超時(shí), 返回0; 如果錯(cuò)誤出現(xiàn), 返回-1;
FD_XXX函數(shù), 是添加、刪除、清空以及判斷fd_set的工具函數(shù)。
select的pseudo 代碼:
while (1){
int ret = select(streams[]);
if (ret > 0 ) {
for i in streams[] {
if i has data {
read or write streams[i];
}
}
} else if (ret == 0) {
handle timeout FDs;
}else {
handle error
}
}
epoll的LINUX的接口:
#include <sys/epoll.h>
//預(yù)定義的EVENT
enum EPOLL_EVENTS
{
EPOLLIN = 0x001,
#define EPOLLIN EPOLLIN
EPOLLPRI = 0x002,
#define EPOLLPRI EPOLLPRI
EPOLLOUT = 0x004,
#define EPOLLOUT EPOLLOUT
EPOLLRDNORM = 0x040,
#define EPOLLRDNORM EPOLLRDNORM
EPOLLRDBAND = 0x080,
#define EPOLLRDBAND EPOLLRDBAND
EPOLLWRNORM = 0x100,
#define EPOLLWRNORM EPOLLWRNORM
EPOLLWRBAND = 0x200,
#define EPOLLWRBAND EPOLLWRBAND
EPOLLMSG = 0x400,
#define EPOLLMSG EPOLLMSG
EPOLLERR = 0x008,
#define EPOLLERR EPOLLERR
EPOLLHUP = 0x010,
#define EPOLLHUP EPOLLHUP
EPOLLRDHUP = 0x2000,
#define EPOLLRDHUP EPOLLRDHUP
EPOLLWAKEUP = 1u << 29,
#define EPOLLWAKEUP EPOLLWAKEUP
EPOLLONESHOT = 1u << 30,
#define EPOLLONESHOT EPOLLONESHOT
EPOLLET = 1u << 31
#define EPOLLET EPOLLET
};
int epoll_create(int size);
//創(chuàng)建epoll對(duì)象并回傳其描述符。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//將要交由內(nèi)核管控的文件描述符加入epoll對(duì)象并設(shè)置觸發(fā)條件。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
//等待已注冊(cè)之事件被觸發(fā)或計(jì)時(shí)終了。
epoll提供edge-triggered及l(fā)evel-triggered模式。在edge-trigger模式中,epoll_wait僅會(huì)在新的事件首次被加入epoll 對(duì)象時(shí)返回;于level-triggered模式下,epoll_wait在事件狀態(tài)未變更前將不斷被觸發(fā)。
舉例來說,倘若有一個(gè)已經(jīng)于epoll注冊(cè)之管線接獲數(shù)據(jù),epoll_wait將返回,并發(fā)出數(shù)據(jù)讀取的信號(hào)。現(xiàn)假設(shè)緩沖器的數(shù)據(jù)僅有部分被讀取并處理,在level-triggered模式下,任何對(duì)epoll_wait之調(diào)用都將即刻返回,直到緩沖器中的數(shù)據(jù)全部被讀?。蝗欢?,在edge-triggered的情境下,epoll_wait僅會(huì)于再次接收到新數(shù)據(jù)(亦即,新數(shù)據(jù)被寫入管線)時(shí)返回。
epoll的實(shí)現(xiàn)pseudo 代碼
epollfd = epoll_create()
while (1) {
active_stream[] = epoll_wait(epollfd)
for (i=0; i < len(active_stream[]); i++) {
read or write active_stream[i]
}
}
接下來我們看一下, redis的多路復(fù)用如何實(shí)現(xiàn)的。整個(gè)redis的main函數(shù)包含如下3部分:
1、初始化Redis Server參數(shù),這部分代碼通過initServerConfig實(shí)現(xiàn)。
2、初始化Redis Server,這部分代碼在initServer里面。
3、啟動(dòng)事件輪詢器。
這里第一部分, 就是通過配置文件的參數(shù)來初始化server對(duì)象的參數(shù), 和本文的主題沒有太大關(guān)系這里略過。
第二部分, 包含了創(chuàng)建輪詢器, 以及一個(gè)時(shí)間event隊(duì)列, 和file event數(shù)組。
void initServer(void) {
...
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
strerror(errno));
}
...
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
第三部分, 是整個(gè)event loop部分:
int main() {
// 第一部分
// 第二部分入口
initServer();
...
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer even for
* EPOLL_CTL_DEL. */
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
以上是redis多路復(fù)用技術(shù)的示例分析的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。