1. 查询执行过程源码解析
根据堆栈整理的查询执行过程中源码的整体流程 (DML为例)
1.1. 核心数据结构
1.1.1. Portal
Portal,也称为策略选择模块,这也代表了Portal最核心的功能。通常,Portal会根据SQL语句的类型,选择不同的执行模块(ProcessUtility、Executor等)。
一个Protal对象的生命周期如下:
- PortalCreate创建:创建一个干净的Portal,其中内存上下文、资源跟踪器、清理函数等都是默认的,但并没有包含执行树的信息;
- PortalDefineQuery初始化1:添加原始的命令语句与计划树信息; (如portal->stmts = plantree_list)
- PortalStart初始化:根据计划树信息选择执行策略,并且调用ExecutorStart初始化执行器;
- PortalRun执行:按照执行策略选择调用Executor或ProcessUtility执行命令;
- PortalDrop清理:对Portal占用的资源进行释放,比如用于缓存输出的内存。
1.1.2. QueryDesc
在执行DML语句时,关键函数入口有三个:ExecutorStart,ExecutorRun与ExecutorEnd,分别进行初始化,执行以及资源回收的工作。比如执行查询select时,这些函数的调用关系如下:
这三个函数中使用到的关键数据结构是QueryDesc,它描述一个查询的执行过程,并在执行时跟踪查询的状态,包含了有关查询的元数据和执行信息(PlanState节点,后面递归执行)。
1.1.3. Plan、Planstate、Estate
PG使用Executor执行数据操作语句的计划树,而真正对每个计划树中每个结点进行处理的则是ExecInitNode,ExecProcNode以及ExecEndNode。这三个函数分别实现了对结点进行初始化,执行以及清理的工作。这三个函数的递归调用则实现了对整棵查询计划树的处理。这三个函数的调用流程主要如下:
每种任务结点都含有带有PlanState结构的字段,比如说哈希连接结点与嵌套连接结点等含有JoinState:
与之相似的还有扫描节点ScanState,由ScanState拓展开来的还有序列扫描SeqScanState,采样扫描SampleScanState,索引扫描IndexScanState以及仅索引扫描IndexOnlyScanState等。各种执行状态都是由Executor通过调用ExecInitNode中根据不同type初始化得到。
1.1.4. TupleTableSlot
这是 PostgreSQL 行处理的基本结构,它表示一个可以容纳一行数据的槽,可包含数据行本身以及行的描述信息。在查询执行过程中,数据行的读取、处理和返回都通过 TupleTableSlot 来进行。
1.2. 核心函数打印Tuple
ExecutorRun中完成任务的是standard_ExecutorRun,其会调用ExecutePlan真正执行计划。ExecutePlan中有一个无限循环,用于不断执行PlanState中的ExecProcNodeMtd函数,并获取结果数据。依然以顺序扫描为例,其工作流程如下:
2. C-API 连接Foundationdb
FoundationDB 是一个分布式键值存储系统,在TeleADB中用于提供元数据统一存储管理服务:
- 弹性和可扩展性:当需要扩展存储容量或增加吞吐量时,只需简单地增加节点即可实现横向扩展,而不需要停机或迁移数据。
- 高可用性:FoundationDB 的元数据存储采用了多副本复制的方式,即将元数据在不同的节点上进行冗余存储,在节点故障的情况下可以快速进行故障转移。
- 事务支持:FoundationDB 的元数据存储是具有原子性和一致性的,它采用了基于版本的存储模型。在进行元数据更新时,会使用事务将多个操作组合在一起,要么全部成功,要么全部失败。
- 支持异步API接口,节省线程方面开销。
#define FDB_API_VERSION 710
#include <pthread.h>
#include <stdlib.h>
#include <foundationdb/fdb_c.h>
#include <assert.h>
FDBDatabase* db;
FDBTransaction *tr = NULL;
pthread_t netThread;
void die(const char* msg)
{
printf("%s\n", msg);
exit(1);
}
static void runNetwork() { fdb_run_network(); }
void insert_data()
{
int committed = 0;
/* 创建一个事务 */
fdb_database_create_transaction(db, &tr);
char *key1 = "y03";
char *val1 = "888";
// 调用API,插入数据
fdb_transaction_set(tr, key1, (int)strlen(key1), val1, (int)strlen(val1));
FDBFuture *commitFuture = fdb_transaction_commit(tr);
// 阻塞当前线程,直到 Future 对象被标记为完成
fdb_future_block_until_ready(commitFuture);
fdb_future_destroy(commitFuture);
/* 销毁事务,释放资源*/
fdb_transaction_destroy(tr);
}
void get_data() {
FDBTransaction *tr = NULL;
char *value = NULL;
fdb_bool_t valuePresent;
int valueLength;
char *key = "y03";
fdb_database_create_transaction(db, &tr);
FDBFuture *getFuture = fdb_transaction_get(tr, key, (int)strlen(key), 0);
//这一句顺序一定得在get_value前
fdb_future_block_until_ready(getFuture);
fdb_future_get_value(getFuture, &valuePresent, &value, &valueLength);
printf("Got Value from db. %s: '%.*s'\n", key, valueLength, value);
int result = atoi(value);
printf("result: %d",result);
fdb_transaction_destroy(tr);
fdb_future_destroy(getFuture);
}
int main(int argc, char **argv)
{
fdb_error_t err;
char *cluster_file = "******"; //描述一下clusterfile
err = fdb_select_api_version(FDB_API_VERSION);
if(err) die("Unable to select API version");
err = fdb_setup_network();
if(err) die("Unable to setup network");
//创建一个新的 POSIX 线程,并将 runNetwork() 函数指定为线程的入口点
pthread_create(&netThread, NULL, (void *)runNetwork, NULL);
fdb_error_t e = fdb_create_database(cluster_file, &db);
if(e) die("Unable to create database");
insert_data();
get_data();
return 0;
}
3. 整合连接FDB代码至PG源码
- 代码整合
static void runNetwork() { fdb_run_network(); }
int TestFdb()
{
FDBDatabase* db;
FDBTransaction *tr = NULL;
pthread_t netThread;
fdb_error_t err;
char *cluster_file = "****";
err = fdb_select_api_version(FDB_API_VERSION);
if(err) elog(DEBUG5,"Unable to select API version");
err = fdb_setup_network();
if(err) elog(DEBUG5,"Unable to setup network");
pthread_create(&netThread, NULL, (void *)runNetwork, NULL);
fdb_error_t e = fdb_create_database(cluster_file, &db);
if(e) elog(DEBUG5,"Unable to create database");
// get data from fdb
char *value = NULL;
fdb_bool_t valuePresent;
int valueLength;
char *key = "a";
fdb_database_create_transaction(db, &tr);
FDBFuture *getFuture = fdb_transaction_get(tr, key, (int)strlen(key), 0);
//这一句顺序一定得在get_value前
fdb_future_block_until_ready(getFuture);
fdb_future_get_value(getFuture, &valuePresent, &value, &valueLength);
printf("Got Value from db. %s: '%.*s'\n", key, valueLength, value);
int result = atoi(value);
fdb_transaction_destroy(tr);
fdb_future_destroy(getFuture);
return result;
}
/* ----------------
* printtup --- send a tuple to the client
此处在printtup这里修改slot返回的数据
* ----------------
*/
static bool
printtup(TupleTableSlot *slot, DestReceiver *self)
{
/*
* send the attributes of this tuple
*/
for (i = 0; i < natts; ++i)
{
PrinttupAttrInfo *thisState = myState->myinfo + i;
Datum attr = slot->tts_values[i];
Datum result = (Datum)TestFdb();
attr = result;
if (thisState->format == 0)
{
/* Text output */
char *outputstr;
// 这里用于处理内存地址与数据类型的转换
outputstr = OutputFunctionCall(&thisState->finfo, attr);
pq_sendcountedtext(buf, outputstr, strlen(outputstr), false);
}
else
{
/* Binary output */
bytea *outputbytes;
outputbytes = SendFunctionCall(&thisState->finfo, attr);
pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ);
pq_sendbytes(buf, VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ);
}
}
return true;
}
- 脚本运行
#!/bin/sh
su postgres -c "pg_ctl stop"
rm -rf pgsql
mkdir pgsql
cd postgresql-15.3
./configure --enable-debug --enable-cassert
--prefix=/usr/local/pgsql CFLAGS=-O0 LDFLAGS=-lfdb_c #连接fdb_c库参数
make world
make install -world
chown postgres -R ../pgsql
su postgres -c "pg_ctl start -D /usr/local/pgdata/data/ -l /usr/local/pgdata/log/logfile"
- 测试