溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Nginx事件驅(qū)動框架處理流程是什么

發(fā)布時間:2022-04-29 17:25:08 來源:億速云 閱讀:205 作者:zzz 欄目:大數(shù)據(jù)

這篇文章主要介紹“Nginx事件驅(qū)動框架處理流程是什么”,在日常操作中,相信很多人在Nginx事件驅(qū)動框架處理流程是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Nginx事件驅(qū)動框架處理流程是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

ngx_event_core_module模塊的ngx_event_process_init方法對事件模塊做了一些初始化。其中包括將“請求連接”這樣一個讀事件對應(yīng)的處理方法(handler)設(shè)置為ngx_event_accept函數(shù),并將此事件添加到epoll模塊中。當(dāng)有新連接事件發(fā)生時,ngx_event_accept就會被調(diào)用。大致流程是這樣:

worker進(jìn)程在ngx_worker_process_cycle方法中不斷循環(huán)調(diào)用ngx_process_events_and_timers函數(shù)處理事件,這個函數(shù)是事件處理的總?cè)肟凇?/p>

ngx_process_events_and_timers會調(diào)用ngx_process_events,這是一個宏,相當(dāng)于ngx_event_actions.process_events,ngx_event_actions是個全局的結(jié)構(gòu)體,存儲了對應(yīng)事件驅(qū)動模塊(這里是epoll模塊)的10個函數(shù)接口。所以這里就是調(diào)用了ngx_epoll_module_ctx.actions.process_events函數(shù),也就是ngx_epoll_process_events函數(shù)來處理事件。

ngx_epoll_process_events調(diào)用linux函數(shù)接口epoll_wait獲得“有新連接”這個事件,然后調(diào)用這個事件的handler處理函數(shù)來對這個事件進(jìn)行處理。

在上面已經(jīng)說過handler已經(jīng)被設(shè)置成了ngx_event_accept函數(shù),所以就調(diào)用ngx_event_accept進(jìn)行實(shí)際的處理。

下面分析ngx_event_accept方法,它的流程圖如下所示:

Nginx事件驅(qū)動框架處理流程是什么

經(jīng)過精簡的代碼如下,注釋中的序號對應(yīng)上圖的序號:

void
ngx_event_accept(ngx_event_t *ev)
{
 socklen_t  socklen;
 ngx_err_t  err;
 ngx_log_t  *log;
 ngx_uint_t  level;
 ngx_socket_t  s;
 ngx_event_t  *rev, *wev;
 ngx_listening_t  *ls;
 ngx_connection_t *c, *lc;
 ngx_event_conf_t *ecf;
 u_char  sa[ngx_sockaddrlen];
 
 if (ev->timedout) {
  if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != ngx_ok) {
   return;
  }
 
  ev->timedout = 0;
 }
 
 ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);
 
 if (ngx_event_flags & ngx_use_rtsig_event) {
  ev->available = 1;
 
 } else if (!(ngx_event_flags & ngx_use_kqueue_event)) {
  ev->available = ecf->multi_accept;
 }
 
 lc = ev->data;
 ls = lc->listening;
 ev->ready = 0;
 
 do {
  socklen = ngx_sockaddrlen;
 
  /* 1、accept方法試圖建立連接,非阻塞調(diào)用 */
  s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
 
  if (s == (ngx_socket_t) -1)
  {
   err = ngx_socket_errno;
 
   if (err == ngx_eagain)
   {
    /* 沒有連接,直接返回 */
    return;
   }
 
   level = ngx_log_alert;
 
   if (err == ngx_econnaborted) {
    level = ngx_log_err;
 
   } else if (err == ngx_emfile || err == ngx_enfile) {
    level = ngx_log_crit;
   }
 
   if (err == ngx_econnaborted) {
    if (ngx_event_flags & ngx_use_kqueue_event) {
     ev->available--;
    }
 
    if (ev->available) {
     continue;
    }
   }
 
   if (err == ngx_emfile || err == ngx_enfile) {
    if (ngx_disable_accept_events((ngx_cycle_t *) ngx_cycle)
     != ngx_ok)
    {
     return;
    }
 
    if (ngx_use_accept_mutex) {
     if (ngx_accept_mutex_held) {
      ngx_shmtx_unlock(&ngx_accept_mutex);
      ngx_accept_mutex_held = 0;
     }
 
     ngx_accept_disabled = 1;
 
    } else {
     ngx_add_timer(ev, ecf->accept_mutex_delay);
    }
   }
 
   return;
  }
 
  /* 2、設(shè)置負(fù)載均衡閾值 */
  ngx_accept_disabled = ngx_cycle->connection_n / 8
        - ngx_cycle->free_connection_n;
 
  /* 3、從連接池獲得一個連接對象 */
  c = ngx_get_connection(s, ev->log);
 
  /* 4、為連接創(chuàng)建內(nèi)存池 */
  c->pool = ngx_create_pool(ls->pool_size, ev->log);
 
  c->sockaddr = ngx_palloc(c->pool, socklen);
 
  ngx_memcpy(c->sockaddr, sa, socklen);
 
  log = ngx_palloc(c->pool, sizeof(ngx_log_t));
 
  /* set a blocking mode for aio and non-blocking mode for others */
  /* 5、設(shè)置套接字屬性為阻塞或非阻塞 */
  if (ngx_inherited_nonblocking) {
   if (ngx_event_flags & ngx_use_aio_event) {
    if (ngx_blocking(s) == -1) {
     ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno,
         ngx_blocking_n " failed");
     ngx_close_accepted_connection(c);
     return;
    }
   }
 
  } else {
   if (!(ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event))) {
    if (ngx_nonblocking(s) == -1) {
     ngx_log_error(ngx_log_alert, ev->log, ngx_socket_errno,
         ngx_nonblocking_n " failed");
     ngx_close_accepted_connection(c);
     return;
    }
   }
  }
 
  *log = ls->log;
 
  c->recv = ngx_recv;
  c->send = ngx_send;
  c->recv_chain = ngx_recv_chain;
  c->send_chain = ngx_send_chain;
 
  c->log = log;
  c->pool->log = log;
 
  c->socklen = socklen;
  c->listening = ls;
  c->local_sockaddr = ls->sockaddr;
  c->local_socklen = ls->socklen;
 
  c->unexpected_eof = 1;
 
  rev = c->read;
  wev = c->write;
 
  wev->ready = 1;
 
  if (ngx_event_flags & (ngx_use_aio_event|ngx_use_rtsig_event)) {
   /* rtsig, aio, iocp */
   rev->ready = 1;
  }
 
  if (ev->deferred_accept) {
   rev->ready = 1;
 
  }
 
  rev->log = log;
  wev->log = log;
 
  /*
   * todo: mt: - ngx_atomic_fetch_add()
   *  or protection by critical section or light mutex
   *
   * todo: mp: - allocated in a shared memory
   *   - ngx_atomic_fetch_add()
   *  or protection by critical section or light mutex
   */
 
  c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
 
  if (ls->addr_ntop) {
   c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
   if (c->addr_text.data == null) {
    ngx_close_accepted_connection(c);
    return;
   }
 
   c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
            c->addr_text.data,
            ls->addr_text_max_len, 0);
   if (c->addr_text.len == 0) {
    ngx_close_accepted_connection(c);
    return;
   }
  }
 
  /* 6、將新連接對應(yīng)的讀寫事件添加到epoll對象中 */
  if (ngx_add_conn && (ngx_event_flags & ngx_use_epoll_event) == 0) {
   if (ngx_add_conn(c) == ngx_error) {
    ngx_close_accepted_connection(c);
    return;
   }
  }
 
  log->data = null;
  log->handler = null;
 
  /* 7、tcp建立成功調(diào)用的方法,這個方法在ngx_listening_t結(jié)構(gòu)體中 */
  ls->handler(c);
 
 } while (ev->available); /* available標(biāo)志表示一次盡可能多的建立連接,由配置項(xiàng)multi_accept決定 */
}

nginx中的“驚群”問題

nginx一般會運(yùn)行多個worker進(jìn)程,這些進(jìn)程會同時監(jiān)聽同一端口。當(dāng)有新連接到來時,內(nèi)核將這些進(jìn)程全部喚醒,但只有一個進(jìn)程能夠和客戶端連接成功,導(dǎo)致其它進(jìn)程在喚醒時浪費(fèi)了大量開銷,這被稱為“驚群”現(xiàn)象。nginx解決“驚群”的方法是,讓進(jìn)程獲得互斥鎖ngx_accept_mutex,讓進(jìn)程互斥地進(jìn)入某一段臨界區(qū)。在該臨界區(qū)中,進(jìn)程將它所要監(jiān)聽的連接對應(yīng)的讀事件添加到epoll模塊中,使得當(dāng)有“新連接”事件發(fā)生時,該worker進(jìn)程會作出反應(yīng)。這段加鎖并添加事件的過程是在函數(shù)ngx_trylock_accept_mutex中完成的。而當(dāng)其它進(jìn)程也進(jìn)入該函數(shù)想要添加讀事件時,發(fā)現(xiàn)互斥鎖被另外一個進(jìn)程持有,所以它只能返回,它所監(jiān)聽的事件也無法添加到epoll模塊,從而無法響應(yīng)“新連接”事件。但這會出現(xiàn)一個問題:持有互斥鎖的那個進(jìn)程在什么時候釋放互斥鎖呢?如果需要等待它處理完所有的事件才釋放鎖的話,那么會需要相當(dāng)長的時間。而在這段時間內(nèi),其它worker進(jìn)程無法建立新連接,這顯然是不可取的。nginx的解決辦法是:通過ngx_trylock_accept_mutex獲得了互斥鎖的進(jìn)程,在獲得就緒讀/寫事件并從epoll_wait返回后,將這些事件歸類放入隊列中:

新連接事件放入ngx_posted_accept_events隊列
已有連接事件放入ngx_posted_events隊列

代碼如下:

if (flags & ngx_post_events)
{
 /* 延后處理這批事件 */
 queue = (ngx_event_t **) (rev->accept ? &ngx_posted_accept_events : &ngx_posted_events);
 
 /* 將事件添加到延后執(zhí)行隊列中 */
 ngx_locked_post_event(rev, queue);
}
else
{
 rev->handler(rev); /* 不需要延后,則立即處理事件 */
}

寫事件做類似處理。進(jìn)程接下來處理ngx_posted_accept_events隊列中的事件,處理完后立即釋放互斥鎖,使該進(jìn)程占用鎖的時間降到了最低。

nginx中的負(fù)載均衡問題

nginx中每個進(jìn)程使用了一個處理負(fù)載均衡的閾值ngx_accept_disabled,它在上圖的第2步中被初始化:

ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

它的初值為一個負(fù)數(shù),該負(fù)數(shù)的絕對值等于總連接數(shù)的7/8.當(dāng)閾值小于0時正常響應(yīng)新連接事件,當(dāng)閾值大于0時不再響應(yīng)新連接事件,并將ngx_accept_disabled減1,代碼如下:

if (ngx_accept_disabled > 0)
{
  ngx_accept_disabled--;
}
else
{
 if (ngx_trylock_accept_mutex(cycle) == ngx_error)
 {
  return;
 }
 ....
}

這說明,當(dāng)某個進(jìn)程當(dāng)前的連接數(shù)達(dá)到能夠處理的總連接數(shù)的7/8時,負(fù)載均衡機(jī)制被觸發(fā),進(jìn)程停止響應(yīng)新連接。

到此,關(guān)于“Nginx事件驅(qū)動框架處理流程是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI