溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用PostgreSQL ExecAgg函數(shù)

發(fā)布時間:2021-11-09 13:51:50 來源:億速云 閱讀:194 作者:iii 欄目:關系型數(shù)據(jù)庫

本篇內(nèi)容介紹了“怎么使用PostgreSQL ExecAgg函數(shù)”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

一、數(shù)據(jù)結構

AggState
聚合函數(shù)執(zhí)行時狀態(tài)結構體,內(nèi)含AggStatePerAgg等結構體

/* ---------------------
 *    AggState information
 *
 *    ss.ss_ScanTupleSlot refers to output of underlying plan.
 *  ss.ss_ScanTupleSlot指的是基礎計劃的輸出.
 *    (ss = ScanState,ps = PlanState)
 *
 *    Note: ss.ps.ps_ExprContext contains ecxt_aggvalues and
 *    ecxt_aggnulls arrays, which hold the computed agg values for the current
 *    input group during evaluation of an Agg node's output tuple(s).  We
 *    create a second ExprContext, tmpcontext, in which to evaluate input
 *    expressions and run the aggregate transition functions.
 *    注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls數(shù)組,
 *      這兩個數(shù)組保存了在計算agg節(jié)點的輸出元組時當前輸入組已計算的agg值.
 * ---------------------
 */
/* these structs are private in nodeAgg.c: */
//在nodeAgg.c中私有的結構體
typedef struct AggStatePerAggData *AggStatePerAgg;
typedef struct AggStatePerTransData *AggStatePerTrans;
typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggStatePerPhaseData *AggStatePerPhase;
typedef struct AggStatePerHashData *AggStatePerHash;
typedef struct AggState
{
    //第一個字段是NodeTag(繼承自ScanState)
    ScanState    ss;                /* its first field is NodeTag */
    //targetlist和quals中所有的Aggref
    List       *aggs;            /* all Aggref nodes in targetlist & quals */
    //鏈表的大小(可以為0)
    int            numaggs;        /* length of list (could be zero!) */
    //pertrans條目大小
    int            numtrans;        /* number of pertrans items */
    //Agg策略模式
    AggStrategy aggstrategy;    /* strategy mode */
    //agg-splitting模式,參見nodes.h
    AggSplit    aggsplit;        /* agg-splitting mode, see nodes.h */
    //指向當前步驟數(shù)據(jù)的指針
    AggStatePerPhase phase;        /* pointer to current phase data */
    //步驟數(shù)(包括0)
    int            numphases;        /* number of phases (including phase 0) */
    //當前步驟
    int            current_phase;    /* current phase number */
    //per-Aggref信息
    AggStatePerAgg peragg;        /* per-Aggref information */
    //per-Trans狀態(tài)信息
    AggStatePerTrans pertrans;    /* per-Trans state information */
    //長生命周期數(shù)據(jù)的ExprContexts(hashtable)
    ExprContext *hashcontext;    /* econtexts for long-lived data (hashtable) */
    ////長生命周期數(shù)據(jù)的ExprContexts(每一個GS使用)
    ExprContext **aggcontexts;    /* econtexts for long-lived data (per GS) */
    //輸入表達式的ExprContext
    ExprContext *tmpcontext;    /* econtext for input expressions */
#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14
    //當前活躍的aggcontext
    ExprContext *curaggcontext; /* currently active aggcontext */
    //當前活躍的aggregate(如存在)
    AggStatePerAgg curperagg;    /* currently active aggregate, if any */
#define FIELDNO_AGGSTATE_CURPERTRANS 16
    //當前活躍的trans state
    AggStatePerTrans curpertrans;    /* currently active trans state, if any */
    //輸入結束?
    bool        input_done;        /* indicates end of input */
    //Agg掃描結束?
    bool        agg_done;        /* indicates completion of Agg scan */
    //最后一個grouping set
    int            projected_set;    /* The last projected grouping set */
#define FIELDNO_AGGSTATE_CURRENT_SET 20
    //將要解析的當前grouping set
    int            current_set;    /* The current grouping set being evaluated */
    //當前投影操作的分組列
    Bitmapset  *grouped_cols;    /* grouped cols in current projection */
    //倒序的分組列鏈表
    List       *all_grouped_cols;    /* list of all grouped cols in DESC order */
    /* These fields are for grouping set phase data */
    //-------- 下面的列用于grouping set步驟數(shù)據(jù)
    //所有步驟中最大的sets大小
    int            maxsets;        /* The max number of sets in any phase */
    //所有步驟的數(shù)組
    AggStatePerPhase phases;    /* array of all phases */
    //對于phases > 1,已排序的輸入信息
    Tuplesortstate *sort_in;    /* sorted input to phases > 1 */
    //對于下一個步驟,輸入已拷貝
    Tuplesortstate *sort_out;    /* input is copied here for next phase */
    //排序結果的slot
    TupleTableSlot *sort_slot;    /* slot for sort results */
    /* these fields are used in AGG_PLAIN and AGG_SORTED modes: */
    //------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:
    //per-group指針的grouping set編號數(shù)組
    AggStatePerGroup *pergroups;    /* grouping set indexed array of per-group
                                     * pointers */
    //當前組的第一個元組拷貝
    HeapTuple    grp_firstTuple; /* copy of first tuple of current group */
    /* these fields are used in AGG_HASHED and AGG_MIXED modes: */
    //--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:
    //是否已填充hash表?
    bool        table_filled;    /* hash table filled yet? */
    //hash桶數(shù)?
    int            num_hashes;
    //相應的哈希表數(shù)據(jù)數(shù)組
    AggStatePerHash perhash;    /* array of per-hashtable data */
    //per-group指針的grouping set編號數(shù)組
    AggStatePerGroup *hash_pergroup;    /* grouping set indexed array of
                                         * per-group pointers */
    /* support for evaluation of agg input expressions: */
    //---------- agg輸入表達式解析支持
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
    //首先是->pergroups,然后是hash_pergroup
    AggStatePerGroup *all_pergroups;    /* array of first ->pergroups, than
                                         * ->hash_pergroup */
    //投影實現(xiàn)機制
    ProjectionInfo *combinedproj;    /* projection machinery */
} AggState;
/* Primitive options supported by nodeAgg.c: */
//nodeag .c支持的基本選項
#define AGGSPLITOP_COMBINE        0x01    /* substitute combinefn for transfn */
#define AGGSPLITOP_SKIPFINAL    0x02    /* skip finalfn, return state as-is */
#define AGGSPLITOP_SERIALIZE    0x04    /* apply serializefn to output */
#define AGGSPLITOP_DESERIALIZE    0x08    /* apply deserializefn to input */
/* Supported operating modes (i.e., useful combinations of these options): */
//支持的操作模式
typedef enum AggSplit
{
    /* Basic, non-split aggregation: */
    //基本 : 非split聚合
    AGGSPLIT_SIMPLE = 0,
    /* Initial phase of partial aggregation, with serialization: */
    //部分聚合的初始步驟,序列化
    AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,
    /* Final phase of partial aggregation, with deserialization: */
    //部分聚合的最終步驟,反序列化
    AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE
} AggSplit;
/* Test whether an AggSplit value selects each primitive option: */
//測試AggSplit選擇了哪些基本選項
#define DO_AGGSPLIT_COMBINE(as)        (((as) & AGGSPLITOP_COMBINE) != 0)
#define DO_AGGSPLIT_SKIPFINAL(as)    (((as) & AGGSPLITOP_SKIPFINAL) != 0)
#define DO_AGGSPLIT_SERIALIZE(as)    (((as) & AGGSPLITOP_SERIALIZE) != 0)
#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)

二、源碼解讀

ExecAgg函數(shù),首先獲取AggState運行狀態(tài),然后根據(jù)各個階段(aggstate->phase)的策略(aggstrategy)執(zhí)行相應的邏輯.如使用Hash聚合,則只有一個節(jié)點,但有兩個策略,首先是AGG_HASHED,該策略對輸入元組按照分組列值進行Hash,同時執(zhí)行轉換函數(shù)計算中間結果值,緩存到哈希表中;然后執(zhí)行AGG_MIXED策略,從Hash表中獲取結果元組并返回結果元組(每一result為一個結果行).

/*
 * ExecAgg -
 *
 *      ExecAgg receives tuples from its outer subplan and aggregates over
 *      the appropriate attribute for each aggregate function use (Aggref
 *      node) appearing in the targetlist or qual of the node.  The number
 *      of tuples to aggregate over depends on whether grouped or plain
 *      aggregation is selected.  In grouped aggregation, we produce a result
 *      row for each group; in plain aggregation there's a single result row
 *      for the whole query.  In either case, the value of each aggregate is
 *      stored in the expression context to be used when ExecProject evaluates
 *      the result tuple.
 *       ExecAgg接收從outer子計劃返回的元組合適的屬性上為每一個聚合函數(shù)(出現(xiàn)在投影列或節(jié)點表達式)執(zhí)行聚合.
 *    需要聚合的元組數(shù)量依賴于是否已分組或者選擇普通聚合.
 *    在已分組的聚合操作宏,為每一個組產(chǎn)生結果行;普通聚合,整個查詢只有一個結果行.
 *    不管哪種情況,每一個聚合結果值都會存儲在表達式上下文中(ExecProject會解析結果元組)
 */
static TupleTableSlot *
ExecAgg(PlanState *pstate)
{
    AggState   *node = castNode(AggState, pstate);
    TupleTableSlot *result = NULL;
    CHECK_FOR_INTERRUPTS();
    if (!node->agg_done)
    {
        /* Dispatch based on strategy */
        //基于策略進行分發(fā)
        switch (node->phase->aggstrategy)
        {
            case AGG_HASHED:
                if (!node->table_filled)
                    agg_fill_hash_table(node);
                /* FALLTHROUGH */
                //填充后,執(zhí)行MIXED
            case AGG_MIXED:
                result = agg_retrieve_hash_table(node);
                break;
            case AGG_PLAIN:
            case AGG_SORTED:
                result = agg_retrieve_direct(node);
                break;
        }
        if (!TupIsNull(result))
            return result;
    }
    return NULL;
}

agg_fill_hash_table
讀取輸入并構建哈希表.
lookup_hash_entries函數(shù)根據(jù)輸入元組構建分組列哈希表(搜索或新建條目),advance_aggregates調(diào)用轉換函數(shù)計算中間結果并緩存.

/*
 * ExecAgg for hashed case: read input and build hash table
 * 讀取輸入并構建哈希表
 */
static void
agg_fill_hash_table(AggState *aggstate)
{
    TupleTableSlot *outerslot;
    ExprContext *tmpcontext = aggstate->tmpcontext;
    /*
     * Process each outer-plan tuple, and then fetch the next one, until we
     * exhaust the outer plan.
     * 處理每一個outer-plan返回的元組,然后繼續(xù)提取下一個,直至完成所有元組的處理.
     */
    for (;;)
    {
        //--------- 循環(huán)直至完成所有元組的處理
        //提取輸入的元組
        outerslot = fetch_input_tuple(aggstate);
        if (TupIsNull(outerslot))
            break;//已完成處理,退出循環(huán)
        /* set up for lookup_hash_entries and advance_aggregates */
        //配置lookup_hash_entries和advance_aggregates函數(shù)
        //把元組放在臨時內(nèi)存上下文中
        tmpcontext->ecxt_outertuple = outerslot;
        /* Find or build hashtable entries */
        //檢索或構建哈希表條目
        lookup_hash_entries(aggstate);
        /* Advance the aggregates (or combine functions) */
        //推動聚合(或組合函數(shù))
        advance_aggregates(aggstate);
        /*
         * Reset per-input-tuple context after each tuple, but note that the
         * hash lookups do this too
         * 重置per-input-tuple內(nèi)存上下文,但需要注意hash檢索也會做這個事情
         */
        ResetExprContext(aggstate->tmpcontext);
    }
    aggstate->table_filled = true;
    /* Initialize to walk the first hash table */
    //初始化用于遍歷第一個哈希表
    select_current_set(aggstate, 0, true);
    ResetTupleHashIterator(aggstate->perhash[0].hashtable,
                           &aggstate->perhash[0].hashiter);
}

agg_retrieve_hash_table
agg_retrieve_hash_table函數(shù)在hash表中檢索結果,執(zhí)行投影等相關操作.

/*
 * ExecAgg for hashed case: retrieving groups from hash table
 * ExecAgg(Hash實現(xiàn)版本):在hash表中檢索組
 */
static TupleTableSlot *
agg_retrieve_hash_table(AggState *aggstate)
{
    ExprContext *econtext;
    AggStatePerAgg peragg;
    AggStatePerGroup pergroup;
    TupleHashEntryData *entry;
    TupleTableSlot *firstSlot;
    TupleTableSlot *result;
    AggStatePerHash perhash;
    /*
     * get state info from node.
     * 從node節(jié)點中獲取狀態(tài)信息.
     *
     * econtext is the per-output-tuple expression context.
     * econtext是per-output-tuple表達式上下文.
     */
    econtext = aggstate->ss.ps.ps_ExprContext;
    peragg = aggstate->peragg;
    firstSlot = aggstate->ss.ss_ScanTupleSlot;
    /*
     * Note that perhash (and therefore anything accessed through it) can
     * change inside the loop, as we change between grouping sets.
     * 注意,在分組之間切換時,perhash在循環(huán)中可能會改變
     */
    perhash = &aggstate->perhash[aggstate->current_set];
    /*
     * We loop retrieving groups until we find one satisfying
     * aggstate->ss.ps.qual
     * 循環(huán)檢索groups,直至檢索到一個符合aggstate->ss.ps.qual條件的組.
     */
    while (!aggstate->agg_done)
    {
        //------------- 選好
        //獲取Slot
        TupleTableSlot *hashslot = perhash->hashslot;
        int            i;
        //檢查中斷
        CHECK_FOR_INTERRUPTS();
        /*
         * Find the next entry in the hash table
         * 檢索hash表的下一個條目
         */
        entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
        if (entry == NULL)
        {
            //條目為NULL,切換到下一個set
            int            nextset = aggstate->current_set + 1;
            if (nextset < aggstate->num_hashes)
            {
                /*
                 * Switch to next grouping set, reinitialize, and restart the
                 * loop.
                 * 切換至下一個grouping set,重新初始化并重啟循環(huán)
                 */
                select_current_set(aggstate, nextset, true);
                perhash = &aggstate->perhash[aggstate->current_set];
                ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);
                continue;
            }
            else
            {
                /* No more hashtables, so done */
                //已完成檢索,設置標記,退出
                aggstate->agg_done = true;
                return NULL;
            }
        }
        /*
         * Clear the per-output-tuple context for each group
         * 為每一個group清除per-output-tuple上下文
         *
         * We intentionally don't use ReScanExprContext here; if any aggs have
         * registered shutdown callbacks, they mustn't be called yet, since we
         * might not be done with that agg.
         * 在這里不會用到ReScanExprContext,如果存在aggs注冊了shutdown回調(diào),
         *   那應該還沒有調(diào)用,因為我們可能還沒有完成該agg的處理.
         */
        ResetExprContext(econtext);
        /*
         * Transform representative tuple back into one with the right
         * columns.
         * 將典型元組轉回具有正確列的元組.
         */
        ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
        slot_getallattrs(hashslot);
        //清理元組
        //重置firstSlot
        ExecClearTuple(firstSlot);
        memset(firstSlot->tts_isnull, true,
               firstSlot->tts_tupleDescriptor->natts * sizeof(bool));
        for (i = 0; i < perhash->numhashGrpCols; i++)
        {
            //重置firstSlot
            int            varNumber = perhash->hashGrpColIdxInput[i] - 1;
            firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
            firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
        }
        ExecStoreVirtualTuple(firstSlot);
        pergroup = (AggStatePerGroup) entry->additional;
        /*
         * Use the representative input tuple for any references to
         * non-aggregated input columns in the qual and tlist.
         * 為qual和tlist中的非聚合輸入列依賴使用典型輸入元組
         */
        econtext->ecxt_outertuple = firstSlot;
        //準備投影slot
        prepare_projection_slot(aggstate,
                                econtext->ecxt_outertuple,
                                aggstate->current_set);
        //最終的聚合操作
        finalize_aggregates(aggstate, peragg, pergroup);
        //投影
        result = project_aggregates(aggstate);
        if (result)
            return result;
    }
    /* No more groups */
    //沒有更多的groups了,返回NULL
    return NULL;
}

“怎么使用PostgreSQL ExecAgg函數(shù)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI