溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

PostgreSQL中的ProcessRepliesIfAny函數(shù)分析

發(fā)布時(shí)間:2021-11-09 14:58:39 來源:億速云 閱讀:192 作者:iii 欄目:關(guān)系型數(shù)據(jù)庫

本篇內(nèi)容主要講解“PostgreSQL中的ProcessRepliesIfAny函數(shù)分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“PostgreSQL中的ProcessRepliesIfAny函數(shù)分析”吧!

調(diào)用棧如下:

(gdb) bt
#0  0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0, 
    nevents=1) at latch.c:1048
#2  0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1, 
    wait_event_info=83886092) at latch.c:1000
#3  0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999, 
    wait_event_info=83886092) at latch.c:385
#4  0x000000000085405b in WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2229
#5  0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6  0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")
    at walsender.c:1539
#7  0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")
    at postgres.c:4178
#8  0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9  0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228

一、數(shù)據(jù)結(jié)構(gòu)

N/A

二、源碼解讀

ProcessRepliesIfAny
在streaming期間,處理接收到的消息,同時(shí)檢查遠(yuǎn)程終端是否關(guān)閉了連接,執(zhí)行相關(guān)處理.
代碼不多也不復(fù)雜,可自行閱讀.

/*
 * Process any incoming messages while streaming. Also checks if the remote
 * end has closed the connection.
 * 在streaming期間,處理接收到的消息.
 * 同時(shí)檢查遠(yuǎn)程終端是否關(guān)閉了連接,執(zhí)行相關(guān)處理.
 */
static void
ProcessRepliesIfAny(void)
{
    unsigned char firstchar;
    int         r;
    bool        received = false;
    //當(dāng)前時(shí)間
    last_processing = GetCurrentTimestamp();
    for (;;)
    {
        //---------- 循環(huán)接收相關(guān)消息
        pq_startmsgread();
        r = pq_getbyte_if_available(&firstchar);
        if (r < 0)
        {
            /* unexpected error or EOF */
            //未知異?;蛘逧OF
            ereport(COMMERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("unexpected EOF on standby connection")));
            //進(jìn)程退出
            proc_exit(0);
        }
        if (r == 0)
        {
            /* no data available without blocking */
            //已無阻塞的消息數(shù)據(jù),退出
            pq_endmsgread();
            break;
        }
        /* Read the message contents */
        //讀取消息內(nèi)容
        resetStringInfo(&reply_message);
        if (pq_getmessage(&reply_message, 0))
        {
            ereport(COMMERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("unexpected EOF on standby connection")));
            proc_exit(0);
        }
        /*
         * If we already received a CopyDone from the frontend, the frontend
         * should not send us anything until we've closed our end of the COPY.
         * XXX: In theory, the frontend could already send the next command
         * before receiving the CopyDone, but libpq doesn't currently allow
         * that.
         * 如果已在前臺接收到CopyDone消息,前臺不應(yīng)該再發(fā)送消息,直至關(guān)閉COPY.
         * XXX:理論上來說,在接收到CopyDone前,前臺可能已經(jīng)發(fā)送了下一個(gè)命令,但libpq不允許這種情況發(fā)生
         */
        if (streamingDoneReceiving && firstchar != 'X')
            ereport(FATAL,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
                            firstchar)));
        /* Handle the very limited subset of commands expected in this phase */
        //處理有限幾個(gè)命令
        switch (firstchar)
        {
                /*
                 * 'd' means a standby reply wrapped in a CopyData packet.
                 * 'd'意味著standby節(jié)點(diǎn)的應(yīng)答封裝了CopyData包
                 */
            case 'd':
                ProcessStandbyMessage();
                received = true;
                break;
                /*
                 * CopyDone means the standby requested to finish streaming.
                 * Reply with CopyDone, if we had not sent that already.
                 * CopyDone意味著standby節(jié)點(diǎn)請求結(jié)束streaming.
                 * 如尚未發(fā)送,則使用CopyDone應(yīng)答.
                 */
            case 'c':
                if (!streamingDoneSending)
                {
                    pq_putmessage_noblock('c', NULL, 0);
                    streamingDoneSending = true;
                }
                streamingDoneReceiving = true;
                received = true;
                break;
                /*
                 * 'X' means that the standby is closing down the socket.
                 * 'X'意味著standby節(jié)點(diǎn)正在關(guān)閉socket
                 */
            case 'X':
                proc_exit(0);
            default:
                ereport(FATAL,
                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
                         errmsg("invalid standby message type \"%c\"",
                                firstchar)));
        }
    }
    /*
     * Save the last reply timestamp if we've received at least one reply.
     * 如接收到至少一條應(yīng)答信息,則保存最后的應(yīng)答時(shí)間戳.
     */
    if (received)
    {
        last_reply_timestamp = last_processing;
        waiting_for_ping_response = false;
    }
}

二、跟蹤分析

在主節(jié)點(diǎn)上用gdb跟蹤postmaster,在PostgresMain上設(shè)置斷點(diǎn)后啟動standby節(jié)點(diǎn),進(jìn)入斷點(diǎn)

(gdb) set follow-fork-mode child
(gdb) b ProcessRepliesIfAny
Breakpoint 2 at 0x85343b: file walsender.c, line 1597.
(gdb) c
Continuing.
Breakpoint 2, ProcessRepliesIfAny () at walsender.c:1597
1597        bool        received = false;
(gdb)

查看進(jìn)程信息

[xdb@localhost ~]$ ps -ef|grep postgres
xdb       1376     1  0 14:16 ?        00:00:00 /appdb/xdb/pg11.2/bin/postgres
xdb       1377  1376  0 14:16 ?        00:00:00 postgres: logger   
xdb       1550  1376  0 16:53 ?        00:00:00 postgres: checkpointer   
xdb       1551  1376  0 16:53 ?        00:00:00 postgres: background writer   
xdb       1552  1376  0 16:53 ?        00:00:00 postgres: walwriter   
xdb       1553  1376  0 16:53 ?        00:00:00 postgres: autovacuum launcher  
xdb       1554  1376  0 16:53 ?        00:00:00 postgres: archiver   
xdb       1555  1376  0 16:53 ?        00:00:00 postgres: stats collector   
xdb       1556  1376  0 16:53 ?        00:00:00 postgres: logical replication launcher  
xdb       1633  1376  0 17:26 ?        00:00:00 postgres: walsender replicator 192.168.26.26(40528) idle

循環(huán)接收相關(guān)消息

(gdb) n
1599        last_processing = GetCurrentTimestamp();
(gdb) 
1603            pq_startmsgread();
(gdb) 
1604            r = pq_getbyte_if_available(&firstchar);
(gdb) 
1605            if (r < 0)
(gdb) p r
$1 = 1
(gdb) p firstchar
$2 = 100 'd'
(gdb)

命令是’d’,執(zhí)行相關(guān)處理

(gdb) n
1613            if (r == 0)
(gdb) 
1621            resetStringInfo(&reply_message);
(gdb) 
1622            if (pq_getmessage(&reply_message, 0))
(gdb) 
1637            if (streamingDoneReceiving && firstchar != 'X')
(gdb) 
1644            switch (firstchar)
(gdb) 
1650                    ProcessStandbyMessage();
(gdb) 
1651                    received = true;
(gdb) 
1652                    break;
(gdb) 
1681        }
(gdb)

設(shè)置斷點(diǎn)

(gdb) b walsender.c:1643
Breakpoint 3 at 0x8535b6: file walsender.c, line 1643.
(gdb) b walsender.c:1672
Breakpoint 4 at 0x85361a: file walsender.c, line 1672.
(gdb) c
Continuing.
Breakpoint 3, ProcessRepliesIfAny () at walsender.c:1644
1644            switch (firstchar)
(gdb) 
Continuing.
...
Breakpoint 4, ProcessRepliesIfAny () at walsender.c:1673
1673                    proc_exit(0);
(gdb)

進(jìn)程即將退出,查看進(jìn)程信息

[xdb@localhost ~]$ ps -ef|grep postgres
xdb       1376     1  0 14:16 ?        00:00:00 /appdb/xdb/pg11.2/bin/postgres
xdb       1377  1376  0 14:16 ?        00:00:00 postgres: logger   
xdb       1550  1376  0 16:53 ?        00:00:00 postgres: checkpointer   
xdb       1551  1376  0 16:53 ?        00:00:00 postgres: background writer   
xdb       1552  1376  0 16:53 ?        00:00:00 postgres: walwriter   
xdb       1553  1376  0 16:53 ?        00:00:00 postgres: autovacuum launcher  
xdb       1554  1376  0 16:53 ?        00:00:00 postgres: archiver   
xdb       1555  1376  0 16:53 ?        00:00:00 postgres: stats collector   
xdb       1556  1376  0 16:53 ?        00:00:00 postgres: logical replication launcher  
xdb       1633  1376  0 17:26 ?        00:00:00 postgres: walsender replicator 192.168.26.26(40528) idle
xdb       1637  1376  0 17:27 ?        00:00:00 postgres: walsender replicator 192.168.26.26(40530) streaming 0/5D075248
[xdb@localhost ~]$

進(jìn)程退出(PID=1633),啟動了新的進(jìn)程(PID=1637)

(gdb) n
[Inferior 2 (process 1633) exited normally]
(gdb)

到此,相信大家對“PostgreSQL中的ProcessRepliesIfAny函數(shù)分析”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI