溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

PostgreSQL 源碼解讀(154)- 后臺進(jìn)程#6(walsender#2)

發(fā)布時間:2020-08-17 10:38:21 來源:ITPUB博客 閱讀:317 作者:husthxd 欄目:關(guān)系型數(shù)據(jù)庫

本節(jié)繼續(xù)介紹PostgreSQL的后臺進(jìn)程walsender,重點介紹的是調(diào)用棧中的exec_replication_command和StartReplication函數(shù).
調(diào)用棧如下:


(gdb) bt
#0  0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0, 
    nevents=1) at latch.c:1048
#2  0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1, 
    wait_event_info=83886092) at latch.c:1000
#3  0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999, 
    wait_event_info=83886092) at latch.c:385
#4  0x000000000085405b in WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2229
#5  0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6  0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
    at walsender.c:1539
#7  0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
    at postgres.c:4178
#8  0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9  0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228

一、數(shù)據(jù)結(jié)構(gòu)

StringInfo
StringInfoData結(jié)構(gòu)體保存關(guān)于擴展字符串的相關(guān)信息.


/*-------------------------
 * StringInfoData holds information about an extensible string.
 * StringInfoData結(jié)構(gòu)體保存關(guān)于擴展字符串的相關(guān)信息.
 *      data    is the current buffer for the string (allocated with palloc).
 *      data    通過palloc分配的字符串緩存
 *      len     is the current string length.  There is guaranteed to be
 *              a terminating '\0' at data[len], although this is not very
 *              useful when the string holds binary data rather than text.
 *      len     是當(dāng)前字符串的長度.保證以ASCII 0(\0)結(jié)束(data[len] = '\0').
 *              雖然如果存儲的是二進(jìn)制數(shù)據(jù)而不是文本時不太好使.
 *      maxlen  is the allocated size in bytes of 'data', i.e. the maximum
 *              string size (including the terminating '\0' char) that we can
 *              currently store in 'data' without having to reallocate
 *              more space.  We must always have maxlen > len.
 *      maxlen  以字節(jié)為單位已分配的'data'的大小,限定了最大的字符串大小(包括結(jié)尾的ASCII 0)
 *              小于此尺寸的數(shù)據(jù)可以直接存儲而無需重新分配.
 *      cursor  is initialized to zero by makeStringInfo or initStringInfo,
 *              but is not otherwise touched by the stringinfo.c routines.
 *              Some routines use it to scan through a StringInfo.
 *      cursor  通過makeStringInfo或initStringInfo初始化為0,但不受stringinfo.c例程的影響.
 *              某些例程使用該字段掃描StringInfo
 *-------------------------
 */
typedef struct StringInfoData
{
    char       *data;
    int         len;
    int         maxlen;
    int         cursor;
} StringInfoData;
typedef StringInfoData *StringInfo;

二、源碼解讀

exec_replication_command
exec_replication_command執(zhí)行復(fù)制命令,如cmd_string被識別為WalSender命令,返回T,否則返回F.
其主要邏輯如下:
1.執(zhí)行相關(guān)初始化和校驗
2.切換內(nèi)存上下文
3.初始化復(fù)制掃描器
4.執(zhí)行事務(wù)相關(guān)的判斷或校驗
5.初始化輸入輸出消息
6.根據(jù)命令類型執(zhí)行相應(yīng)的命令
6.1命令類型為T_StartReplicationCmd,調(diào)用StartReplication


/*
 * Execute an incoming replication command.
 * 執(zhí)行復(fù)制命令.
 *
 * Returns true if the cmd_string was recognized as WalSender command, false
 * if not.
 * 如cmd_string被識別為WalSender命令,返回T,否則返回F
 */
bool
exec_replication_command(const char *cmd_string)
{
    int         parse_rc;
    Node       *cmd_node;
    MemoryContext cmd_context;
    MemoryContext old_context;
    /*
     * If WAL sender has been told that shutdown is getting close, switch its
     * status accordingly to handle the next replication commands correctly.
     * 如果WAL sender已被通知關(guān)閉,切換狀態(tài)以應(yīng)對接下來的復(fù)制命令.
     */
    if (got_STOPPING)
        WalSndSetState(WALSNDSTATE_STOPPING);
    /*
     * Throw error if in stopping mode.  We need prevent commands that could
     * generate WAL while the shutdown checkpoint is being written.  To be
     * safe, we just prohibit all new commands.
     * 如在stopping模式,則拋出錯誤.
     * 我們需要在shutdown checkpoint寫入期間禁止命令的產(chǎn)生.
     * 安全期間,禁止所有新的命令.
     */
    if (MyWalSnd->state == WALSNDSTATE_STOPPING)
        ereport(ERROR,
                (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
    /*
     * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
     * command arrives. Clean up the old stuff if there's anything.
     * CREATE_REPLICATION_SLOT ... LOGICAL 導(dǎo)出快照直至下個命令到達(dá).
     * 如存在,則清理舊的stuff.
     * 
     */
    SnapBuildClearExportedSnapshot();
    //檢查中斷
    CHECK_FOR_INTERRUPTS();
    //命令上下文
    cmd_context = AllocSetContextCreate(CurrentMemoryContext,
                                        "Replication command context",
                                        ALLOCSET_DEFAULT_SIZES);
    old_context = MemoryContextSwitchTo(cmd_context);
    //初始化復(fù)制掃描器
    replication_scanner_init(cmd_string);
    parse_rc = replication_yyparse();
    if (parse_rc != 0)
        ereport(ERROR,
                (errcode(ERRCODE_SYNTAX_ERROR),
                 (errmsg_internal("replication command parser returned %d",
                                  parse_rc))));
    cmd_node = replication_parse_result;
    /*
     * Log replication command if log_replication_commands is enabled. Even
     * when it's disabled, log the command with DEBUG1 level for backward
     * compatibility. Note that SQL commands are not logged here, and will be
     * logged later if log_statement is enabled.
     * 如log_replication_commands啟用,則記錄復(fù)制命令在日志中.
     * 就算該選項被禁止,通過DEBUG1級別記錄日志.
     * 注意SQL命令不在這里記錄,在log_statement啟用的情況下在后續(xù)進(jìn)行記錄.
     * 
     */
    if (cmd_node->type != T_SQLCmd)
        ereport(log_replication_commands ? LOG : DEBUG1,
                (errmsg("received replication command: %s", cmd_string)));
    /*
     * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
     * called outside of transaction the snapshot should be cleared here.
     * CREATE_REPLICATION_SLOT ... LOGICAL導(dǎo)出快照.
     * 該命令如果在事務(wù)的外層被調(diào)用,那么快照應(yīng)在這里清除.
     */
    if (!IsTransactionBlock())
        SnapBuildClearExportedSnapshot();
    /*
     * For aborted transactions, don't allow anything except pure SQL, the
     * exec_simple_query() will handle it correctly.
     * 對于廢棄的事務(wù),除了純SQL外不允許其他命令,exec_simple_query()函數(shù)可以正確處理這種情況.
     */
    if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
        ereport(ERROR,
                (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
                 errmsg("current transaction is aborted, "
                        "commands ignored until end of transaction block")));
    CHECK_FOR_INTERRUPTS();
    /*
     * Allocate buffers that will be used for each outgoing and incoming
     * message.  We do this just once per command to reduce palloc overhead.
     * 為消息I/O分配緩存.
     * 每個命令執(zhí)行一次以減少palloc的負(fù)載.
     */
    initStringInfo(&output_message);
    initStringInfo(&reply_message);
    initStringInfo(&tmpbuf);
    /* Report to pgstat that this process is running */
    //向pgstat報告該進(jìn)程正在運行.
    pgstat_report_activity(STATE_RUNNING, NULL);
    //根據(jù)命令類型執(zhí)行相應(yīng)的命令
    switch (cmd_node->type)
    {
        case T_IdentifySystemCmd:
            //識別系統(tǒng)
            IdentifySystem();
            break;
        case T_BaseBackupCmd:
            //BASE_BACKUP
            PreventInTransactionBlock(true, "BASE_BACKUP");
            SendBaseBackup((BaseBackupCmd *) cmd_node);
            break;
        case T_CreateReplicationSlotCmd:
            //創(chuàng)建復(fù)制slot
            CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
            break;
        case T_DropReplicationSlotCmd:
            //刪除復(fù)制slot
            DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
            break;
        case T_StartReplicationCmd:
            //START_REPLICATION
            {
                StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
                PreventInTransactionBlock(true, "START_REPLICATION");
                if (cmd->kind == REPLICATION_KIND_PHYSICAL)
                    StartReplication(cmd);
                else
                    StartLogicalReplication(cmd);
                break;
            }
        case T_TimeLineHistoryCmd:
            //構(gòu)造時間線歷史 TIMELINE_HISTORY
            PreventInTransactionBlock(true, "TIMELINE_HISTORY");
            SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
            break;
        case T_VariableShowStmt:
            //
            {
                DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
                VariableShowStmt *n = (VariableShowStmt *) cmd_node;
                GetPGVariable(n->name, dest);
            }
            break;
        case T_SQLCmd:
            //SQL命令
            if (MyDatabaseId == InvalidOid)
                ereport(ERROR,
                        (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
            /* Report to pgstat that this process is now idle */
            pgstat_report_activity(STATE_IDLE, NULL);
            /* Tell the caller that this wasn't a WalSender command. */
            return false;
        default:
            //其他命令
            elog(ERROR, "unrecognized replication command node tag: %u",
                 cmd_node->type);
    }
    /* done */
    //執(zhí)行完畢,回到原來的內(nèi)存上下文中
    MemoryContextSwitchTo(old_context);
    MemoryContextDelete(cmd_context);
    /* Send CommandComplete message */
    //命令結(jié)束
    EndCommand("SELECT", DestRemote);
    /* Report to pgstat that this process is now idle */
    //報告狀態(tài)
    pgstat_report_activity(STATE_IDLE, NULL);
    return true;
}

StartReplication
StartReplication處理START_REPLICATION命令.
其主要邏輯如下:
1.執(zhí)行相關(guān)初始化和校驗
2.選擇時間線
3.進(jìn)入COPY模式
3.1設(shè)置狀態(tài)
3.2發(fā)送CopyBothResponse消息,啟動streaming
3.3初始化相關(guān)變量,如共享內(nèi)存狀態(tài)等
3.4進(jìn)入主循環(huán)(WalSndLoop)


/*
 * Handle START_REPLICATION command.
 * 處理START_REPLICATION命令
 *
 * At the moment, this never returns, but an ereport(ERROR) will take us back
 * to the main loop.
 * 該函數(shù)不會返回,但ereport(ERROR)調(diào)用可以回到主循環(huán)
 */
static void
StartReplication(StartReplicationCmd *cmd)
{
    StringInfoData buf;
    XLogRecPtr  FlushPtr;
    if (ThisTimeLineID == 0)
        //時間線校驗
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
    /*
     * We assume here that we're logging enough information in the WAL for
     * log-shipping, since this is checked in PostmasterMain().
     * 在這里,由于在PostmasterMain()假定已為log-shipping記錄了足夠多的信息
     *
     * NOTE: wal_level can only change at shutdown, so in most cases it is
     * difficult for there to be WAL data that we can still see that was
     * written at wal_level='minimal'.
     * 注意:wal_level只能在shutdown的情況下進(jìn)行修改,
     *   因此在大多數(shù)情況下,很難看到在wal_level='minimal'的情況下的WAL數(shù)據(jù).
     */
    if (cmd->slotname)
    {
        ReplicationSlotAcquire(cmd->slotname, true);
        //#define SlotIsLogical ( slot ) (slot->data.database != InvalidOid)
        if (SlotIsLogical(MyReplicationSlot))
            ereport(ERROR,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                     (errmsg("cannot use a logical replication slot for physical replication"))));
    }
    /*
     * Select the timeline. If it was given explicitly by the client, use
     * that. Otherwise use the timeline of the last replayed record, which is
     * kept in ThisTimeLineID.
     * 選擇時間線.
     * 如果通過客戶端明確給出,則使用該值.
     * 否則的話,使用最后重放記錄的時間線,在ThisTimeLineID中保存.
     */
    if (am_cascading_walsender)
    {
        /* this also updates ThisTimeLineID */
        //這也會更新ThisTimeLineID變量
        FlushPtr = GetStandbyFlushRecPtr();
    }
    else
        FlushPtr = GetFlushRecPtr();
    if (cmd->timeline != 0)
    {
        XLogRecPtr  switchpoint;
        sendTimeLine = cmd->timeline;
        if (sendTimeLine == ThisTimeLineID)
        {
            sendTimeLineIsHistoric = false;
            sendTimeLineValidUpto = InvalidXLogRecPtr;
        }
        else
        {
            List       *timeLineHistory;
            sendTimeLineIsHistoric = true;
            /*
             * Check that the timeline the client requested exists, and the
             * requested start location is on that timeline.
             * 檢查客戶端請求的時間線是否存在,請求的開始位置是否在該時間線上.
             */
            timeLineHistory = readTimeLineHistory(ThisTimeLineID);
            switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
                                         &sendTimeLineNextTLI);
            list_free_deep(timeLineHistory);
            /*
             * Found the requested timeline in the history. Check that
             * requested startpoint is on that timeline in our history.
             * 通過歷史文件找到請求的時間線.
             * 在歷史中檢查請求的開始點是否在時間線上.
             *
             * This is quite loose on purpose. We only check that we didn't
             * fork off the requested timeline before the switchpoint. We
             * don't check that we switched *to* it before the requested
             * starting point. This is because the client can legitimately
             * request to start replication from the beginning of the WAL
             * segment that contains switchpoint, but on the new timeline, so
             * that it doesn't end up with a partial segment. If you ask for
             * too old a starting point, you'll get an error later when we
             * fail to find the requested WAL segment in pg_wal.
             * 這是有意為之.我們只檢查在切換點之前沒有fork off的請求的時間線.
             * 我們不會檢查在請求的開始點之前的時間線.
             * 這是因為客戶端可以合法地請求從包含交換點的WAL端的開始處進(jìn)行復(fù)制,
             *   在新的時間線上如此執(zhí)行,以避免出現(xiàn)由于部分segment的問題導(dǎo)致出錯.
             * 如果客戶端請求一個較舊的開始點,在pg_wal中無法找到請求的WAL段時會報錯.
             *
             * XXX: we could be more strict here and only allow a startpoint
             * that's older than the switchpoint, if it's still in the same
             * WAL segment.
             * XXX: 我們可以更嚴(yán)格,如果仍然在同一個WAL segment中,那么可以只允許比切換點舊的開始點
             */
            if (!XLogRecPtrIsInvalid(switchpoint) &&
                switchpoint < cmd->startpoint)
            {
                ereport(ERROR,
                        (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
                                (uint32) (cmd->startpoint >> 32),
                                (uint32) (cmd->startpoint),
                                cmd->timeline),
                         errdetail("This server's history forked from timeline %u at %X/%X.",
                                   cmd->timeline,
                                   (uint32) (switchpoint >> 32),
                                   (uint32) (switchpoint))));
            }
            sendTimeLineValidUpto = switchpoint;
        }
    }
    else
    {
        sendTimeLine = ThisTimeLineID;
        sendTimeLineValidUpto = InvalidXLogRecPtr;
        sendTimeLineIsHistoric = false;
    }
    streamingDoneSending = streamingDoneReceiving = false;
    /* If there is nothing to stream, don't even enter COPY mode */
    //如果沒有任何東西需要stream,不需要啟動COPY命令
    if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
    {
        /*
         * When we first start replication the standby will be behind the
         * primary. For some applications, for example synchronous
         * replication, it is important to have a clear state for this initial
         * catchup mode, so we can trigger actions when we change streaming
         * state later. We may stay in this state for a long time, which is
         * exactly why we want to be able to monitor whether or not we are
         * still here.
         * 在首次啟動復(fù)制時,standby節(jié)點會落后于master節(jié)點.
         * 對于某些應(yīng)用,比如同步復(fù)制,對于這種初始的catchup模式有一個干凈的狀態(tài)是十分重要的,
         *   因此在改變streaming狀態(tài)時我們可以觸發(fā)相關(guān)的動作.
         * 我們可以處于這種狀態(tài)很長時間,這正是我們希望有能力監(jiān)控我們是否仍在這里的原因.
         */
        //設(shè)置狀態(tài)
        WalSndSetState(WALSNDSTATE_CATCHUP);
        /* Send a CopyBothResponse message, and start streaming */
        //發(fā)送CopyBothResponse消息,啟動streaming
        pq_beginmessage(&buf, 'W');//W->COPY命令?
        pq_sendbyte(&buf, 0);
        pq_sendint16(&buf, 0);
        pq_endmessage(&buf);
        pq_flush();
        /*
         * Don't allow a request to stream from a future point in WAL that
         * hasn't been flushed to disk in this server yet.
         * 不允許請求該服務(wù)器上一個尚未刷入到磁盤上的WAL未來位置.
         */
        if (FlushPtr < cmd->startpoint)
        {
            ereport(ERROR,
                    (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
                            (uint32) (cmd->startpoint >> 32),
                            (uint32) (cmd->startpoint),
                            (uint32) (FlushPtr >> 32),
                            (uint32) (FlushPtr))));
        }
        /* Start streaming from the requested point */
        //從請求點開始streaming
        sentPtr = cmd->startpoint;
        /* Initialize shared memory status, too */
        //初始化共享內(nèi)存狀態(tài)
        SpinLockAcquire(&MyWalSnd->mutex);
        MyWalSnd->sentPtr = sentPtr;
        SpinLockRelease(&MyWalSnd->mutex);
        SyncRepInitConfig();
        /* Main loop of walsender */
        //walsender主循環(huán),開始復(fù)制,激活復(fù)制
        replication_active = true;
        //主循環(huán)
        WalSndLoop(XLogSendPhysical);
        //完結(jié)后設(shè)置為非活動狀態(tài)
        replication_active = false;
        if (got_STOPPING)
            proc_exit(0);//退出
        //設(shè)置狀態(tài)
        WalSndSetState(WALSNDSTATE_STARTUP);
        Assert(streamingDoneSending && streamingDoneReceiving);
    }
    if (cmd->slotname)
        ReplicationSlotRelease();
    /*
     * Copy is finished now. Send a single-row result set indicating the next
     * timeline.
     * Copy命令已完結(jié).發(fā)送單行結(jié)果集以提升下一個timeline
     */
    if (sendTimeLineIsHistoric)
    {
        char        startpos_str[8 + 1 + 8 + 1];
        DestReceiver *dest;
        TupOutputState *tstate;
        TupleDesc   tupdesc;
        Datum       values[2];
        bool        nulls[2];
        snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
                 (uint32) (sendTimeLineValidUpto >> 32),
                 (uint32) sendTimeLineValidUpto);
        dest = CreateDestReceiver(DestRemoteSimple);
        MemSet(nulls, false, sizeof(nulls));
        /*
         * Need a tuple descriptor representing two columns. int8 may seem
         * like a surprising data type for this, but in theory int4 would not
         * be wide enough for this, as TimeLineID is unsigned.
         */
        tupdesc = CreateTemplateTupleDesc(2);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
                                  INT8OID, -1, 0);
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
                                  TEXTOID, -1, 0);
        /* prepare for projection of tuple */
        tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
        values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
        values[1] = CStringGetTextDatum(startpos_str);
        /* send it to dest */
        do_tup_output(tstate, values, nulls);
        end_tup_output(tstate);
    }
    /* Send CommandComplete message */
    pq_puttextmessage('C', "START_STREAMING");
}

三、跟蹤分析

在主節(jié)點上用gdb跟蹤postmaster,在PostgresMain上設(shè)置斷點后啟動standby節(jié)點,進(jìn)入斷點


[xdb@localhost ~]$ ps -ef|grep postgres
xdb       1339     1  2 14:45 pts/0    00:00:00 /appdb/xdb/pg11.2/bin/postgres
[xdb@localhost ~]$ gdb -p 1339
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
...
(gdb) set follow-fork-mode child
(gdb) b exec_replication_command
Breakpoint 1 at 0x852fd2: file walsender.c, line 1438.
(gdb) c
Continuing.
[New process 1356]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7f5df9d2d8c0 (LWP 1356)]
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYSTEM") at walsender.c:1438
1438        if (got_STOPPING)
(gdb)

第一個命令是IDENTIFY_SYSTEM,第二個命令才是需要跟蹤的對象START_REPLICATION


(gdb) c
Continuing.
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:1438
1438        if (got_STOPPING)
(gdb)

1.執(zhí)行相關(guān)初始化和校驗


(gdb) n
1446        if (MyWalSnd->state == WALSNDSTATE_STOPPING)
(gdb) 
1454        SnapBuildClearExportedSnapshot();
(gdb) p *MyWalSnd
$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0, 
  writeLag = -1, flushLag = -1, applyLag = -1, mutex = 0 '\000', latch = 0x7f5dee92c4d4, sync_standby_priority = 0}
(gdb) n
1456        CHECK_FOR_INTERRUPTS();
(gdb)

2.切換內(nèi)存上下文


(gdb) 
1458        cmd_context = AllocSetContextCreate(CurrentMemoryContext,
(gdb) 
1461        old_context = MemoryContextSwitchTo(cmd_context);
(gdb)

3.初始化復(fù)制掃描器


(gdb) 
1463        replication_scanner_init(cmd_string);
(gdb) n
1464        parse_rc = replication_yyparse();
(gdb) 
1465        if (parse_rc != 0)
(gdb) p parse_rc
$3 = 0
(gdb) 
(gdb) n
1471        cmd_node = replication_parse_result;
(gdb)
(gdb) 
1479        if (cmd_node->type != T_SQLCmd)
(gdb) n
1480            ereport(log_replication_commands ? LOG : DEBUG1,
(gdb) p cmd_node
$4 = (Node *) 0x1df4710
(gdb) p *cmd_node
$5 = {type = T_StartReplicationCmd}
(gdb)

4.執(zhí)行事務(wù)相關(guān)的判斷或校驗


(gdb) n
1487        if (!IsTransactionBlock())
(gdb) 
1488            SnapBuildClearExportedSnapshot();
(gdb) 
1494        if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
(gdb) 
1500        CHECK_FOR_INTERRUPTS();
(gdb)

5.初始化輸入輸出消息


(gdb) 
1506        initStringInfo(&output_message);
(gdb) 
1507        initStringInfo(&reply_message);
(gdb) 
1508        initStringInfo(&tmpbuf);
(gdb) 
1511        pgstat_report_activity(STATE_RUNNING, NULL);

6.根據(jù)命令類型執(zhí)行相應(yīng)的命令
6.1命令類型為T_StartReplicationCmd,調(diào)用StartReplication


(gdb) n
1513        switch (cmd_node->type)
(gdb) 
1534                    StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
(gdb) 
1536                    PreventInTransactionBlock(true, "START_REPLICATION");
(gdb) 
1538                    if (cmd->kind == REPLICATION_KIND_PHYSICAL)
(gdb) 
1539                        StartReplication(cmd);

進(jìn)入StartReplication


1539                        StartReplication(cmd);
(gdb) step
StartReplication (cmd=0x1df4710) at walsender.c:532
532     if (ThisTimeLineID == 0)
(gdb)

1.執(zhí)行相關(guān)初始化和校驗


(gdb) n
546     if (cmd->slotname)
(gdb) 
560     if (am_cascading_walsender)
(gdb)

2.選擇時間線


(gdb) n
568     if (cmd->timeline != 0)
(gdb) 
572         sendTimeLine = cmd->timeline;
(gdb) 
573         if (sendTimeLine == ThisTimeLineID)
(gdb) 
575             sendTimeLineIsHistoric = false;
(gdb) p FlushPtr
$9 = 1560397696
(gdb) n
576             sendTimeLineValidUpto = InvalidXLogRecPtr;
(gdb) 
634     streamingDoneSending = streamingDoneReceiving = false;
(gdb) p sendTimeLine
$10 = 16
(gdb) p ThisTimeLineID
$11 = 16
(gdb) p *cmd
$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16, 
  startpoint = 1560281088, options = 0x0}
(gdb)

3.進(jìn)入COPY模式


(gdb) n
637     if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
(gdb)

3.1設(shè)置狀態(tài)


648         WalSndSetState(WALSNDSTATE_CATCHUP);
(gdb) p sendTimeLineValidUpto
$13 = 0
(gdb) p cmd->startpoint
$14 = 1560281088
(gdb)

3.2發(fā)送CopyBothResponse消息,啟動streaming


(gdb) n
651         pq_beginmessage(&buf, 'W');
(gdb) 
652         pq_sendbyte(&buf, 0);
(gdb) 
653         pq_sendint16(&buf, 0);
(gdb) 
654         pq_endmessage(&buf);
(gdb) p buf
$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87}
(gdb) p buf->data
$16 = 0x1df53b0 ""
(gdb) x/hb buf->data
0x1df53b0:  0
(gdb) x/32hb buf->data
0x1df53b0:  0   0   0   127 127 127 127 127
0x1df53b8:  127 127 127 127 127 127 127 127
0x1df53c0:  127 127 127 127 127 127 127 127
0x1df53c8:  127 127 127 127 127 127 127 127
(gdb)

3.3初始化相關(guān)變量,如共享內(nèi)存狀態(tài)等


(gdb) n
655         pq_flush();
(gdb) 
661         if (FlushPtr < cmd->startpoint)
(gdb) p FlushPtr
$17 = 1560397696
(gdb) p cmd->startpoint
$18 = 1560281088
(gdb) n
672         sentPtr = cmd->startpoint;
(gdb) 
675         SpinLockAcquire(&MyWalSnd->mutex);
(gdb) 
676         MyWalSnd->sentPtr = sentPtr;
(gdb) 
677         SpinLockRelease(&MyWalSnd->mutex);
(gdb) 
679         SyncRepInitConfig();
(gdb) 
682         replication_active = true;

3.4進(jìn)入主循環(huán)(WalSndLoop)


(gdb) 
684         WalSndLoop(XLogSendPhysical);
(gdb)

DONE!

四、參考資料

PG Source Code

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI