溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

epoll封裝reactor原理是什么

發(fā)布時間:2022-08-13 15:30:59 來源:億速云 閱讀:202 作者:iii 欄目:開發(fā)技術

這篇“epoll封裝reactor原理是什么”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“epoll封裝reactor原理是什么”文章吧。

reactor是什么?

reactor是一種高并發(fā)服務器模型,是一種框架,一個概念,所以reactor沒有一個固定的代碼,可以有很多變種,后續(xù)會介紹到。

組成:?阻塞的IO(如果是阻塞IO,發(fā)送緩沖區(qū)滿了怎么辦,就阻塞了) + io多路復?;特征:基于事件循環(huán),以事件驅動或者事件回調的?式來實現(xiàn)業(yè)務邏輯。

reactor中的IO使用的是select,poll,epoll這些IO多路復用,使用IO多路復用系統(tǒng)不必創(chuàng)建維護大量線程,只使用一個線程、一個選擇器就可同時處理成千上萬連接,大大減少了系統(tǒng)開銷。

reactor中文譯為反應堆,將epoll中的IO變成事件驅動,比如讀事件,寫事件。來了個讀事件,立馬進行反應,執(zhí)行提前注冊好的事件回調函數(shù)。

回想一下普通函數(shù)調用的機制:程序調用某函數(shù),函數(shù)執(zhí)行,程序等待,函數(shù)將結果和控制權返回給程序,程序繼續(xù)處理。reactor反應堆,是一種事件驅動機制,和普通函數(shù)調用的不同之處在于:應用程序不是主動的調用某個 API 完成處理,而是恰恰相反,reactor逆置了事件處理流程,應用程序需要提供相應的接口并注冊到 reactor上,如果相應的事件發(fā)生,reactor將主動調用應用程序注冊的接口,這些接口又稱為“回調函數(shù)”。

說白了,reactor就是對epoll進行封裝,進行網(wǎng)絡IO與業(yè)務的解耦,將epoll管理IO變成管理事件,整個程序由事件進行驅動執(zhí)行。就像下圖一樣,有就緒事件返回,reactor:由事件驅動執(zhí)行對應的回調函數(shù);epoll:需要自己判斷。

epoll封裝reactor原理是什么

reactor模型三個重要組件與流程分析

reactor是處理并發(fā) I/O 比較常見的一種模式,用于同步 I/O,中心思想是將所有要處理的 I/O 事件注冊到一個中心 I/O 多路復用器(epoll)上,同時主線程/進程阻塞在多路復用器上;

一旦有 I/O 事件到來或是準備就緒(文件描述符或 socket 可讀、寫),多路復用器返回并將事先注冊的相應 I/O 事件分發(fā)到對應的處理器中。

組件

reactor模型有三個重要的組件

多路復用器:由操作系統(tǒng)提供,在 linux 上一般是 select, poll, epoll 等系統(tǒng)調用。

epoll封裝reactor原理是什么

事件分發(fā)器:將多路復用器中返回的就緒事件分到對應的處理函數(shù)中。

epoll封裝reactor原理是什么

事件處理器:負責處理特定事件的處理函數(shù)。

epoll封裝reactor原理是什么

流程

具體流程:

  • 注冊相應的事件處理器(剛開始listenfd注冊都就緒事件)

  • 多路復用器等待事件

  • 事件到來,激活事件分發(fā)器,分發(fā)器調用事件到對應的處理器

  • 事件處理器處理事件,然后注冊新的事件(比如讀事件,完成讀操作后,根據(jù)業(yè)務處理數(shù)據(jù),注冊寫事件,寫事件根據(jù)業(yè)務響應請求;比如listen讀事件,肯定要給新的連接注冊讀事件)

將epoll封裝成reactor事件驅動

封裝每一個連接sockfd變成ntyevent

我們知道一個連接對應一個文件描述符fd,對于這個連接(fd)來說,它有自己的事件(讀,寫)。我們將fd都設置成非阻塞的,所以這里我們需要添加兩個buffer,至于大小就是看業(yè)務需求了。

struct ntyevent {
    int fd;//socket fd
    int events;//事件
    char sbuffer[BUFFER_LENGTH];//寫緩沖buffer
    int slength;
    char rbuffer[BUFFER_LENGTH];//讀緩沖buffer
    int rlength;
//    typedef int (*NtyCallBack)(int, int, void *);
    NtyCallBack callback;//回調函數(shù)
    void *arg;
    int status;//1MOD 0 null
};

封裝epfd和ntyevent變成ntyreactor

我們知道socket fd已經(jīng)被封裝成了ntyevent,那么有多少個ntyevent呢?這里demo初始化reactor的時候其實是將*events指向了一個1024的ntyevent數(shù)組(按照道理來說客戶端連接可以一直連,不止1024個客戶端,后續(xù)文章有解決方案,這里從簡)。epfd肯定要封裝進行,不用多說。

struct ntyreactor {
    int epfd;
    struct ntyevent *events;
    //struct ntyevent events[1024];
};

封裝讀、寫、接收連接等事件對應的操作變成callback

前面已經(jīng)說了,把事件寫成回調函數(shù),這里的參數(shù)fd肯定要知道自己的哪個連接,events是什么事件的意思,arg傳的是ntyreactor (考慮到后續(xù)多線程多進程,如果將ntyreactor設為全局感覺不太好 )

typedef int (*NtyCallBack)(int, int, void *);
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
int accept_cb(int fd, int events, void *arg);

給每個客戶端的ntyevent設置屬性

具兩個例子,我們知道第一個socket一定是listenfd,用來監(jiān)聽用的,那么首先肯定是設置ntyevent的各項屬性。 本來是讀事件,讀完后要改成寫事件,那么必然要把原來的讀回調函數(shù)設置成寫事件回調。

void nty_event_set(struct ntyevent *ev, int fd, NtyCallBack callback, void *arg) {
    ev->fd = fd;
    ev->callback = callback;
    ev->events = 0;
    ev->arg = arg;
}

將ntyevent加入到epoll中由內核監(jiān)聽

int nty_event_add(int epfd, int events, struct ntyevent *ntyev) {
    struct epoll_event ev = {0, {0}};
    ev.data.ptr = ntyev;
    ev.events = ntyev->events = events;
    int op;
    if (ntyev->status == 1) {
        op = EPOLL_CTL_MOD;
    }
    else {
        op = EPOLL_CTL_ADD;
        ntyev->status = 1;
    }
    if (epoll_ctl(epfd, op, ntyev->fd, &ev) < 0) {
        printf("event add failed [fd=%d], events[%d],err:%s,err:%d\n", ntyev->fd, events, strerror(errno), errno);
        return -1;
    }
    return 0;
}

將ntyevent從epoll中去除

int nty_event_del(int epfd, struct ntyevent *ev) {
    struct epoll_event ep_ev = {0, {0}};
    if (ev->status != 1) {
        return -1;
    }
    ep_ev.data.ptr = ev;
    ev->status = 0;
    epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
    //epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, NULL);
    return 0;
}

讀事件回調函數(shù)

這里就是被觸發(fā)的回調函數(shù),具體代碼要與業(yè)務結合,這里的參考意義不大(這里就是讀一次,改成寫事件)

int recv_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    struct ntyevent *ntyev = &reactor->events[fd];
    int len = recv(fd, ntyev->buffer, BUFFER_LENGTH, 0);
    nty_event_del(reactor->epfd, ntyev);
    if (len > 0) {
        ntyev->length = len;
        ntyev->buffer[len] = '\0';
        printf("C[%d]:%s\n", fd, ntyev->buffer);
        nty_event_set(ntyev, fd, send_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLOUT, ntyev);
    }
    else if (len == 0) {
        close(ntyev->fd);
        printf("[fd=%d] pos[%ld], closed\n", fd, ntyev - reactor->events);
    }
    else {
        close(ntyev->fd);
        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }
    return len;
}

寫事件回調函數(shù)

這里就是被觸發(fā)的回調函數(shù),具體代碼要與業(yè)務結合,這里的參考意義不大(將讀事件讀的數(shù)據(jù)寫回,再改成讀事件,相當于echo)

int send_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    struct ntyevent *ntyev = &reactor->events[fd];
    int len = send(fd, ntyev->buffer, ntyev->length, 0);
    if (len > 0) {
        printf("send[fd=%d], [%d]%s\n", fd, len, ntyev->buffer);
        nty_event_del(reactor->epfd, ntyev);
        nty_event_set(ntyev, fd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, ntyev);
    }
    else {
        close(ntyev->fd);
        nty_event_del(reactor->epfd, ntyev);
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
    }
    return len;
}

接受新連接事件回調函數(shù)

本質上就是accept,然后將其加入到epoll監(jiān)聽

int accept_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    if (reactor == NULL) return -1;
    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);
    int clientfd;
    if ((clientfd = accept(fd, (struct sockaddr *) &client_addr, &len)) == -1) {
        printf("accept: %s\n", strerror(errno));
        return -1;
    }
    printf("client fd = %d\n", clientfd);
    if ((fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
        printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
        return -1;
    }
    nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
    printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port),
           reactor->events[clientfd].last_active, clientfd);
    return 0;
}

reactor運行

就是將原來的epoll_wait從main函數(shù)中封裝到ntyreactor_run函數(shù)中

int ntyreactor_run(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    if (reactor->epfd < 0) return -1;
    if (reactor->events == NULL) return -1;
    struct epoll_event events[MAX_EPOLL_EVENTS];
    int checkpos = 0, i;
    while (1) {
        int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (nready < 0) {
            printf("epoll_wait error, exit\n");
            continue;
        }
        for (i = 0; i < nready; i++) {
            struct ntyevent *ev = (struct ntyevent *) events[i].data.ptr;
            ev->callback(ev->fd, events[i].events, ev->arg);
        }
    }
}

reactor簡單版代碼與測試

后續(xù)會出一篇測試百萬連接數(shù)量的文章

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#define BUFFER_LENGTH           4096
#define MAX_EPOLL_EVENTS        1024
#define SERVER_PORT             8082
typedef int (*NtyCallBack)(int, int, void *);
struct ntyevent {
    int fd;
    int events;
    void *arg;
    NtyCallBack callback;
    int status;//1MOD 0 null
    char buffer[BUFFER_LENGTH];
    int length;
    long last_active;
};
struct ntyreactor {
    int epfd;
    struct ntyevent *events;
};
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
int accept_cb(int fd, int events, void *arg);
void nty_event_set(struct ntyevent *ev, int fd, NtyCallBack callback, void *arg) {
    ev->fd = fd;
    ev->callback = callback;
    ev->events = 0;
    ev->arg = arg;
    ev->last_active = time(NULL);
}
int nty_event_add(int epfd, int events, struct ntyevent *ntyev) {
    struct epoll_event ev = {0, {0}};
    ev.data.ptr = ntyev;
    ev.events = ntyev->events = events;
    int op;
    if (ntyev->status == 1) {
        op = EPOLL_CTL_MOD;
    }
    else {
        op = EPOLL_CTL_ADD;
        ntyev->status = 1;
    }
    if (epoll_ctl(epfd, op, ntyev->fd, &ev) < 0) {
        printf("event add failed [fd=%d], events[%d],err:%s,err:%d\n", ntyev->fd, events, strerror(errno), errno);
        return -1;
    }
    return 0;
}
int nty_event_del(int epfd, struct ntyevent *ev) {
    struct epoll_event ep_ev = {0, {0}};
    if (ev->status != 1) {
        return -1;
    }
    ep_ev.data.ptr = ev;
    ev->status = 0;
    epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
    //epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, NULL);
    return 0;
}
int recv_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    struct ntyevent *ntyev = &reactor->events[fd];
    int len = recv(fd, ntyev->buffer, BUFFER_LENGTH, 0);
    nty_event_del(reactor->epfd, ntyev);
    if (len > 0) {
        ntyev->length = len;
        ntyev->buffer[len] = '\0';
        printf("C[%d]:%s\n", fd, ntyev->buffer);
        nty_event_set(ntyev, fd, send_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLOUT, ntyev);
    }
    else if (len == 0) {
        close(ntyev->fd);
        printf("[fd=%d] pos[%ld], closed\n", fd, ntyev - reactor->events);
    }
    else {
        close(ntyev->fd);
        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }
    return len;
}
int send_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    struct ntyevent *ntyev = &reactor->events[fd];
    int len = send(fd, ntyev->buffer, ntyev->length, 0);
    if (len > 0) {
        printf("send[fd=%d], [%d]%s\n", fd, len, ntyev->buffer);
        nty_event_del(reactor->epfd, ntyev);
        nty_event_set(ntyev, fd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, ntyev);
    }
    else {
        close(ntyev->fd);
        nty_event_del(reactor->epfd, ntyev);
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
    }
    return len;
}
int accept_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    if (reactor == NULL) return -1;
    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);
    int clientfd;
    if ((clientfd = accept(fd, (struct sockaddr *) &client_addr, &len)) == -1) {
        printf("accept: %s\n", strerror(errno));
        return -1;
    }
    printf("client fd = %d\n", clientfd);
    if ((fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
        printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
        return -1;
    }
    nty_event_set(&reactor->events[clientfd], clientfd, recv_cb, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[clientfd]);
    printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port),
           reactor->events[clientfd].last_active, clientfd);
    return 0;
}
int init_sock(short port) {
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(port);
    bind(fd, (struct sockaddr *) &server_addr, sizeof(server_addr));
    if (listen(fd, 20) < 0) {
        printf("listen failed : %s\n", strerror(errno));
    }
    return fd;
}
int ntyreactor_init(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    memset(reactor, 0, sizeof(struct ntyreactor));
    reactor->epfd = epoll_create(1);
    if (reactor->epfd <= 0) {
        printf("create epfd in %s err %s\n", __func__, strerror(errno));
        return -2;
    }
    reactor->events = (struct ntyevent *) malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    memset(reactor->events, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (reactor->events == NULL) {
        printf("create epll events in %s err %s\n", __func__, strerror(errno));
        close(reactor->epfd);
        return -3;
    }
    return 0;
}
int ntyreactor_destory(struct ntyreactor *reactor) {
    close(reactor->epfd);
    free(reactor->events);
}
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NtyCallBack acceptor) {
    if (reactor == NULL) return -1;
    if (reactor->events == NULL) return -1;
    nty_event_set(&reactor->events[sockfd], sockfd, acceptor, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, &reactor->events[sockfd]);
    return 0;
}
_Noreturn int ntyreactor_run(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    if (reactor->epfd < 0) return -1;
    if (reactor->events == NULL) return -1;
    struct epoll_event events[MAX_EPOLL_EVENTS];
    int checkpos = 0, i;
    while (1) {
        //心跳包 60s 超時則斷開連接
        long now = time(NULL);
        for (i = 0; i < 100; i++, checkpos++) {
            if (checkpos == MAX_EPOLL_EVENTS) {
                checkpos = 0;
            }
            if (reactor->events[checkpos].status != 1 || checkpos == 3) {
                continue;
            }
            long duration = now - reactor->events[checkpos].last_active;
            if (duration >= 60) {
                close(reactor->events[checkpos].fd);
                printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);
                nty_event_del(reactor->epfd, &reactor->events[checkpos]);
            }
        }
        int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (nready < 0) {
            printf("epoll_wait error, exit\n");
            continue;
        }
        for (i = 0; i < nready; i++) {
            struct ntyevent *ev = (struct ntyevent *) events[i].data.ptr;
            ev->callback(ev->fd, events[i].events, ev->arg);
        }
    }
}
int main(int argc, char *argv[]) {
    int sockfd = init_sock(SERVER_PORT);
    struct ntyreactor *reactor = (struct ntyreactor *) malloc(sizeof(struct ntyreactor));
    if (ntyreactor_init(reactor) != 0) {
        return -1;
    }
    ntyreactor_addlistener(reactor, sockfd, accept_cb);
    ntyreactor_run(reactor);
    ntyreactor_destory(reactor);
    close(sockfd);
    return 0;
}

epoll封裝reactor原理是什么

reactor優(yōu)點

reactor模式是編寫高性能網(wǎng)絡服務器的必備技術之一,它具有如下優(yōu)點:

  • 響應快,不必為單個同步時間所阻塞,雖然 reactor本身依然是同步的

  • 編程相對簡單,可以最大程度的避免復雜的多線程及同步問題,并且避免了多線程/進程的切換開銷

  • 可擴展性,可以方便的通過增加 reactor實例個數(shù)來充分利用 CPU 資源

  • 可復用性,reactor 框架本身與具體事件處理邏輯無關,具有很高的復用性

reactor模型開發(fā)效率上比起直接使用 IO 復用要高,它通常是單線程的,設計目標是希望單線程使用一顆 CPU 的全部資源,但也有附帶優(yōu)點,即每個事件處理中很多時候可以不考慮共享資源的互斥訪問??墒侨秉c也是明顯的,現(xiàn)在的硬件發(fā)展,已經(jīng)不再遵循摩爾定律,CPU 的頻率受制于材料的限制不再有大的提升,而改為是從核數(shù)的增加上提升能力,當程序需要使用多核資源時,reactor模型就會悲劇。

如果程序業(yè)務很簡單,例如只是簡單的訪問一些提供了并發(fā)訪問的服務,就可以直接開啟多個反應堆,每個反應堆對應一顆 CPU 核心,這些反應堆上跑的請求互不相關,這是完全可以利用多核的。例如 Nginx 這樣的 http 靜態(tài)服務器。

reactor多種模型

單reactor + 單線程模型

單reactor單線程模型,指的是所有的 IO 操作(讀,寫,建立連接)都在同一個線程上面完成

epoll封裝reactor原理是什么

缺點:

  • 由于只有一個線程,因此事件是順序處理的,一個線程同時只能做一件事情,事件的優(yōu)先級得不到保證

  • 不能充分利用多核CPU

單reactor + 線程池(Thread Pool)模型

相比于單reactor單線程模型,此模型中收到請求后,不在reactor線程計算,而是使用線程池來計算,這會充分的利用多核CPU。

采用此模式時有可能存在多個線程同時計算同一個連接上的多個請求,算出的結果的次序是不確定的, 所以需要網(wǎng)絡框架在設計協(xié)議時帶一個id標示,以便以便讓客戶端區(qū)分response對應的是哪個request。

epoll封裝reactor原理是什么

多reactor + 多線程模型

此模式的特點是每個線程一個循環(huán), 有一個main reactor負責accept連接, 然后把該連接掛在某個sub reactor中,這樣該連接的所有操作都在那個sub reactor所處的線程中完成。

多個連接可能被分配到多個線程中,充分利用CPU。在應用場景中,reactor的個數(shù)可以采用 固定的個數(shù),比如跟CPU數(shù)目一致。

此模型與單reactor多線程模型相比,減少了進出thread pool兩次上下文切換,小規(guī)模的計算可以在當前IO線程完成并且返回結果,降低響應的延遲。

并可以有效防止當IO壓力過大時一個reactor處理能力飽和問題。

epoll封裝reactor原理是什么

多reactor + 線程池(Thread Pool)模型

此模型是上面兩個的混合體,它既使用多個 reactors 來處理 IO,又使用線程池來處理計算。此模式適適合既有突發(fā)IO(利用Multiple Reactor分擔),又有突發(fā)計算的應用(利用線程池把一個連接上的計算任務分配給多個線程)。

epoll封裝reactor原理是什么

注意點

注意:

前面介紹的四種reactor 模式在具體實現(xiàn)時為了簡應該遵循的原則是:每個文件描述符只由一個線程操作。

這樣可以輕輕松松解決消息收發(fā)的順序性問題,也避免了關閉文件描述符的各種race condition。一個線程可以操作多個文件描述符,但是一個線程不能操作別的線程擁有的文件描述符。

這一點不難做到。epoll也遵循了相同的原則。Linux文檔中并沒有說明,當一個線程證阻塞在epoll_wait時,另一個線程往epoll fd添加一個新的監(jiān)控fd會發(fā)生什么。

新fd上的事件會不會在此次epoll_wait調用中返回?為了穩(wěn)妥起見,我們應該吧對同一個 epoll fd的操作(添加、刪除、修改等等)都放到同一個線程中執(zhí)行。

reactor完善版代碼

由于fd的數(shù)量未知,這里設計ntyreactor 里面包含 eventblock ,eventblock 包含1024個fd。每個fd通過 fd/1024定位到在第幾個eventblock,通過fd%1024定位到在eventblock第幾個位置。

