溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

nginx如何動(dòng)態(tài)修改upstream ngx_http_dyups_module

發(fā)布時(shí)間:2021-11-19 17:13:06 來(lái)源:億速云 閱讀:335 作者:小新 欄目:網(wǎng)絡(luò)管理

小編給大家分享一下nginx如何動(dòng)態(tài)修改upstream ngx_http_dyups_module,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

簡(jiǎn)介

nginx 動(dòng)態(tài)修改upstream不reload nginx模塊,ngx_http_dyups_module分析。

主要圍繞https://github.com/yzprofile/ngx_http_dyups_module/blob/master/ngx_http_dyups_module.c進(jìn)行分析記錄下來(lái)。

開(kāi)整......

在create_main_conf的時(shí)候初始化這個(gè)數(shù)組

static void * ngx_http_dyups_create_main_conf(ngx_conf_t *cf)

{

...

if (ngx_array_init(&dmcf->dy_upstreams, cf->pool, 1024, sizeof(ngx_http_dyups_srv_conf_t)) != NGX_OK)

{

return NULL;

}

...

}

ngx_http_dyups_init

static ngx_http_module_t  ngx_http_dyups_module_ctx = {

ngx_http_dyups_pre_conf,          /* preconfiguration */

ngx_http_dyups_init,              /* postconfiguration */

ngx_http_dyups_create_main_conf,  /* create main configuration */

ngx_http_dyups_init_main_conf,    /* init main configuration */

ngx_http_dyups_create_srv_conf,   /* create server configuration */

NULL,                             /* merge server configuration */

NULL,                             /* create location configuration */

NULL                              /* merge location configuration */

};

在dyups init的時(shí)把upstream中的conf取出來(lái)放進(jìn)去。

初始化dy_upstream鏈以及全局ngx_http_dyups_deleted_upstream。

static ngx_int_t ngx_http_dyups_init(ngx_conf_t *cf)

{

...

dmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_dyups_module);

umcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_upstream_module);

uscfp = umcf->upstreams.elts;

for (i = 0; i < umcf->upstreams.nelts; i++) {

duscf = ngx_array_push(&dmcf->dy_upstreams);

// 清零

ngx_memzero(duscf, sizeof(ngx_http_dyups_srv_conf_t));

duscf->pool = NULL;

// 賦值

duscf->upstream = uscfp[i];

duscf->dynamic = (uscfp[i]->port == 0

&& uscfp[i]->srv_conf && uscfp[i]->servers

&& uscfp[i]->flags & NGX_HTTP_UPSTREAM_CREATE);

duscf->deleted = 0;

// 賦值index

duscf->idx = i;

}

...

}

dyups share memory同步機(jī)制

shm初始化是在ngx_http_dyups_init_main_conf函數(shù)中實(shí)現(xiàn)的,同時(shí)設(shè)置了read_mesg的超時(shí)時(shí)間,并且指定了大小。

static char *ngx_http_dyups_init_main_conf(ngx_conf_t *cf, void *conf)

{

...

if (dmcf->read_msg_timeout == NGX_CONF_UNSET_MSEC) {

// 一秒一次

dmcf->read_msg_timeout = 1000;

}

if (dmcf->shm_size == NGX_CONF_UNSET_UINT) {

dmcf->shm_size = 2 * 1024 * 1024;

}

return ngx_http_dyups_init_shm(cf, conf);

...

}

static char *ngx_http_dyups_init_shm(ngx_conf_t *cf, void *conf)

{

...

shm_zone = ngx_shared_memory_add(cf, &dmcf->shm_name, dmcf->shm_size,

&ngx_http_dyups_module);

shm_zone->data = cf->pool;

// 加進(jìn)去的這個(gè)名頭的共享內(nèi)存塊的init函數(shù)會(huì)在初始化的時(shí)候統(tǒng)一調(diào)用

shm_zone->init = ngx_http_dyups_init_shm_zone;

...

}

static ngx_int_t ngx_http_dyups_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data)

{

...

shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;

sh = ngx_slab_alloc(shpool, sizeof(ngx_dyups_shctx_t));

if (sh == NULL) {

return NGX_ERROR;

}

// 全局變量,sh和shpool

ngx_dyups_global_ctx.sh = sh;

ngx_dyups_global_ctx.shpool = shpool;

// 初始化msg->queue

ngx_queue_init(&sh->msg_queue);

sh->version = 0;

sh->status = NULL;

...

}

ngx_http_dyups_init_process

該函數(shù)在啟動(dòng)進(jìn)程時(shí)候調(diào)用,設(shè)定了一些定時(shí)器。

初始化共享內(nèi)存,判斷如果是非正常退出的,那么重新加載upstream配置。

static ngx_int_t ngx_http_dyups_init_process(ngx_cycle_t *cycle)

{

...

// 設(shè)定定時(shí)器來(lái)定時(shí)read msg,同步信息

timer = &ngx_dyups_global_ctx.msg_timer;

timer->handler = ngx_http_dyups_read_msg;

ngx_add_timer(timer, dmcf->read_msg_timeout);

// 拿到全局的pool和sh

shpool = ngx_dyups_global_ctx.shpool;

sh = ngx_dyups_global_ctx.sh;

ngx_shmtx_lock(&shpool->mutex);

// 初始化的時(shí)候肯定是NULL,,申請(qǐng)對(duì)應(yīng)數(shù)量進(jìn)程數(shù)的內(nèi)存

if (sh->status == NULL) {

sh->status = ngx_slab_alloc_locked(shpool,

sizeof(ngx_dyups_status_t) * ccf->worker_processes);

if (sh->status == NULL) {

ngx_shmtx_unlock(&shpool->mutex);

return NGX_ERROR;

}

ngx_memzero(sh->status,

sizeof(ngx_dyups_status_t) * ccf->worker_processes);

ngx_shmtx_unlock(&shpool->mutex);

return NGX_OK;

}

ngx_shmtx_unlock(&shpool->mutex);

// 判斷version,如果不是0的話,說(shuō)明version已經(jīng)在同步中被++了,所以是進(jìn)程掛掉再被拉起來(lái)

if (sh->version != 0) {

//...

}

最核心的是ngx_http_dyups_read_msg函數(shù),里面的是ngx_http_dyups_read_msg_locked函數(shù)

static void ngx_http_dyups_read_msg_locked(ngx_event_t *ev)

{

...

sh = ngx_dyups_global_ctx.sh;

shpool = ngx_dyups_global_ctx.shpool;

for (i = 0; i < ccf->worker_processes; i++) {

status = &sh->status[i];

if (status->pid == 0 || status->pid == ngx_pid) {

ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0,

"[dyups] process %P update time %ui",

status->pid, status->time);

// 遍歷全部進(jìn)程,將對(duì)應(yīng)的pid賦值

status->pid = ngx_pid;

status->time = now;

break;

}

}

// 遍歷消息隊(duì)列

for (q = ngx_queue_last(&sh->msg_queue);

q != ngx_queue_sentinel(&sh->msg_queue);

q = ngx_queue_prev(q))

{

// 如果該msg的count和進(jìn)程數(shù)一致,就是大家都同步過(guò)了,把這個(gè)msg刪掉

if (msg->count == ccf->worker_processes) {

t = ngx_queue_next(q); ngx_queue_remove(q); q = t;

ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] destroy msg %V:%V",&msg->name, &msg->content);

ngx_dyups_destroy_msg(shpool, msg);

continue;

}

found = 0;

for (i = 0; i < msg->count; i++) {

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, 0,"[dyups] msg pids [%P]", msg->pid[i]);

if (msg->pid[i] == ngx_pid) {

found = 1;

break;

}

}

// 如果發(fā)現(xiàn)該進(jìn)程了,就說(shuō)明已經(jīng)同步過(guò)了。

if (found) {

ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] msg %V count %ui found", &msg->name, msg->count);

continue;

}

// 如果沒(méi)發(fā)現(xiàn)的話,count++,pid更新

msg->pid[i] = ngx_pid;

msg->count++;

ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ev->log, 0, "[dyups] msg %V count %ui", &msg->name, msg->count);

// 取出來(lái)name和content

name = msg->name;

content = msg->content;

// 執(zhí)行同步

rc = ngx_dyups_sync_cmd(pool, &name, &content, msg->flag);

if (rc != NGX_OK) {

ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "[dyups] read msg error, may cause the " "config inaccuracy,name:%V, content:%V",&name, &content);

}

}

...

}

static ngx_int_t ngx_dyups_sync_cmd(ngx_pool_t *pool, ngx_str_t *name, ngx_str_t *content, ngx_uint_t flag)

{

...

} else if (flag == NGX_DYUPS_ADD) {

body.start = body.pos = content->data;

body.end = body.last = content->data + content->len;

body.temporary = 1;

rc = ngx_dyups_do_update(name, &body, &rv);

ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "[dyups] sync add: %V rv: %V rc: %i", name, &rv, rc);

if (rc != NGX_HTTP_OK) {

return NGX_ERROR;

}

return NGX_OK;

}

...

}

同步其他進(jìn)程接受的信息,如果是當(dāng)前進(jìn)程處理的就要把信息添加到消息隊(duì)列中。


ngx_dyups_update_upstream

ngx_int_t ngx_dyups_update_upstream(ngx_str_t *name, ngx_buf_t *buf, ngx_str_t *rv)

{

...

ngx_http_dyups_read_msg_locked(timer);

// 沙箱測(cè)試配置

status = ngx_dyups_sandbox_update(buf, rv);

if (status != NGX_HTTP_OK) {

goto finish;

}

status = ngx_dyups_do_update(name, buf, rv);

if (status == NGX_HTTP_OK) {

//把操作發(fā)到隊(duì)列中去

if (ngx_http_dyups_send_msg(name, buf, NGX_DYUPS_ADD)) {

ngx_str_set(rv, "alert: update success "

"but not sync to other process");

status = NGX_HTTP_INTERNAL_SERVER_ERROR;

}

}

...

}

ngx_http_dyups_send_msg函數(shù)分析

static ngx_int_t ngx_http_dyups_send_msg(ngx_str_t *name, ngx_buf_t *body, ngx_uint_t flag)

{

...

// 初始化整個(gè)msg,將name和body填充進(jìn)去

sh->version++;

ngx_queue_insert_head(&sh->msg_queue, &msg->queue);

...

}

ngx_dyups_do_update

在update之前先f(wàn)ind尋找對(duì)應(yīng)的upstream。

static ngx_http_dyups_srv_conf_t * ngx_dyups_find_upstream(ngx_str_t *name, ngx_int_t *idx)

{

...

duscfs = dumcf->dy_upstreams.elts;

for (i = 0; i < dumcf->dy_upstreams.nelts; i++) {

duscf = &duscfs[i];

uscf = duscf->upstream;

if (uscf->host.len != name->len

|| ngx_strncasecmp(uscf->host.data, name->data, uscf->host.len)

!= 0)

{

continue;

}

*idx = i;

return duscf;

}

...

}

如果尋找到了idx賦值。

一旦發(fā)現(xiàn)尋找到了對(duì)應(yīng)name的dy_upstream就先判斷。

然后調(diào)用的是ngx_dyups_mark_upstream_delete函數(shù)

static void ngx_dyups_mark_upstream_delete(ngx_http_dyups_srv_conf_t *duscf)

{

...

// 獲取umcf和uscf

uscf = duscf->upstream;

umcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,

ngx_http_upstream_module);

// us獲取這個(gè)dynamic upstream下的servers

us = uscf->servers->elts;

for (i = 0; i < uscf->servers->nelts; i++) {

// 標(biāo)志位置1

us[i].down = 1;

#if (NGX_HTTP_UPSTREAM_CHECK)

if (us[i].addrs) {

// 關(guān)閉peer,看宏定義主要關(guān)閉健康檢查的peer

ngx_http_upstream_check_delete_dynamic_peer(&uscf->host,

us[i].addrs);

}

#endif

}

// 將upstream對(duì)應(yīng)的index的配置變成一個(gè)dummy配置

uscfp[duscf->idx] = &ngx_http_dyups_deleted_upstream;

#if (NGX_HTTP_UPSTREAM_RBTREE)

ngx_rbtree_delete(&umcf->rbtree, &uscf->node);

#endif

duscf->deleted = NGX_DYUPS_DELETING;

...

}

其中最重要的是check_delete_dynamic_peer

void ngx_http_upstream_check_delete_dynamic_peer(ngx_str_t *name,ngx_addr_t *peer_addr)

{

...

/* 一堆比較 找到choosen*/

chosen = &peer[i];

chosen->shm->ref--;

if (chosen->shm->ref <= 0 && chosen->shm->delete != PEER_DELETED) {

ngx_http_upstream_check_clear_dynamic_peer_shm(chosen->shm);

chosen->shm->delete = PEER_DELETED;

}

ngx_shmtx_unlock(&chosen->shm->mutex);

ngx_http_upstream_check_clear_peer(chosen);

...

}

這樣子刪完一次之后,再find一次,idx大概率就變成-1了,就可以進(jìn)行創(chuàng)建

static ngx_int_t ngx_dyups_do_update(ngx_str_t *name, ngx_buf_t *buf, ngx_str_t *rv)

{

...

if (idx == -1) {

duscf = ngx_array_push(&dumcf->dy_upstreams);

uscfp = ngx_array_push(&umcf->upstreams);

ngx_memzero(duscf, sizeof(ngx_http_dyups_srv_conf_t));

// 這里為了獲取在umcf中的新upstream的index值。

idx = umcf->upstreams.nelts - 1;

}

duscf->idx = idx;

rc = ngx_dyups_init_upstream(duscf, name, idx);

rc = ngx_dyups_add_server(duscf, buf);

...

}

最重要的就是init_upstream和add_server。

首先是init ipstream,他的傳參其實(shí)是dy_srv_conf_t。upstream的name,以及upstream鏈表中對(duì)應(yīng)的index

static ngx_int_t ngx_dyups_init_upstream(ngx_http_dyups_srv_conf_t *duscf, ngx_str_t *name, ngx_uint_t index)

{

...

umcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,

ngx_http_upstream_module);

uscfp = umcf->upstreams.elts;

/*初始化uscf 也就是upstream的各個(gè)結(jié)構(gòu)體*/

uscfp[index] = uscf; // 賦值

duscf->dynamic = 1;

duscf->upstream = uscf;

ctx = ngx_pcalloc(duscf->pool, sizeof(ngx_http_conf_ctx_t));

// 存放ctx

duscf->ctx = ctx;

// insert進(jìn)去uscf

uscf->node.key = ngx_crc32_short(uscf->host.data, uscf->host.len);

ngx_rbtree_insert(&umcf->rbtree, &uscf->node);

...

}

static ngx_int_t ngx_dyups_add_server(ngx_http_dyups_srv_conf_t *duscf, ngx_buf_t *buf)

{

...

ngx_dyups_parse_upstream(&cf, buf)

...

}

static char * ngx_dyups_parse_upstream(ngx_conf_t *cf, ngx_buf_t *buf)

{

...

b = *buf;

ngx_memzero(&conf_file, sizeof(ngx_conf_file_t));

conf_file.file.fd = NGX_INVALID_FILE;

conf_file.buffer = &ampb;

cf->conf_file = &conf_file;

return ngx_conf_parse(cf, NULL);

...

}

ngx_dyups_do_delete

static ngx_int_t ngx_dyups_do_delete(ngx_str_t *name, ngx_str_t *rv)

{

...

duscf = ngx_dyups_find_upstream(name, &dumy);

// 如果查出來(lái)的是NULL,或者是一個(gè)已經(jīng)被標(biāo)記刪除,或者徹底刪除的,就說(shuō)明要?jiǎng)h這個(gè)有異常

if (duscf == NULL || duscf->deleted) {

ngx_log_error(NGX_LOG_DEBUG, ngx_cycle->log, 0, "[dyups] not find upstream %V %p", name, duscf);

ngx_str_set(rv, "not found uptream");

return NGX_HTTP_NOT_FOUND;

}

// 沒(méi)問(wèn)題的話就執(zhí)行正常刪除

ngx_dyups_mark_upstream_delete(duscf);

...

}

ngx_dyups_find_upstream

find upstream做了很多事還做了一部分的刪除操作。

static ngx_http_dyups_srv_conf_t * ngx_dyups_find_upstream(ngx_str_t *name, ngx_int_t *idx)

{

...

dumcf = ngx_http_cycle_get_module_main_conf(ngx_cycle, ngx_http_dyups_module);

duscfs = dumcf->dy_upstreams.elts;

for (i = 0; i < dumcf->dy_upstreams.nelts; i++) {

// 這里是在mark_upstream中被標(biāo)記的

if (duscf->deleted == NGX_DYUPS_DELETING) {

// 確認(rèn)可以刪除,主要看這個(gè)ref的引用計(jì)數(shù)

if (*(duscf->ref) == 0) {

ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "[dyups] free dynamic upstream in find upstream" " %ui", duscf->idx);

duscf->deleted = NGX_DYUPS_DELETED;

if (duscf->pool) {

ngx_destroy_pool(duscf->pool);

duscf->pool = NULL;

}

}

}

// 如果是deleted或者是deleting,算是沒(méi)有找到,除非遍歷完沒(méi)找到返回一個(gè)deleted。

if (duscf->deleted == NGX_DYUPS_DELETING) {

continue;

}

if (duscf->deleted == NGX_DYUPS_DELETED) {

*idx = i;

duscf_del = duscf;

continue;

}

// 如果找到了就正常返回

if (uscf->host.len != name->len

|| ngx_strncasecmp(uscf->host.data, name->data, uscf->host.len)

!= 0)

{

continue;

}

*idx = i;

return duscf;

}

...

}

引用計(jì)數(shù)ref

dyups的peer init get和free函數(shù)。

static ngx_int_t ngx_http_dyups_init_peer(ngx_http_request_t *r,

ngx_http_upstream_srv_conf_t *us)

{

...

// 設(shè)置上下文

ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_dyups_ctx_t));

if (ctx == NULL) {

return NGX_ERROR;

}

// scf指向?qū)?yīng)的dscf,ctx的data指向了自己

ctx->scf = dscf;

ctx->data = r->upstream->peer.data;

ctx->get = r->upstream->peer.get;

ctx->free = r->upstream->peer.free;

r->upstream->peer.data = ctx;

r->upstream->peer.get = ngx_http_dyups_get_peer;

r->upstream->peer.free = ngx_http_dyups_free_peer;

// 與客戶端的連接的這個(gè)pool注冊(cè)一個(gè)銷(xiāo)毀函數(shù)

cln = ngx_pool_cleanup_add(r->pool, 0);

if (cln == NULL) {

return NGX_ERROR;

}

// 引用計(jì)數(shù)加一

dscf->ref++;

// 等調(diào)用這個(gè)就會(huì)將ref--。

cln->handler = ngx_http_dyups_clean_request;

cln->data = &dscf->ref;

...

}

// 這個(gè)函數(shù)是在ngx_dyups_add_server初始化upstream中被賦值的

uscf->peer.init = ngx_http_dyups_init_peer;

// 調(diào)用的位置是在這里

static void ngx_http_upstream_init_request(ngx_http_request_t *r)

{

...

if (uscf->peer.init(r, uscf) != NGX_OK) {

ngx_http_upstream_finalize_request(r, u,

NGX_HTTP_INTERNAL_SERVER_ERROR);

return;

}

ngx_http_upstream_connect(r, u);

...

}

static ngx_int_t ngx_http_dyups_get_peer(ngx_peer_connection_t *pc, void *data)

{

ngx_http_dyups_ctx_t  *ctx = data;

// 就用之前的peer get來(lái)。

return ctx->get(pc, ctx->data);

}

static void ngx_http_dyups_free_peer(ngx_peer_connection_t *pc, void *data,

ngx_uint_t state)

{

ngx_http_dyups_ctx_t  *ctx = data;

ngx_pool_cleanup_t  *cln;

/* upstream connect failed */

if (pc->connection == NULL) {

goto done;

}

if (pc->cached) {

goto done;

}

// free的時(shí)候先給ref++,等調(diào)用handler后ref--

ctx->scf->ref++;

// pool設(shè)置一個(gè)銷(xiāo)毀pool的數(shù)據(jù)結(jié)構(gòu),賦值給pool->cleanup

cln = ngx_pool_cleanup_add(pc->connection->pool, 0);

if (cln == NULL) {

ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "[dyups] dynamic upstream free peer may cause memleak %i",ctx->scf->ref);

goto done;

}

// 銷(xiāo)毀的遞歸函數(shù)

cln->handler = ngx_http_dyups_clean_request;

cln->data = &ctx->scf->ref;

done:

// 結(jié)束后調(diào)用之前保存的free

ctx->free(pc, ctx->data, state);

}

static void ngx_http_dyups_clean_request(void *data)

{

ngx_uint_t  *ref = data;

(*ref)--;

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0, "[dyups] http clean request count %i", *ref);

}

pool在被destroy的時(shí)候,會(huì)調(diào)用這個(gè)handler將引用計(jì)數(shù)減掉。

void ngx_destroy_pool(ngx_pool_t *pool)

{

ngx_pool_t          *p, *n;

ngx_pool_large_t    *l;

ngx_pool_cleanup_t  *c;

for (c = pool->cleanup; c; c = c->next) {

if (c->handler) {

ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, pool->log, 0,

"run cleanup: %p", c);

c->handler(c->data);

}

}

...

}

以上是“nginx如何動(dòng)態(tài)修改upstream ngx_http_dyups_module”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI