溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

PostgreSQL中ReceiveXlogStream有什么作用

發(fā)布時(shí)間:2021-11-09 15:13:50 來源:億速云 閱讀:171 作者:iii 欄目:關(guān)系型數(shù)據(jù)庫(kù)

這篇文章主要介紹“PostgreSQL中ReceiveXlogStream有什么作用”,在日常操作中,相信很多人在PostgreSQL中ReceiveXlogStream有什么作用問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”PostgreSQL中ReceiveXlogStream有什么作用”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

本節(jié)簡(jiǎn)單介紹了PostgreSQL的備份工具pg_basebackup源碼中實(shí)際執(zhí)行備份邏輯的BaseBackup中對(duì)WAL數(shù)據(jù)進(jìn)行備份的實(shí)現(xiàn)函數(shù)StartLogStreamer->LogStreamerMain及其主要的實(shí)現(xiàn)函數(shù)ReceiveXlogStream.

一、數(shù)據(jù)結(jié)構(gòu)

logstreamer_param
WAL data streamer參數(shù).

typedef struct
{
     ////后臺(tái)連接
    PGconn     *bgconn;
    //開始位置
    XLogRecPtr  startptr;
    //目錄或者tar文件,依賴于使用的模式
    char        xlog[MAXPGPATH];    /* directory or tarfile depending on mode */
    //系統(tǒng)標(biāo)識(shí)符
    char       *sysidentifier;
    //時(shí)間線
    int         timeline;
} logstreamer_param;

StreamCtl
接收xlog流數(shù)據(jù)時(shí)的全局參數(shù)

/*
 * Global parameters when receiving xlog stream. For details about the individual fields,
 * see the function comment for ReceiveXlogStream().
 * 接收xlog流數(shù)據(jù)時(shí)的全局參數(shù).
 * 每個(gè)域字段的詳細(xì)解釋,參見ReceiveXlogStream()函數(shù)注釋.
 */
typedef struct StreamCtl
{
    //streaming的開始位置
    XLogRecPtr  startpos;       /* Start position for streaming */
    //時(shí)間線
    TimeLineID  timeline;       /* Timeline to stream data from */
    //系統(tǒng)標(biāo)識(shí)符
    char       *sysidentifier;  /* Validate this system identifier and
                                 * timeline */
    //standby超時(shí)信息
    int         standby_message_timeout;    /* Send status messages this often */
    //是否同步(寫入時(shí)是否馬上Flush WAL data)
    bool        synchronous;    /* Flush immediately WAL data on write */
    //在已歸檔的數(shù)據(jù)中標(biāo)記segment為已完成
    bool        mark_done;      /* Mark segment as done in generated archive */
    //刷新到磁盤上以確保數(shù)據(jù)的一致性狀態(tài)(是否已刷新到磁盤上)
    bool        do_sync;        /* Flush to disk to ensure consistent state of
                                 * data */
    //在返回T時(shí)停止streaming
    stream_stop_callback stream_stop;   /* Stop streaming when returns true */
    //如有效,監(jiān)測(cè)該socket中的輸入并檢查stream_stop()的返回
    pgsocket    stop_socket;    /* if valid, watch for input on this socket
                                 * and check stream_stop() when there is any */
    //如何寫WAL
    WalWriteMethod *walmethod;  /* How to write the WAL */
    //附加到部分接受文件的后綴
    char       *partial_suffix; /* Suffix appended to partially received files */
    //使用的replication slot,如無則為NULL
    char       *replication_slot;   /* Replication slot to use, or NULL */
} StreamCtl;

二、源碼解讀

LogStreamerMain
WAL流復(fù)制主函數(shù),用于fork后的子進(jìn)程調(diào)用

static int
LogStreamerMain(logstreamer_param *param)
{
    StreamCtl   stream;//接收xlog流數(shù)據(jù)時(shí)的全局參數(shù)
    in_log_streamer = true;
    //初始化StreamCtl結(jié)構(gòu)體
    MemSet(&stream, 0, sizeof(stream));
    stream.startpos = param->startptr;
    stream.timeline = param->timeline;
    stream.sysidentifier = param->sysidentifier;
    stream.stream_stop = reached_end_position;
#ifndef WIN32
    stream.stop_socket = bgpipe[0];
#else
    stream.stop_socket = PGINVALID_SOCKET;
#endif
    stream.standby_message_timeout = standby_message_timeout;
    stream.synchronous = false;
    stream.do_sync = do_sync;
    stream.mark_done = true;
    stream.partial_suffix = NULL;
    stream.replication_slot = replication_slot;
    if (format == 'p')
        stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
    else
        stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
    //接收數(shù)據(jù)
    if (!ReceiveXlogStream(param->bgconn, &stream))
        /*
         * Any errors will already have been reported in the function process,
         * but we need to tell the parent that we didn't shutdown in a nice
         * way.
         * 在函數(shù)執(zhí)行過程中出現(xiàn)的錯(cuò)誤已通過警告的方式發(fā)出,
         * 但仍需要告知父進(jìn)程不能優(yōu)雅的關(guān)閉本進(jìn)程.
         */
        return 1;
    if (!stream.walmethod->finish())
    {
        fprintf(stderr,
                _("%s: could not finish writing WAL files: %s\n"),
                progname, strerror(errno));
        return 1;
    }
    //結(jié)束連接
    PQfinish(param->bgconn);
    //普通文件格式
    if (format == 'p')
        FreeWalDirectoryMethod();
    else
        FreeWalTarMethod();
    //是否內(nèi)存
    pg_free(stream.walmethod);
    return 0;
}

ReceiveXlogStream
在指定的開始位置接收log stream

/*
 * Receive a log stream starting at the specified position.
 * 在指定的開始位置接收log stream
 *
 * Individual parameters are passed through the StreamCtl structure.
 * 通過StreamCtl結(jié)構(gòu)體傳遞參數(shù).
 *
 * If sysidentifier is specified, validate that both the system
 * identifier and the timeline matches the specified ones
 * (by sending an extra IDENTIFY_SYSTEM command)
 * 如指定了系統(tǒng)標(biāo)識(shí)符,驗(yàn)證系統(tǒng)標(biāo)識(shí)符和timeline是否匹配指定的信息.
 * (通過發(fā)送額外的IDENTIFY_SYSTEM命令)
 *
 * All received segments will be written to the directory
 * specified by basedir. This will also fetch any missing timeline history
 * files.
 * 所有接收到的segments會(huì)寫入到basedir中.
 * 這同時(shí)會(huì)提前所有缺失的timeline history文件.
 *
 * The stream_stop callback will be called every time data
 * is received, and whenever a segment is completed. If it returns
 * true, the streaming will stop and the function
 * return. As long as it returns false, streaming will continue
 * indefinitely.
 * stream_stop回調(diào)函數(shù)在每次接收到數(shù)據(jù)以及segment完成傳輸后調(diào)用.
 * 如返回T,streaming會(huì)停止,函數(shù)返回.
 * 如返回F,streaming會(huì)一直繼續(xù).
 *
 * If stream_stop() checks for external input, stop_socket should be set to
 * the FD it checks.  This will allow such input to be detected promptly
 * rather than after standby_message_timeout (which might be indefinite).
 * Note that signals will interrupt waits for input as well, but that is
 * race-y since a signal received while busy won't interrupt the wait.
 * 如stream_stop()用于檢測(cè)額外的輸入,stop_socket變量應(yīng)設(shè)置為該函數(shù)需檢查的FD.
 * 這會(huì)允許立即檢測(cè)此類輸入,而不是在standby_message_timeout之后(可能會(huì)無限循環(huán)).
 * 注意信號(hào)也會(huì)中斷輸入等待,但這是存在競(jìng)爭(zhēng)的,因?yàn)樵诿r(shí)接收到信號(hào)不會(huì)中斷等待.
 *
 * standby_message_timeout controls how often we send a message
 * back to the master letting it know our progress, in milliseconds.
 * Zero means no messages are sent.
 * This message will only contain the write location, and never
 * flush or replay.
 * standby_message_timeout控制發(fā)送進(jìn)度消息回master的頻度,單位為ms.
 * 0意味著沒有消息會(huì)發(fā)送.
 * 該消息只保存寫入位置,永遠(yuǎn)不會(huì)flush或replay.
 *
 * If 'partial_suffix' is not NULL, files are initially created with the
 * given suffix, and the suffix is removed once the file is finished. That
 * allows you to tell the difference between partial and completed files,
 * so that you can continue later where you left.
 * 如'partial_suffix'不為NULL,文件已通過給定的suffix創(chuàng)建,
 *   一旦文件完成傳輸,則suffix會(huì)被清除.
 * 這是部分和完整完成文件的異同,以便在離開后可以繼續(xù).
 *
 * If 'synchronous' is true, the received WAL is flushed as soon as written,
 * otherwise only when the WAL file is closed.
 * 如'synchronous'為T,接收到的WAL會(huì)刷新為寫入,否則的話只會(huì)在WAL file關(guān)閉時(shí)才寫入.
 *
 * Note: The WAL location *must* be at a log segment start!
 * 注意:WAL位置必須是log segment的起始位置.
 */
bool
ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
{
    char        query[128];
    char        slotcmd[128];
    PGresult   *res;
    XLogRecPtr  stoppos;
    /*
     * The caller should've checked the server version already, but doesn't do
     * any harm to check it here too.
     * 調(diào)用者已完成版本校驗(yàn),但這里重復(fù)校驗(yàn)并沒有什么問題.
     */
    if (!CheckServerVersionForStreaming(conn))
        return false;
    /*
     * Decide whether we want to report the flush position. If we report the
     * flush position, the primary will know what WAL we'll possibly
     * re-request, and it can then remove older WAL safely. We must always do
     * that when we are using slots.
     * 確定是否需要報(bào)告flush位置.
     * 如果我們報(bào)告了flush位置,主服務(wù)器將會(huì)知道可能重復(fù)請(qǐng)求的WAL file,
     *   這樣可以安全的移除更老的WAL.
     * 如使用slots,應(yīng)經(jīng)常執(zhí)行該操作.
     *
     * Reporting the flush position makes one eligible as a synchronous
     * replica. People shouldn't include generic names in
     * synchronous_standby_names, but we've protected them against it so far,
     * so let's continue to do so unless specifically requested.
     * 報(bào)告flush位置使其符合同步副本的條件.
     * DBA不應(yīng)該在synchronous_standby_names中包含常規(guī)的名稱,但我們截止目前位置已很好的保護(hù)了它們,
     *   因此可以繼續(xù)這樣執(zhí)行除非特別請(qǐng)求.
     */
    if (stream->replication_slot != NULL)
    {
        //存在slot
        reportFlushPosition = true;
        sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
    }
    else
    {
        if (stream->synchronous)
            reportFlushPosition = true;//同步
        else
            reportFlushPosition = false;//異步
        slotcmd[0] = 0;//ASCII 0
    }
    if (stream->sysidentifier != NULL)
    {
        //系統(tǒng)標(biāo)識(shí)符不為NULL
        /* Validate system identifier hasn't changed */
        //驗(yàn)證系統(tǒng)標(biāo)識(shí)符沒有改變
        //發(fā)送IDENTIFY_SYSTEM命令
        res = PQexec(conn, "IDENTIFY_SYSTEM");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
            fprintf(stderr,
                    _("%s: could not send replication command \"%s\": %s"),
                    progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
            PQclear(res);
            return false;
        }
        if (PQntuples(res) != 1 || PQnfields(res) < 3)
        {
            fprintf(stderr,
                    _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d r more fields\n"),
                    progname, PQntuples(res), PQnfields(res), 1, 3);
            PQclear(res);
            return false;
        }
        if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
        {
            fprintf(stderr,
                    _("%s: system identifier does not match between base backup and streaming onnection\n"),
                    progname);
            PQclear(res);
            return false;
        }
        if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
        {
            fprintf(stderr,
                    _("%s: starting timeline %u is not present in the server\n"),
                    progname, stream->timeline);
            PQclear(res);
            return false;
        }
        PQclear(res);
    }
    /*
     * initialize flush position to starting point, it's the caller's
     * responsibility that that's sane.
     * 初始化flush位置為開始點(diǎn),這是調(diào)用者的責(zé)任.
     */
    lastFlushPosition = stream->startpos;
    while (1)
    {
        /*
         * Fetch the timeline history file for this timeline, if we don't have
         * it already. When streaming log to tar, this will always return
         * false, as we are never streaming into an existing file and
         * therefore there can be no pre-existing timeline history file.
         * 為該timeline提前timeline history,如我們已不需要.
         * 如streaming日志為tar格式,這通常會(huì)返回F,這如同從來沒有streaming到已存在的文件中,
         *   因此沒有已存在的timeline history文件.
         */
        if (!existsTimeLineHistoryFile(stream))
        {
            //如不存在history文件
            snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
            //發(fā)送TIMELINE_HISTORY命令
            res = PQexec(conn, query);
            if (PQresultStatus(res) != PGRES_TUPLES_OK)
            {
                /* FIXME: we might send it ok, but get an error */
                fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
                        progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
                PQclear(res);
                return false;
            }
            /*
             * The response to TIMELINE_HISTORY is a single row result set
             * with two fields: filename and content
             * TIMELINE_HISTORY的響應(yīng)是一個(gè)單行結(jié)果集,有兩個(gè)字段:filename和content
             */
            if (PQnfields(res) != 2 || PQntuples(res) != 1)
            {
                fprintf(stderr,
                        _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d ields, expected %d rows and %d fields\n"),
                        progname, PQntuples(res), PQnfields(res), 1, 2);
            }
            /* Write the history file to disk */
            //寫入history文件到磁盤上
            writeTimeLineHistoryFile(stream,
                                     PQgetvalue(res, 0, 0),
                                     PQgetvalue(res, 0, 1));
            PQclear(res);
        }
        /*
         * Before we start streaming from the requested location, check if the
         * callback tells us to stop here.
         * 從請(qǐng)求的位置開始streaming前,檢查回調(diào)函數(shù)告訴我們?cè)谀耐V?
         */
        if (stream->stream_stop(stream->startpos, stream->timeline, false))
            return true;
        /* Initiate the replication stream at specified location */
        //在指定的位置初始化復(fù)制流
        snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
                 slotcmd,
                 (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
                 stream->timeline);
        //發(fā)送START_REPLICATION命令
        res = PQexec(conn, query);
        if (PQresultStatus(res) != PGRES_COPY_BOTH)
        {
            fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
                    progname, "START_REPLICATION", PQresultErrorMessage(res));
            PQclear(res);
            return false;
        }
        PQclear(res);
        /* Stream the WAL */
        //流化WAL
        res = HandleCopyStream(conn, stream, &stoppos);
        if (res == NULL)
            goto error;
        /*
         * Streaming finished.
         *
         * There are two possible reasons for that: a controlled shutdown, or
         * we reached the end of the current timeline. In case of
         * end-of-timeline, the server sends a result set after Copy has
         * finished, containing information about the next timeline. Read
         * that, and restart streaming from the next timeline. In case of
         * controlled shutdown, stop here.
         * Streaming完成.
         * 這里有兩個(gè)可能的原因:可控的shutdown或者到達(dá)了當(dāng)前時(shí)間線的末尾.
         * 在end-of-timeline這種情況下,服務(wù)器在Copy完成后發(fā)送結(jié)果集,
         *   含有關(guān)于下一個(gè)時(shí)間線的相關(guān)信息.
         * 讀取這些信息,在下一個(gè)時(shí)間線開始重新啟動(dòng)streaming.
         * 如為可控的關(guān)閉,可以停止了.
         */
        if (PQresultStatus(res) == PGRES_TUPLES_OK)
        {
            /*
             * End-of-timeline. Read the next timeline's ID and starting
             * position. Usually, the starting position will match the end of
             * the previous timeline, but there are corner cases like if the
             * server had sent us half of a WAL record, when it was promoted.
             * The new timeline will begin at the end of the last complete
             * record in that case, overlapping the partial WAL record on the
             * old timeline.
             * 這是End-of-timeline的情況.
             * 讀取下一個(gè)時(shí)間線ID和開始位置.通常來說,開始位置將匹配先前時(shí)間線的末尾,
             *   但會(huì)存在特殊的情況比如服務(wù)器已經(jīng)傳輸了WAL Record的一部分.
             * 這種情況下,新的時(shí)間線會(huì)在上次已完成的記錄末尾開始,與舊時(shí)間線的部分WAL Record重疊.
             */
            uint32      newtimeline;//新的時(shí)間線
            bool        parsed;//是否解析
            //讀取結(jié)果集的末尾
            parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
            PQclear(res);
            if (!parsed)
                goto error;
            /* Sanity check the values the server gave us */
            //執(zhí)行校驗(yàn)和堅(jiān)持
            if (newtimeline <= stream->timeline)
            {
                //新的時(shí)間線不可能小于等于stream中的時(shí)間線
                fprintf(stderr,
                        _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
                        progname, newtimeline, stream->timeline);
                goto error;
            }
            if (stream->startpos > stoppos)
            {
                //開始位置大于結(jié)束位置
                fprintf(stderr,
                        _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline u to begin at %X/%X\n"),
                        progname,
                        stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
                        newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
                goto error;
            }
            /* Read the final result, which should be CommandComplete. */
            //讀取最后的結(jié)果,應(yīng)為命令結(jié)束
            res = PQgetResult(conn);
            if (PQresultStatus(res) != PGRES_COMMAND_OK)
            {
                fprintf(stderr,
                        _("%s: unexpected termination of replication stream: %s"),
                        progname, PQresultErrorMessage(res));
                PQclear(res);
                goto error;
            }
            PQclear(res);
            /*
             * Loop back to start streaming from the new timeline. Always
             * start streaming at the beginning of a segment.
             * 從新時(shí)間線開始循環(huán),通常會(huì)在segment的開始出開始streaming
             */
            stream->timeline = newtimeline;
            stream->startpos = stream->startpos -
                XLogSegmentOffset(stream->startpos, WalSegSz);
            continue;//繼續(xù)循環(huán)
        }
        else if (PQresultStatus(res) == PGRES_COMMAND_OK)
        {
            PQclear(res);
            /*
             * End of replication (ie. controlled shut down of the server).
             * replication完成(比如服務(wù)器關(guān)閉了復(fù)制)
             *
             * Check if the callback thinks it's OK to stop here. If not,
             * complain.
             * 檢查是否回調(diào)函數(shù)認(rèn)為在這里停止就OK了,如果不是,則報(bào)警.
             */
            if (stream->stream_stop(stoppos, stream->timeline, false))
                return true;
            else
            {
                fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
                        progname);
                goto error;
            }
        }
        else
        {
            /* Server returned an error. */
            //返回錯(cuò)誤
            fprintf(stderr,
                    _("%s: unexpected termination of replication stream: %s"),
                    progname, PQresultErrorMessage(res));
            PQclear(res);
            goto error;
        }
    }
error:
    if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
        fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
                progname, current_walfile_name, stream->walmethod->getlasterror());
    walfile = NULL;
    return false;
}
/*
 * The main loop of ReceiveXlogStream. Handles the COPY stream after
 * initiating streaming with the START_REPLICATION command.
 * ReceiveXlogStream中的主循環(huán)實(shí)現(xiàn)函數(shù).
 * 在使用START_REPLICATION命令初始化streaming后處理COPY stream.
 *
 * If the COPY ends (not necessarily successfully) due a message from the
 * server, returns a PGresult and sets *stoppos to the last byte written.
 * On any other sort of error, returns NULL.
 * 如COPY由于服務(wù)器端的原因終止,返回PGresult并設(shè)置*stoppos為最后寫入的字節(jié).
 * 如出現(xiàn)錯(cuò)誤,則返回NULL.
 */
static PGresult *
HandleCopyStream(PGconn *conn, StreamCtl *stream,
                 XLogRecPtr *stoppos)
{
    char       *copybuf = NULL;
    TimestampTz last_status = -1;
    XLogRecPtr  blockpos = stream->startpos;
    still_sending = true;
    while (1)
    {
        //循環(huán)處理
        int         r;
        TimestampTz now;//時(shí)間戳
        long        sleeptime;
        /*
         * Check if we should continue streaming, or abort at this point.
         * 檢查我們是否應(yīng)該繼續(xù)streaming,或者在當(dāng)前就退出
         */
        if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
            goto error;
        now = feGetCurrentTimestamp();
        /*
         * If synchronous option is true, issue sync command as soon as there
         * are WAL data which has not been flushed yet.
         * 如同步選項(xiàng)為T,只要存在未flushed的WAL data,馬上執(zhí)行sync命令.
         */
        if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
        {
            if (stream->walmethod->sync(walfile) != 0)
            {
                fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
                        progname, current_walfile_name, stream->walmethod->getlasterror());
                goto error;
            }
            lastFlushPosition = blockpos;
            /*
             * Send feedback so that the server sees the latest WAL locations
             * immediately.
             * 發(fā)送反饋以便服務(wù)器馬上可看到最后的WAL位置.
             */
            if (!sendFeedback(conn, blockpos, now, false))
                goto error;
            last_status = now;
        }
        /*
         * Potentially send a status message to the master
         * 可能向主服務(wù)器發(fā)送狀態(tài)消息
         */
        if (still_sending && stream->standby_message_timeout > 0 &&
            feTimestampDifferenceExceeds(last_status, now,
                                         stream->standby_message_timeout))
        {
            /* Time to send feedback! */
            //是時(shí)候發(fā)送反饋了.
            if (!sendFeedback(conn, blockpos, now, false))
                goto error;
            last_status = now;
        }
        /*
         * Calculate how long send/receive loops should sleep
         * 計(jì)算send/receive循環(huán)應(yīng)該睡眠多長(zhǎng)時(shí)間
         */
        sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
                                                 last_status);
        //拷貝stream中接收到的內(nèi)容
        r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ?buf);
        while (r != 0)
        {
            if (r == -1)
                goto error;//出錯(cuò)
            if (r == -2)
            {
                //已完結(jié)或出錯(cuò)
                PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
                if (res == NULL)
                    goto error;
                else
                    return res;
            }
            /* Check the message type. */
            //檢查消息類型
            if (copybuf[0] == 'k')
            {
                if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
                                         &last_status))
                    goto error;
            }
            else if (copybuf[0] == 'w')
            {
                if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
                    goto error;
                /*
                 * Check if we should continue streaming, or abort at this
                 * point.
                 * 檢查我們是否應(yīng)該繼續(xù)streaming或者在此就停止
                 */
                if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
                    goto error;
            }
            else
            {
                fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
                        progname, copybuf[0]);
                goto error;
            }
            /*
             * Process the received data, and any subsequent data we can read
             * without blocking.
             * 處理接收到的數(shù)據(jù),后續(xù)的數(shù)據(jù)可以無阻塞的讀取.
             */
            r = CopyStreamReceive(conn, 0, stream->stop_socket, ?buf);
        }
    }
error:
    if (copybuf != NULL)
        PQfreemem(copybuf);
    return NULL;
}
/*
 * Check if we should continue streaming, or abort at this point.
 */
static bool
CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
                    XLogRecPtr *stoppos)
{
    if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
    {
        if (!close_walfile(stream, blockpos))
        {
            /* Potential error message is written by close_walfile */
            return false;
        }
        if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
        {
            fprintf(stderr, _("%s: could not send copy-end packet: %s"),
                    progname, PQerrorMessage(conn));
            return false;
        }
        still_sending = false;
    }
    return true;
}
/*
 * Receive CopyData message available from XLOG stream, blocking for
 * maximum of 'timeout' ms.
 * 接收從XLOG stream中可用的CopyData消息,如超出最大的'timeout'毫秒,需要阻塞.
 *
 * If data was received, returns the length of the data. *buffer is set to
 * point to a buffer holding the received message. The buffer is only valid
 * until the next CopyStreamReceive call.
 * 如接收到數(shù)據(jù),則返回?cái)?shù)據(jù)的大小.
 * 變量*buffer設(shè)置為指向含有接收到消息的buffer.buffer在下一個(gè)CopyStreamReceive調(diào)用才會(huì)生效.
 *
 * Returns 0 if no data was available within timeout, or if wait was
 * interrupted by signal or stop_socket input.
 * -1 on error. -2 if the server ended the COPY.
 * 如在timeout時(shí)間內(nèi)沒有數(shù)據(jù)返回,或者如果因?yàn)樾盘?hào)等待/stop_socket輸入中斷,則返回0.
 * -1:表示出現(xiàn)錯(cuò)誤.-2表示服務(wù)器完成了COPY
 */
static int
CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
                  char **buffer)
{
    char       *copybuf = NULL;
    int         rawlen;
    if (*buffer != NULL)
        PQfreemem(*buffer);
    *buffer = NULL;
    /* Try to receive a CopyData message */
    rawlen = PQgetCopyData(conn, ?buf, 1);
    if (rawlen == 0)
    {
        int         ret;
        /*
         * No data available.  Wait for some to appear, but not longer than
         * the specified timeout, so that we can ping the server.  Also stop
         * waiting if input appears on stop_socket.
         */
        ret = CopyStreamPoll(conn, timeout, stop_socket);
        if (ret <= 0)
            return ret;
        /* Now there is actually data on the socket */
        if (PQconsumeInput(conn) == 0)
        {
            fprintf(stderr,
                    _("%s: could not receive data from WAL stream: %s"),
                    progname, PQerrorMessage(conn));
            return -1;
        }
        /* Now that we've consumed some input, try again */
        rawlen = PQgetCopyData(conn, ?buf, 1);
        if (rawlen == 0)
            return 0;
    }
    if (rawlen == -1)           /* end-of-streaming or error */
        return -2;
    if (rawlen == -2)
    {
        fprintf(stderr, _("%s: could not read COPY data: %s"),
                progname, PQerrorMessage(conn));
        return -1;
    }
    /* Return received messages to caller */
    *buffer = copybuf;
    return rawlen;
}

三、跟蹤分析

備份命令

pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

啟動(dòng)gdb跟蹤(跟蹤fork的子進(jìn)程)

[xdb@localhost ~]$ gdb pg_basebackup
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>...
Reading symbols from /appdb/xdb/pg11.2/bin/pg_basebackup...done.
(gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
(gdb) set follow-fork-mode child
(gdb) b LogStreamerMain
Breakpoint 1 at 0x403c51: file pg_basebackup.c, line 490.
(gdb) r
Starting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Password: 
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/5A000028 on timeline 16
pg_basebackup: starting background WAL receiver
pg_basebackup: created temporary replication slot "pg_basebackup_1604"
[New process 2036]
[Thread debugging using libthread_db enabled]backup/backup_label          )
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7ffff7fe7840 (LWP 2036)]
Breakpoint 1, LogStreamerMain (param=0x629db0) at pg_basebackup.c:490
490     in_log_streamer = true;
305153/305153 kB (100%), 1/1 tablespace                                          )
pg_basebackup: write-ahead log end point: 0/5A0000F8
pg_basebackup: waiting for background process to finish streaming ...
(gdb)

輸入?yún)?shù)

(gdb) n
492     MemSet(&stream, 0, sizeof(stream));
(gdb) p *param
$1 = {bgconn = 0x62a280, startptr = 1509949440, xlog = "/data/backup/pg_wal", '\000' <repeats 1004 times>, 
  sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb)

設(shè)置StreamCtl結(jié)構(gòu)體

(gdb) n
493     stream.startpos = param->startptr;
(gdb) 
494     stream.timeline = param->timeline;
(gdb) 
495     stream.sysidentifier = param->sysidentifier;
(gdb) 
496     stream.stream_stop = reached_end_position;
(gdb) 
498     stream.stop_socket = bgpipe[0];
(gdb) 
502     stream.standby_message_timeout = standby_message_timeout;
(gdb) 
503     stream.synchronous = false;
(gdb) 
504     stream.do_sync = do_sync;
(gdb) 
505     stream.mark_done = true;
(gdb) 
506     stream.partial_suffix = NULL;
(gdb) 
507     stream.replication_slot = replication_slot;
(gdb) 
509     if (format == 'p')
(gdb) 
510         stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
(gdb)

進(jìn)入ReceiveXlogStream函數(shù)

(gdb) 
514     if (!ReceiveXlogStream(param->bgconn, &stream))
(gdb) step
ReceiveXlogStream (conn=0x62a280, stream=0x7fffffffda30) at receivelog.c:458
458     if (!CheckServerVersionForStreaming(conn))
(gdb) 
(gdb) n
472     if (stream->replication_slot != NULL)
(gdb) p *stream
$2 = {startpos = 1509949440, timeline = 16, sysidentifier = 0x61f1a0 "6666964067616600474", 
  standby_message_timeout = 10000, synchronous = false, mark_done = true, do_sync = true, 
  stream_stop = 0x403953 <reached_end_position>, stop_socket = 8, walmethod = 0x632b10, partial_suffix = 0x0, 
  replication_slot = 0x62a1e0 "pg_basebackup_1604"}
(gdb)

判斷系統(tǒng)標(biāo)識(shí)符和時(shí)間線

(gdb) n
474         reportFlushPosition = true;
(gdb) 
475         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
(gdb) 
486     if (stream->sysidentifier != NULL)
(gdb) 
489         res = PQexec(conn, "IDENTIFY_SYSTEM");
(gdb) 
490         if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb) 
498         if (PQntuples(res) != 1 || PQnfields(res) < 3)
(gdb) 
506         if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
(gdb) p PQgetvalue(res, 0, 0)
$3 = 0x633500 "6666964067616600474"
(gdb) n
514         if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
(gdb) 
522         PQclear(res);
(gdb) p PQgetvalue(res, 0, 1)
$4 = 0x633514 "16"
(gdb)

不存在時(shí)間線history文件,生成history文件

(gdb) n
529     lastFlushPosition = stream->startpos;
(gdb) 
539         if (!existsTimeLineHistoryFile(stream))
(gdb) 
541             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
(gdb) 
542             res = PQexec(conn, query);
(gdb) 
543             if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb) 
556             if (PQnfields(res) != 2 || PQntuples(res) != 1)
(gdb) 
564             writeTimeLineHistoryFile(stream,
(gdb) 
568             PQclear(res);
(gdb)

調(diào)用START_REPLICATION命令初始化

(gdb) 
575         if (stream->stream_stop(stream->startpos, stream->timeline, false))
(gdb) n
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb) 
581                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
(gdb) 
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb) 
581                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
(gdb) 
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb) 
583         res = PQexec(conn, query);
(gdb) 
584         if (PQresultStatus(res) != PGRES_COPY_BOTH)
(gdb) 
591         PQclear(res);
(gdb)

執(zhí)行命令,處理stream WAL,完成調(diào)用

595         if (res == NULL)
(gdb) p *res
$5 = {ntups = 0, numAttributes = 0, attDescs = 0x0, tuples = 0x0, tupArrSize = 0, numParameters = 0, paramDescs = 0x0, 
  resultStatus = PGRES_COMMAND_OK, 
  cmdStatus = "START_STREAMING\000\000\000\000\000\270\027u\367\377\177\000\000P/c\000\000\000\000\000CT\000\000\001", '\000' <repeats 19 times>, "\200\000\000", binary = 0, noticeHooks = {noticeRec = 0x7ffff7b9eaa4 <defaultNoticeReceiver>, 
    noticeRecArg = 0x0, noticeProc = 0x7ffff7b9eaf9 <defaultNoticeProcessor>, noticeProcArg = 0x0}, events = 0x0, 
  nEvents = 0, client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x0, 
  curOffset = 0, spaceLeft = 0}
(gdb) n
608         if (PQresultStatus(res) == PGRES_TUPLES_OK)
(gdb) 
666         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
(gdb) 
668             PQclear(res);
(gdb) 
676             if (stream->stream_stop(stoppos, stream->timeline, false))
(gdb) 
677                 return true;
(gdb) 
702 }
(gdb) 
LogStreamerMain (param=0x629db0) at pg_basebackup.c:523
523     if (!stream.walmethod->finish())
(gdb)

到此,關(guān)于“PostgreSQL中ReceiveXlogStream有什么作用”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI