溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

PostgreSQL 源碼解讀(97)- 查詢語(yǔ)句#79(ExecHashJoin函數(shù)#5-H...

發(fā)布時(shí)間:2020-08-13 12:45:01 來(lái)源:ITPUB博客 閱讀:360 作者:husthxd 欄目:關(guān)系型數(shù)據(jù)庫(kù)

本節(jié)是ExecHashJoin函數(shù)介紹的第五部分,主要介紹了ExecHashJoin中依賴的其他函數(shù)的實(shí)現(xiàn)邏輯,這些函數(shù)在HJ_NEED_NEW_BATCH階段中使用,主要的函數(shù)是ExecHashJoinNewBatch。

一、數(shù)據(jù)結(jié)構(gòu)

JoinState
Hash/NestLoop/Merge Join的基類

/* ----------------
 *   JoinState information
 *
 *      Superclass for state nodes of join plans.
 *      Hash/NestLoop/Merge Join的基類
 * ----------------
 */
typedef struct JoinState
{
    PlanState   ps;//基類PlanState
    JoinType    jointype;//連接類型
    //在找到一個(gè)匹配inner tuple的時(shí)候,如需要跳轉(zhuǎn)到下一個(gè)outer tuple,則該值為T
    bool        single_match;   /* True if we should skip to next outer tuple
                                 * after finding one inner match */
    //連接條件表達(dá)式(除了ps.qual)
    ExprState  *joinqual;       /* JOIN quals (in addition to ps.qual) */
} JoinState;

HashJoinState
Hash Join運(yùn)行期狀態(tài)結(jié)構(gòu)體

/* these structs are defined in executor/hashjoin.h: */
typedef struct HashJoinTupleData *HashJoinTuple;
typedef struct HashJoinTableData *HashJoinTable;

typedef struct HashJoinState
{
    JoinState   js;             /* 基類;its first field is NodeTag */
    ExprState  *hashclauses;//hash連接條件
    List       *hj_OuterHashKeys;   /* 外表?xiàng)l件鏈表;list of ExprState nodes */
    List       *hj_InnerHashKeys;   /* 內(nèi)表連接條件;list of ExprState nodes */
    List       *hj_HashOperators;   /* 操作符OIDs鏈表;list of operator OIDs */
    HashJoinTable hj_HashTable;//Hash表
    uint32      hj_CurHashValue;//當(dāng)前的Hash值
    int         hj_CurBucketNo;//當(dāng)前的bucket編號(hào)
    int         hj_CurSkewBucketNo;//行傾斜bucket編號(hào)
    HashJoinTuple hj_CurTuple;//當(dāng)前元組
    TupleTableSlot *hj_OuterTupleSlot;//outer relation slot
    TupleTableSlot *hj_HashTupleSlot;//Hash tuple slot
    TupleTableSlot *hj_NullOuterTupleSlot;//用于外連接的outer虛擬slot
    TupleTableSlot *hj_NullInnerTupleSlot;//用于外連接的inner虛擬slot
    TupleTableSlot *hj_FirstOuterTupleSlot;//
    int         hj_JoinState;//JoinState狀態(tài)
    bool        hj_MatchedOuter;//是否匹配
    bool        hj_OuterNotEmpty;//outer relation是否為空
} HashJoinState;

HashJoinTable
Hash表數(shù)據(jù)結(jié)構(gòu)

typedef struct HashJoinTableData
{
    int         nbuckets;       /* 內(nèi)存中的hash桶數(shù);# buckets in the in-memory hash table */
    int         log2_nbuckets;  /* 2的對(duì)數(shù)(nbuckets必須是2的冪);its log2 (nbuckets must be a power of 2) */

    int         nbuckets_original;  /* 首次hash時(shí)的桶數(shù);# buckets when starting the first hash */
    int         nbuckets_optimal;   /* 優(yōu)化后的桶數(shù)(每個(gè)批次);optimal # buckets (per batch) */
    int         log2_nbuckets_optimal;  /* 2的對(duì)數(shù);log2(nbuckets_optimal) */

    /* buckets[i] is head of list of tuples in i'th in-memory bucket */
    //bucket [i]是內(nèi)存中第i個(gè)桶中的元組鏈表的head item
    union
    {
        /* unshared array is per-batch storage, as are all the tuples */
        //未共享數(shù)組是按批處理存儲(chǔ)的,所有元組均如此
        struct HashJoinTupleData **unshared;
        /* shared array is per-query DSA area, as are all the tuples */
        //共享數(shù)組是每個(gè)查詢的DSA區(qū)域,所有元組均如此
        dsa_pointer_atomic *shared;
    }           buckets;

    bool        keepNulls;      /*如不匹配則存儲(chǔ)NULL元組,該值為T;true to store unmatchable NULL tuples */

    bool        skewEnabled;    /*是否使用傾斜優(yōu)化?;are we using skew optimization? */
    HashSkewBucket **skewBucket;    /* 傾斜的hash表桶數(shù);hashtable of skew buckets */
    int         skewBucketLen;  /* skewBucket數(shù)組大小;size of skewBucket array (a power of 2!) */
    int         nSkewBuckets;   /* 活動(dòng)的傾斜桶數(shù);number of active skew buckets */
    int        *skewBucketNums; /* 活動(dòng)傾斜桶數(shù)組索引;array indexes of active skew buckets */

    int         nbatch;         /* 批次數(shù);number of batches */
    int         curbatch;       /* 當(dāng)前批次,第一輪為0;current batch #; 0 during 1st pass */

    int         nbatch_original;    /* 在開(kāi)始inner掃描時(shí)的批次;nbatch when we started inner scan */
    int         nbatch_outstart;    /* 在開(kāi)始o(jì)uter掃描時(shí)的批次;nbatch when we started outer scan */

    bool        growEnabled;    /* 關(guān)閉nbatch增加的標(biāo)記;flag to shut off nbatch increases */

    double      totalTuples;    /* 從inner plan獲得的元組數(shù);# tuples obtained from inner plan */
    double      partialTuples;  /* 通過(guò)hashjoin獲得的inner元組數(shù);# tuples obtained from inner plan by me */
    double      skewTuples;     /* 傾斜元組數(shù);# tuples inserted into skew tuples */

    /*
     * These arrays are allocated for the life of the hash join, but only if
     * nbatch > 1.  A file is opened only when we first write a tuple into it
     * (otherwise its pointer remains NULL).  Note that the zero'th array
     * elements never get used, since we will process rather than dump out any
     * tuples of batch zero.
     * 這些數(shù)組在散列連接的生命周期內(nèi)分配,但僅當(dāng)nbatch > 1時(shí)分配。
     * 只有當(dāng)?shù)谝淮螌⒃M寫入文件時(shí),文件才會(huì)打開(kāi)(否則它的指針將保持NULL)。
     * 注意,第0個(gè)數(shù)組元素永遠(yuǎn)不會(huì)被使用,因?yàn)榕?的元組永遠(yuǎn)不會(huì)轉(zhuǎn)儲(chǔ).
     */
    BufFile   **innerBatchFile; /* 每個(gè)批次的inner虛擬臨時(shí)文件緩存;buffered virtual temp file per batch */
    BufFile   **outerBatchFile; /* 每個(gè)批次的outer虛擬臨時(shí)文件緩存;buffered virtual temp file per batch */

    /*
     * Info about the datatype-specific hash functions for the datatypes being
     * hashed. These are arrays of the same length as the number of hash join
     * clauses (hash keys).
     * 有關(guān)正在散列的數(shù)據(jù)類型的特定于數(shù)據(jù)類型的散列函數(shù)的信息。
     * 這些數(shù)組的長(zhǎng)度與散列連接子句(散列鍵)的數(shù)量相同。
     */
    FmgrInfo   *outer_hashfunctions;    /* outer hash函數(shù)FmgrInfo結(jié)構(gòu)體;lookup data for hash functions */
    FmgrInfo   *inner_hashfunctions;    /* inner hash函數(shù)FmgrInfo結(jié)構(gòu)體;lookup data for hash functions */
    bool       *hashStrict;     /* 每個(gè)hash操作符是嚴(yán)格?is each hash join operator strict? */

    Size        spaceUsed;      /* 元組使用的當(dāng)前內(nèi)存空間大小;memory space currently used by tuples */
    Size        spaceAllowed;   /* 空間使用上限;upper limit for space used */
    Size        spacePeak;      /* 峰值的空間使用;peak space used */
    Size        spaceUsedSkew;  /* 傾斜哈希表的當(dāng)前空間使用情況;skew hash table's current space usage */
    Size        spaceAllowedSkew;   /* 傾斜哈希表的使用上限;upper limit for skew hashtable */

    MemoryContext hashCxt;      /* 整個(gè)散列連接存儲(chǔ)的上下文;context for whole-hash-join storage */
    MemoryContext batchCxt;     /* 該批次存儲(chǔ)的上下文;context for this-batch-only storage */

    /* used for dense allocation of tuples (into linked chunks) */
    //用于密集分配元組(到鏈接塊中)
    HashMemoryChunk chunks;     /* 整個(gè)批次使用一個(gè)鏈表;one list for the whole batch */

    /* Shared and private state for Parallel Hash. */
    //并行hash使用的共享和私有狀態(tài)
    HashMemoryChunk current_chunk;  /* 后臺(tái)進(jìn)程的當(dāng)前chunk;this backend's current chunk */
    dsa_area   *area;           /* 用于分配內(nèi)存的DSA區(qū)域;DSA area to allocate memory from */
    ParallelHashJoinState *parallel_state;//并行執(zhí)行狀態(tài)
    ParallelHashJoinBatchAccessor *batches;//并行訪問(wèn)器
    dsa_pointer current_chunk_shared;//當(dāng)前chunk的開(kāi)始指針
} HashJoinTableData;

typedef struct HashJoinTableData *HashJoinTable;

HashJoinTupleData
Hash連接元組數(shù)據(jù)

/* ----------------------------------------------------------------
 *              hash-join hash table structures
 *
 * Each active hashjoin has a HashJoinTable control block, which is
 * palloc'd in the executor's per-query context.  All other storage needed
 * for the hashjoin is kept in private memory contexts, two for each hashjoin.
 * This makes it easy and fast to release the storage when we don't need it
 * anymore.  (Exception: data associated with the temp files lives in the
 * per-query context too, since we always call buffile.c in that context.)
 * 每個(gè)活動(dòng)的hashjoin都有一個(gè)可散列的控制塊,它在執(zhí)行程序的每個(gè)查詢上下文中都是通過(guò)palloc分配的。
 * hashjoin所需的所有其他存儲(chǔ)都保存在私有內(nèi)存上下文中,每個(gè)hashjoin有兩個(gè)。
 * 當(dāng)不再需要它的時(shí)候,這使得釋放它變得簡(jiǎn)單和快速。
 * (例外:與臨時(shí)文件相關(guān)的數(shù)據(jù)也存在于每個(gè)查詢上下文中,因?yàn)樵谶@種情況下總是調(diào)用buffile.c。)
 *
 * The hashtable contexts are made children of the per-query context, ensuring
 * that they will be discarded at end of statement even if the join is
 * aborted early by an error.  (Likewise, any temporary files we make will
 * be cleaned up by the virtual file manager in event of an error.)
 * hashtable上下文是每個(gè)查詢上下文的子上下文,確保在語(yǔ)句結(jié)束時(shí)丟棄它們,即使連接因錯(cuò)誤而提前中止。
 *   (同樣,如果出現(xiàn)錯(cuò)誤,虛擬文件管理器將清理創(chuàng)建的任何臨時(shí)文件。)
 *
 * Storage that should live through the entire join is allocated from the
 * "hashCxt", while storage that is only wanted for the current batch is
 * allocated in the "batchCxt".  By resetting the batchCxt at the end of
 * each batch, we free all the per-batch storage reliably and without tedium.
 * 通過(guò)整個(gè)連接的存儲(chǔ)空間應(yīng)從“hashCxt”分配,而只需要當(dāng)前批處理的存儲(chǔ)空間在“batchCxt”中分配。
 * 通過(guò)在每個(gè)批處理結(jié)束時(shí)重置batchCxt,可以可靠地釋放每個(gè)批處理的所有存儲(chǔ),而不會(huì)感到單調(diào)乏味。
 * 
 * During first scan of inner relation, we get its tuples from executor.
 * If nbatch > 1 then tuples that don't belong in first batch get saved
 * into inner-batch temp files. The same statements apply for the
 * first scan of the outer relation, except we write tuples to outer-batch
 * temp files.  After finishing the first scan, we do the following for
 * each remaining batch:
 *  1. Read tuples from inner batch file, load into hash buckets.
 *  2. Read tuples from outer batch file, match to hash buckets and output.
 * 在內(nèi)部關(guān)系的第一次掃描中,從執(zhí)行者那里得到了它的元組。
 * 如果nbatch > 1,那么不屬于第一批的元組將保存到批內(nèi)臨時(shí)文件中。
 * 相同的語(yǔ)句適用于外關(guān)系的第一次掃描,但是我們將元組寫入外部批處理臨時(shí)文件。
 * 完成第一次掃描后,我們對(duì)每批剩余的元組做如下處理: 
 * 1.從內(nèi)部批處理文件讀取元組,加載到散列桶中。
 * 2.從外部批處理文件讀取元組,匹配哈希桶和輸出。 
 *
 * It is possible to increase nbatch on the fly if the in-memory hash table
 * gets too big.  The hash-value-to-batch computation is arranged so that this
 * can only cause a tuple to go into a later batch than previously thought,
 * never into an earlier batch.  When we increase nbatch, we rescan the hash
 * table and dump out any tuples that are now of a later batch to the correct
 * inner batch file.  Subsequently, while reading either inner or outer batch
 * files, we might find tuples that no longer belong to the current batch;
 * if so, we just dump them out to the correct batch file.
 * 如果內(nèi)存中的哈希表太大,可以動(dòng)態(tài)增加nbatch。
 * 散列值到批處理的計(jì)算是這樣安排的:
 *   這只會(huì)導(dǎo)致元組進(jìn)入比以前認(rèn)為的更晚的批處理,而不會(huì)進(jìn)入更早的批處理。
 * 當(dāng)增加nbatch時(shí),重新掃描哈希表,并將現(xiàn)在屬于后面批處理的任何元組轉(zhuǎn)儲(chǔ)到正確的內(nèi)部批處理文件。
 * 隨后,在讀取內(nèi)部或外部批處理文件時(shí),可能會(huì)發(fā)現(xiàn)不再屬于當(dāng)前批處理的元組;
 *   如果是這樣,只需將它們轉(zhuǎn)儲(chǔ)到正確的批處理文件即可。
 * ----------------------------------------------------------------
 */

/* these are in nodes/execnodes.h: */
/* typedef struct HashJoinTupleData *HashJoinTuple; */
/* typedef struct HashJoinTableData *HashJoinTable; */

typedef struct HashJoinTupleData
{
    /* link to next tuple in same bucket */
    //link同一個(gè)桶中的下一個(gè)元組
    union
    {
        struct HashJoinTupleData *unshared;
        dsa_pointer shared;
    }           next;
    uint32      hashvalue;      /* 元組的hash值;tuple's hash code */
    /* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
}           HashJoinTupleData;

#define HJTUPLE_OVERHEAD  MAXALIGN(sizeof(HashJoinTupleData))
#define HJTUPLE_MINTUPLE(hjtup)  \
    ((MinimalTuple) ((char *) (hjtup) + HJTUPLE_OVERHEAD))

二、源碼解讀

ExecHashJoinNewBatch
切換到新的hashjoin批次,如成功,則返回T;已完成,返回F



/*----------------------------------------------------------------------------------------------------
                                    HJ_FILL_OUTER_TUPLE 階段
----------------------------------------------------------------------------------------------------*/
//參見(jiàn)ExecHashJoin

/*----------------------------------------------------------------------------------------------------
                                    HJ_FILL_INNER_TUPLES 階段
----------------------------------------------------------------------------------------------------*/
//參見(jiàn)ExecHashJoin

/*----------------------------------------------------------------------------------------------------
                                    HJ_NEED_NEW_BATCH 階段
----------------------------------------------------------------------------------------------------*/
/*
 * ExecHashJoinNewBatch
 *      switch to a new hashjoin batch
 *      切換到新的hashjoin批次
 *
 * Returns true if successful, false if there are no more batches.
 * 如成功,則返回T;已完成,返回F
 */
static bool
ExecHashJoinNewBatch(HashJoinState *hjstate)
{
    HashJoinTable hashtable = hjstate->hj_HashTable;//Hash表
    int         nbatch;//批次數(shù)
    int         curbatch;//當(dāng)前批次
    BufFile    *innerFile;//inner relation緩存文件
    TupleTableSlot *slot;//slot
    uint32      hashvalue;//hash值

    nbatch = hashtable->nbatch;
    curbatch = hashtable->curbatch;

    if (curbatch > 0)
    {
        /*
         * We no longer need the previous outer batch file; close it right
         * away to free disk space.
         * 不再需要以前的外批處理文件;關(guān)閉它以釋放磁盤空間。
         */
        if (hashtable->outerBatchFile[curbatch])
            BufFileClose(hashtable->outerBatchFile[curbatch]);
        hashtable->outerBatchFile[curbatch] = NULL;
    }
    else                        /* curbatch ==0,剛剛完成了第一個(gè)批次;we just finished the first batch */
    {
        /*
         * Reset some of the skew optimization state variables, since we no
         * longer need to consider skew tuples after the first batch. The
         * memory context reset we are about to do will release the skew
         * hashtable itself.
         * 重置一些傾斜優(yōu)化狀態(tài)變量,因?yàn)樵诘谝慌笪覀儾辉傩枰紤]傾斜元組。
         * 我們將要進(jìn)行的內(nèi)存上下文重置將釋放傾斜散鏈表本身。
         */
        hashtable->skewEnabled = false;
        hashtable->skewBucket = NULL;
        hashtable->skewBucketNums = NULL;
        hashtable->nSkewBuckets = 0;
        hashtable->spaceUsedSkew = 0;
    }

    /*
     * We can always skip over any batches that are completely empty on both
     * sides.  We can sometimes skip over batches that are empty on only one
     * side, but there are exceptions:
     * 可以跳過(guò)任何兩邊都是空的批次。有時(shí)我們可以跳過(guò)只在一側(cè)為空的批處理,但也有例外:
     *
     * 1. In a left/full outer join, we have to process outer batches even if
     * the inner batch is empty.  Similarly, in a right/full outer join, we
     * have to process inner batches even if the outer batch is empty.
     * 1、在左/全外連接中,即使內(nèi)部批是空的,我們也必須處理外批數(shù)據(jù)。
     *    類似地,在右/完整外部連接中,即使外批數(shù)據(jù)為空,也必須處理內(nèi)批數(shù)據(jù)。
     *
     * 2. If we have increased nbatch since the initial estimate, we have to
     * scan inner batches since they might contain tuples that need to be
     * reassigned to later inner batches.
     * 2、如果在初始估算之后增加了nbatch,必須掃描內(nèi)部批處理,
     *   因?yàn)樗鼈兛赡馨枰匦路峙涞胶竺娴膬?nèi)部批處理的元組。
     *
     * 3. Similarly, if we have increased nbatch since starting the outer
     * scan, we have to rescan outer batches in case they contain tuples that
     * need to be reassigned.
     * 3、類似地,如果在開(kāi)始外部掃描之后增加了nbatch,必須重新掃描外部批處理,
     *   以防它們包含需要重新分配的元組。
     */
    curbatch++;
    while (curbatch < nbatch &&
           (hashtable->outerBatchFile[curbatch] == NULL ||
            hashtable->innerBatchFile[curbatch] == NULL))
    {
        if (hashtable->outerBatchFile[curbatch] &&
            HJ_FILL_OUTER(hjstate))
            break;              /* 符合規(guī)則1,需要處理;must process due to rule 1 */
        if (hashtable->innerBatchFile[curbatch] &&
            HJ_FILL_INNER(hjstate))
            break;              /* 符合規(guī)則1,需要處理;must process due to rule 1 */
        if (hashtable->innerBatchFile[curbatch] &&
            nbatch != hashtable->nbatch_original)
            break;              /* 符合規(guī)則2,需要處理;must process due to rule 2 */
        if (hashtable->outerBatchFile[curbatch] &&
            nbatch != hashtable->nbatch_outstart)
            break;              /* 符合規(guī)則3,需要處理;must process due to rule 3 */
        /* We can ignore this batch. */
        /* Release associated temp files right away. */
        //均不符合規(guī)則1-3,則可以忽略這個(gè)批次了
        //釋放臨時(shí)文件
        if (hashtable->innerBatchFile[curbatch])
            BufFileClose(hashtable->innerBatchFile[curbatch]);
        hashtable->innerBatchFile[curbatch] = NULL;
        if (hashtable->outerBatchFile[curbatch])
            BufFileClose(hashtable->outerBatchFile[curbatch]);
        hashtable->outerBatchFile[curbatch] = NULL;
        curbatch++;//下一個(gè)批次
    }

    if (curbatch >= nbatch)
        return false;           /* 已完成處理所有批次;no more batches */

    hashtable->curbatch = curbatch;//下一個(gè)批次

    /*
     * Reload the hash table with the new inner batch (which could be empty)
     * 使用新的內(nèi)部批處理數(shù)據(jù)(有可能是空的)重新加載哈希表
     */
    ExecHashTableReset(hashtable);//重置Hash表
    //inner relation文件
    innerFile = hashtable->innerBatchFile[curbatch];
    //批次文件不為NULL
    if (innerFile != NULL)
    {
        if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))//掃描innerFile,不成功,則報(bào)錯(cuò)
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not rewind hash-join temporary file: %m")));

        while ((slot = ExecHashJoinGetSavedTuple(hjstate,
                                                 innerFile,
                                                 &hashvalue,
                                                 hjstate->hj_HashTupleSlot)))//
        {
            /*
             * NOTE: some tuples may be sent to future batches.  Also, it is
             * possible for hashtable->nbatch to be increased here!
             * 注意:一些元組可能被發(fā)送到未來(lái)的批次中。
             * 另外,這里也可以增加hashtable->nbatch !
             */
            ExecHashTableInsert(hashtable, slot, hashvalue);
        }

        /*
         * after we build the hash table, the inner batch file is no longer
         * needed
         * 構(gòu)建哈希表之后,內(nèi)部批處理臨時(shí)文件就不再需要了,關(guān)閉之
         */
        BufFileClose(innerFile);
        hashtable->innerBatchFile[curbatch] = NULL;
    }

    /*
     * Rewind outer batch file (if present), so that we can start reading it.
     */
    if (hashtable->outerBatchFile[curbatch] != NULL)
    {
        if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L, SEEK_SET))
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not rewind hash-join temporary file: %m")));
    }

    return true;
}


/*
 * ExecHashJoinGetSavedTuple
 *      read the next tuple from a batch file.  Return NULL if no more.
 *      從批次文件中讀取下一個(gè)元組,如無(wú)則返回NULL
 *
 * On success, *hashvalue is set to the tuple's hash value, and the tuple
 * itself is stored in the given slot.
 * 如成功,*hashvalue參數(shù)設(shè)置為元組的Hash值,元組存儲(chǔ)在給定的slot中
 */
static TupleTableSlot *
ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
                          BufFile *file,
                          uint32 *hashvalue,
                          TupleTableSlot *tupleSlot)
{
    uint32      header[2];
    size_t      nread;
    MinimalTuple tuple;

    /*
     * We check for interrupts here because this is typically taken as an
     * alternative code path to an ExecProcNode() call, which would include
     * such a check.
     * 在這里檢查中斷,因?yàn)檫@通常被作為ExecProcNode()調(diào)用的替代代碼路徑,通常包含這樣的檢查。
     */
    CHECK_FOR_INTERRUPTS();

    /*
     * Since both the hash value and the MinimalTuple length word are uint32,
     * we can read them both in one BufFileRead() call without any type
     * cheating.
     * 因?yàn)楣V岛妥钚¢L(zhǎng)度字都是uint32,所以可以在一個(gè)BufFileRead()調(diào)用中讀取它們,
     *   而不需要任何類型的cheating。
     */
    nread = BufFileRead(file, (void *) header, sizeof(header));//讀取文件
    if (nread == 0)             /* end of file */
    {
        //已讀取完畢,返回NULL
        ExecClearTuple(tupleSlot);
        return NULL;
    }
    if (nread != sizeof(header))//讀取的大小不等于header的大小,報(bào)錯(cuò)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not read from hash-join temporary file: %m")));
    //hash值
    *hashvalue = header[0];
    //tuple,分配的內(nèi)存大小為MinimalTuple結(jié)構(gòu)體大小
    tuple = (MinimalTuple) palloc(header[1]);
    //元組大小
    tuple->t_len = header[1];
    //讀取文件
    nread = BufFileRead(file,
                        (void *) ((char *) tuple + sizeof(uint32)),
                        header[1] - sizeof(uint32));
    //讀取有誤,報(bào)錯(cuò)
    if (nread != header[1] - sizeof(uint32))
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not read from hash-join temporary file: %m")));
    //存儲(chǔ)到slot中
    ExecForceStoreMinimalTuple(tuple, tupleSlot, true);
    return tupleSlot;//返回slot
}


 /*
 * ExecHashTableInsert
 *      insert a tuple into the hash table depending on the hash value
 *      it may just go to a temp file for later batches
 *      根據(jù)哈希值向哈希表中插入一個(gè)tuple,它可能只是轉(zhuǎn)到一個(gè)臨時(shí)文件中以供以后的批處理
 *
 * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual
 * tuple; the minimal case in particular is certain to happen while reloading
 * tuples from batch files.  We could save some cycles in the regular-tuple
 * case by not forcing the slot contents into minimal form; not clear if it's
 * worth the messiness required.
 * 注意:傳遞的TupleTableSlot可能包含一個(gè)常規(guī)、最小或虛擬元組;
 *   在從批處理文件中重新加載元組時(shí),肯定會(huì)出現(xiàn)最小的情況。
 * 如為常規(guī)元組,可以通過(guò)不強(qiáng)制slot內(nèi)容變成最小形式來(lái)節(jié)省一些處理時(shí)間;
 *   但不清楚這樣的混亂是否值得。
 */
void
ExecHashTableInsert(HashJoinTable hashtable,
                    TupleTableSlot *slot,
                    uint32 hashvalue)
{
    bool        shouldFree;//是否釋放資源
    MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);//獲取一個(gè)MinimalTuple
    int         bucketno;//桶號(hào)
    int         batchno;//批次號(hào)

    ExecHashGetBucketAndBatch(hashtable, hashvalue,
                              &bucketno, &batchno);//獲取桶號(hào)和批次號(hào)

    /*
     * decide whether to put the tuple in the hash table or a temp file
     * 判斷是否放到hash表中還是放到臨時(shí)文件中
     */
    if (batchno == hashtable->curbatch)
    {
        //批次號(hào)==hash表的批次號(hào),放到hash表中
        /*
         * put the tuple in hash table
         * 把元組放到hash表中
         */
        HashJoinTuple hashTuple;//hash tuple
        int         hashTupleSize;//大小
        double      ntuples = (hashtable->totalTuples - hashtable->skewTuples);//常規(guī)元組數(shù)量

        /* Create the HashJoinTuple */
        //創(chuàng)建HashJoinTuple
        hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;//大小
        hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);//分配存儲(chǔ)空間
        //hash值
        hashTuple->hashvalue = hashvalue;
        //拷貝數(shù)據(jù)
        memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);

        /*
         * We always reset the tuple-matched flag on insertion.  This is okay
         * even when reloading a tuple from a batch file, since the tuple
         * could not possibly have been matched to an outer tuple before it
         * went into the batch file.
         * 我們總是在插入時(shí)重置元組匹配的標(biāo)志。
         * 即使在從批處理文件中重新加載元組時(shí),這也是可以的,
         *   因?yàn)樵谠M進(jìn)入批處理文件之前,它不可能與外部元組匹配。
         */
        HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));

        /* Push it onto the front of the bucket's list */
        //
        hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
        hashtable->buckets.unshared[bucketno] = hashTuple;

        /*
         * Increase the (optimal) number of buckets if we just exceeded the
         * NTUP_PER_BUCKET threshold, but only when there's still a single
         * batch.
         * 如果剛剛超過(guò)了NTUP_PER_BUCKET閾值,但是只有在仍然有單個(gè)批處理時(shí),
         *  才增加桶的(優(yōu)化后)數(shù)量。
         */
        if (hashtable->nbatch == 1 &&
            ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
        {
            //只有1個(gè)批次而且元組數(shù)大于桶數(shù)*每桶的元組數(shù)
            /* Guard against integer overflow and alloc size overflow */
            //確保整數(shù)不要溢出
            if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
                hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
            {
                hashtable->nbuckets_optimal *= 2;
                hashtable->log2_nbuckets_optimal += 1;
            }
        }

        /* Account for space used, and back off if we've used too much */
        //聲明使用的存儲(chǔ)空間,如果使用太多,需要回退
        hashtable->spaceUsed += hashTupleSize;
        if (hashtable->spaceUsed > hashtable->spacePeak)
            hashtable->spacePeak = hashtable->spaceUsed;//超出峰值,則跳轉(zhuǎn)
        if (hashtable->spaceUsed +
            hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
            > hashtable->spaceAllowed)
            ExecHashIncreaseNumBatches(hashtable);//超出允許的空間,則增加批次
    }
    else
    {
        //不在這個(gè)批次中
        /*
         * put the tuple into a temp file for later batches
         * 放在臨時(shí)文件中以便后續(xù)處理(減少重復(fù)掃描)
         */
        Assert(batchno > hashtable->curbatch);
        ExecHashJoinSaveTuple(tuple,
                              hashvalue,
                              &hashtable->innerBatchFile[batchno]);//存儲(chǔ)tuple到臨時(shí)文件中
    }

    if (shouldFree)//如需要釋放空間,則處理之
        heap_free_minimal_tuple(tuple);
}

三、跟蹤分析

設(shè)置work_mem為較低的值(1MB),便于手工產(chǎn)生批次

testdb=# set work_mem='1MB';
SET
testdb=# show work_mem;
 work_mem 
----------
 1MB
(1 row)

測(cè)試腳本如下

testdb=# set enable_nestloop=false;
SET
testdb=# set enable_mergejoin=false;
SET
testdb=# explain verbose select dw.*,grjf.grbh,grjf.xm,grjf.ny,grjf.je 
testdb-# from t_dwxx dw,lateral (select gr.grbh,gr.xm,jf.ny,jf.je 
testdb(#                         from t_grxx gr inner join t_jfxx jf 
testdb(#                                        on gr.dwbh = dw.dwbh 
testdb(#                                           and gr.grbh = jf.grbh) grjf
testdb-# order by dw.dwbh;
                                          QUERY PLAN                                           
-----------------------------------------------------------------------------------------------
 Sort  (cost=14828.83..15078.46 rows=99850 width=47)
   Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
   Sort Key: dw.dwbh
   ->  Hash Join  (cost=3176.00..6537.55 rows=99850 width=47)
         Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm, jf.ny, jf.je
         Hash Cond: ((gr.grbh)::text = (jf.grbh)::text)
         ->  Hash Join  (cost=289.00..2277.61 rows=99850 width=32)
               Output: dw.dwmc, dw.dwbh, dw.dwdz, gr.grbh, gr.xm
               Inner Unique: true
               Hash Cond: ((gr.dwbh)::text = (dw.dwbh)::text)
               ->  Seq Scan on public.t_grxx gr  (cost=0.00..1726.00 rows=100000 width=16)
                     Output: gr.dwbh, gr.grbh, gr.xm, gr.xb, gr.nl
               ->  Hash  (cost=164.00..164.00 rows=10000 width=20)
                     Output: dw.dwmc, dw.dwbh, dw.dwdz
                     ->  Seq Scan on public.t_dwxx dw  (cost=0.00..164.00 rows=10000 width=20)
                           Output: dw.dwmc, dw.dwbh, dw.dwdz
         ->  Hash  (cost=1637.00..1637.00 rows=100000 width=20)
               Output: jf.ny, jf.je, jf.grbh
               ->  Seq Scan on public.t_jfxx jf  (cost=0.00..1637.00 rows=100000 width=20)
                     Output: jf.ny, jf.je, jf.grbh
(20 rows)

啟動(dòng)gdb,設(shè)置斷點(diǎn),進(jìn)入ExecHashJoinNewBatch

(gdb) b ExecHashJoinNewBatch
Breakpoint 1 at 0x7031f5: file nodeHashjoin.c, line 943.
(gdb) c
Continuing.

Breakpoint 1, ExecHashJoinNewBatch (hjstate=0x1c40738) at nodeHashjoin.c:943
943     HashJoinTable hashtable = hjstate->hj_HashTable;

獲取批次數(shù)(8個(gè)批次)和當(dāng)前批次(0,第一個(gè)批次)

(gdb) n
950     nbatch = hashtable->nbatch;
(gdb) 
951     curbatch = hashtable->curbatch;
(gdb) 
953     if (curbatch > 0)
(gdb) p nbatch
$5 = 8
(gdb) p curbatch
$6 = 0

curbatch ==0,剛剛完成了第一個(gè)批次,重置傾斜優(yōu)化的相關(guān)狀態(tài)變量

(gdb) n
971         hashtable->skewEnabled = false;
(gdb) 
972         hashtable->skewBucket = NULL;
(gdb) 
973         hashtable->skewBucketNums = NULL;
(gdb) 
974         hashtable->nSkewBuckets = 0;
(gdb) 
975         hashtable->spaceUsedSkew = 0;
(gdb) 
995     curbatch++;

外表為空或內(nèi)表為空時(shí)的優(yōu)化,本次調(diào)用均不為空

(gdb) n
996     while (curbatch < nbatch &&
(gdb) 
997            (hashtable->outerBatchFile[curbatch] == NULL ||
(gdb) p hashtable->outerBatchFile[curbatch]
$7 = (BufFile *) 0x1d85290
(gdb) p hashtable->outerBatchFile[curbatch]
$8 = (BufFile *) 0x1d85290

設(shè)置當(dāng)前批次,重建Hash表

(gdb) 
1023        if (curbatch >= nbatch)
(gdb) 
1026        hashtable->curbatch = curbatch;
(gdb) 
1031        ExecHashTableReset(hashtable);

獲取inner relation批次臨時(shí)文件

(gdb) 
1033        innerFile = hashtable->innerBatchFile[curbatch];
(gdb) 
1035        if (innerFile != NULL)
(gdb) p innerFile
$9 = (BufFile *) 0x1cc0540

臨時(shí)文件不為NULL,讀取文件

(gdb) n
1037            if (BufFileSeek(innerFile, 0, 0L, SEEK_SET))
(gdb) 
1042            while ((slot = ExecHashJoinGetSavedTuple(hjstate,

進(jìn)入函數(shù)ExecHashJoinGetSavedTuple

(gdb) step
ExecHashJoinGetSavedTuple (hjstate=0x1c40fd8, file=0x1cc0540, hashvalue=0x7ffeace60824, tupleSlot=0x1c4cc20)
    at nodeHashjoin.c:1259
1259        CHECK_FOR_INTERRUPTS();
(gdb) 

ExecHashJoinGetSavedTuple->讀取頭部8個(gè)字節(jié)(header,類型為void *,在64 bit的機(jī)器上,大小8個(gè)字節(jié))

gdb) n
1266        nread = BufFileRead(file, (void *) header, sizeof(header));
(gdb) 
1267        if (nread == 0)             /* end of file */
(gdb) p nread
$10 = 8
(gdb) n
1272        if (nread != sizeof(header))
(gdb) 

ExecHashJoinGetSavedTuple->獲取Hash值(1978434688)

(gdb) 
1276        *hashvalue = header[0];
(gdb) n
1277        tuple = (MinimalTuple) palloc(header[1]);
(gdb) p *hashvalue
$11 = 1978434688

ExecHashJoinGetSavedTuple->獲取tuple&元組長(zhǎng)度

(gdb) n
1278        tuple->t_len = header[1];
(gdb) 
1281                            header[1] - sizeof(uint32));
(gdb) p tuple->t_len
$16 = 24
(gdb) p *tuple
$17 = {t_len = 24, mt_padding = "\177\177\177\177\177\177", t_infomask2 = 32639, t_infomask = 32639, t_hoff = 127 '\177', 
  t_bits = 0x1c5202f "\177\177\177\177\177\177\177\177\177~\177\177\177\177\177\177\177"}
(gdb) 

ExecHashJoinGetSavedTuple->根據(jù)大小讀取文件獲取元組

(gdb) n
1279        nread = BufFileRead(file,
(gdb) 
1282        if (nread != header[1] - sizeof(uint32))
(gdb) p header[1]
$18 = 24
(gdb) p sizeof(uint32)
$19 = 4
(gdb) p *tuple
$20 = {t_len = 24, mt_padding = "\000\000\000\000\000", t_infomask2 = 3, t_infomask = 2, t_hoff = 24 '\030', 
  t_bits = 0x1c5202f ""}

ExecHashJoinGetSavedTuple->存儲(chǔ)到slot中,完成調(diào)用

(gdb) n
1286        return ExecStoreMinimalTuple(tuple, tupleSlot, true);
(gdb) 
1287    }
(gdb) 
ExecHashJoinNewBatch (hjstate=0x1c40fd8) at nodeHashjoin.c:1051
1051                ExecHashTableInsert(hashtable, slot, hashvalue);

插入到Hash表中

(gdb) 
1051                ExecHashTableInsert(hashtable, slot, hashvalue);

進(jìn)入ExecHashTableInsert

(gdb) step
ExecHashTableInsert (hashtable=0x1c6e1c0, slot=0x1c4cc20, hashvalue=3757101760) at nodeHash.c:1593
1593        MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
(gdb) 

ExecHashTableInsert->獲取批次號(hào)和hash桶號(hào)

(gdb) n
1597        ExecHashGetBucketAndBatch(hashtable, hashvalue,
(gdb) 
1603        if (batchno == hashtable->curbatch)
(gdb) p batchno
$21 = 1
(gdb) p bucketno
$22 = 21184
(gdb) 
(gdb) p hashtable->curbatch
$23 = 1

ExecHashTableInsert->批次號(hào)與Hash表中的批次號(hào)一致,把元組放到Hash表中
常規(guī)元組數(shù)量=100000

(gdb) n
1610            double      ntuples = (hashtable->totalTuples - hashtable->skewTuples);
(gdb) n
1613            hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
(gdb) p ntuples
$24 = 100000

ExecHashTableInsert->創(chuàng)建HashJoinTuple,重置元組匹配標(biāo)記

(gdb) n
1614            hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
(gdb) 
1616            hashTuple->hashvalue = hashvalue;
(gdb) 
1617            memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
(gdb) 
1625            HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
(gdb) 

ExecHashTableInsert->元組放在Hash表桶鏈表的前面

(gdb) n
1628            hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
(gdb) 
1629            hashtable->buckets.unshared[bucketno] = hashTuple;
(gdb) 
1636            if (hashtable->nbatch == 1 &&
(gdb) 

ExecHashTableInsert->調(diào)整或記錄Hash表內(nèi)存使用的峰值并返回,回到ExecHashJoinNewBatch

(gdb) 
1649            hashtable->spaceUsed += hashTupleSize;
(gdb) 
...
(gdb) 
1667    }
(gdb) n
ExecHashJoinNewBatch (hjstate=0x1c40fd8) at nodeHashjoin.c:1042
1042            while ((slot = ExecHashJoinGetSavedTuple(hjstate,

循環(huán)插入到Hash表中

1042            while ((slot = ExecHashJoinGetSavedTuple(hjstate,
(gdb) n
1051                ExecHashTableInsert(hashtable, slot, hashvalue);
...

DONE!

四、參考資料

Hash Joins: Past, Present and Future/PGCon 2017
A Look at How Postgres Executes a Tiny Join - Part 1
A Look at How Postgres Executes a Tiny Join - Part 2
Assignment 2 Symmetric Hash Join

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI