溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Redis命令處理過程實(shí)例源碼分析

發(fā)布時(shí)間:2022-02-11 09:15:45 來源:億速云 閱讀:124 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹“Redis命令處理過程實(shí)例源碼分析”的相關(guān)知識(shí),小編通過實(shí)際案例向大家展示操作過程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“Redis命令處理過程實(shí)例源碼分析”文章能幫助大家解決問題。

本文基于社區(qū)版Redis 4.0.8

Redis命令處理過程實(shí)例源碼分析

1、命令解析

Redis服務(wù)器接收到的命令請(qǐng)求首先存儲(chǔ)在客戶端對(duì)象的querybuf輸入緩沖區(qū),然后解析命令請(qǐng)求的各個(gè)參數(shù),并存儲(chǔ)在客戶端對(duì)象的argv和argc字段。

客戶端解析命令請(qǐng)求的入口函數(shù)為readQueryFromClient,會(huì)讀取socket數(shù)據(jù)存儲(chǔ)到客戶端對(duì)象的輸入緩沖區(qū),并調(diào)用函數(shù)processInputBuffer解析命令請(qǐng)求。

注:內(nèi)聯(lián)命令:使用telnet會(huì)話輸入命令的方式

void processInputBuffer(client *c) {
    ......
    //循環(huán)遍歷輸入緩沖區(qū),獲取命令參數(shù),調(diào)用processMultibulkBuffer解析命令參數(shù)和長(zhǎng)度
    while(sdslen(c->querybuf)) {
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;//處理telnet方式的內(nèi)聯(lián)命令
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break; //解析命令參數(shù)和長(zhǎng)度暫存到客戶端結(jié)構(gòu)體中
        } else {
            serverPanic("Unknown request type");
        }
    }    
}

//解析命令參數(shù)和長(zhǎng)度暫存到客戶端結(jié)構(gòu)體中
int processMultibulkBuffer(client *c) {
    //定位到行尾
    newline = strchr(c->querybuf,'\r');
    //解析命令請(qǐng)求參數(shù)數(shù)目,并存儲(chǔ)在客戶端對(duì)象的c->multibulklen字段
    serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');
    ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
    c->multibulklen = ll;
    pos = (newline-c->querybuf)+2;//記錄已解析命令的請(qǐng)求長(zhǎng)度resp的長(zhǎng)度
    /* Setup argv array on client structure */
    //分配請(qǐng)求參數(shù)存儲(chǔ)空間
    c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
    
    // 開始循環(huán)解析每個(gè)請(qǐng)求參數(shù)
    while(c->multibulklen) {
        ......
        newline = strchr(c->querybuf+pos,'\r');
        if (c->querybuf[pos] != '$') {
            return C_ERR;
        ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
        pos += newline-(c->querybuf+pos)+2;
        c->bulklen = ll;//字符串參數(shù)長(zhǎng)度暫存在客戶端對(duì)象的bulklen字段
        
        //讀取該長(zhǎng)度的參數(shù)內(nèi)容,并創(chuàng)建字符串對(duì)象,同時(shí)更新待解析參數(shù)multibulklen
        c->argv[c->argc++] =createStringObject(c->querybuf+pos,c->bulklen);
        pos += c->bulklen+2;
        c->multibulklen--;
    }

2、命令調(diào)用

當(dāng)multibulklen的值更新為0時(shí),表示參數(shù)解析完成,開始調(diào)用processCommand來處理命令,處理命令前有很多校驗(yàn)邏輯,如下:

void processInputBuffer(client *c) {
    
    ......
     //調(diào)用processCommand來處理命令
     if (processCommand(c) == C_OK) {
         ......
     }
}

//處理命令函數(shù)
int processCommand(client *c) {
    //校驗(yàn)是否是quit命令
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
    //調(diào)用lookupCommand,查看該命令是否存在
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        flagTransaction(c);
        addReplyErrorFormat(c,"unknown command '%s'",
            (char*)c->argv[0]->ptr);
        return C_OK;
    //檢查用戶權(quán)限
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
    {
        addReply(c,shared.noautherr);
    //還有很多檢查,不一一列舉,比如集群/持久化/復(fù)制等
    /* 真正執(zhí)行命令 */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        queueMultiCommand(c);
        //將結(jié)果寫入outbuffer
        addReply(c,shared.queued);
    } 
// 調(diào)用execCommand執(zhí)行命令
void execCommand(client *c) {
    call(c,CMD_CALL_FULL);//調(diào)用call執(zhí)行命令
//調(diào)用execCommand調(diào)用call執(zhí)行命令
void call(client *c, int flags) {
    start = ustime();
    c->cmd->proc(c);//執(zhí)行命令
    duration = ustime()-start;
    //如果是慢查詢,記錄慢查詢
    if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        //記錄到慢日志中
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    //更新統(tǒng)計(jì)信息:當(dāng)前命令執(zhí)行時(shí)間和調(diào)用次數(shù)
    if (flags & CMD_CALL_STATS) {
        c->lastcmd->microseconds += duration;
        c->lastcmd->calls++;

3、返回結(jié)果

Redis返回結(jié)果并不是直接返回給客戶端,而是先寫入到輸出緩沖區(qū)(buf字段)或者輸出鏈表(reply字段)

int processCommand(client *c) {
    ......
    //將結(jié)果寫入outbuffer
    addReply(c,shared.queued);
    ......
    
}
//將結(jié)果寫入outbuffer
void addReply(client *c, robj *obj) {
    //調(diào)用listAddNodeHead將客戶端添加到服務(wù)端結(jié)構(gòu)體的client_pending_write鏈表,以便后續(xù)能快速查找出哪些客戶端有數(shù)據(jù)需要發(fā)送
    if (prepareClientToWrite(c) != C_OK) return;
    
    //然后添加字符串到輸出緩沖區(qū)
    if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
        //如果添加失敗,則添加到輸出鏈表中
        _addReplyObjectToList(c,obj); 
}

addReply函數(shù)只是將待發(fā)送給客戶端的數(shù)據(jù)暫存在輸出鏈表或者輸出緩沖區(qū),那么什么時(shí)候?qū)⑦@些數(shù)據(jù)發(fā)送給客戶端呢?答案是開啟事件循環(huán)時(shí),調(diào)用的beforesleep函數(shù),該函數(shù)專門執(zhí)行一些不是很費(fèi)時(shí)的操作,如過期鍵刪除,向客戶端返回命令回復(fù)等

void beforeSleep(struct aeEventLoop *eventLoop) {
    ......
     /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
}

//回復(fù)客戶端命令函數(shù)
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);
        /* 發(fā)送客戶端數(shù)據(jù) */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;
        /* If there is nothing left, do nothing. Otherwise install
         * the write handler. */
         //如果數(shù)據(jù)量很大,一次性沒有發(fā)送完成,則進(jìn)行添加文件事件,監(jiān)聽當(dāng)前客戶端socket文件描述符的可寫事件即可
        if (clientHasPendingReplies(c) &&
            aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
                sendReplyToClient, c) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    return processed;

關(guān)于“Redis命令處理過程實(shí)例源碼分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注億速云行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI