searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Greenplum查询源码之旅(2)

2024-08-23 09:39:36
6
0

查询执行源码解析

Master监听端口

入口函数为PostmasterMain().主要是监听端口以及初始化.

PostmasterMain

src/backend/main/main.c
    |--Main()
    |--|--PostmasterMain()
    {
        /\*
        \* CDB: gpdb auxilary process like fts probe, dtx recovery process is
        \* essential, we need to load them ahead of custom shared preload libraries
        \* to avoid exceeding max\_worker\_processes.
        \*/
        load\_auxiliary\_libraries();

        ......
        /\*
        \* Establish input sockets.
        \*/
        status = StreamServerPort(AF\_UNSPEC, NULL,
        (unsigned short) PostPortNumber,
        UnixSocketDir,
        ListenSocket, MAXLISTEN);
        .......

        /\*
        \* Set up shared memory and semaphores.
        \*/
        reset\_shared(PostPortNumber);

        ......

        /\* main idle loop of master \*/
        status = ServerLoop();
        .......

    }

需要关注一下函数load_auxiliary_libraries.这个函数完成了辅助进程的初始化。涉及到的数据结构为BackgroundWorker。从下面代码中也可以了解到Greenplum后台有哪些独有的辅助进程,以及各个辅助进程的入口函数,从而了解辅助进程的执行逻辑,此处不详述:

static BackgroundWorker PMAuxProcList[MaxPMAuxProc] =
    {
        {"ftsprobe process",
            BGWORKER\_SHMEM\_ACCESS | BGWORKER\_BACKEND\_DATABASE\_CONNECTION,
            BgWorkerStart\_DtxRecovering, /\* no need to wait dtx recovery \*/
            0, /\* restart immediately if ftsprobe exits with non-zero code \*/
            FtsProbeMain, {0}, {0}, 0, 0,
            FtsProbeStartRule},

        {"global deadlock detector process",
            BGWORKER\_SHMEM\_ACCESS | BGWORKER\_BACKEND\_DATABASE\_CONNECTION,
            BgWorkerStart\_RecoveryFinished,
            0, /\* restart immediately if gdd exits with non-zero code \*/
            GlobalDeadLockDetectorMain, {0}, {0}, 0, 0,
            GlobalDeadLockDetectorStartRule},

        {"dtx recovery process",
            BGWORKER\_SHMEM\_ACCESS | BGWORKER\_BACKEND\_DATABASE\_CONNECTION,
            BgWorkerStart\_DtxRecovering, /\* no need to wait dtx recovery \*/
            0, /\* restart immediately if dtx recovery process exits with non-zero code \*/
            DtxRecoveryMain, {0}, {0}, 0, 0,
            DtxRecoveryStartRule},

        {"stats sender process",
            BGWORKER\_SHMEM\_ACCESS,
            BgWorkerStart\_RecoveryFinished,
            0, /\* restart immediately if stats sender exits with non-zero code \*/
            SegmentInfoSenderMain, {0}, {0}, 0, 0,
            SegmentInfoSenderStartRule},

        {"sweeper process",
            BGWORKER\_SHMEM\_ACCESS,
            BgWorkerStart\_RecoveryFinished,
            0, /\* restart immediately if sweeper process exits with non-zero code \*/
            BackoffSweeperMain, {0}, {0}, 0, 0,
            BackoffSweeperStartRule},

        {"perfmon process",
            BGWORKER\_SHMEM\_ACCESS,
            BgWorkerStart\_RecoveryFinished,
            0, /\* restart immediately if perfmon process exits with non-zero code \*/
            PerfmonMain, {0}, {0}, 0, 0,
            PerfmonStartRule},

        #ifdef ENABLE\_IC\_PROXY
        {"ic proxy process",
            #ifdef FAULT\_INJECTOR
            BGWORKER\_SHMEM\_ACCESS,
            #else
            0,
            #endif
            BgWorkerStart\_RecoveryFinished,
            0, /\* restart immediately if ic proxy process exits with non-zero code \*/
            ICProxyMain, {0}, {0}, 0, 0,
            ICProxyStartRule},
        #endif  /\* ENABLE\_IC\_PROXY \*/
    };

master的主要逻辑在函数*ServerLoop()*里面.
当新的连接到来的时候,会启动一个新的子进程.
同时也会对辅助进程进行存在性检查,如果子进程不存在,会重新启动一个.逻辑如下代码所示:

ServerLoop函数

src/backend/postmaster/postmaster.c
    /\* Postmaster main entry point\*/
    |--PostmasterMain()
    |--|--ServerLoop() /\*Main idle loop of postmaster\*/
    {
        for (;;)
        {
            .........
            /\*
            \* New connection pending on any of our sockets? If so, fork a child
            \* process to deal with it.
            \*/

            for (i = 0; i < MAXLISTEN; i++)
            {
                ......

                Port       \*port;

                port = ConnCreate(ListenSocket[i]);
                if (port)
                {
                    BackendStartup(port);
                    ......
                }
            }

            /\* If we have lost the log collector, try to start a new one \*/
            SysLogger\_Start();

            /\*
            \* If no background writer process is running, and we are not in a
            \* state that prevents it, start one.  It doesn't matter if this
            \* fails, we'll just try again later.  Likewise for the checkpointer.
            \*/
            StartCheckpointer();
            StartBackgroundWriter();
            StartWalWriter();
            StartAutoVacLauncher();

            /\* If we have lost the stats collector, try to start a new one \*/
            pgstat\_start();

            /\* Get other worker processes running, if needed \*/
            maybe\_start\_bgworker();

        }
    }

启动Backend-QD进程

Greenplum基于postgresql,是进程模型. 当一个客户端连接进来的时候,master
fork子进程,然后调用BackendStartup,进行连接处理工作。这时候Fork出来的子进程通常称为Backend。

src/backend/postmaster/postmaster.c
    |--BackendStartup()
    {
        pid = fork\_process();
        if (pid == 0)
        {
            /\* Perform additional initialization and client authentication \*/
            BackendInitialize(port);
            /\* And run the backend \*/
            BackendRun(port);
            |--|--PostgresMain();
        }
    }

客户端发送查询到QD

Fork出来的Backend子进程,主要逻辑在函数PostgresMain中。ReadCommand()读取协议命令,然后调用exec_simple_query()进行处理。具体的命令可以参考PG协议:frontend/backend
protocol。我们主要跟踪Q命令的执行逻辑:

PostgresMain

src/backend/tcop/postgres.c
    /\* postgres main loop
    \* all backends, interactive or otherwise start here
    \*/
    |--PostgresMain()
    {
        for (;;)
        {
            /\*
            \* (3) read a command (loop blocks here)
            \*/
            firstchar = ReadCommand(&input\_message);

            /\*
            \* (6) process the command.  But ignore it if we're skipping till
            \* Sync.
            \*/
            switch (firstchar)
            {
                case 'Q':               /\* simple query \*/
                case 'M':               /\* Greenplum Database dispatched statement from QD \*/
                case 'T':               /\* Greenplum Database dispatched transaction protocol from QD \*/
                case 'F':               /\* fastpath function call \*/
                case 'X':               /\* terminate \*/
                case 'B':               /\* bind \*/
                case 'C':               /\* close \*/
                case 'D':               /\* describe \*/
                case 'E':               /\* execute \*/
                case 'H':               /\* flush \*/
                case 'P':               /\* parse \*/
                case 'S':               /\* sync \*/
                case 'd':               /\* copy data \*/
                case 'c':               /\* copy done \*/
                case 'f':               /\* copy fail \*/
            }
        }
    }

Greenplum在协议中增加了两个自定义命令,M命令主要用来反序列化plan信息,然后执行相应的查询计划,T主要和分布式事务相关:

case 'M':    /\* MPP dispatched stmt from QD \*/
    case 'T':    /\* MPP dispatched dtx protocol command from QD \*/

QD处理查询

QD的处理逻辑主要在函数exec_simple_query.
对接收到的SQL语句,Backend对其进行parse, analyze, rewrite, 以及plan,
然后进行plan的分发,最后把结果返回给客户端.

exec_simple_query

src/backend/tcop/postgres.c
    |--exec\_simple\_query()
    {
        /\*
        \* Start up a transaction command.
        \*/
        start\_xact\_command();

        /\*
        \* Do basic parsing of the query or queries
        \*/
        parsetree\_list = pg\_parse\_query(query\_string);

        /\*
        \* OK to analyze, rewrite, and plan this query.
        \*/
        querytree\_list = pg\_analyze\_and\_rewrite(parsetree, query\_string,NULL, 0);

        plantree\_list = pg\_plan\_queries(querytree\_list, NULL, false);

        /\*
        \* Create unnamed portal to run the query or queries in.
        \*/
        portal = CreatePortal("", true, true);

        /\*
        \* Start the portal.
        \*/
        PortalStart();

        /\*
        \* Now we can create the destination receiver object.
        \*/
        receiver = CreateDestReceiver(dest, portal);

        /\*
        \* Run the portal to completion, and then drop it (and the receiver).
        \*/
        (void) PortalRun();

        (\*receiver->rDestroy) (receiver);

        PortalDrop(portal, false);

        /\*
        \* Close down transaction statement, if one is open.
        \*/
        finish\_xact\_command();

    }

QD parses、analysis 以及 rewrite

Parse的逻辑在函数 parsetree_list = pg_parse_query(query_string);
主要是进行语法与语义解析,然后返回一个解析树.
解析树数据结果一个List,里面包含了各个节点。具体的可以参见gram.y。

List       \*raw\_parsetree\_list;

    /\*
    \* Do raw parsing (only).
    \*
    \* A list of parsetrees is returned, since there might be multiple
    \* commands in the given string.
    \*/
    pg\_parse\_query(const char \*query\_string)
    {
        raw\_parsetree\_list = raw\_parser(query\_string);

        return raw\_parsetree\_list;
    }

    /\*
    \* raw\_parser
    \*       Given a query in string form, do lexical and grammatical analysis.
    \*
    \* Returns a list of raw (un-analyzed) parse trees.
    \*/
    List \*
    raw\_parser(const char \*str)
    {
        int         yyresult;

        parsetree = NIL;            /\* in case grammar forgets to set it \*/
        have\_lookahead = false;

        scanner\_init(str);
        parser\_init();

        yyresult = base\_yyparse();

        scanner\_finish();

        if (yyresult)               /\* error \*/
        return NIL;

        return parsetree;
    }
Postgres

使用Flex/Bison解析SQL语句.针对Select语句,会返回一个SelectStmt,里面包含了Select各个部分的解析结果。

gpdb/src/backend/parser/scan.l
    ....
    whitespace      ({space}+|{comment})

    digit       [0-9]
    ident\_start     [A-Za-z\\200-\\377\_]
    ident\_cont      [A-Za-z\\200-\\377\_0-9\\\$]
    ....


    SELECT \*                 /\* target\_list \*/
    FROM user                /\* from\_clause \*/
    WHERE username= 'JOHN' ' /\* where\_clause \*/

    src/backend/parser/gram.y
    simple\_select:
    SELECT opt\_distinct target\_list
    into\_clause from\_clause where\_clause
    group\_clause having\_clause window\_clause
    {
        SelectStmt \*n = makeNode(SelectStmt);
        n->distinctClause = \$2;
        n->targetList = \$3;
        n->intoClause = \$4;
        n->fromClause = \$5;
        n->whereClause = \$6;
        n->groupClause = \$7;
        n->havingClause = \$8;
        n->windowClause = \$9;
        \$\$ = (Node \*)n;
    }

*SelectStmt*数据结构成员如下所示:

SelectStmt

src/include/nodes/ parsenodes.h
    typedef struct SelectStmt
    {
        List       \*distinctClause;
        IntoClause \*intoClause;     /\* target for SELECT INTO / CREATE TABLE AS \*/
        List       \*targetList;     /\* the target list (of ResTarget) \*/
        List       \*fromClause;     /\* the FROM clause \*/
        Node       \*whereClause;    /\* WHERE qualification \*/
        List       \*groupClause;    /\* GROUP BY clauses \*/
        Node       \*havingClause;   /\* HAVING conditional-expression \*/
        List       \*windowClause;   /\* window specification clauses \*/
        List       \*scatterClause;  /\* GPDB: TableValueExpr data distribution \*/
        WithClause \*withClause;     /\* with clause \*/

        ........

        /\* This field used by: SELECT INTO, CTAS \*/
        List       \*distributedBy;  /\* GPDB: columns to distribute the data on. \*/

    } SelectStmt;

Analyze and Rewrite

当查询parse完毕之后,系统会对其进行Analyze,会进行一下转换工作,比如把表名转换为内部使用的OID,\*的扩展等。

|--pg\_analyze\_and\_rewrite()
    {
        /\*(1) Perform parse analysis.\*/
        |--querytree\_list = parse\_analyze(parsetree, query\_string, paramTypes, numParams);
        |--|--transformTopLevelStmt(pstate, parseTree);
        |--|--|--query = transformStmt(pstate, parseTree);
        |--|--|--|--result = transformSelectStmt(pstate, n);

        /\* (2) Rewrite the queries, as necessary\*/
        |--querytree\_list = pg\_rewrite\_query(query);
        |--|--QueryRewrite(query)
        |--|--|-- /\*Apply all non-SELECT rules possibly getting 0 or many queries\*/
        |--|--|--querylist = RewriteQuery(parsetree, NIL);
        |--|--|--|-- product\_queries = fireRules(parsetree,
        |--|--|-- /\*Apply all the RIR rules on each query \*/
        |--|--|-- query = fireRIRrules(query, NIL, false);;

    }
\*的替换

在函数transformStmt()中,
\*会被替换为真实的列。可以留意函数里面的numnames分支情况.其中的2,3,4对应了不同的情况,详见注释:

src/backend/parser/analyze.c
    |--transformStmt
    |--|--transformSelectStmt(pstate, n);
    |--|--|--transformTargetList(pstate, stmt->targetList,
    |--|--|--|--ExpandColumnRefStar(pstate,
    |--|--|--|--|--refnameRangeTblEntry(pstate, nspname, relname,

    src\\backend\\parser\\parse\_target.c
    ExpandColumnRefStar:

    switch (numnames)
    {
        case 2: //relation.colname
        relname = strVal(linitial(fields));
        rte = refnameRangeTblEntry(pstate, nspname, relname,
        cref->location,
        &levels\_up);
        break;
        case 3: //db.relation.colname
        nspname = strVal(linitial(fields));
        relname = strVal(lsecond(fields));
        rte = refnameRangeTblEntry(pstate, nspname, relname,
        cref->location,
        &levels\_up);
        break;
        case 4: //nspname.db.relation.colname
        {
            char       \*catname = strVal(linitial(fields));

            /\*
            \* We check the catalog name and then ignore it.
            \*/
            if (strcmp(catname, get\_database\_name(MyDatabaseId)) != 0)
            {
                crserr = CRSERR\_WRONG\_DB;
                break;
            }
            nspname = strVal(lsecond(fields));
            relname = strVal(lthird(fields));
            rte = refnameRangeTblEntry(pstate, nspname, relname,
            cref->location,
            &levels\_up);
            break;
        }

        src\\backend\\parser\\parse\_relation.c

        RangeTblEntry \*
        refnameRangeTblEntry(ParseState \*pstate,
        const char \*schemaname,
        const char \*refname,
        int location,
        int \*sublevels\_up)
        {
            Oid         relId = InvalidOid;

            if (schemaname != NULL)
            {
                Oid         namespaceId;
                namespaceId = LookupNamespaceNoError(schemaname);
                if (!OidIsValid(namespaceId))
                return NULL;
                relId = get\_relname\_relid(refname, namespaceId);
                if (!OidIsValid(relId))
                return NULL;
            }

            while (pstate != NULL)
            {
                RangeTblEntry \*result;

                if (OidIsValid(relId))
                result = scanNameSpaceForRelid(pstate, relId, location);
                else
                result = scanNameSpaceForRefname(pstate, refname, location);

                if (result)
                return result;

                if (sublevels\_up)
                (\*sublevels\_up)++;
                else
                break;

                pstate = pstate->parentParseState;
            }
            return NULL;
        }
OID的查找
src/backend/parser/analyze.c
        |--transformStmt
        |--|--transformFromClause(pstate, stmt->fromClause);
        |--|--|--transformFromClauseItem(pstate,
        |--|--|--|--transformTableEntry(pstate, rangeVar);
        |--|--|--|--|--addRangeTableEntry(pstate, r, r->alias
        {
            RangeTblEntry \*rte = makeNode(RangeTblEntry);
            rel = parserOpenTable(pstate, relation, lockmode, NULL);
            rte->relid = RelationGetRelid(rel);
            heap\_close(rel, NoLock);
            ......
        }

数据结构:Query

/\*
        \* Query -
        \*     Parse analysis turns all statements into a Query tree
        \*     for further processing by the rewriter and planner.
        \*
        \*     Utility statements (i.e. non-optimizable statements) have the
        \*     utilityStmt field set, and the Query itself is mostly dummy.
        \*     DECLARE CURSOR is a special case: it is represented like a SELECT,
        \*     but the original DeclareCursorStmt is stored in utilityStmt.
        \*
        \*     Planning converts a Query tree into a Plan tree headed by a PlannedStmt
        \*     node --- the Query structure is not used by the executor.
        \*/
        typedef struct Query
        {
            NodeTag     type;
            CmdType     commandType;    /\* select|insert|update|delete|utility \*/
            QuerySource querySource;    /\* where did I come from? \*/
            uint32      queryId;        /\* query identifier (can be set by plugins) \*/
            bool        canSetTag;      /\* do I set the command result tag? \*/
            Node       \*utilityStmt;    /\* non-null if this is DECLARE CURSOR or a
            \* non-optimizable statement \*/
            int         resultRelation; /\* rtable index of target relation for
            \* INSERT/UPDATE/DELETE; 0 for SELECT \*/
            bool        hasAggs;        /\* has aggregates in tlist or havingQual \*/
            bool        hasWindowFuncs; /\* has window functions in tlist \*/
            bool        hasSubLinks;    /\* has subquery SubLink \*/
            bool        hasDynamicFunctions; /\* has functions with unstable return types \*/
            bool        hasFuncsWithExecRestrictions; /\* has functions with EXECUTE ON MASTER or ALL SEGMENTS \*/
            bool        hasDistinctOn;  /\* distinctClause is from DISTINCT ON \*/
            bool        hasRecursive;   /\* WITH RECURSIVE was specified \*/
            bool        hasModifyingCTE;    /\* has INSERT/UPDATE/DELETE in WITH \*/
            bool        hasForUpdate;   /\* FOR [KEY] UPDATE/SHARE was specified \*/
            List       \*cteList;        /\* WITH list (of CommonTableExpr's) \*/
            List       \*rtable;         /\* list of range table entries \*/
            FromExpr   \*jointree;       /\* table join tree (FROM and WHERE clauses) \*/
            List       \*targetList;     /\* target list (of TargetEntry) \*/
            List       \*withCheckOptions;       /\* a list of WithCheckOption's \*/
            List       \*returningList;  /\* return-values list (of TargetEntry) \*/

            /\*
            \* A list of GroupClauses or GroupingClauses.  The order of GroupClauses
            \* or GroupingClauses are based on input queries. However, in each
            \* grouping set, all GroupClauses will appear in front of GroupingClauses.
            \* See the following GROUP BY clause:
            \*
            \* GROUP BY ROLLUP(b,c),a, CUBE(e,d)
            \*
            \* the result list can be roughly represented as follows.
            \*
            \* GroupClause(a) --> GroupingClause( ROLLUP, groupsets (GroupClause(b)
            \* --> GroupClause(c) ) ) --> GroupingClause( CUBE, groupsets
            \* (GroupClause(e) --> GroupClause(d) ) )
            \*/
            List       \*groupClause;    /\* a list of SortGroupClause's \*/

            Node       \*havingQual;     /\* qualifications applied to groups \*/

            List       \*windowClause;   /\* defined window specifications \*/

            List       \*distinctClause; /\* a list of SortGroupClause's \*/

            List       \*sortClause;     /\* a list of SortGroupClause's \*/

            List       \*scatterClause;  /\* a list of tle's \*/
            bool        isTableValueSelect; /\* GPDB: Is this a TABLE (...) subquery argument? \*/

            Node       \*limitOffset;    /\* # of result tuples to skip (int8 expr) \*/
            Node       \*limitCount;     /\* # of result tuples to return (int8 expr) \*/

            List       \*rowMarks;       /\* a list of RowMarkClause's \*/

            Node       \*setOperations;  /\* set-operation tree if this is top level of
            \* a UNION/INTERSECT/EXCEPT query \*/
            List       \*constraintDeps; /\* a list of pg\_constraint OIDs that the query
            \* depends on to be semantically valid \*/

            /\*
            \* MPP: Used only on QD. Don't serialize. Holds the result distribution
            \* policy for SELECT ... INTO and set operations.
            \*/
            struct GpPolicy \*intoPolicy;

            /\*
            \* GPDB: Used to indicate this query is part of CTAS or COPY so that its plan
            \* would always be dispatched in parallel.
            \*/
            ParentStmtType  parentStmtType;
            bool        expandMatViews; /\* force expansion of materialized views during rewrite to treat as views \*/
        } Query;

QD产生优化后的查询计划

optimizer

|--src\\backend\\tcop\\postgres.c
        |--plantree\_list = pg\_plan\_queries(querytree\_list, 0, NULL);
        |--|--pg\_plan\_query(query, cursorOptions, boundParams);
        |--|--|--plan = planner(querytree, cursorOptions, boundParams);
        |--|--|--|--result = standard\_planner(parse, cursorOptions, boundParams);
        |--|--|--|--|--src\\backend\\optimizer\\plan\\planner.c
        |--|--|--|--|--result = optimize\_query(parse, boundParams); //
        |--|--|--|--|--src\\backend\\optimizer\\plan\\orca.c
        |--|--|--|--|--|--result = GPOPTOptimizedPlan(pqueryCopy, &fUnexpectedFailure);

Planner

|--src\\backend\\tcop\\postgres.c
|--plantree\_list = pg\_plan\_queries(querytree\_list, 0, NULL);
|--|--pg\_plan\_query(query, cursorOptions, boundParams);
|--|--|--plan = planner(querytree, cursorOptions, boundParams);
|--|--|--|--result = standard\_planner(parse, cursorOptions, boundParams);
|--|--|--|--|--src\\backend\\optimizer\\plan\\planner.c
|--|--|--|--|--top\_plan = subquery\_planner(glob, parse, NULL,
|--|--|--|--|--plan = grouping\_planner(root, tuple\_fraction);
|--|--|--|--|--|--final\_rel = query\_planner(root, sub\_tlist,

|--grouping\_planner()
{
    /\*
    \* Generate the best unsorted and presorted paths for this Query (but
    \* note there may not be any presorted paths).  We also generate (in
    \* standard\_qp\_callback) pathkey representations of the query's sort
    \* clause, distinct clause, etc.
    \*/
    final\_rel = query\_planner(root, sub\_tlist,standard\_qp\_callback, &qp\_extra);


    |--query\_planner()
    |--|--make\_one\_rel
    /\*Finds all possible access paths for executing a query\*/
    {
        /\* find seqscan and all index paths for each base relation \*/
        set\_base\_rel\_pathlists(root);

        /\*
        \* Generate access paths for the entire join tree.
        \*/
        rel = make\_rel\_from\_joinlist(root, joinlist);
    }

    if (parse->groupClause){
        ......
    }
    else if (parse->distinctClause)
    {
        ......
    }

    cost\_sort(&sort\_path, root, root->query\_pathkeys, cheapest\_path->total\_cost,
    path\_rows, path\_width,
    0.0, work\_mem, root->limit\_tuples);

    choose\_hashed\_grouping(root,
    tuple\_fraction, limit\_tuples, ......
    choose\_hashed\_distinct(root,

    if (Gp\_role == GP\_ROLE\_DISPATCH && result\_plan == NULL)
    {
        result\_plan = cdb\_grouping\_planner(root,....
    }

    ......

    if (Gp\_role == GP\_ROLE\_DISPATCH && CdbPathLocus\_IsPartitioned(current\_locus))
    {
        bool        needMotion;

        needMotion = !cdbpathlocus\_collocates\_pathkeys(root, current\_locus,
        distinct\_dist\_pathkeys, false /\* exact\_match \*/ );
    }

    /\* reate a plan according to query\_planner's \* results.\*/
    result\_plan = create\_plan(root, best\_path);
    /\* Insert AGG or GROUP node if needed \*/
    make\_agg()
    /\*If there is a DISTINCT clause, add the UNIQUE node.\*/
    make\_unique()
    /\*Finally, if there is a LIMIT/OFFSET clause, add the LIMIT node.\*/
    make\_limit()
}

Relation Access Paths

set_base_rel_pathlists()
对每个基础表找出具体的扫描路径,比如seqscan或者index scan或者bitmap
scans等.

|--set\_base\_rel\_pathlists()
{
    for (rti = 1; rti < root->simple\_rel\_array\_size; rti++)
    {
        |--set\_rel\_pathlist(root, rel, rti);
        |--|--set\_plain\_rel\_pathlist(root, rel, rte);
        {
            /\* Consider sequential scan. \*/
            switch (rel->relstorage)
            {
                case RELSTORAGE\_EXTERNAL:
                case RELSTORAGE\_AOROWS:
                case RELSTORAGE\_AOCOLS:
                case RELSTORAGE\_HEAP:
                seqpath = create\_seqscan\_path(root, rel);
            }

            if (plain\_allow\_indexpath\_under\_subplan(root, rel))
            {
                /\* Consider index and bitmap scans \*/
                create\_index\_paths(root, rel);

                if (rel->relstorage == RELSTORAGE\_HEAP)
                create\_tidscan\_paths(root, rel);
            }
            ......
            /\* Now find the cheapest of the paths for this rel \*/
            set\_cheapest(root, rel);
        }
    }
}

Seqscan Path

生成T_SeqScan类型的Path Node, 同时计算顺序扫描话费的代价
*cost_seqscan(pathnode, root, rel)*

|--create\_seqscan\_path(PlannerInfo \*root, RelOptInfo \*rel, Relids required\_outer)
{
    Path       \*pathnode = makeNode(Path);

    pathnode->pathtype = T\_SeqScan;

    pathnode->locus = cdbpathlocus\_from\_baserel(root, rel);

    cost\_seqscan(pathnode, root, rel);

    return pathnode;
}

Join处理

|--make\_rel\_from\_joinlist()

|--|--make\_rel\_from\_joinlist
|--|--|--standard\_join\_search
{
    /\*
    \* We employ a simple "dynamic programming" algorithm: we first find all
    \* ways to build joins of two jointree items, then all ways to build joins
    \* of three items (from two-item joins and single items), then four-item
    \* joins, and so on until we have considered all ways to join all the
    \* items into one rel.
    \*
    \* root->join\_rel\_level[j] is a list of all the j-item rels.  Initially we
    \* set root->join\_rel\_level[1] to represent all the single-jointree-item
    \* relations.
    \*/
    for (lev = 2; lev <= levels\_needed; lev++)
    {
        |--join\_search\_one\_level
        {
            /\*
            \* First, consider left-sided and right-sided plans, in which rels of
            \* exactly level-1 member relations are joined against initial relations.
            \* We prefer to join using join clauses, but if we find a rel of level-1
            \* members that has no join clauses, we will generate Cartesian-product
            \* joins against all initial rels not already contained in it.
            \*/
            |--make\_rels\_by\_clause\_joins
            |--|--make\_join\_rel
            switch (sjinfo->jointype)
            {
                case JOIN\_INNER:
                add\_paths\_to\_joinrel
                case JOIN\_LEFT:
                case JOIN\_FULL:
                case JOIN\_SEMI:
                case JOIN\_ANTI:
                case JOIN\_LASJ\_NOTIN:
            }
            |--|--add\_paths\_to\_joinrel
            {
                /\*
                \* Find potential mergejoin clauses.  We can skip this if we are not
                \* interested in doing a mergejoin.  However, mergejoin may be our only
                \* way of implementing a full outer join, so override enable\_mergejoin if
                \* it's a full join.
                \*
                \* CDB: Always build mergeclause\_list.  We need it for motion planning.
                \*/
                redistribution\_clauses = select\_cdb\_redistribute\_clauses(root,
                /\*
                \* 1. Consider mergejoin paths where both relations must be explicitly
                \* sorted.  Skip this if we can't mergejoin.
                \*/
                mergeclause\_list = select\_mergejoin\_clauses(root,
                sort\_inner\_and\_outer(root, joinrel, outerrel, innerrel,

                |--sort\_inner\_and\_outer();
                |--|--try\_mergejoin\_path()
                |--|--|--create\_mergejoin\_path
                |--|--|--|--cdbpath\_motion\_for\_join
                |--|--|--|--turn\_volatile\_seggen\_to\_singleqe
                |--|--|--|--|--cdbpath\_create\_motion\_path
                {
                    /\* Create CdbMotionPath node. \*/
                    pathnode = makeNode(CdbMotionPath);
                    pathnode->path.pathtype = T\_Motion;
                    pathnode->path.parent = subpath->parent;
                    pathnode->path.locus = locus;
                    pathnode->path.rows = subpath->rows;
                    pathnode->path.pathkeys = pathkeys;
                    pathnode->subpath = subpath;
                }


                /\*
                \* 2. Consider paths where the outer relation need not be explicitly
                \* sorted. This includes both nestloops and mergejoins where the outer
                \* path is already ordered.  Again, skip this if we can't mergejoin.
                \* (That's okay because we know that nestloop can't handle right/full
                \* joins at all, so it wouldn't work in the prohibited cases either.)
                \*/
                match\_unsorted\_outer(root, joinrel, outerrel, innerrel,

                /\*
                \* 4. Consider paths where both outer and inner relations must be hashed
                \* before being joined.  As above, disregard enable\_hashjoin for full
                \* joins, because there may be no other alternative.
                \*
                \* We consider both the cheapest-total-cost and cheapest-startup-cost
                \* outer paths.  There's no need to consider any but the
                \* cheapest-total-cost inner path, however.
                \*/
                hash\_inner\_and\_outer(root, joinrel, outerrel, innerrel,
            }

            /\* Now, consider "bushy plans" \*/
        }
    }

create_mergejoin_path

在merge join阶段,会生成需要的Motion.
\*     Creates a pathnode corresponding to a mergejoin join between
\*     two relations

|--create\_mergejoin\_path()
{
    |--cdbpath\_motion\_for\_join()
    {
        /\*
        \* Add motion nodes above subpaths and decide where to join.
        \*/
        |--cdbpath\_create\_motion\_path()
        |--|--cdbpath\_cost\_motion(root, pathnode);

        pathnode = makeNode(MergePath);

        cost\_mergejoin(pathnode, root);
    }
}

/\*
\* cdbpath\_cost\_motion
\*    Fills in the cost estimate fields in a MotionPath node.
\*/
void
cdbpath\_cost\_motion(PlannerInfo \*root, CdbMotionPath \*motionpath)
{

    cost\_per\_row = (gp\_motion\_cost\_per\_row > 0.0)
    ? gp\_motion\_cost\_per\_row
    : 2.0 \* cpu\_tuple\_cost;
    sendrows = cdbpath\_rows(root, subpath);
    recvrows = cdbpath\_rows(root, (Path \*)motionpath);
    motioncost = cost\_per\_row \* 0.5 \* (sendrows + recvrows);

    motionpath->path.total\_cost = motioncost + subpath->total\_cost;
    motionpath->path.startup\_cost = subpath->startup\_cost;
    motionpath->path.memory = subpath->memory;
}

QD准备执行plan

Gangs, QEs and interconnects)
初始化逻辑:

注意cdbdisp_dispatchX实现了Plan的序列化逻辑。

|--PortalStart
|--|--ExecutorStart(queryDesc, eflags);
|--|--|--standard_ExecutorStart
{
    estate = CreateExecutorState();

    /*
    * Assign a Motion Node to every Plan Node. This makes it
    * easy to identify which slice any Node belongs to
    */
    AssignParentMotionToPlanNodes(queryDesc->plannedstmt);

    /*
    * Initialize the plan state tree
    */
    InitPlan(queryDesc, eflags);

    if (shouldDispatch){
        ....
        CdbDispatchPlan(queryDesc, needDtx, true);
        |--cdbdisp_dispatchX
        {....
            ds = cdbdisp_makeDispatcherState(queryDesc->extended_query);

            /*
            * Since we intend to execute the plan, inventory the slice tree,
            * allocate gangs, and associate them with slices.
            *
            * On return, gangs have been allocated and CDBProcess lists have
            * been filled in the slice table.)
            *
            * Notice: This must be done before cdbdisp_buildPlanQueryParms
            */
            AssignGangs(ds, queryDesc);
        }

        /*
        * if in dispatch mode, time to serialize plan and query
        * trees, and fire off cdb_exec command to each of the qexecs
        */
        |--CdbDispatchPlan();
        |--|--cdbdisp_dispatchX()
        pQueryParms = cdbdisp_buildPlanQueryParms(queryDesc, planRequiresTxn);
        queryText = buildGpQueryString(pQueryParms, &queryTextLength);

        |--|--|--buildGpQueryString
        total_query_len = 1 /* 'M' */ +
            sizeof(len) /* message length */ +
            sizeof(gp_command_count) +
            sizeof(sessionUserId) /* sessionUserIsSuper */ +
            sizeof(outerUserId) /* outerUserIsSuper */ +
            sizeof(currentUserId) +
            sizeof(n32) * 2 /* currentStatementStartTimestamp */ +
            sizeof(command_len) +
            sizeof(querytree_len) +
            sizeof(plantree_len) +
            sizeof(params_len) +
            sizeof(sddesc_len) +
            sizeof(dtxContextInfo_len) +
            dtxContextInfo_len +
            command_len +
            querytree_len +
            plantree_len +
            params_len +
            sddesc_len +
            sizeof(numsegments) +
            sizeof(resgroupInfo.len) +
            resgroupInfo.len +
            sizeof(tempNamespaceId) +
            sizeof(tempToastNamespaceId) +
            0;

        //plan serialization
        pos = shared_query;
        *pos++ = 'M';

        pos += 4;                   /* placeholder for message length */

        tmp = htonl(gp_command_count);
        memcpy(pos, &tmp, sizeof(gp_command_count));
        pos += sizeof(gp_command_count);

        tmp = htonl(sessionUserId);
        memcpy(pos, &tmp, sizeof(sessionUserId));
        pos += sizeof(sessionUserId);

        tmp = htonl(outerUserId);
        memcpy(pos, &tmp, sizeof(outerUserId));
        pos += sizeof(outerUserId);

        tmp = htonl(currentUserId);
        memcpy(pos, &tmp, sizeof(currentUserId));
        pos += sizeof(currentUserId);

        /*
        * High order half first, since we're doing MSB-first
        */
        n32 = (uint32) (currentStatementStartTimestamp >> 32);
        n32 = htonl(n32);
        memcpy(pos, &n32, sizeof(n32));
        pos += sizeof(n32);

        /*
        * Now the low order half
        */
        n32 = (uint32) currentStatementStartTimestamp;
        n32 = htonl(n32);
        memcpy(pos, &n32, sizeof(n32));
        pos += sizeof(n32);

        tmp = htonl(command_len);
        memcpy(pos, &tmp, sizeof(command_len));
        pos += sizeof(command_len);

        tmp = htonl(querytree_len);
        memcpy(pos, &tmp, sizeof(querytree_len));
        pos += sizeof(querytree_len);

        tmp = htonl(plantree_len);
        memcpy(pos, &tmp, sizeof(plantree_len));
        pos += sizeof(plantree_len);

        tmp = htonl(params_len);
        memcpy(pos, &tmp, sizeof(params_len));
        pos += sizeof(params_len);

        tmp = htonl(sddesc_len);
        memcpy(pos, &tmp, sizeof(tmp));
        pos += sizeof(tmp);

        tmp = htonl(dtxContextInfo_len);
        memcpy(pos, &tmp, sizeof(tmp));
        pos += sizeof(tmp);

        if (dtxContextInfo_len > 0)
        {
            memcpy(pos, dtxContextInfo, dtxContextInfo_len);
            pos += dtxContextInfo_len;
        }

        memcpy(pos, command, command_len);
        /* If command is truncated we need to set the terminating '\0' manually */
        pos[command_len - 1] = '\0';
        pos += command_len;

        if (querytree_len > 0)
        {
            memcpy(pos, querytree, querytree_len);
            pos += querytree_len;
        }

        if (plantree_len > 0)
        {
            memcpy(pos, plantree, plantree_len);
            pos += plantree_len;
        }

        if (params_len > 0)
        {
            memcpy(pos, params, params_len);
            pos += params_len;
        }

        if (sddesc_len > 0)
        {
            memcpy(pos, sddesc, sddesc_len);
            pos += sddesc_len;
        }

        tmp = htonl(numsegments);
        memcpy(pos, &tmp, sizeof(numsegments));
        pos += sizeof(numsegments);

        tmp = htonl(resgroupInfo.len);
        memcpy(pos, &tmp, sizeof(resgroupInfo.len));
        pos += sizeof(resgroupInfo.len);

        if (resgroupInfo.len > 0)
        {
            memcpy(pos, resgroupInfo.data, resgroupInfo.len);
            pos += resgroupInfo.len;
        }

        /* pass process local variables to QEs */
        GetTempNamespaceState(&tempNamespaceId, &tempToastNamespaceId);
        tempNamespaceId = htonl(tempNamespaceId);
        tempToastNamespaceId = htonl(tempToastNamespaceId);

        memcpy(pos, &tempNamespaceId, sizeof(tempNamespaceId));
        pos += sizeof(tempNamespaceId);
        memcpy(pos, &tempToastNamespaceId, sizeof(tempToastNamespaceId));
        pos += sizeof(tempToastNamespaceId);

        /*
        * fill in length placeholder
        */
        len = pos - shared_query - 1;
        tmp = htonl(len);
        memcpy(shared_query + 1, &tmp, sizeof(len));

        Assert(len + 1 == total_query_len);

        if (finalLen)
        *finalLen = len + 1;

        |--|--|--cdbdisp_dispatchToGang();
        |--|--|--|--cdbdisp_dispatchToGang_async

        for (i = 0; i < gp->size; i++)
        {
            dispatchCommand(qeResult, pParms->query_text, pParms->query_text_len);
        }
        |--|--|--|--|--dispatchCommand
        |--|--|--|--|--|--PQsendGpQuery_shared
    }
}

DispatcherInternalFuncs DispatcherAsyncFuncs =
{
    cdbdisp_checkForCancel_async,
    cdbdisp_getWaitSocketFd_async,
    cdbdisp_makeDispatchParams_async,
    cdbdisp_checkAckMessage_async,
    cdbdisp_checkDispatchResult_async,
    cdbdisp_dispatchToGang_async,
    cdbdisp_waitDispatchFinish_async
};
Gang的分配逻辑:
/*
 * Function AssignGangs runs on the QD and finishes construction of the
 * global slice table for a plan by assigning gangs allocated by the
 * executor factory to the slices of the slice table.
 *
 * On entry, the slice table (at queryDesc->estate->es_sliceTable) has
 * the correct structure (established by InitSliceTable) and has correct
 * gang types (established by function FillSliceTable).
 *
 * Gang assignment involves taking an inventory of the requirements of
 * each slice tree in the slice table, asking the executor factory to
 * allocate a minimal set of gangs that can satisfy any of the slice trees,
 * and associating the allocated gangs with slices in the slice table.
 *
 * On successful exit, the CDBProcess lists (primaryProcesses, mirrorProcesses)
 * and the Gang pointers (primaryGang, mirrorGang) are set correctly in each
 * slice in the slice table.
 */
 |--AssignGangs(queryDesc, gp_singleton_segindex);
 |--|--AssignWriterGangFirst
 |--|--|--AllocateGang()
 |--|--|--|--cdbgang_createGang()
 |--|--|--|--cdbgang_createGang_async
 |--|--|--|--|--PQconnectPoll
 void
 AssignGangs(QueryDesc *queryDesc, int utility_segment_index)
 {
     /*
     * Get the gangs we'll use.
     *
     * As a general rule the first gang is a writer and the rest are readers.
     * If this happens to be an extended query protocol then all gangs are readers.
     */
     if (inv.numNgangs > 0)
     {
         inv.vecNgangs = (Gang **) palloc(sizeof(Gang *) * inv.numNgangs);
         for (i = 0; i < inv.numNgangs; i++)
         {
             if (i == 0 && !queryDesc->extended_query)
             {
                 inv.vecNgangs[i] = allocateWriterGang();
             }
             else
             {
                 inv.vecNgangs[i] = allocateGang(GANGTYPE_PRIMARY_READER, getgpsegmentCount(), 0,);
             }
         }
     }
 }

 segments in Gang:
 |--standard_ExecutorStart
 //only master does dispatch
 if (Gp_role == GP_ROLE_DISPATCH &&
 (queryDesc->plannedstmt->planTree->dispatch == DISPATCH_PARALLEL ||
 queryDesc->plannedstmt->nMotionNodes > 0))
 {
     //nMotionNodes get initialized when orca optimizer is called.
     /* Set up blank slice table to be filled in during InitPlan. */
     //Here we get the SliceTable
     InitSliceTable(estate, queryDesc->plannedstmt->nMotionNodes, queryDesc->plannedstmt->nInitPlans);
     |--|--InitSliceTable
     {
         n = 1 + nMotions + nSubplans;

         table = makeNode(SliceTable);
         table->nMotions = nMotions;
         table->nInitPlans = nSubplans;
         table->slices = NIL;
         table->instrument_options = INSTRUMENT_NONE;

         for (i = 0; i < n; i++)
         {
             slice = makeNode(Slice);

             slice->sliceIndex = i;
             slice->rootIndex = (i > 0 && i <= nMotions) ? -1 : i;
             slice->gangType = GANGTYPE_UNALLOCATED;
             slice->gangSize = 0;
             slice->segments = NIL;
             slice->directDispatch.isDirectDispatch = false;
             slice->directDispatch.contentIds = NIL;
             slice->primaryGang = NULL;
             slice->parentIndex = -1;
             slice->children = NIL;
             slice->primaryProcesses = NIL;

             table->slices = lappend(table->slices, slice);
         }

         estate->es_sliceTable = table;
     }
 }

 //set up interconnect
 if (queryDesc->plannedstmt->nMotionNodes > 0)
 {
     SetupInterconnect(estate);
     UpdateMotionExpectedReceivers(estate->motionlayer_context, estate->es_sliceTable);
 }

 /*
 * Initialize the slice table.
 */
 if (Gp_role == GP_ROLE_DISPATCH)
 {
     FillSliceTable(estate, plannedstmt);
     |--FillSliceTable_walker
     |--|--FillSliceGangInfo
     {
         switch (slice->gangType)
         {
             case GANGTYPE_UNALLOCATED:
             case GANGTYPE_PRIMARY_WRITER:
             case GANGTYPE_PRIMARY_READER:
             int i;
             slice->gangSize = numsegments;
             slice->segments = NIL;
             for (i = 0; i < numsegments; i++)
             slice->segments = lappend_int(slice->segments, i % getgpsegmentCount());

             case GANGTYPE_ENTRYDB_READER:
             slice->gangSize = 1;
             slice->segments = list_make1_int(-1);
             break;
             case GANGTYPE_SINGLETON_READER:
             slice->gangSize = 1;
             slice->segments = list_make1_int(gp_session_id % numsegments);
         }
     }
 }

QE接收Plan,反序列化并执行

|--PostgresMain
{
    for (;;)
    {
        switch (firstchar)
        {
            case 'M':           /* MPP dispatched stmt from QD */
            {
                exec_mpp_query();
            }
            .......
        }
    }
}

接下来的章节会针对源码进行梳理,敬请关注。

未完待续

0条评论
作者已关闭评论
张****生
2文章数
0粉丝数
张****生
2 文章 | 0 粉丝
张****生
2文章数
0粉丝数
张****生
2 文章 | 0 粉丝
原创

Greenplum查询源码之旅(2)

2024-08-23 09:39:36
6
0

查询执行源码解析

Master监听端口

入口函数为PostmasterMain().主要是监听端口以及初始化.

PostmasterMain

src/backend/main/main.c
    |--Main()
    |--|--PostmasterMain()
    {
        /\*
        \* CDB: gpdb auxilary process like fts probe, dtx recovery process is
        \* essential, we need to load them ahead of custom shared preload libraries
        \* to avoid exceeding max\_worker\_processes.
        \*/
        load\_auxiliary\_libraries();

        ......
        /\*
        \* Establish input sockets.
        \*/
        status = StreamServerPort(AF\_UNSPEC, NULL,
        (unsigned short) PostPortNumber,
        UnixSocketDir,
        ListenSocket, MAXLISTEN);
        .......

        /\*
        \* Set up shared memory and semaphores.
        \*/
        reset\_shared(PostPortNumber);

        ......

        /\* main idle loop of master \*/
        status = ServerLoop();
        .......

    }

需要关注一下函数load_auxiliary_libraries.这个函数完成了辅助进程的初始化。涉及到的数据结构为BackgroundWorker。从下面代码中也可以了解到Greenplum后台有哪些独有的辅助进程,以及各个辅助进程的入口函数,从而了解辅助进程的执行逻辑,此处不详述:

static BackgroundWorker PMAuxProcList[MaxPMAuxProc] =
    {
        {"ftsprobe process",
            BGWORKER\_SHMEM\_ACCESS | BGWORKER\_BACKEND\_DATABASE\_CONNECTION,
            BgWorkerStart\_DtxRecovering, /\* no need to wait dtx recovery \*/
            0, /\* restart immediately if ftsprobe exits with non-zero code \*/
            FtsProbeMain, {0}, {0}, 0, 0,
            FtsProbeStartRule},

        {"global deadlock detector process",
            BGWORKER\_SHMEM\_ACCESS | BGWORKER\_BACKEND\_DATABASE\_CONNECTION,
            BgWorkerStart\_RecoveryFinished,
            0, /\* restart immediately if gdd exits with non-zero code \*/
            GlobalDeadLockDetectorMain, {0}, {0}, 0, 0,
            GlobalDeadLockDetectorStartRule},

        {"dtx recovery process",
            BGWORKER\_SHMEM\_ACCESS | BGWORKER\_BACKEND\_DATABASE\_CONNECTION,
            BgWorkerStart\_DtxRecovering, /\* no need to wait dtx recovery \*/
            0, /\* restart immediately if dtx recovery process exits with non-zero code \*/
            DtxRecoveryMain, {0}, {0}, 0, 0,
            DtxRecoveryStartRule},

        {"stats sender process",
            BGWORKER\_SHMEM\_ACCESS,
            BgWorkerStart\_RecoveryFinished,
            0, /\* restart immediately if stats sender exits with non-zero code \*/
            SegmentInfoSenderMain, {0}, {0}, 0, 0,
            SegmentInfoSenderStartRule},

        {"sweeper process",
            BGWORKER\_SHMEM\_ACCESS,
            BgWorkerStart\_RecoveryFinished,
            0, /\* restart immediately if sweeper process exits with non-zero code \*/
            BackoffSweeperMain, {0}, {0}, 0, 0,
            BackoffSweeperStartRule},

        {"perfmon process",
            BGWORKER\_SHMEM\_ACCESS,
            BgWorkerStart\_RecoveryFinished,
            0, /\* restart immediately if perfmon process exits with non-zero code \*/
            PerfmonMain, {0}, {0}, 0, 0,
            PerfmonStartRule},

        #ifdef ENABLE\_IC\_PROXY
        {"ic proxy process",
            #ifdef FAULT\_INJECTOR
            BGWORKER\_SHMEM\_ACCESS,
            #else
            0,
            #endif
            BgWorkerStart\_RecoveryFinished,
            0, /\* restart immediately if ic proxy process exits with non-zero code \*/
            ICProxyMain, {0}, {0}, 0, 0,
            ICProxyStartRule},
        #endif  /\* ENABLE\_IC\_PROXY \*/
    };

master的主要逻辑在函数*ServerLoop()*里面.
当新的连接到来的时候,会启动一个新的子进程.
同时也会对辅助进程进行存在性检查,如果子进程不存在,会重新启动一个.逻辑如下代码所示:

ServerLoop函数

src/backend/postmaster/postmaster.c
    /\* Postmaster main entry point\*/
    |--PostmasterMain()
    |--|--ServerLoop() /\*Main idle loop of postmaster\*/
    {
        for (;;)
        {
            .........
            /\*
            \* New connection pending on any of our sockets? If so, fork a child
            \* process to deal with it.
            \*/

            for (i = 0; i < MAXLISTEN; i++)
            {
                ......

                Port       \*port;

                port = ConnCreate(ListenSocket[i]);
                if (port)
                {
                    BackendStartup(port);
                    ......
                }
            }

            /\* If we have lost the log collector, try to start a new one \*/
            SysLogger\_Start();

            /\*
            \* If no background writer process is running, and we are not in a
            \* state that prevents it, start one.  It doesn't matter if this
            \* fails, we'll just try again later.  Likewise for the checkpointer.
            \*/
            StartCheckpointer();
            StartBackgroundWriter();
            StartWalWriter();
            StartAutoVacLauncher();

            /\* If we have lost the stats collector, try to start a new one \*/
            pgstat\_start();

            /\* Get other worker processes running, if needed \*/
            maybe\_start\_bgworker();

        }
    }

启动Backend-QD进程

Greenplum基于postgresql,是进程模型. 当一个客户端连接进来的时候,master
fork子进程,然后调用BackendStartup,进行连接处理工作。这时候Fork出来的子进程通常称为Backend。

src/backend/postmaster/postmaster.c
    |--BackendStartup()
    {
        pid = fork\_process();
        if (pid == 0)
        {
            /\* Perform additional initialization and client authentication \*/
            BackendInitialize(port);
            /\* And run the backend \*/
            BackendRun(port);
            |--|--PostgresMain();
        }
    }

客户端发送查询到QD

Fork出来的Backend子进程,主要逻辑在函数PostgresMain中。ReadCommand()读取协议命令,然后调用exec_simple_query()进行处理。具体的命令可以参考PG协议:frontend/backend
protocol。我们主要跟踪Q命令的执行逻辑:

PostgresMain

src/backend/tcop/postgres.c
    /\* postgres main loop
    \* all backends, interactive or otherwise start here
    \*/
    |--PostgresMain()
    {
        for (;;)
        {
            /\*
            \* (3) read a command (loop blocks here)
            \*/
            firstchar = ReadCommand(&input\_message);

            /\*
            \* (6) process the command.  But ignore it if we're skipping till
            \* Sync.
            \*/
            switch (firstchar)
            {
                case 'Q':               /\* simple query \*/
                case 'M':               /\* Greenplum Database dispatched statement from QD \*/
                case 'T':               /\* Greenplum Database dispatched transaction protocol from QD \*/
                case 'F':               /\* fastpath function call \*/
                case 'X':               /\* terminate \*/
                case 'B':               /\* bind \*/
                case 'C':               /\* close \*/
                case 'D':               /\* describe \*/
                case 'E':               /\* execute \*/
                case 'H':               /\* flush \*/
                case 'P':               /\* parse \*/
                case 'S':               /\* sync \*/
                case 'd':               /\* copy data \*/
                case 'c':               /\* copy done \*/
                case 'f':               /\* copy fail \*/
            }
        }
    }

Greenplum在协议中增加了两个自定义命令,M命令主要用来反序列化plan信息,然后执行相应的查询计划,T主要和分布式事务相关:

case 'M':    /\* MPP dispatched stmt from QD \*/
    case 'T':    /\* MPP dispatched dtx protocol command from QD \*/

QD处理查询

QD的处理逻辑主要在函数exec_simple_query.
对接收到的SQL语句,Backend对其进行parse, analyze, rewrite, 以及plan,
然后进行plan的分发,最后把结果返回给客户端.

exec_simple_query

src/backend/tcop/postgres.c
    |--exec\_simple\_query()
    {
        /\*
        \* Start up a transaction command.
        \*/
        start\_xact\_command();

        /\*
        \* Do basic parsing of the query or queries
        \*/
        parsetree\_list = pg\_parse\_query(query\_string);

        /\*
        \* OK to analyze, rewrite, and plan this query.
        \*/
        querytree\_list = pg\_analyze\_and\_rewrite(parsetree, query\_string,NULL, 0);

        plantree\_list = pg\_plan\_queries(querytree\_list, NULL, false);

        /\*
        \* Create unnamed portal to run the query or queries in.
        \*/
        portal = CreatePortal("", true, true);

        /\*
        \* Start the portal.
        \*/
        PortalStart();

        /\*
        \* Now we can create the destination receiver object.
        \*/
        receiver = CreateDestReceiver(dest, portal);

        /\*
        \* Run the portal to completion, and then drop it (and the receiver).
        \*/
        (void) PortalRun();

        (\*receiver->rDestroy) (receiver);

        PortalDrop(portal, false);

        /\*
        \* Close down transaction statement, if one is open.
        \*/
        finish\_xact\_command();

    }

QD parses、analysis 以及 rewrite

Parse的逻辑在函数 parsetree_list = pg_parse_query(query_string);
主要是进行语法与语义解析,然后返回一个解析树.
解析树数据结果一个List,里面包含了各个节点。具体的可以参见gram.y。

List       \*raw\_parsetree\_list;

    /\*
    \* Do raw parsing (only).
    \*
    \* A list of parsetrees is returned, since there might be multiple
    \* commands in the given string.
    \*/
    pg\_parse\_query(const char \*query\_string)
    {
        raw\_parsetree\_list = raw\_parser(query\_string);

        return raw\_parsetree\_list;
    }

    /\*
    \* raw\_parser
    \*       Given a query in string form, do lexical and grammatical analysis.
    \*
    \* Returns a list of raw (un-analyzed) parse trees.
    \*/
    List \*
    raw\_parser(const char \*str)
    {
        int         yyresult;

        parsetree = NIL;            /\* in case grammar forgets to set it \*/
        have\_lookahead = false;

        scanner\_init(str);
        parser\_init();

        yyresult = base\_yyparse();

        scanner\_finish();

        if (yyresult)               /\* error \*/
        return NIL;

        return parsetree;
    }
Postgres

使用Flex/Bison解析SQL语句.针对Select语句,会返回一个SelectStmt,里面包含了Select各个部分的解析结果。

gpdb/src/backend/parser/scan.l
    ....
    whitespace      ({space}+|{comment})

    digit       [0-9]
    ident\_start     [A-Za-z\\200-\\377\_]
    ident\_cont      [A-Za-z\\200-\\377\_0-9\\\$]
    ....


    SELECT \*                 /\* target\_list \*/
    FROM user                /\* from\_clause \*/
    WHERE username= 'JOHN' ' /\* where\_clause \*/

    src/backend/parser/gram.y
    simple\_select:
    SELECT opt\_distinct target\_list
    into\_clause from\_clause where\_clause
    group\_clause having\_clause window\_clause
    {
        SelectStmt \*n = makeNode(SelectStmt);
        n->distinctClause = \$2;
        n->targetList = \$3;
        n->intoClause = \$4;
        n->fromClause = \$5;
        n->whereClause = \$6;
        n->groupClause = \$7;
        n->havingClause = \$8;
        n->windowClause = \$9;
        \$\$ = (Node \*)n;
    }

*SelectStmt*数据结构成员如下所示:

SelectStmt

src/include/nodes/ parsenodes.h
    typedef struct SelectStmt
    {
        List       \*distinctClause;
        IntoClause \*intoClause;     /\* target for SELECT INTO / CREATE TABLE AS \*/
        List       \*targetList;     /\* the target list (of ResTarget) \*/
        List       \*fromClause;     /\* the FROM clause \*/
        Node       \*whereClause;    /\* WHERE qualification \*/
        List       \*groupClause;    /\* GROUP BY clauses \*/
        Node       \*havingClause;   /\* HAVING conditional-expression \*/
        List       \*windowClause;   /\* window specification clauses \*/
        List       \*scatterClause;  /\* GPDB: TableValueExpr data distribution \*/
        WithClause \*withClause;     /\* with clause \*/

        ........

        /\* This field used by: SELECT INTO, CTAS \*/
        List       \*distributedBy;  /\* GPDB: columns to distribute the data on. \*/

    } SelectStmt;

Analyze and Rewrite

当查询parse完毕之后,系统会对其进行Analyze,会进行一下转换工作,比如把表名转换为内部使用的OID,\*的扩展等。

|--pg\_analyze\_and\_rewrite()
    {
        /\*(1) Perform parse analysis.\*/
        |--querytree\_list = parse\_analyze(parsetree, query\_string, paramTypes, numParams);
        |--|--transformTopLevelStmt(pstate, parseTree);
        |--|--|--query = transformStmt(pstate, parseTree);
        |--|--|--|--result = transformSelectStmt(pstate, n);

        /\* (2) Rewrite the queries, as necessary\*/
        |--querytree\_list = pg\_rewrite\_query(query);
        |--|--QueryRewrite(query)
        |--|--|-- /\*Apply all non-SELECT rules possibly getting 0 or many queries\*/
        |--|--|--querylist = RewriteQuery(parsetree, NIL);
        |--|--|--|-- product\_queries = fireRules(parsetree,
        |--|--|-- /\*Apply all the RIR rules on each query \*/
        |--|--|-- query = fireRIRrules(query, NIL, false);;

    }
\*的替换

在函数transformStmt()中,
\*会被替换为真实的列。可以留意函数里面的numnames分支情况.其中的2,3,4对应了不同的情况,详见注释:

src/backend/parser/analyze.c
    |--transformStmt
    |--|--transformSelectStmt(pstate, n);
    |--|--|--transformTargetList(pstate, stmt->targetList,
    |--|--|--|--ExpandColumnRefStar(pstate,
    |--|--|--|--|--refnameRangeTblEntry(pstate, nspname, relname,

    src\\backend\\parser\\parse\_target.c
    ExpandColumnRefStar:

    switch (numnames)
    {
        case 2: //relation.colname
        relname = strVal(linitial(fields));
        rte = refnameRangeTblEntry(pstate, nspname, relname,
        cref->location,
        &levels\_up);
        break;
        case 3: //db.relation.colname
        nspname = strVal(linitial(fields));
        relname = strVal(lsecond(fields));
        rte = refnameRangeTblEntry(pstate, nspname, relname,
        cref->location,
        &levels\_up);
        break;
        case 4: //nspname.db.relation.colname
        {
            char       \*catname = strVal(linitial(fields));

            /\*
            \* We check the catalog name and then ignore it.
            \*/
            if (strcmp(catname, get\_database\_name(MyDatabaseId)) != 0)
            {
                crserr = CRSERR\_WRONG\_DB;
                break;
            }
            nspname = strVal(lsecond(fields));
            relname = strVal(lthird(fields));
            rte = refnameRangeTblEntry(pstate, nspname, relname,
            cref->location,
            &levels\_up);
            break;
        }

        src\\backend\\parser\\parse\_relation.c

        RangeTblEntry \*
        refnameRangeTblEntry(ParseState \*pstate,
        const char \*schemaname,
        const char \*refname,
        int location,
        int \*sublevels\_up)
        {
            Oid         relId = InvalidOid;

            if (schemaname != NULL)
            {
                Oid         namespaceId;
                namespaceId = LookupNamespaceNoError(schemaname);
                if (!OidIsValid(namespaceId))
                return NULL;
                relId = get\_relname\_relid(refname, namespaceId);
                if (!OidIsValid(relId))
                return NULL;
            }

            while (pstate != NULL)
            {
                RangeTblEntry \*result;

                if (OidIsValid(relId))
                result = scanNameSpaceForRelid(pstate, relId, location);
                else
                result = scanNameSpaceForRefname(pstate, refname, location);

                if (result)
                return result;

                if (sublevels\_up)
                (\*sublevels\_up)++;
                else
                break;

                pstate = pstate->parentParseState;
            }
            return NULL;
        }
OID的查找
src/backend/parser/analyze.c
        |--transformStmt
        |--|--transformFromClause(pstate, stmt->fromClause);
        |--|--|--transformFromClauseItem(pstate,
        |--|--|--|--transformTableEntry(pstate, rangeVar);
        |--|--|--|--|--addRangeTableEntry(pstate, r, r->alias
        {
            RangeTblEntry \*rte = makeNode(RangeTblEntry);
            rel = parserOpenTable(pstate, relation, lockmode, NULL);
            rte->relid = RelationGetRelid(rel);
            heap\_close(rel, NoLock);
            ......
        }

数据结构:Query

/\*
        \* Query -
        \*     Parse analysis turns all statements into a Query tree
        \*     for further processing by the rewriter and planner.
        \*
        \*     Utility statements (i.e. non-optimizable statements) have the
        \*     utilityStmt field set, and the Query itself is mostly dummy.
        \*     DECLARE CURSOR is a special case: it is represented like a SELECT,
        \*     but the original DeclareCursorStmt is stored in utilityStmt.
        \*
        \*     Planning converts a Query tree into a Plan tree headed by a PlannedStmt
        \*     node --- the Query structure is not used by the executor.
        \*/
        typedef struct Query
        {
            NodeTag     type;
            CmdType     commandType;    /\* select|insert|update|delete|utility \*/
            QuerySource querySource;    /\* where did I come from? \*/
            uint32      queryId;        /\* query identifier (can be set by plugins) \*/
            bool        canSetTag;      /\* do I set the command result tag? \*/
            Node       \*utilityStmt;    /\* non-null if this is DECLARE CURSOR or a
            \* non-optimizable statement \*/
            int         resultRelation; /\* rtable index of target relation for
            \* INSERT/UPDATE/DELETE; 0 for SELECT \*/
            bool        hasAggs;        /\* has aggregates in tlist or havingQual \*/
            bool        hasWindowFuncs; /\* has window functions in tlist \*/
            bool        hasSubLinks;    /\* has subquery SubLink \*/
            bool        hasDynamicFunctions; /\* has functions with unstable return types \*/
            bool        hasFuncsWithExecRestrictions; /\* has functions with EXECUTE ON MASTER or ALL SEGMENTS \*/
            bool        hasDistinctOn;  /\* distinctClause is from DISTINCT ON \*/
            bool        hasRecursive;   /\* WITH RECURSIVE was specified \*/
            bool        hasModifyingCTE;    /\* has INSERT/UPDATE/DELETE in WITH \*/
            bool        hasForUpdate;   /\* FOR [KEY] UPDATE/SHARE was specified \*/
            List       \*cteList;        /\* WITH list (of CommonTableExpr's) \*/
            List       \*rtable;         /\* list of range table entries \*/
            FromExpr   \*jointree;       /\* table join tree (FROM and WHERE clauses) \*/
            List       \*targetList;     /\* target list (of TargetEntry) \*/
            List       \*withCheckOptions;       /\* a list of WithCheckOption's \*/
            List       \*returningList;  /\* return-values list (of TargetEntry) \*/

            /\*
            \* A list of GroupClauses or GroupingClauses.  The order of GroupClauses
            \* or GroupingClauses are based on input queries. However, in each
            \* grouping set, all GroupClauses will appear in front of GroupingClauses.
            \* See the following GROUP BY clause:
            \*
            \* GROUP BY ROLLUP(b,c),a, CUBE(e,d)
            \*
            \* the result list can be roughly represented as follows.
            \*
            \* GroupClause(a) --> GroupingClause( ROLLUP, groupsets (GroupClause(b)
            \* --> GroupClause(c) ) ) --> GroupingClause( CUBE, groupsets
            \* (GroupClause(e) --> GroupClause(d) ) )
            \*/
            List       \*groupClause;    /\* a list of SortGroupClause's \*/

            Node       \*havingQual;     /\* qualifications applied to groups \*/

            List       \*windowClause;   /\* defined window specifications \*/

            List       \*distinctClause; /\* a list of SortGroupClause's \*/

            List       \*sortClause;     /\* a list of SortGroupClause's \*/

            List       \*scatterClause;  /\* a list of tle's \*/
            bool        isTableValueSelect; /\* GPDB: Is this a TABLE (...) subquery argument? \*/

            Node       \*limitOffset;    /\* # of result tuples to skip (int8 expr) \*/
            Node       \*limitCount;     /\* # of result tuples to return (int8 expr) \*/

            List       \*rowMarks;       /\* a list of RowMarkClause's \*/

            Node       \*setOperations;  /\* set-operation tree if this is top level of
            \* a UNION/INTERSECT/EXCEPT query \*/
            List       \*constraintDeps; /\* a list of pg\_constraint OIDs that the query
            \* depends on to be semantically valid \*/

            /\*
            \* MPP: Used only on QD. Don't serialize. Holds the result distribution
            \* policy for SELECT ... INTO and set operations.
            \*/
            struct GpPolicy \*intoPolicy;

            /\*
            \* GPDB: Used to indicate this query is part of CTAS or COPY so that its plan
            \* would always be dispatched in parallel.
            \*/
            ParentStmtType  parentStmtType;
            bool        expandMatViews; /\* force expansion of materialized views during rewrite to treat as views \*/
        } Query;

QD产生优化后的查询计划

optimizer

|--src\\backend\\tcop\\postgres.c
        |--plantree\_list = pg\_plan\_queries(querytree\_list, 0, NULL);
        |--|--pg\_plan\_query(query, cursorOptions, boundParams);
        |--|--|--plan = planner(querytree, cursorOptions, boundParams);
        |--|--|--|--result = standard\_planner(parse, cursorOptions, boundParams);
        |--|--|--|--|--src\\backend\\optimizer\\plan\\planner.c
        |--|--|--|--|--result = optimize\_query(parse, boundParams); //
        |--|--|--|--|--src\\backend\\optimizer\\plan\\orca.c
        |--|--|--|--|--|--result = GPOPTOptimizedPlan(pqueryCopy, &fUnexpectedFailure);

Planner

|--src\\backend\\tcop\\postgres.c
|--plantree\_list = pg\_plan\_queries(querytree\_list, 0, NULL);
|--|--pg\_plan\_query(query, cursorOptions, boundParams);
|--|--|--plan = planner(querytree, cursorOptions, boundParams);
|--|--|--|--result = standard\_planner(parse, cursorOptions, boundParams);
|--|--|--|--|--src\\backend\\optimizer\\plan\\planner.c
|--|--|--|--|--top\_plan = subquery\_planner(glob, parse, NULL,
|--|--|--|--|--plan = grouping\_planner(root, tuple\_fraction);
|--|--|--|--|--|--final\_rel = query\_planner(root, sub\_tlist,

|--grouping\_planner()
{
    /\*
    \* Generate the best unsorted and presorted paths for this Query (but
    \* note there may not be any presorted paths).  We also generate (in
    \* standard\_qp\_callback) pathkey representations of the query's sort
    \* clause, distinct clause, etc.
    \*/
    final\_rel = query\_planner(root, sub\_tlist,standard\_qp\_callback, &qp\_extra);


    |--query\_planner()
    |--|--make\_one\_rel
    /\*Finds all possible access paths for executing a query\*/
    {
        /\* find seqscan and all index paths for each base relation \*/
        set\_base\_rel\_pathlists(root);

        /\*
        \* Generate access paths for the entire join tree.
        \*/
        rel = make\_rel\_from\_joinlist(root, joinlist);
    }

    if (parse->groupClause){
        ......
    }
    else if (parse->distinctClause)
    {
        ......
    }

    cost\_sort(&sort\_path, root, root->query\_pathkeys, cheapest\_path->total\_cost,
    path\_rows, path\_width,
    0.0, work\_mem, root->limit\_tuples);

    choose\_hashed\_grouping(root,
    tuple\_fraction, limit\_tuples, ......
    choose\_hashed\_distinct(root,

    if (Gp\_role == GP\_ROLE\_DISPATCH && result\_plan == NULL)
    {
        result\_plan = cdb\_grouping\_planner(root,....
    }

    ......

    if (Gp\_role == GP\_ROLE\_DISPATCH && CdbPathLocus\_IsPartitioned(current\_locus))
    {
        bool        needMotion;

        needMotion = !cdbpathlocus\_collocates\_pathkeys(root, current\_locus,
        distinct\_dist\_pathkeys, false /\* exact\_match \*/ );
    }

    /\* reate a plan according to query\_planner's \* results.\*/
    result\_plan = create\_plan(root, best\_path);
    /\* Insert AGG or GROUP node if needed \*/
    make\_agg()
    /\*If there is a DISTINCT clause, add the UNIQUE node.\*/
    make\_unique()
    /\*Finally, if there is a LIMIT/OFFSET clause, add the LIMIT node.\*/
    make\_limit()
}

Relation Access Paths

set_base_rel_pathlists()
对每个基础表找出具体的扫描路径,比如seqscan或者index scan或者bitmap
scans等.

|--set\_base\_rel\_pathlists()
{
    for (rti = 1; rti < root->simple\_rel\_array\_size; rti++)
    {
        |--set\_rel\_pathlist(root, rel, rti);
        |--|--set\_plain\_rel\_pathlist(root, rel, rte);
        {
            /\* Consider sequential scan. \*/
            switch (rel->relstorage)
            {
                case RELSTORAGE\_EXTERNAL:
                case RELSTORAGE\_AOROWS:
                case RELSTORAGE\_AOCOLS:
                case RELSTORAGE\_HEAP:
                seqpath = create\_seqscan\_path(root, rel);
            }

            if (plain\_allow\_indexpath\_under\_subplan(root, rel))
            {
                /\* Consider index and bitmap scans \*/
                create\_index\_paths(root, rel);

                if (rel->relstorage == RELSTORAGE\_HEAP)
                create\_tidscan\_paths(root, rel);
            }
            ......
            /\* Now find the cheapest of the paths for this rel \*/
            set\_cheapest(root, rel);
        }
    }
}

Seqscan Path

生成T_SeqScan类型的Path Node, 同时计算顺序扫描话费的代价
*cost_seqscan(pathnode, root, rel)*

|--create\_seqscan\_path(PlannerInfo \*root, RelOptInfo \*rel, Relids required\_outer)
{
    Path       \*pathnode = makeNode(Path);

    pathnode->pathtype = T\_SeqScan;

    pathnode->locus = cdbpathlocus\_from\_baserel(root, rel);

    cost\_seqscan(pathnode, root, rel);

    return pathnode;
}

Join处理

|--make\_rel\_from\_joinlist()

|--|--make\_rel\_from\_joinlist
|--|--|--standard\_join\_search
{
    /\*
    \* We employ a simple "dynamic programming" algorithm: we first find all
    \* ways to build joins of two jointree items, then all ways to build joins
    \* of three items (from two-item joins and single items), then four-item
    \* joins, and so on until we have considered all ways to join all the
    \* items into one rel.
    \*
    \* root->join\_rel\_level[j] is a list of all the j-item rels.  Initially we
    \* set root->join\_rel\_level[1] to represent all the single-jointree-item
    \* relations.
    \*/
    for (lev = 2; lev <= levels\_needed; lev++)
    {
        |--join\_search\_one\_level
        {
            /\*
            \* First, consider left-sided and right-sided plans, in which rels of
            \* exactly level-1 member relations are joined against initial relations.
            \* We prefer to join using join clauses, but if we find a rel of level-1
            \* members that has no join clauses, we will generate Cartesian-product
            \* joins against all initial rels not already contained in it.
            \*/
            |--make\_rels\_by\_clause\_joins
            |--|--make\_join\_rel
            switch (sjinfo->jointype)
            {
                case JOIN\_INNER:
                add\_paths\_to\_joinrel
                case JOIN\_LEFT:
                case JOIN\_FULL:
                case JOIN\_SEMI:
                case JOIN\_ANTI:
                case JOIN\_LASJ\_NOTIN:
            }
            |--|--add\_paths\_to\_joinrel
            {
                /\*
                \* Find potential mergejoin clauses.  We can skip this if we are not
                \* interested in doing a mergejoin.  However, mergejoin may be our only
                \* way of implementing a full outer join, so override enable\_mergejoin if
                \* it's a full join.
                \*
                \* CDB: Always build mergeclause\_list.  We need it for motion planning.
                \*/
                redistribution\_clauses = select\_cdb\_redistribute\_clauses(root,
                /\*
                \* 1. Consider mergejoin paths where both relations must be explicitly
                \* sorted.  Skip this if we can't mergejoin.
                \*/
                mergeclause\_list = select\_mergejoin\_clauses(root,
                sort\_inner\_and\_outer(root, joinrel, outerrel, innerrel,

                |--sort\_inner\_and\_outer();
                |--|--try\_mergejoin\_path()
                |--|--|--create\_mergejoin\_path
                |--|--|--|--cdbpath\_motion\_for\_join
                |--|--|--|--turn\_volatile\_seggen\_to\_singleqe
                |--|--|--|--|--cdbpath\_create\_motion\_path
                {
                    /\* Create CdbMotionPath node. \*/
                    pathnode = makeNode(CdbMotionPath);
                    pathnode->path.pathtype = T\_Motion;
                    pathnode->path.parent = subpath->parent;
                    pathnode->path.locus = locus;
                    pathnode->path.rows = subpath->rows;
                    pathnode->path.pathkeys = pathkeys;
                    pathnode->subpath = subpath;
                }


                /\*
                \* 2. Consider paths where the outer relation need not be explicitly
                \* sorted. This includes both nestloops and mergejoins where the outer
                \* path is already ordered.  Again, skip this if we can't mergejoin.
                \* (That's okay because we know that nestloop can't handle right/full
                \* joins at all, so it wouldn't work in the prohibited cases either.)
                \*/
                match\_unsorted\_outer(root, joinrel, outerrel, innerrel,

                /\*
                \* 4. Consider paths where both outer and inner relations must be hashed
                \* before being joined.  As above, disregard enable\_hashjoin for full
                \* joins, because there may be no other alternative.
                \*
                \* We consider both the cheapest-total-cost and cheapest-startup-cost
                \* outer paths.  There's no need to consider any but the
                \* cheapest-total-cost inner path, however.
                \*/
                hash\_inner\_and\_outer(root, joinrel, outerrel, innerrel,
            }

            /\* Now, consider "bushy plans" \*/
        }
    }

create_mergejoin_path

在merge join阶段,会生成需要的Motion.
\*     Creates a pathnode corresponding to a mergejoin join between
\*     two relations

|--create\_mergejoin\_path()
{
    |--cdbpath\_motion\_for\_join()
    {
        /\*
        \* Add motion nodes above subpaths and decide where to join.
        \*/
        |--cdbpath\_create\_motion\_path()
        |--|--cdbpath\_cost\_motion(root, pathnode);

        pathnode = makeNode(MergePath);

        cost\_mergejoin(pathnode, root);
    }
}

/\*
\* cdbpath\_cost\_motion
\*    Fills in the cost estimate fields in a MotionPath node.
\*/
void
cdbpath\_cost\_motion(PlannerInfo \*root, CdbMotionPath \*motionpath)
{

    cost\_per\_row = (gp\_motion\_cost\_per\_row > 0.0)
    ? gp\_motion\_cost\_per\_row
    : 2.0 \* cpu\_tuple\_cost;
    sendrows = cdbpath\_rows(root, subpath);
    recvrows = cdbpath\_rows(root, (Path \*)motionpath);
    motioncost = cost\_per\_row \* 0.5 \* (sendrows + recvrows);

    motionpath->path.total\_cost = motioncost + subpath->total\_cost;
    motionpath->path.startup\_cost = subpath->startup\_cost;
    motionpath->path.memory = subpath->memory;
}

QD准备执行plan

Gangs, QEs and interconnects)
初始化逻辑:

注意cdbdisp_dispatchX实现了Plan的序列化逻辑。

|--PortalStart
|--|--ExecutorStart(queryDesc, eflags);
|--|--|--standard_ExecutorStart
{
    estate = CreateExecutorState();

    /*
    * Assign a Motion Node to every Plan Node. This makes it
    * easy to identify which slice any Node belongs to
    */
    AssignParentMotionToPlanNodes(queryDesc->plannedstmt);

    /*
    * Initialize the plan state tree
    */
    InitPlan(queryDesc, eflags);

    if (shouldDispatch){
        ....
        CdbDispatchPlan(queryDesc, needDtx, true);
        |--cdbdisp_dispatchX
        {....
            ds = cdbdisp_makeDispatcherState(queryDesc->extended_query);

            /*
            * Since we intend to execute the plan, inventory the slice tree,
            * allocate gangs, and associate them with slices.
            *
            * On return, gangs have been allocated and CDBProcess lists have
            * been filled in the slice table.)
            *
            * Notice: This must be done before cdbdisp_buildPlanQueryParms
            */
            AssignGangs(ds, queryDesc);
        }

        /*
        * if in dispatch mode, time to serialize plan and query
        * trees, and fire off cdb_exec command to each of the qexecs
        */
        |--CdbDispatchPlan();
        |--|--cdbdisp_dispatchX()
        pQueryParms = cdbdisp_buildPlanQueryParms(queryDesc, planRequiresTxn);
        queryText = buildGpQueryString(pQueryParms, &queryTextLength);

        |--|--|--buildGpQueryString
        total_query_len = 1 /* 'M' */ +
            sizeof(len) /* message length */ +
            sizeof(gp_command_count) +
            sizeof(sessionUserId) /* sessionUserIsSuper */ +
            sizeof(outerUserId) /* outerUserIsSuper */ +
            sizeof(currentUserId) +
            sizeof(n32) * 2 /* currentStatementStartTimestamp */ +
            sizeof(command_len) +
            sizeof(querytree_len) +
            sizeof(plantree_len) +
            sizeof(params_len) +
            sizeof(sddesc_len) +
            sizeof(dtxContextInfo_len) +
            dtxContextInfo_len +
            command_len +
            querytree_len +
            plantree_len +
            params_len +
            sddesc_len +
            sizeof(numsegments) +
            sizeof(resgroupInfo.len) +
            resgroupInfo.len +
            sizeof(tempNamespaceId) +
            sizeof(tempToastNamespaceId) +
            0;

        //plan serialization
        pos = shared_query;
        *pos++ = 'M';

        pos += 4;                   /* placeholder for message length */

        tmp = htonl(gp_command_count);
        memcpy(pos, &tmp, sizeof(gp_command_count));
        pos += sizeof(gp_command_count);

        tmp = htonl(sessionUserId);
        memcpy(pos, &tmp, sizeof(sessionUserId));
        pos += sizeof(sessionUserId);

        tmp = htonl(outerUserId);
        memcpy(pos, &tmp, sizeof(outerUserId));
        pos += sizeof(outerUserId);

        tmp = htonl(currentUserId);
        memcpy(pos, &tmp, sizeof(currentUserId));
        pos += sizeof(currentUserId);

        /*
        * High order half first, since we're doing MSB-first
        */
        n32 = (uint32) (currentStatementStartTimestamp >> 32);
        n32 = htonl(n32);
        memcpy(pos, &n32, sizeof(n32));
        pos += sizeof(n32);

        /*
        * Now the low order half
        */
        n32 = (uint32) currentStatementStartTimestamp;
        n32 = htonl(n32);
        memcpy(pos, &n32, sizeof(n32));
        pos += sizeof(n32);

        tmp = htonl(command_len);
        memcpy(pos, &tmp, sizeof(command_len));
        pos += sizeof(command_len);

        tmp = htonl(querytree_len);
        memcpy(pos, &tmp, sizeof(querytree_len));
        pos += sizeof(querytree_len);

        tmp = htonl(plantree_len);
        memcpy(pos, &tmp, sizeof(plantree_len));
        pos += sizeof(plantree_len);

        tmp = htonl(params_len);
        memcpy(pos, &tmp, sizeof(params_len));
        pos += sizeof(params_len);

        tmp = htonl(sddesc_len);
        memcpy(pos, &tmp, sizeof(tmp));
        pos += sizeof(tmp);

        tmp = htonl(dtxContextInfo_len);
        memcpy(pos, &tmp, sizeof(tmp));
        pos += sizeof(tmp);

        if (dtxContextInfo_len > 0)
        {
            memcpy(pos, dtxContextInfo, dtxContextInfo_len);
            pos += dtxContextInfo_len;
        }

        memcpy(pos, command, command_len);
        /* If command is truncated we need to set the terminating '\0' manually */
        pos[command_len - 1] = '\0';
        pos += command_len;

        if (querytree_len > 0)
        {
            memcpy(pos, querytree, querytree_len);
            pos += querytree_len;
        }

        if (plantree_len > 0)
        {
            memcpy(pos, plantree, plantree_len);
            pos += plantree_len;
        }

        if (params_len > 0)
        {
            memcpy(pos, params, params_len);
            pos += params_len;
        }

        if (sddesc_len > 0)
        {
            memcpy(pos, sddesc, sddesc_len);
            pos += sddesc_len;
        }

        tmp = htonl(numsegments);
        memcpy(pos, &tmp, sizeof(numsegments));
        pos += sizeof(numsegments);

        tmp = htonl(resgroupInfo.len);
        memcpy(pos, &tmp, sizeof(resgroupInfo.len));
        pos += sizeof(resgroupInfo.len);

        if (resgroupInfo.len > 0)
        {
            memcpy(pos, resgroupInfo.data, resgroupInfo.len);
            pos += resgroupInfo.len;
        }

        /* pass process local variables to QEs */
        GetTempNamespaceState(&tempNamespaceId, &tempToastNamespaceId);
        tempNamespaceId = htonl(tempNamespaceId);
        tempToastNamespaceId = htonl(tempToastNamespaceId);

        memcpy(pos, &tempNamespaceId, sizeof(tempNamespaceId));
        pos += sizeof(tempNamespaceId);
        memcpy(pos, &tempToastNamespaceId, sizeof(tempToastNamespaceId));
        pos += sizeof(tempToastNamespaceId);

        /*
        * fill in length placeholder
        */
        len = pos - shared_query - 1;
        tmp = htonl(len);
        memcpy(shared_query + 1, &tmp, sizeof(len));

        Assert(len + 1 == total_query_len);

        if (finalLen)
        *finalLen = len + 1;

        |--|--|--cdbdisp_dispatchToGang();
        |--|--|--|--cdbdisp_dispatchToGang_async

        for (i = 0; i < gp->size; i++)
        {
            dispatchCommand(qeResult, pParms->query_text, pParms->query_text_len);
        }
        |--|--|--|--|--dispatchCommand
        |--|--|--|--|--|--PQsendGpQuery_shared
    }
}

DispatcherInternalFuncs DispatcherAsyncFuncs =
{
    cdbdisp_checkForCancel_async,
    cdbdisp_getWaitSocketFd_async,
    cdbdisp_makeDispatchParams_async,
    cdbdisp_checkAckMessage_async,
    cdbdisp_checkDispatchResult_async,
    cdbdisp_dispatchToGang_async,
    cdbdisp_waitDispatchFinish_async
};
Gang的分配逻辑:
/*
 * Function AssignGangs runs on the QD and finishes construction of the
 * global slice table for a plan by assigning gangs allocated by the
 * executor factory to the slices of the slice table.
 *
 * On entry, the slice table (at queryDesc->estate->es_sliceTable) has
 * the correct structure (established by InitSliceTable) and has correct
 * gang types (established by function FillSliceTable).
 *
 * Gang assignment involves taking an inventory of the requirements of
 * each slice tree in the slice table, asking the executor factory to
 * allocate a minimal set of gangs that can satisfy any of the slice trees,
 * and associating the allocated gangs with slices in the slice table.
 *
 * On successful exit, the CDBProcess lists (primaryProcesses, mirrorProcesses)
 * and the Gang pointers (primaryGang, mirrorGang) are set correctly in each
 * slice in the slice table.
 */
 |--AssignGangs(queryDesc, gp_singleton_segindex);
 |--|--AssignWriterGangFirst
 |--|--|--AllocateGang()
 |--|--|--|--cdbgang_createGang()
 |--|--|--|--cdbgang_createGang_async
 |--|--|--|--|--PQconnectPoll
 void
 AssignGangs(QueryDesc *queryDesc, int utility_segment_index)
 {
     /*
     * Get the gangs we'll use.
     *
     * As a general rule the first gang is a writer and the rest are readers.
     * If this happens to be an extended query protocol then all gangs are readers.
     */
     if (inv.numNgangs > 0)
     {
         inv.vecNgangs = (Gang **) palloc(sizeof(Gang *) * inv.numNgangs);
         for (i = 0; i < inv.numNgangs; i++)
         {
             if (i == 0 && !queryDesc->extended_query)
             {
                 inv.vecNgangs[i] = allocateWriterGang();
             }
             else
             {
                 inv.vecNgangs[i] = allocateGang(GANGTYPE_PRIMARY_READER, getgpsegmentCount(), 0,);
             }
         }
     }
 }

 segments in Gang:
 |--standard_ExecutorStart
 //only master does dispatch
 if (Gp_role == GP_ROLE_DISPATCH &&
 (queryDesc->plannedstmt->planTree->dispatch == DISPATCH_PARALLEL ||
 queryDesc->plannedstmt->nMotionNodes > 0))
 {
     //nMotionNodes get initialized when orca optimizer is called.
     /* Set up blank slice table to be filled in during InitPlan. */
     //Here we get the SliceTable
     InitSliceTable(estate, queryDesc->plannedstmt->nMotionNodes, queryDesc->plannedstmt->nInitPlans);
     |--|--InitSliceTable
     {
         n = 1 + nMotions + nSubplans;

         table = makeNode(SliceTable);
         table->nMotions = nMotions;
         table->nInitPlans = nSubplans;
         table->slices = NIL;
         table->instrument_options = INSTRUMENT_NONE;

         for (i = 0; i < n; i++)
         {
             slice = makeNode(Slice);

             slice->sliceIndex = i;
             slice->rootIndex = (i > 0 && i <= nMotions) ? -1 : i;
             slice->gangType = GANGTYPE_UNALLOCATED;
             slice->gangSize = 0;
             slice->segments = NIL;
             slice->directDispatch.isDirectDispatch = false;
             slice->directDispatch.contentIds = NIL;
             slice->primaryGang = NULL;
             slice->parentIndex = -1;
             slice->children = NIL;
             slice->primaryProcesses = NIL;

             table->slices = lappend(table->slices, slice);
         }

         estate->es_sliceTable = table;
     }
 }

 //set up interconnect
 if (queryDesc->plannedstmt->nMotionNodes > 0)
 {
     SetupInterconnect(estate);
     UpdateMotionExpectedReceivers(estate->motionlayer_context, estate->es_sliceTable);
 }

 /*
 * Initialize the slice table.
 */
 if (Gp_role == GP_ROLE_DISPATCH)
 {
     FillSliceTable(estate, plannedstmt);
     |--FillSliceTable_walker
     |--|--FillSliceGangInfo
     {
         switch (slice->gangType)
         {
             case GANGTYPE_UNALLOCATED:
             case GANGTYPE_PRIMARY_WRITER:
             case GANGTYPE_PRIMARY_READER:
             int i;
             slice->gangSize = numsegments;
             slice->segments = NIL;
             for (i = 0; i < numsegments; i++)
             slice->segments = lappend_int(slice->segments, i % getgpsegmentCount());

             case GANGTYPE_ENTRYDB_READER:
             slice->gangSize = 1;
             slice->segments = list_make1_int(-1);
             break;
             case GANGTYPE_SINGLETON_READER:
             slice->gangSize = 1;
             slice->segments = list_make1_int(gp_session_id % numsegments);
         }
     }
 }

QE接收Plan,反序列化并执行

|--PostgresMain
{
    for (;;)
    {
        switch (firstchar)
        {
            case 'M':           /* MPP dispatched stmt from QD */
            {
                exec_mpp_query();
            }
            .......
        }
    }
}

接下来的章节会针对源码进行梳理,敬请关注。

未完待续

文章来自个人专栏
greenplum
2 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0