epoll封裝reactor原理是什么

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#define BUFFER_LENGTH           4096
#define MAX_EPOLL_EVENTS        1024
#define SERVER_PORT             8081
#define PORT_COUNT              100
typedef int (*NCALLBACK)(int, int, void *);
struct ntyevent {
    int fd;
    int events;
    void *arg;
    NCALLBACK callback;
    int status;
    char buffer[BUFFER_LENGTH];
    int length;
};
struct eventblock {
    struct eventblock *next;
    struct ntyevent *events;
};
struct ntyreactor {
    int epfd;
    int blkcnt;
    struct eventblock *evblk;
};
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
struct ntyevent *ntyreactor_find_event_idx(struct ntyreactor *reactor, int sockfd);
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK *callback, void *arg) {
    ev->fd = fd;
    ev->callback = callback;
    ev->events = 0;
    ev->arg = arg;
}
int nty_event_add(int epfd, int events, struct ntyevent *ev) {
    struct epoll_event ep_ev = {0, {0}};
    ep_ev.data.ptr = ev;
    ep_ev.events = ev->events = events;
    int op;
    if (ev->status == 1) {
        op = EPOLL_CTL_MOD;
    }
    else {
        op = EPOLL_CTL_ADD;
        ev->status = 1;
    }
    if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {
        printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
        return -1;
    }
    return 0;
}
int nty_event_del(int epfd, struct ntyevent *ev) {
    struct epoll_event ep_ev = {0, {0}};
    if (ev->status != 1) {
        return -1;
    }
    ep_ev.data.ptr = ev;
    ev->status = 0;
    epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
    return 0;
}
int recv_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    struct ntyevent *ev = ntyreactor_find_event_idx(reactor, fd);
    int len = recv(fd, ev->buffer, BUFFER_LENGTH, 0); //
    nty_event_del(reactor->epfd, ev);
    if (len > 0) {
        ev->length = len;
        ev->buffer[len] = '\0';
//        printf("recv[%d]:%s\n", fd, ev->buffer);
        printf("recv fd=[%d\n", fd);
        nty_event_set(ev, fd, send_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLOUT, ev);
    }
    else if (len == 0) {
        close(ev->fd);
        //printf("[fd=%d] pos[%ld], closed\n", fd, ev-reactor->events);
    }
    else {
        close(ev->fd);
//        printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
    }
    return len;
}
int send_cb(int fd, int events, void *arg) {
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    struct ntyevent *ev = ntyreactor_find_event_idx(reactor, fd);
    int len = send(fd, ev->buffer, ev->length, 0);
    if (len > 0) {
//        printf("send[fd=%d], [%d]%s\n", fd, len, ev->buffer);
        printf("send fd=[%d\n]", fd);
        nty_event_del(reactor->epfd, ev);
        nty_event_set(ev, fd, recv_cb, reactor);
        nty_event_add(reactor->epfd, EPOLLIN, ev);
    }
    else {
        nty_event_del(reactor->epfd, ev);
        close(ev->fd);
        printf("send[fd=%d] error %s\n", fd, strerror(errno));
    }
    return len;
}
int accept_cb(int fd, int events, void *arg) {//非阻塞
    struct ntyreactor *reactor = (struct ntyreactor *) arg;
    if (reactor == NULL) return -1;
    struct sockaddr_in client_addr;
    socklen_t len = sizeof(client_addr);
    int clientfd;
    if ((clientfd = accept(fd, (struct sockaddr *) &client_addr, &len)) == -1) {
        printf("accept: %s\n", strerror(errno));
        return -1;
    }
    if ((fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
        printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
        return -1;
    }
    struct ntyevent *event = ntyreactor_find_event_idx(reactor, clientfd);
    nty_event_set(event, clientfd, recv_cb, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, event);
    printf("new connect [%s:%d], pos[%d]\n",
           inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
    return 0;
}
int init_sock(short port) {
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    fcntl(fd, F_SETFL, O_NONBLOCK);
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(port);
    bind(fd, (struct sockaddr *) &server_addr, sizeof(server_addr));
    if (listen(fd, 20) < 0) {
        printf("listen failed : %s\n", strerror(errno));
    }
    return fd;
}
int ntyreactor_alloc(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    if (reactor->evblk == NULL) return -1;
    struct eventblock *blk = reactor->evblk;
    while (blk->next != NULL) {
        blk = blk->next;
    }
    struct ntyevent *evs = (struct ntyevent *) malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (evs == NULL) {
        printf("ntyreactor_alloc ntyevents failed\n");
        return -2;
    }
    memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    struct eventblock *block = (struct eventblock *) malloc(sizeof(struct eventblock));
    if (block == NULL) {
        printf("ntyreactor_alloc eventblock failed\n");
        return -2;
    }
    memset(block, 0, sizeof(struct eventblock));
    block->events = evs;
    block->next = NULL;
    blk->next = block;
    reactor->blkcnt++; //
    return 0;
}
struct ntyevent *ntyreactor_find_event_idx(struct ntyreactor *reactor, int sockfd) {
    int blkidx = sockfd / MAX_EPOLL_EVENTS;
    while (blkidx >= reactor->blkcnt) {
        ntyreactor_alloc(reactor);
    }
    int i = 0;
    struct eventblock *blk = reactor->evblk;
    while (i++ < blkidx && blk != NULL) {
        blk = blk->next;
    }
    return &blk->events[sockfd % MAX_EPOLL_EVENTS];
}
int ntyreactor_init(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    memset(reactor, 0, sizeof(struct ntyreactor));
    reactor->epfd = epoll_create(1);
    if (reactor->epfd <= 0) {
        printf("create epfd in %s err %s\n", __func__, strerror(errno));
        return -2;
    }
    struct ntyevent *evs = (struct ntyevent *) malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    if (evs == NULL) {
        printf("ntyreactor_alloc ntyevents failed\n");
        return -2;
    }
    memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
    struct eventblock *block = (struct eventblock *) malloc(sizeof(struct eventblock));
    if (block == NULL) {
        printf("ntyreactor_alloc eventblock failed\n");
        return -2;
    }
    memset(block, 0, sizeof(struct eventblock));
    block->events = evs;
    block->next = NULL;
    reactor->evblk = block;
    reactor->blkcnt = 1;
    return 0;
}
int ntyreactor_destory(struct ntyreactor *reactor) {
    close(reactor->epfd);
    //free(reactor->events);
    struct eventblock *blk = reactor->evblk;
    struct eventblock *blk_next = NULL;
    while (blk != NULL) {
        blk_next = blk->next;
        free(blk->events);
        free(blk);
        blk = blk_next;
    }
    return 0;
}
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor) {
    if (reactor == NULL) return -1;
    if (reactor->evblk == NULL) return -1;
    struct ntyevent *event = ntyreactor_find_event_idx(reactor, sockfd);
    nty_event_set(event, sockfd, acceptor, reactor);
    nty_event_add(reactor->epfd, EPOLLIN, event);
    return 0;
}
_Noreturn int ntyreactor_run(struct ntyreactor *reactor) {
    if (reactor == NULL) return -1;
    if (reactor->epfd < 0) return -1;
    if (reactor->evblk == NULL) return -1;
    struct epoll_event events[MAX_EPOLL_EVENTS + 1];
    int i;
    while (1) {
        int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
        if (nready < 0) {
            printf("epoll_wait error, exit\n");
            continue;
        }
        for (i = 0; i < nready; i++) {
            struct ntyevent *ev = (struct ntyevent *) events[i].data.ptr;
            if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
            if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {
                ev->callback(ev->fd, events[i].events, ev->arg);
            }
        }
    }
}
// <remoteip, remoteport, localip, localport,protocol>
int main(int argc, char *argv[]) {
    unsigned short port = SERVER_PORT; // listen 8081
    if (argc == 2) {
        port = atoi(argv[1]);
    }
    struct ntyreactor *reactor = (struct ntyreactor *) malloc(sizeof(struct ntyreactor));
    ntyreactor_init(reactor);
    int i = 0;
    int sockfds[PORT_COUNT] = {0};
    for (i = 0; i < PORT_COUNT; i++) {
        sockfds[i] = init_sock(port + i);
        ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
    }
    ntyreactor_run(reactor);
    ntyreactor_destory(reactor);
    for (i = 0; i < PORT_COUNT; i++) {
        close(sockfds[i]);
    }
    free(reactor);
    return 0;
}

以上就是關于“epoll封裝reactor原理是什么”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI