查询执行源码解析
Master监听端口
入口函数为PostmasterMain().主要是监听端口以及初始化.
PostmasterMain
src/backend/main/main.c
|--Main()
|--|--PostmasterMain()
{
/\*
\* CDB: gpdb auxilary process like fts probe, dtx recovery process is
\* essential, we need to load them ahead of custom shared preload libraries
\* to avoid exceeding max\_worker\_processes.
\*/
load\_auxiliary\_libraries();
......
/\*
\* Establish input sockets.
\*/
status = StreamServerPort(AF\_UNSPEC, NULL,
(unsigned short) PostPortNumber,
UnixSocketDir,
ListenSocket, MAXLISTEN);
.......
/\*
\* Set up shared memory and semaphores.
\*/
reset\_shared(PostPortNumber);
......
/\* main idle loop of master \*/
status = ServerLoop();
.......
}
需要关注一下函数load_auxiliary_libraries.这个函数完成了辅助进程的初始化。涉及到的数据结构为BackgroundWorker。从下面代码中也可以了解到Greenplum后台有哪些独有的辅助进程,以及各个辅助进程的入口函数,从而了解辅助进程的执行逻辑,此处不详述:
static BackgroundWorker PMAuxProcList[MaxPMAuxProc] =
{
{"ftsprobe process",
BGWORKER\_SHMEM\_ACCESS | BGWORKER\_BACKEND\_DATABASE\_CONNECTION,
BgWorkerStart\_DtxRecovering, /\* no need to wait dtx recovery \*/
0, /\* restart immediately if ftsprobe exits with non-zero code \*/
FtsProbeMain, {0}, {0}, 0, 0,
FtsProbeStartRule},
{"global deadlock detector process",
BGWORKER\_SHMEM\_ACCESS | BGWORKER\_BACKEND\_DATABASE\_CONNECTION,
BgWorkerStart\_RecoveryFinished,
0, /\* restart immediately if gdd exits with non-zero code \*/
GlobalDeadLockDetectorMain, {0}, {0}, 0, 0,
GlobalDeadLockDetectorStartRule},
{"dtx recovery process",
BGWORKER\_SHMEM\_ACCESS | BGWORKER\_BACKEND\_DATABASE\_CONNECTION,
BgWorkerStart\_DtxRecovering, /\* no need to wait dtx recovery \*/
0, /\* restart immediately if dtx recovery process exits with non-zero code \*/
DtxRecoveryMain, {0}, {0}, 0, 0,
DtxRecoveryStartRule},
{"stats sender process",
BGWORKER\_SHMEM\_ACCESS,
BgWorkerStart\_RecoveryFinished,
0, /\* restart immediately if stats sender exits with non-zero code \*/
SegmentInfoSenderMain, {0}, {0}, 0, 0,
SegmentInfoSenderStartRule},
{"sweeper process",
BGWORKER\_SHMEM\_ACCESS,
BgWorkerStart\_RecoveryFinished,
0, /\* restart immediately if sweeper process exits with non-zero code \*/
BackoffSweeperMain, {0}, {0}, 0, 0,
BackoffSweeperStartRule},
{"perfmon process",
BGWORKER\_SHMEM\_ACCESS,
BgWorkerStart\_RecoveryFinished,
0, /\* restart immediately if perfmon process exits with non-zero code \*/
PerfmonMain, {0}, {0}, 0, 0,
PerfmonStartRule},
#ifdef ENABLE\_IC\_PROXY
{"ic proxy process",
#ifdef FAULT\_INJECTOR
BGWORKER\_SHMEM\_ACCESS,
#else
0,
#endif
BgWorkerStart\_RecoveryFinished,
0, /\* restart immediately if ic proxy process exits with non-zero code \*/
ICProxyMain, {0}, {0}, 0, 0,
ICProxyStartRule},
#endif /\* ENABLE\_IC\_PROXY \*/
};
master的主要逻辑在函数*ServerLoop()*里面.
当新的连接到来的时候,会启动一个新的子进程.
同时也会对辅助进程进行存在性检查,如果子进程不存在,会重新启动一个.逻辑如下代码所示:
ServerLoop函数
src/backend/postmaster/postmaster.c
/\* Postmaster main entry point\*/
|--PostmasterMain()
|--|--ServerLoop() /\*Main idle loop of postmaster\*/
{
for (;;)
{
.........
/\*
\* New connection pending on any of our sockets? If so, fork a child
\* process to deal with it.
\*/
for (i = 0; i < MAXLISTEN; i++)
{
......
Port \*port;
port = ConnCreate(ListenSocket[i]);
if (port)
{
BackendStartup(port);
......
}
}
/\* If we have lost the log collector, try to start a new one \*/
SysLogger\_Start();
/\*
\* If no background writer process is running, and we are not in a
\* state that prevents it, start one. It doesn't matter if this
\* fails, we'll just try again later. Likewise for the checkpointer.
\*/
StartCheckpointer();
StartBackgroundWriter();
StartWalWriter();
StartAutoVacLauncher();
/\* If we have lost the stats collector, try to start a new one \*/
pgstat\_start();
/\* Get other worker processes running, if needed \*/
maybe\_start\_bgworker();
}
}
启动Backend-QD进程
Greenplum基于postgresql,是进程模型. 当一个客户端连接进来的时候,master
fork子进程,然后调用BackendStartup,进行连接处理工作。这时候Fork出来的子进程通常称为Backend。
src/backend/postmaster/postmaster.c
|--BackendStartup()
{
pid = fork\_process();
if (pid == 0)
{
/\* Perform additional initialization and client authentication \*/
BackendInitialize(port);
/\* And run the backend \*/
BackendRun(port);
|--|--PostgresMain();
}
}
客户端发送查询到QD
Fork出来的Backend子进程,主要逻辑在函数PostgresMain中。ReadCommand()读取协议命令,然后调用exec_simple_query()进行处理。具体的命令可以参考PG协议:frontend/backend
protocol。我们主要跟踪Q命令的执行逻辑:
PostgresMain
src/backend/tcop/postgres.c
/\* postgres main loop
\* all backends, interactive or otherwise start here
\*/
|--PostgresMain()
{
for (;;)
{
/\*
\* (3) read a command (loop blocks here)
\*/
firstchar = ReadCommand(&input\_message);
/\*
\* (6) process the command. But ignore it if we're skipping till
\* Sync.
\*/
switch (firstchar)
{
case 'Q': /\* simple query \*/
case 'M': /\* Greenplum Database dispatched statement from QD \*/
case 'T': /\* Greenplum Database dispatched transaction protocol from QD \*/
case 'F': /\* fastpath function call \*/
case 'X': /\* terminate \*/
case 'B': /\* bind \*/
case 'C': /\* close \*/
case 'D': /\* describe \*/
case 'E': /\* execute \*/
case 'H': /\* flush \*/
case 'P': /\* parse \*/
case 'S': /\* sync \*/
case 'd': /\* copy data \*/
case 'c': /\* copy done \*/
case 'f': /\* copy fail \*/
}
}
}
Greenplum在协议中增加了两个自定义命令,M命令主要用来反序列化plan信息,然后执行相应的查询计划,T主要和分布式事务相关:
case 'M': /\* MPP dispatched stmt from QD \*/
case 'T': /\* MPP dispatched dtx protocol command from QD \*/
QD处理查询
QD的处理逻辑主要在函数exec_simple_query.
对接收到的SQL语句,Backend对其进行parse, analyze, rewrite, 以及plan,
然后进行plan的分发,最后把结果返回给客户端.
exec_simple_query
src/backend/tcop/postgres.c
|--exec\_simple\_query()
{
/\*
\* Start up a transaction command.
\*/
start\_xact\_command();
/\*
\* Do basic parsing of the query or queries
\*/
parsetree\_list = pg\_parse\_query(query\_string);
/\*
\* OK to analyze, rewrite, and plan this query.
\*/
querytree\_list = pg\_analyze\_and\_rewrite(parsetree, query\_string,NULL, 0);
plantree\_list = pg\_plan\_queries(querytree\_list, NULL, false);
/\*
\* Create unnamed portal to run the query or queries in.
\*/
portal = CreatePortal("", true, true);
/\*
\* Start the portal.
\*/
PortalStart();
/\*
\* Now we can create the destination receiver object.
\*/
receiver = CreateDestReceiver(dest, portal);
/\*
\* Run the portal to completion, and then drop it (and the receiver).
\*/
(void) PortalRun();
(\*receiver->rDestroy) (receiver);
PortalDrop(portal, false);
/\*
\* Close down transaction statement, if one is open.
\*/
finish\_xact\_command();
}
QD parses、analysis 以及 rewrite
Parse的逻辑在函数 parsetree_list = pg_parse_query(query_string);
主要是进行语法与语义解析,然后返回一个解析树.
解析树数据结果一个List,里面包含了各个节点。具体的可以参见gram.y。
List \*raw\_parsetree\_list;
/\*
\* Do raw parsing (only).
\*
\* A list of parsetrees is returned, since there might be multiple
\* commands in the given string.
\*/
pg\_parse\_query(const char \*query\_string)
{
raw\_parsetree\_list = raw\_parser(query\_string);
return raw\_parsetree\_list;
}
/\*
\* raw\_parser
\* Given a query in string form, do lexical and grammatical analysis.
\*
\* Returns a list of raw (un-analyzed) parse trees.
\*/
List \*
raw\_parser(const char \*str)
{
int yyresult;
parsetree = NIL; /\* in case grammar forgets to set it \*/
have\_lookahead = false;
scanner\_init(str);
parser\_init();
yyresult = base\_yyparse();
scanner\_finish();
if (yyresult) /\* error \*/
return NIL;
return parsetree;
}
Postgres
使用Flex/Bison解析SQL语句.针对Select语句,会返回一个SelectStmt,里面包含了Select各个部分的解析结果。
gpdb/src/backend/parser/scan.l
....
whitespace ({space}+|{comment})
digit [0-9]
ident\_start [A-Za-z\\200-\\377\_]
ident\_cont [A-Za-z\\200-\\377\_0-9\\\$]
....
SELECT \* /\* target\_list \*/
FROM user /\* from\_clause \*/
WHERE username= 'JOHN' ' /\* where\_clause \*/
src/backend/parser/gram.y
simple\_select:
SELECT opt\_distinct target\_list
into\_clause from\_clause where\_clause
group\_clause having\_clause window\_clause
{
SelectStmt \*n = makeNode(SelectStmt);
n->distinctClause = \$2;
n->targetList = \$3;
n->intoClause = \$4;
n->fromClause = \$5;
n->whereClause = \$6;
n->groupClause = \$7;
n->havingClause = \$8;
n->windowClause = \$9;
\$\$ = (Node \*)n;
}
*SelectStmt*数据结构成员如下所示:
SelectStmt
src/include/nodes/ parsenodes.h
typedef struct SelectStmt
{
List \*distinctClause;
IntoClause \*intoClause; /\* target for SELECT INTO / CREATE TABLE AS \*/
List \*targetList; /\* the target list (of ResTarget) \*/
List \*fromClause; /\* the FROM clause \*/
Node \*whereClause; /\* WHERE qualification \*/
List \*groupClause; /\* GROUP BY clauses \*/
Node \*havingClause; /\* HAVING conditional-expression \*/
List \*windowClause; /\* window specification clauses \*/
List \*scatterClause; /\* GPDB: TableValueExpr data distribution \*/
WithClause \*withClause; /\* with clause \*/
........
/\* This field used by: SELECT INTO, CTAS \*/
List \*distributedBy; /\* GPDB: columns to distribute the data on. \*/
} SelectStmt;
Analyze and Rewrite
当查询parse完毕之后,系统会对其进行Analyze,会进行一下转换工作,比如把表名转换为内部使用的OID,\*的扩展等。
|--pg\_analyze\_and\_rewrite()
{
/\*(1) Perform parse analysis.\*/
|--querytree\_list = parse\_analyze(parsetree, query\_string, paramTypes, numParams);
|--|--transformTopLevelStmt(pstate, parseTree);
|--|--|--query = transformStmt(pstate, parseTree);
|--|--|--|--result = transformSelectStmt(pstate, n);
/\* (2) Rewrite the queries, as necessary\*/
|--querytree\_list = pg\_rewrite\_query(query);
|--|--QueryRewrite(query)
|--|--|-- /\*Apply all non-SELECT rules possibly getting 0 or many queries\*/
|--|--|--querylist = RewriteQuery(parsetree, NIL);
|--|--|--|-- product\_queries = fireRules(parsetree,
|--|--|-- /\*Apply all the RIR rules on each query \*/
|--|--|-- query = fireRIRrules(query, NIL, false);;
}
\*的替换
在函数transformStmt()中,
\*会被替换为真实的列。可以留意函数里面的numnames分支情况.其中的2,3,4对应了不同的情况,详见注释:
src/backend/parser/analyze.c
|--transformStmt
|--|--transformSelectStmt(pstate, n);
|--|--|--transformTargetList(pstate, stmt->targetList,
|--|--|--|--ExpandColumnRefStar(pstate,
|--|--|--|--|--refnameRangeTblEntry(pstate, nspname, relname,
src\\backend\\parser\\parse\_target.c
ExpandColumnRefStar:
switch (numnames)
{
case 2: //relation.colname
relname = strVal(linitial(fields));
rte = refnameRangeTblEntry(pstate, nspname, relname,
cref->location,
&levels\_up);
break;
case 3: //db.relation.colname
nspname = strVal(linitial(fields));
relname = strVal(lsecond(fields));
rte = refnameRangeTblEntry(pstate, nspname, relname,
cref->location,
&levels\_up);
break;
case 4: //nspname.db.relation.colname
{
char \*catname = strVal(linitial(fields));
/\*
\* We check the catalog name and then ignore it.
\*/
if (strcmp(catname, get\_database\_name(MyDatabaseId)) != 0)
{
crserr = CRSERR\_WRONG\_DB;
break;
}
nspname = strVal(lsecond(fields));
relname = strVal(lthird(fields));
rte = refnameRangeTblEntry(pstate, nspname, relname,
cref->location,
&levels\_up);
break;
}
src\\backend\\parser\\parse\_relation.c
RangeTblEntry \*
refnameRangeTblEntry(ParseState \*pstate,
const char \*schemaname,
const char \*refname,
int location,
int \*sublevels\_up)
{
Oid relId = InvalidOid;
if (schemaname != NULL)
{
Oid namespaceId;
namespaceId = LookupNamespaceNoError(schemaname);
if (!OidIsValid(namespaceId))
return NULL;
relId = get\_relname\_relid(refname, namespaceId);
if (!OidIsValid(relId))
return NULL;
}
while (pstate != NULL)
{
RangeTblEntry \*result;
if (OidIsValid(relId))
result = scanNameSpaceForRelid(pstate, relId, location);
else
result = scanNameSpaceForRefname(pstate, refname, location);
if (result)
return result;
if (sublevels\_up)
(\*sublevels\_up)++;
else
break;
pstate = pstate->parentParseState;
}
return NULL;
}
OID的查找
src/backend/parser/analyze.c
|--transformStmt
|--|--transformFromClause(pstate, stmt->fromClause);
|--|--|--transformFromClauseItem(pstate,
|--|--|--|--transformTableEntry(pstate, rangeVar);
|--|--|--|--|--addRangeTableEntry(pstate, r, r->alias
{
RangeTblEntry \*rte = makeNode(RangeTblEntry);
rel = parserOpenTable(pstate, relation, lockmode, NULL);
rte->relid = RelationGetRelid(rel);
heap\_close(rel, NoLock);
......
}
数据结构:Query
/\*
\* Query -
\* Parse analysis turns all statements into a Query tree
\* for further processing by the rewriter and planner.
\*
\* Utility statements (i.e. non-optimizable statements) have the
\* utilityStmt field set, and the Query itself is mostly dummy.
\* DECLARE CURSOR is a special case: it is represented like a SELECT,
\* but the original DeclareCursorStmt is stored in utilityStmt.
\*
\* Planning converts a Query tree into a Plan tree headed by a PlannedStmt
\* node --- the Query structure is not used by the executor.
\*/
typedef struct Query
{
NodeTag type;
CmdType commandType; /\* select|insert|update|delete|utility \*/
QuerySource querySource; /\* where did I come from? \*/
uint32 queryId; /\* query identifier (can be set by plugins) \*/
bool canSetTag; /\* do I set the command result tag? \*/
Node \*utilityStmt; /\* non-null if this is DECLARE CURSOR or a
\* non-optimizable statement \*/
int resultRelation; /\* rtable index of target relation for
\* INSERT/UPDATE/DELETE; 0 for SELECT \*/
bool hasAggs; /\* has aggregates in tlist or havingQual \*/
bool hasWindowFuncs; /\* has window functions in tlist \*/
bool hasSubLinks; /\* has subquery SubLink \*/
bool hasDynamicFunctions; /\* has functions with unstable return types \*/
bool hasFuncsWithExecRestrictions; /\* has functions with EXECUTE ON MASTER or ALL SEGMENTS \*/
bool hasDistinctOn; /\* distinctClause is from DISTINCT ON \*/
bool hasRecursive; /\* WITH RECURSIVE was specified \*/
bool hasModifyingCTE; /\* has INSERT/UPDATE/DELETE in WITH \*/
bool hasForUpdate; /\* FOR [KEY] UPDATE/SHARE was specified \*/
List \*cteList; /\* WITH list (of CommonTableExpr's) \*/
List \*rtable; /\* list of range table entries \*/
FromExpr \*jointree; /\* table join tree (FROM and WHERE clauses) \*/
List \*targetList; /\* target list (of TargetEntry) \*/
List \*withCheckOptions; /\* a list of WithCheckOption's \*/
List \*returningList; /\* return-values list (of TargetEntry) \*/
/\*
\* A list of GroupClauses or GroupingClauses. The order of GroupClauses
\* or GroupingClauses are based on input queries. However, in each
\* grouping set, all GroupClauses will appear in front of GroupingClauses.
\* See the following GROUP BY clause:
\*
\* GROUP BY ROLLUP(b,c),a, CUBE(e,d)
\*
\* the result list can be roughly represented as follows.
\*
\* GroupClause(a) --> GroupingClause( ROLLUP, groupsets (GroupClause(b)
\* --> GroupClause(c) ) ) --> GroupingClause( CUBE, groupsets
\* (GroupClause(e) --> GroupClause(d) ) )
\*/
List \*groupClause; /\* a list of SortGroupClause's \*/
Node \*havingQual; /\* qualifications applied to groups \*/
List \*windowClause; /\* defined window specifications \*/
List \*distinctClause; /\* a list of SortGroupClause's \*/
List \*sortClause; /\* a list of SortGroupClause's \*/
List \*scatterClause; /\* a list of tle's \*/
bool isTableValueSelect; /\* GPDB: Is this a TABLE (...) subquery argument? \*/
Node \*limitOffset; /\* # of result tuples to skip (int8 expr) \*/
Node \*limitCount; /\* # of result tuples to return (int8 expr) \*/
List \*rowMarks; /\* a list of RowMarkClause's \*/
Node \*setOperations; /\* set-operation tree if this is top level of
\* a UNION/INTERSECT/EXCEPT query \*/
List \*constraintDeps; /\* a list of pg\_constraint OIDs that the query
\* depends on to be semantically valid \*/
/\*
\* MPP: Used only on QD. Don't serialize. Holds the result distribution
\* policy for SELECT ... INTO and set operations.
\*/
struct GpPolicy \*intoPolicy;
/\*
\* GPDB: Used to indicate this query is part of CTAS or COPY so that its plan
\* would always be dispatched in parallel.
\*/
ParentStmtType parentStmtType;
bool expandMatViews; /\* force expansion of materialized views during rewrite to treat as views \*/
} Query;
QD产生优化后的查询计划
optimizer
|--src\\backend\\tcop\\postgres.c
|--plantree\_list = pg\_plan\_queries(querytree\_list, 0, NULL);
|--|--pg\_plan\_query(query, cursorOptions, boundParams);
|--|--|--plan = planner(querytree, cursorOptions, boundParams);
|--|--|--|--result = standard\_planner(parse, cursorOptions, boundParams);
|--|--|--|--|--src\\backend\\optimizer\\plan\\planner.c
|--|--|--|--|--result = optimize\_query(parse, boundParams); //
|--|--|--|--|--src\\backend\\optimizer\\plan\\orca.c
|--|--|--|--|--|--result = GPOPTOptimizedPlan(pqueryCopy, &fUnexpectedFailure);
Planner
|--src\\backend\\tcop\\postgres.c
|--plantree\_list = pg\_plan\_queries(querytree\_list, 0, NULL);
|--|--pg\_plan\_query(query, cursorOptions, boundParams);
|--|--|--plan = planner(querytree, cursorOptions, boundParams);
|--|--|--|--result = standard\_planner(parse, cursorOptions, boundParams);
|--|--|--|--|--src\\backend\\optimizer\\plan\\planner.c
|--|--|--|--|--top\_plan = subquery\_planner(glob, parse, NULL,
|--|--|--|--|--plan = grouping\_planner(root, tuple\_fraction);
|--|--|--|--|--|--final\_rel = query\_planner(root, sub\_tlist,
|--grouping\_planner()
{
/\*
\* Generate the best unsorted and presorted paths for this Query (but
\* note there may not be any presorted paths). We also generate (in
\* standard\_qp\_callback) pathkey representations of the query's sort
\* clause, distinct clause, etc.
\*/
final\_rel = query\_planner(root, sub\_tlist,standard\_qp\_callback, &qp\_extra);
|--query\_planner()
|--|--make\_one\_rel
/\*Finds all possible access paths for executing a query\*/
{
/\* find seqscan and all index paths for each base relation \*/
set\_base\_rel\_pathlists(root);
/\*
\* Generate access paths for the entire join tree.
\*/
rel = make\_rel\_from\_joinlist(root, joinlist);
}
if (parse->groupClause){
......
}
else if (parse->distinctClause)
{
......
}
cost\_sort(&sort\_path, root, root->query\_pathkeys, cheapest\_path->total\_cost,
path\_rows, path\_width,
0.0, work\_mem, root->limit\_tuples);
choose\_hashed\_grouping(root,
tuple\_fraction, limit\_tuples, ......
choose\_hashed\_distinct(root,
if (Gp\_role == GP\_ROLE\_DISPATCH && result\_plan == NULL)
{
result\_plan = cdb\_grouping\_planner(root,....
}
......
if (Gp\_role == GP\_ROLE\_DISPATCH && CdbPathLocus\_IsPartitioned(current\_locus))
{
bool needMotion;
needMotion = !cdbpathlocus\_collocates\_pathkeys(root, current\_locus,
distinct\_dist\_pathkeys, false /\* exact\_match \*/ );
}
/\* reate a plan according to query\_planner's \* results.\*/
result\_plan = create\_plan(root, best\_path);
/\* Insert AGG or GROUP node if needed \*/
make\_agg()
/\*If there is a DISTINCT clause, add the UNIQUE node.\*/
make\_unique()
/\*Finally, if there is a LIMIT/OFFSET clause, add the LIMIT node.\*/
make\_limit()
}
Relation Access Paths
set_base_rel_pathlists()
对每个基础表找出具体的扫描路径,比如seqscan或者index scan或者bitmap
scans等.
|--set\_base\_rel\_pathlists()
{
for (rti = 1; rti < root->simple\_rel\_array\_size; rti++)
{
|--set\_rel\_pathlist(root, rel, rti);
|--|--set\_plain\_rel\_pathlist(root, rel, rte);
{
/\* Consider sequential scan. \*/
switch (rel->relstorage)
{
case RELSTORAGE\_EXTERNAL:
case RELSTORAGE\_AOROWS:
case RELSTORAGE\_AOCOLS:
case RELSTORAGE\_HEAP:
seqpath = create\_seqscan\_path(root, rel);
}
if (plain\_allow\_indexpath\_under\_subplan(root, rel))
{
/\* Consider index and bitmap scans \*/
create\_index\_paths(root, rel);
if (rel->relstorage == RELSTORAGE\_HEAP)
create\_tidscan\_paths(root, rel);
}
......
/\* Now find the cheapest of the paths for this rel \*/
set\_cheapest(root, rel);
}
}
}
Seqscan Path
生成T_SeqScan类型的Path Node, 同时计算顺序扫描话费的代价
*cost_seqscan(pathnode, root, rel)*
|--create\_seqscan\_path(PlannerInfo \*root, RelOptInfo \*rel, Relids required\_outer)
{
Path \*pathnode = makeNode(Path);
pathnode->pathtype = T\_SeqScan;
pathnode->locus = cdbpathlocus\_from\_baserel(root, rel);
cost\_seqscan(pathnode, root, rel);
return pathnode;
}
Join处理
|--make\_rel\_from\_joinlist()
|--|--make\_rel\_from\_joinlist
|--|--|--standard\_join\_search
{
/\*
\* We employ a simple "dynamic programming" algorithm: we first find all
\* ways to build joins of two jointree items, then all ways to build joins
\* of three items (from two-item joins and single items), then four-item
\* joins, and so on until we have considered all ways to join all the
\* items into one rel.
\*
\* root->join\_rel\_level[j] is a list of all the j-item rels. Initially we
\* set root->join\_rel\_level[1] to represent all the single-jointree-item
\* relations.
\*/
for (lev = 2; lev <= levels\_needed; lev++)
{
|--join\_search\_one\_level
{
/\*
\* First, consider left-sided and right-sided plans, in which rels of
\* exactly level-1 member relations are joined against initial relations.
\* We prefer to join using join clauses, but if we find a rel of level-1
\* members that has no join clauses, we will generate Cartesian-product
\* joins against all initial rels not already contained in it.
\*/
|--make\_rels\_by\_clause\_joins
|--|--make\_join\_rel
switch (sjinfo->jointype)
{
case JOIN\_INNER:
add\_paths\_to\_joinrel
case JOIN\_LEFT:
case JOIN\_FULL:
case JOIN\_SEMI:
case JOIN\_ANTI:
case JOIN\_LASJ\_NOTIN:
}
|--|--add\_paths\_to\_joinrel
{
/\*
\* Find potential mergejoin clauses. We can skip this if we are not
\* interested in doing a mergejoin. However, mergejoin may be our only
\* way of implementing a full outer join, so override enable\_mergejoin if
\* it's a full join.
\*
\* CDB: Always build mergeclause\_list. We need it for motion planning.
\*/
redistribution\_clauses = select\_cdb\_redistribute\_clauses(root,
/\*
\* 1. Consider mergejoin paths where both relations must be explicitly
\* sorted. Skip this if we can't mergejoin.
\*/
mergeclause\_list = select\_mergejoin\_clauses(root,
sort\_inner\_and\_outer(root, joinrel, outerrel, innerrel,
|--sort\_inner\_and\_outer();
|--|--try\_mergejoin\_path()
|--|--|--create\_mergejoin\_path
|--|--|--|--cdbpath\_motion\_for\_join
|--|--|--|--turn\_volatile\_seggen\_to\_singleqe
|--|--|--|--|--cdbpath\_create\_motion\_path
{
/\* Create CdbMotionPath node. \*/
pathnode = makeNode(CdbMotionPath);
pathnode->path.pathtype = T\_Motion;
pathnode->path.parent = subpath->parent;
pathnode->path.locus = locus;
pathnode->path.rows = subpath->rows;
pathnode->path.pathkeys = pathkeys;
pathnode->subpath = subpath;
}
/\*
\* 2. Consider paths where the outer relation need not be explicitly
\* sorted. This includes both nestloops and mergejoins where the outer
\* path is already ordered. Again, skip this if we can't mergejoin.
\* (That's okay because we know that nestloop can't handle right/full
\* joins at all, so it wouldn't work in the prohibited cases either.)
\*/
match\_unsorted\_outer(root, joinrel, outerrel, innerrel,
/\*
\* 4. Consider paths where both outer and inner relations must be hashed
\* before being joined. As above, disregard enable\_hashjoin for full
\* joins, because there may be no other alternative.
\*
\* We consider both the cheapest-total-cost and cheapest-startup-cost
\* outer paths. There's no need to consider any but the
\* cheapest-total-cost inner path, however.
\*/
hash\_inner\_and\_outer(root, joinrel, outerrel, innerrel,
}
/\* Now, consider "bushy plans" \*/
}
}
create_mergejoin_path
在merge join阶段,会生成需要的Motion.
\* Creates a pathnode corresponding to a mergejoin join between
\* two relations
|--create\_mergejoin\_path()
{
|--cdbpath\_motion\_for\_join()
{
/\*
\* Add motion nodes above subpaths and decide where to join.
\*/
|--cdbpath\_create\_motion\_path()
|--|--cdbpath\_cost\_motion(root, pathnode);
pathnode = makeNode(MergePath);
cost\_mergejoin(pathnode, root);
}
}
/\*
\* cdbpath\_cost\_motion
\* Fills in the cost estimate fields in a MotionPath node.
\*/
void
cdbpath\_cost\_motion(PlannerInfo \*root, CdbMotionPath \*motionpath)
{
cost\_per\_row = (gp\_motion\_cost\_per\_row > 0.0)
? gp\_motion\_cost\_per\_row
: 2.0 \* cpu\_tuple\_cost;
sendrows = cdbpath\_rows(root, subpath);
recvrows = cdbpath\_rows(root, (Path \*)motionpath);
motioncost = cost\_per\_row \* 0.5 \* (sendrows + recvrows);
motionpath->path.total\_cost = motioncost + subpath->total\_cost;
motionpath->path.startup\_cost = subpath->startup\_cost;
motionpath->path.memory = subpath->memory;
}
QD准备执行plan
Gangs, QEs and interconnects)
初始化逻辑:
注意cdbdisp_dispatchX实现了Plan的序列化逻辑。
|--PortalStart
|--|--ExecutorStart(queryDesc, eflags);
|--|--|--standard_ExecutorStart
{
estate = CreateExecutorState();
/*
* Assign a Motion Node to every Plan Node. This makes it
* easy to identify which slice any Node belongs to
*/
AssignParentMotionToPlanNodes(queryDesc->plannedstmt);
/*
* Initialize the plan state tree
*/
InitPlan(queryDesc, eflags);
if (shouldDispatch){
....
CdbDispatchPlan(queryDesc, needDtx, true);
|--cdbdisp_dispatchX
{....
ds = cdbdisp_makeDispatcherState(queryDesc->extended_query);
/*
* Since we intend to execute the plan, inventory the slice tree,
* allocate gangs, and associate them with slices.
*
* On return, gangs have been allocated and CDBProcess lists have
* been filled in the slice table.)
*
* Notice: This must be done before cdbdisp_buildPlanQueryParms
*/
AssignGangs(ds, queryDesc);
}
/*
* if in dispatch mode, time to serialize plan and query
* trees, and fire off cdb_exec command to each of the qexecs
*/
|--CdbDispatchPlan();
|--|--cdbdisp_dispatchX()
pQueryParms = cdbdisp_buildPlanQueryParms(queryDesc, planRequiresTxn);
queryText = buildGpQueryString(pQueryParms, &queryTextLength);
|--|--|--buildGpQueryString
total_query_len = 1 /* 'M' */ +
sizeof(len) /* message length */ +
sizeof(gp_command_count) +
sizeof(sessionUserId) /* sessionUserIsSuper */ +
sizeof(outerUserId) /* outerUserIsSuper */ +
sizeof(currentUserId) +
sizeof(n32) * 2 /* currentStatementStartTimestamp */ +
sizeof(command_len) +
sizeof(querytree_len) +
sizeof(plantree_len) +
sizeof(params_len) +
sizeof(sddesc_len) +
sizeof(dtxContextInfo_len) +
dtxContextInfo_len +
command_len +
querytree_len +
plantree_len +
params_len +
sddesc_len +
sizeof(numsegments) +
sizeof(resgroupInfo.len) +
resgroupInfo.len +
sizeof(tempNamespaceId) +
sizeof(tempToastNamespaceId) +
0;
//plan serialization
pos = shared_query;
*pos++ = 'M';
pos += 4; /* placeholder for message length */
tmp = htonl(gp_command_count);
memcpy(pos, &tmp, sizeof(gp_command_count));
pos += sizeof(gp_command_count);
tmp = htonl(sessionUserId);
memcpy(pos, &tmp, sizeof(sessionUserId));
pos += sizeof(sessionUserId);
tmp = htonl(outerUserId);
memcpy(pos, &tmp, sizeof(outerUserId));
pos += sizeof(outerUserId);
tmp = htonl(currentUserId);
memcpy(pos, &tmp, sizeof(currentUserId));
pos += sizeof(currentUserId);
/*
* High order half first, since we're doing MSB-first
*/
n32 = (uint32) (currentStatementStartTimestamp >> 32);
n32 = htonl(n32);
memcpy(pos, &n32, sizeof(n32));
pos += sizeof(n32);
/*
* Now the low order half
*/
n32 = (uint32) currentStatementStartTimestamp;
n32 = htonl(n32);
memcpy(pos, &n32, sizeof(n32));
pos += sizeof(n32);
tmp = htonl(command_len);
memcpy(pos, &tmp, sizeof(command_len));
pos += sizeof(command_len);
tmp = htonl(querytree_len);
memcpy(pos, &tmp, sizeof(querytree_len));
pos += sizeof(querytree_len);
tmp = htonl(plantree_len);
memcpy(pos, &tmp, sizeof(plantree_len));
pos += sizeof(plantree_len);
tmp = htonl(params_len);
memcpy(pos, &tmp, sizeof(params_len));
pos += sizeof(params_len);
tmp = htonl(sddesc_len);
memcpy(pos, &tmp, sizeof(tmp));
pos += sizeof(tmp);
tmp = htonl(dtxContextInfo_len);
memcpy(pos, &tmp, sizeof(tmp));
pos += sizeof(tmp);
if (dtxContextInfo_len > 0)
{
memcpy(pos, dtxContextInfo, dtxContextInfo_len);
pos += dtxContextInfo_len;
}
memcpy(pos, command, command_len);
/* If command is truncated we need to set the terminating '\0' manually */
pos[command_len - 1] = '\0';
pos += command_len;
if (querytree_len > 0)
{
memcpy(pos, querytree, querytree_len);
pos += querytree_len;
}
if (plantree_len > 0)
{
memcpy(pos, plantree, plantree_len);
pos += plantree_len;
}
if (params_len > 0)
{
memcpy(pos, params, params_len);
pos += params_len;
}
if (sddesc_len > 0)
{
memcpy(pos, sddesc, sddesc_len);
pos += sddesc_len;
}
tmp = htonl(numsegments);
memcpy(pos, &tmp, sizeof(numsegments));
pos += sizeof(numsegments);
tmp = htonl(resgroupInfo.len);
memcpy(pos, &tmp, sizeof(resgroupInfo.len));
pos += sizeof(resgroupInfo.len);
if (resgroupInfo.len > 0)
{
memcpy(pos, resgroupInfo.data, resgroupInfo.len);
pos += resgroupInfo.len;
}
/* pass process local variables to QEs */
GetTempNamespaceState(&tempNamespaceId, &tempToastNamespaceId);
tempNamespaceId = htonl(tempNamespaceId);
tempToastNamespaceId = htonl(tempToastNamespaceId);
memcpy(pos, &tempNamespaceId, sizeof(tempNamespaceId));
pos += sizeof(tempNamespaceId);
memcpy(pos, &tempToastNamespaceId, sizeof(tempToastNamespaceId));
pos += sizeof(tempToastNamespaceId);
/*
* fill in length placeholder
*/
len = pos - shared_query - 1;
tmp = htonl(len);
memcpy(shared_query + 1, &tmp, sizeof(len));
Assert(len + 1 == total_query_len);
if (finalLen)
*finalLen = len + 1;
|--|--|--cdbdisp_dispatchToGang();
|--|--|--|--cdbdisp_dispatchToGang_async
for (i = 0; i < gp->size; i++)
{
dispatchCommand(qeResult, pParms->query_text, pParms->query_text_len);
}
|--|--|--|--|--dispatchCommand
|--|--|--|--|--|--PQsendGpQuery_shared
}
}
DispatcherInternalFuncs DispatcherAsyncFuncs =
{
cdbdisp_checkForCancel_async,
cdbdisp_getWaitSocketFd_async,
cdbdisp_makeDispatchParams_async,
cdbdisp_checkAckMessage_async,
cdbdisp_checkDispatchResult_async,
cdbdisp_dispatchToGang_async,
cdbdisp_waitDispatchFinish_async
};
Gang的分配逻辑:
/*
* Function AssignGangs runs on the QD and finishes construction of the
* global slice table for a plan by assigning gangs allocated by the
* executor factory to the slices of the slice table.
*
* On entry, the slice table (at queryDesc->estate->es_sliceTable) has
* the correct structure (established by InitSliceTable) and has correct
* gang types (established by function FillSliceTable).
*
* Gang assignment involves taking an inventory of the requirements of
* each slice tree in the slice table, asking the executor factory to
* allocate a minimal set of gangs that can satisfy any of the slice trees,
* and associating the allocated gangs with slices in the slice table.
*
* On successful exit, the CDBProcess lists (primaryProcesses, mirrorProcesses)
* and the Gang pointers (primaryGang, mirrorGang) are set correctly in each
* slice in the slice table.
*/
|--AssignGangs(queryDesc, gp_singleton_segindex);
|--|--AssignWriterGangFirst
|--|--|--AllocateGang()
|--|--|--|--cdbgang_createGang()
|--|--|--|--cdbgang_createGang_async
|--|--|--|--|--PQconnectPoll
void
AssignGangs(QueryDesc *queryDesc, int utility_segment_index)
{
/*
* Get the gangs we'll use.
*
* As a general rule the first gang is a writer and the rest are readers.
* If this happens to be an extended query protocol then all gangs are readers.
*/
if (inv.numNgangs > 0)
{
inv.vecNgangs = (Gang **) palloc(sizeof(Gang *) * inv.numNgangs);
for (i = 0; i < inv.numNgangs; i++)
{
if (i == 0 && !queryDesc->extended_query)
{
inv.vecNgangs[i] = allocateWriterGang();
}
else
{
inv.vecNgangs[i] = allocateGang(GANGTYPE_PRIMARY_READER, getgpsegmentCount(), 0,);
}
}
}
}
segments in Gang:
|--standard_ExecutorStart
//only master does dispatch
if (Gp_role == GP_ROLE_DISPATCH &&
(queryDesc->plannedstmt->planTree->dispatch == DISPATCH_PARALLEL ||
queryDesc->plannedstmt->nMotionNodes > 0))
{
//nMotionNodes get initialized when orca optimizer is called.
/* Set up blank slice table to be filled in during InitPlan. */
//Here we get the SliceTable
InitSliceTable(estate, queryDesc->plannedstmt->nMotionNodes, queryDesc->plannedstmt->nInitPlans);
|--|--InitSliceTable
{
n = 1 + nMotions + nSubplans;
table = makeNode(SliceTable);
table->nMotions = nMotions;
table->nInitPlans = nSubplans;
table->slices = NIL;
table->instrument_options = INSTRUMENT_NONE;
for (i = 0; i < n; i++)
{
slice = makeNode(Slice);
slice->sliceIndex = i;
slice->rootIndex = (i > 0 && i <= nMotions) ? -1 : i;
slice->gangType = GANGTYPE_UNALLOCATED;
slice->gangSize = 0;
slice->segments = NIL;
slice->directDispatch.isDirectDispatch = false;
slice->directDispatch.contentIds = NIL;
slice->primaryGang = NULL;
slice->parentIndex = -1;
slice->children = NIL;
slice->primaryProcesses = NIL;
table->slices = lappend(table->slices, slice);
}
estate->es_sliceTable = table;
}
}
//set up interconnect
if (queryDesc->plannedstmt->nMotionNodes > 0)
{
SetupInterconnect(estate);
UpdateMotionExpectedReceivers(estate->motionlayer_context, estate->es_sliceTable);
}
/*
* Initialize the slice table.
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
FillSliceTable(estate, plannedstmt);
|--FillSliceTable_walker
|--|--FillSliceGangInfo
{
switch (slice->gangType)
{
case GANGTYPE_UNALLOCATED:
case GANGTYPE_PRIMARY_WRITER:
case GANGTYPE_PRIMARY_READER:
int i;
slice->gangSize = numsegments;
slice->segments = NIL;
for (i = 0; i < numsegments; i++)
slice->segments = lappend_int(slice->segments, i % getgpsegmentCount());
case GANGTYPE_ENTRYDB_READER:
slice->gangSize = 1;
slice->segments = list_make1_int(-1);
break;
case GANGTYPE_SINGLETON_READER:
slice->gangSize = 1;
slice->segments = list_make1_int(gp_session_id % numsegments);
}
}
}
QE接收Plan,反序列化并执行
|--PostgresMain
{
for (;;)
{
switch (firstchar)
{
case 'M': /* MPP dispatched stmt from QD */
{
exec_mpp_query();
}
.......
}
}
}
接下来的章节会针对源码进行梳理,敬请关注。
未完待续