千家信息网

怎么使用PostgreSQL ExecAgg函数

发表于:2025-11-08 作者:千家信息网编辑
千家信息网最后更新 2025年11月08日,本篇内容介绍了"怎么使用PostgreSQL ExecAgg函数"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够
千家信息网最后更新 2025年11月08日怎么使用PostgreSQL ExecAgg函数

本篇内容介绍了"怎么使用PostgreSQL ExecAgg函数"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、数据结构

AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体

/* --------------------- *    AggState information * *    ss.ss_ScanTupleSlot refers to output of underlying plan. *  ss.ss_ScanTupleSlot指的是基础计划的输出. *    (ss = ScanState,ps = PlanState) * *    Note: ss.ps.ps_ExprContext contains ecxt_aggvalues and *    ecxt_aggnulls arrays, which hold the computed agg values for the current *    input group during evaluation of an Agg node's output tuple(s).  We *    create a second ExprContext, tmpcontext, in which to evaluate input *    expressions and run the aggregate transition functions. *    注意:ss.ps.ps_ExprContext包含了ecxt_aggvalues和ecxt_aggnulls数组, *      这两个数组保存了在计算agg节点的输出元组时当前输入组已计算的agg值. * --------------------- *//* these structs are private in nodeAgg.c: *///在nodeAgg.c中私有的结构体typedef struct AggStatePerAggData *AggStatePerAgg;typedef struct AggStatePerTransData *AggStatePerTrans;typedef struct AggStatePerGroupData *AggStatePerGroup;typedef struct AggStatePerPhaseData *AggStatePerPhase;typedef struct AggStatePerHashData *AggStatePerHash;typedef struct AggState{    //第一个字段是NodeTag(继承自ScanState)    ScanState    ss;                /* its first field is NodeTag */    //targetlist和quals中所有的Aggref    List       *aggs;            /* all Aggref nodes in targetlist & quals */    //链表的大小(可以为0)    int            numaggs;        /* length of list (could be zero!) */    //pertrans条目大小    int            numtrans;        /* number of pertrans items */    //Agg策略模式    AggStrategy aggstrategy;    /* strategy mode */    //agg-splitting模式,参见nodes.h    AggSplit    aggsplit;        /* agg-splitting mode, see nodes.h */    //指向当前步骤数据的指针    AggStatePerPhase phase;        /* pointer to current phase data */    //步骤数(包括0)    int            numphases;        /* number of phases (including phase 0) */    //当前步骤    int            current_phase;    /* current phase number */    //per-Aggref信息    AggStatePerAgg peragg;        /* per-Aggref information */    //per-Trans状态信息    AggStatePerTrans pertrans;    /* per-Trans state information */    //长生命周期数据的ExprContexts(hashtable)    ExprContext *hashcontext;    /* econtexts for long-lived data (hashtable) */    ////长生命周期数据的ExprContexts(每一个GS使用)    ExprContext **aggcontexts;    /* econtexts for long-lived data (per GS) */    //输入表达式的ExprContext    ExprContext *tmpcontext;    /* econtext for input expressions */#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14    //当前活跃的aggcontext    ExprContext *curaggcontext; /* currently active aggcontext */    //当前活跃的aggregate(如存在)    AggStatePerAgg curperagg;    /* currently active aggregate, if any */#define FIELDNO_AGGSTATE_CURPERTRANS 16    //当前活跃的trans state    AggStatePerTrans curpertrans;    /* currently active trans state, if any */    //输入结束?    bool        input_done;        /* indicates end of input */    //Agg扫描结束?    bool        agg_done;        /* indicates completion of Agg scan */    //最后一个grouping set    int            projected_set;    /* The last projected grouping set */#define FIELDNO_AGGSTATE_CURRENT_SET 20    //将要解析的当前grouping set    int            current_set;    /* The current grouping set being evaluated */    //当前投影操作的分组列    Bitmapset  *grouped_cols;    /* grouped cols in current projection */    //倒序的分组列链表    List       *all_grouped_cols;    /* list of all grouped cols in DESC order */    /* These fields are for grouping set phase data */    //-------- 下面的列用于grouping set步骤数据    //所有步骤中最大的sets大小    int            maxsets;        /* The max number of sets in any phase */    //所有步骤的数组    AggStatePerPhase phases;    /* array of all phases */    //对于phases > 1,已排序的输入信息    Tuplesortstate *sort_in;    /* sorted input to phases > 1 */    //对于下一个步骤,输入已拷贝    Tuplesortstate *sort_out;    /* input is copied here for next phase */    //排序结果的slot    TupleTableSlot *sort_slot;    /* slot for sort results */    /* these fields are used in AGG_PLAIN and AGG_SORTED modes: */    //------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:    //per-group指针的grouping set编号数组    AggStatePerGroup *pergroups;    /* grouping set indexed array of per-group                                     * pointers */    //当前组的第一个元组拷贝    HeapTuple    grp_firstTuple; /* copy of first tuple of current group */    /* these fields are used in AGG_HASHED and AGG_MIXED modes: */    //--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:    //是否已填充hash表?    bool        table_filled;    /* hash table filled yet? */    //hash桶数?    int            num_hashes;    //相应的哈希表数据数组    AggStatePerHash perhash;    /* array of per-hashtable data */    //per-group指针的grouping set编号数组    AggStatePerGroup *hash_pergroup;    /* grouping set indexed array of                                         * per-group pointers */    /* support for evaluation of agg input expressions: */    //---------- agg输入表达式解析支持#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34    //首先是->pergroups,然后是hash_pergroup    AggStatePerGroup *all_pergroups;    /* array of first ->pergroups, than                                         * ->hash_pergroup */    //投影实现机制    ProjectionInfo *combinedproj;    /* projection machinery */} AggState;/* Primitive options supported by nodeAgg.c: *///nodeag .c支持的基本选项#define AGGSPLITOP_COMBINE        0x01    /* substitute combinefn for transfn */#define AGGSPLITOP_SKIPFINAL    0x02    /* skip finalfn, return state as-is */#define AGGSPLITOP_SERIALIZE    0x04    /* apply serializefn to output */#define AGGSPLITOP_DESERIALIZE    0x08    /* apply deserializefn to input *//* Supported operating modes (i.e., useful combinations of these options): *///支持的操作模式typedef enum AggSplit{    /* Basic, non-split aggregation: */    //基本 : 非split聚合    AGGSPLIT_SIMPLE = 0,    /* Initial phase of partial aggregation, with serialization: */    //部分聚合的初始步骤,序列化    AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,    /* Final phase of partial aggregation, with deserialization: */    //部分聚合的最终步骤,反序列化    AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE} AggSplit;/* Test whether an AggSplit value selects each primitive option: *///测试AggSplit选择了哪些基本选项#define DO_AGGSPLIT_COMBINE(as)        (((as) & AGGSPLITOP_COMBINE) != 0)#define DO_AGGSPLIT_SKIPFINAL(as)    (((as) & AGGSPLITOP_SKIPFINAL) != 0)#define DO_AGGSPLIT_SERIALIZE(as)    (((as) & AGGSPLITOP_SERIALIZE) != 0)#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)

二、源码解读

ExecAgg函数,首先获取AggState运行状态,然后根据各个阶段(aggstate->phase)的策略(aggstrategy)执行相应的逻辑.如使用Hash聚合,则只有一个节点,但有两个策略,首先是AGG_HASHED,该策略对输入元组按照分组列值进行Hash,同时执行转换函数计算中间结果值,缓存到哈希表中;然后执行AGG_MIXED策略,从Hash表中获取结果元组并返回结果元组(每一result为一个结果行).

/* * ExecAgg - * *      ExecAgg receives tuples from its outer subplan and aggregates over *      the appropriate attribute for each aggregate function use (Aggref *      node) appearing in the targetlist or qual of the node.  The number *      of tuples to aggregate over depends on whether grouped or plain *      aggregation is selected.  In grouped aggregation, we produce a result *      row for each group; in plain aggregation there's a single result row *      for the whole query.  In either case, the value of each aggregate is *      stored in the expression context to be used when ExecProject evaluates *      the result tuple. *       ExecAgg接收从outer子计划返回的元组合适的属性上为每一个聚合函数(出现在投影列或节点表达式)执行聚合. *    需要聚合的元组数量依赖于是否已分组或者选择普通聚合. *    在已分组的聚合操作宏,为每一个组产生结果行;普通聚合,整个查询只有一个结果行. *    不管哪种情况,每一个聚合结果值都会存储在表达式上下文中(ExecProject会解析结果元组) */static TupleTableSlot *ExecAgg(PlanState *pstate){    AggState   *node = castNode(AggState, pstate);    TupleTableSlot *result = NULL;    CHECK_FOR_INTERRUPTS();    if (!node->agg_done)    {        /* Dispatch based on strategy */        //基于策略进行分发        switch (node->phase->aggstrategy)        {            case AGG_HASHED:                if (!node->table_filled)                    agg_fill_hash_table(node);                /* FALLTHROUGH */                //填充后,执行MIXED            case AGG_MIXED:                result = agg_retrieve_hash_table(node);                break;            case AGG_PLAIN:            case AGG_SORTED:                result = agg_retrieve_direct(node);                break;        }        if (!TupIsNull(result))            return result;    }    return NULL;}

agg_fill_hash_table
读取输入并构建哈希表.
lookup_hash_entries函数根据输入元组构建分组列哈希表(搜索或新建条目),advance_aggregates调用转换函数计算中间结果并缓存.

/* * ExecAgg for hashed case: read input and build hash table * 读取输入并构建哈希表 */static voidagg_fill_hash_table(AggState *aggstate){    TupleTableSlot *outerslot;    ExprContext *tmpcontext = aggstate->tmpcontext;    /*     * Process each outer-plan tuple, and then fetch the next one, until we     * exhaust the outer plan.     * 处理每一个outer-plan返回的元组,然后继续提取下一个,直至完成所有元组的处理.     */    for (;;)    {        //--------- 循环直至完成所有元组的处理        //提取输入的元组        outerslot = fetch_input_tuple(aggstate);        if (TupIsNull(outerslot))            break;//已完成处理,退出循环        /* set up for lookup_hash_entries and advance_aggregates */        //配置lookup_hash_entries和advance_aggregates函数        //把元组放在临时内存上下文中        tmpcontext->ecxt_outertuple = outerslot;        /* Find or build hashtable entries */        //检索或构建哈希表条目        lookup_hash_entries(aggstate);        /* Advance the aggregates (or combine functions) */        //推动聚合(或组合函数)        advance_aggregates(aggstate);        /*         * Reset per-input-tuple context after each tuple, but note that the         * hash lookups do this too         * 重置per-input-tuple内存上下文,但需要注意hash检索也会做这个事情         */        ResetExprContext(aggstate->tmpcontext);    }    aggstate->table_filled = true;    /* Initialize to walk the first hash table */    //初始化用于遍历第一个哈希表    select_current_set(aggstate, 0, true);    ResetTupleHashIterator(aggstate->perhash[0].hashtable,                           &aggstate->perhash[0].hashiter);}

agg_retrieve_hash_table
agg_retrieve_hash_table函数在hash表中检索结果,执行投影等相关操作.

/* * ExecAgg for hashed case: retrieving groups from hash table * ExecAgg(Hash实现版本):在hash表中检索组 */static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate){    ExprContext *econtext;    AggStatePerAgg peragg;    AggStatePerGroup pergroup;    TupleHashEntryData *entry;    TupleTableSlot *firstSlot;    TupleTableSlot *result;    AggStatePerHash perhash;    /*     * get state info from node.     * 从node节点中获取状态信息.     *     * econtext is the per-output-tuple expression context.     * econtext是per-output-tuple表达式上下文.     */    econtext = aggstate->ss.ps.ps_ExprContext;    peragg = aggstate->peragg;    firstSlot = aggstate->ss.ss_ScanTupleSlot;    /*     * Note that perhash (and therefore anything accessed through it) can     * change inside the loop, as we change between grouping sets.     * 注意,在分组之间切换时,perhash在循环中可能会改变     */    perhash = &aggstate->perhash[aggstate->current_set];    /*     * We loop retrieving groups until we find one satisfying     * aggstate->ss.ps.qual     * 循环检索groups,直至检索到一个符合aggstate->ss.ps.qual条件的组.     */    while (!aggstate->agg_done)    {        //------------- 选好        //获取Slot        TupleTableSlot *hashslot = perhash->hashslot;        int            i;        //检查中断        CHECK_FOR_INTERRUPTS();        /*         * Find the next entry in the hash table         * 检索hash表的下一个条目         */        entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);        if (entry == NULL)        {            //条目为NULL,切换到下一个set            int            nextset = aggstate->current_set + 1;            if (nextset < aggstate->num_hashes)            {                /*                 * Switch to next grouping set, reinitialize, and restart the                 * loop.                 * 切换至下一个grouping set,重新初始化并重启循环                 */                select_current_set(aggstate, nextset, true);                perhash = &aggstate->perhash[aggstate->current_set];                ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);                continue;            }            else            {                /* No more hashtables, so done */                //已完成检索,设置标记,退出                aggstate->agg_done = true;                return NULL;            }        }        /*         * Clear the per-output-tuple context for each group         * 为每一个group清除per-output-tuple上下文         *         * We intentionally don't use ReScanExprContext here; if any aggs have         * registered shutdown callbacks, they mustn't be called yet, since we         * might not be done with that agg.         * 在这里不会用到ReScanExprContext,如果存在aggs注册了shutdown回调,         *   那应该还没有调用,因为我们可能还没有完成该agg的处理.         */        ResetExprContext(econtext);        /*         * Transform representative tuple back into one with the right         * columns.         * 将典型元组转回具有正确列的元组.         */        ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);        slot_getallattrs(hashslot);        //清理元组        //重置firstSlot        ExecClearTuple(firstSlot);        memset(firstSlot->tts_isnull, true,               firstSlot->tts_tupleDescriptor->natts * sizeof(bool));        for (i = 0; i < perhash->numhashGrpCols; i++)        {            //重置firstSlot            int            varNumber = perhash->hashGrpColIdxInput[i] - 1;            firstSlot->tts_values[varNumber] = hashslot->tts_values[i];            firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];        }        ExecStoreVirtualTuple(firstSlot);        pergroup = (AggStatePerGroup) entry->additional;        /*         * Use the representative input tuple for any references to         * non-aggregated input columns in the qual and tlist.         * 为qual和tlist中的非聚合输入列依赖使用典型输入元组         */        econtext->ecxt_outertuple = firstSlot;        //准备投影slot        prepare_projection_slot(aggstate,                                econtext->ecxt_outertuple,                                aggstate->current_set);        //最终的聚合操作        finalize_aggregates(aggstate, peragg, pergroup);        //投影        result = project_aggregates(aggstate);        if (result)            return result;    }    /* No more groups */    //没有更多的groups了,返回NULL    return NULL;}

"怎么使用PostgreSQL ExecAgg函数"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0