searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Postgresql连接Foundationdb

2023-09-25 08:33:30
9
0

1. 查询执行过程源码解析

根据堆栈整理的查询执行过程中源码的整体流程 (DML为例)

1.1. 核心数据结构

1.1.1. Portal

Portal,也称为策略选择模块,这也代表了Portal最核心的功能。通常,Portal会根据SQL语句的类型,选择不同的执行模块(ProcessUtility、Executor等)。

一个Protal对象的生命周期如下:

  1. PortalCreate创建:创建一个干净的Portal,其中内存上下文、资源跟踪器、清理函数等都是默认的,但并没有包含执行树的信息;
  2. PortalDefineQuery初始化1:添加原始的命令语句与计划树信息; (如portal->stmts = plantree_list)
  3. PortalStart初始化:根据计划树信息选择执行策略,并且调用ExecutorStart初始化执行器;
  4. PortalRun执行:按照执行策略选择调用Executor或ProcessUtility执行命令;
  5. PortalDrop清理:对Portal占用的资源进行释放,比如用于缓存输出的内存。

1.1.2. QueryDesc

在执行DML语句时,关键函数入口有三个:ExecutorStart,ExecutorRun与ExecutorEnd,分别进行初始化,执行以及资源回收的工作。比如执行查询select时,这些函数的调用关系如下:

这三个函数中使用到的关键数据结构是QueryDesc,它描述一个查询的执行过程,并在执行时跟踪查询的状态,包含了有关查询的元数据和执行信息(PlanState节点,后面递归执行)。

1.1.3. Plan、Planstate、Estate

PG使用Executor执行数据操作语句的计划树,而真正对每个计划树中每个结点进行处理的则是ExecInitNode,ExecProcNode以及ExecEndNode。这三个函数分别实现了对结点进行初始化,执行以及清理的工作。这三个函数的递归调用则实现了对整棵查询计划树的处理。这三个函数的调用流程主要如下:

 

每种任务结点都含有带有PlanState结构的字段,比如说哈希连接结点与嵌套连接结点等含有JoinState:

与之相似的还有扫描节点ScanState,由ScanState拓展开来的还有序列扫描SeqScanState,采样扫描SampleScanState,索引扫描IndexScanState以及仅索引扫描IndexOnlyScanState等。各种执行状态都是由Executor通过调用ExecInitNode中根据不同type初始化得到。

1.1.4. TupleTableSlot

这是 PostgreSQL 行处理的基本结构,它表示一个可以容纳一行数据的槽,可包含数据行本身以及行的描述信息。在查询执行过程中,数据行的读取、处理和返回都通过 TupleTableSlot 来进行。

1.2. 核心函数打印Tuple

ExecutorRun中完成任务的是standard_ExecutorRun,其会调用ExecutePlan真正执行计划。ExecutePlan中有一个无限循环,用于不断执行PlanState中的ExecProcNodeMtd函数,并获取结果数据。依然以顺序扫描为例,其工作流程如下:

2. C-API 连接Foundationdb

FoundationDB 是一个分布式键值存储系统,在TeleADB中用于提供元数据统一存储管理服务:

  1. 弹性和可扩展性:当需要扩展存储容量或增加吞吐量时,只需简单地增加节点即可实现横向扩展,而不需要停机或迁移数据。
  2. 高可用性:FoundationDB 的元数据存储采用了多副本复制的方式,即将元数据在不同的节点上进行冗余存储,在节点故障的情况下可以快速进行故障转移。
  3. 事务支持:FoundationDB 的元数据存储是具有原子性和一致性的,它采用了基于版本的存储模型。在进行元数据更新时,会使用事务将多个操作组合在一起,要么全部成功,要么全部失败。
  4. 支持异步API接口,节省线程方面开销。

#define FDB_API_VERSION 710
#include <pthread.h>
#include <stdlib.h>
#include <foundationdb/fdb_c.h>
#include <assert.h>
FDBDatabase* db;
FDBTransaction *tr = NULL;
pthread_t netThread;
void die(const char* msg)
{
    printf("%s\n", msg);
    exit(1);
}
static void runNetwork() { fdb_run_network(); }
void insert_data()
{
     int committed = 0;
  /*  创建一个事务 */
    fdb_database_create_transaction(db, &tr);
    char *key1 = "y03";
    char *val1 = "888";
    // 调用API,插入数据
    fdb_transaction_set(tr, key1, (int)strlen(key1), val1, (int)strlen(val1));
    FDBFuture *commitFuture = fdb_transaction_commit(tr);
    // 阻塞当前线程,直到 Future 对象被标记为完成
    fdb_future_block_until_ready(commitFuture);
    fdb_future_destroy(commitFuture);
  /* 销毁事务,释放资源*/
  fdb_transaction_destroy(tr);
}
void get_data() {
  FDBTransaction *tr = NULL;
  char *value = NULL;
  fdb_bool_t valuePresent;
  int valueLength;
  char *key = "y03";
  fdb_database_create_transaction(db, &tr);
  FDBFuture *getFuture = fdb_transaction_get(tr, key, (int)strlen(key), 0);
  //这一句顺序一定得在get_value前
  fdb_future_block_until_ready(getFuture);
  fdb_future_get_value(getFuture, &valuePresent, &value, &valueLength);
  printf("Got Value from db. %s: '%.*s'\n", key, valueLength, value);
  int result = atoi(value);
  printf("result: %d",result);
  fdb_transaction_destroy(tr);
  fdb_future_destroy(getFuture);
}
int main(int argc, char **argv)
{
    fdb_error_t err;
    char *cluster_file = "******";  //描述一下clusterfile
    err = fdb_select_api_version(FDB_API_VERSION);
    if(err) die("Unable to select API version");
    err = fdb_setup_network();
    if(err) die("Unable to setup network");
    //创建一个新的 POSIX 线程,并将 runNetwork() 函数指定为线程的入口点
    pthread_create(&netThread, NULL, (void *)runNetwork, NULL);
    fdb_error_t e = fdb_create_database(cluster_file, &db);
    if(e) die("Unable to create database");
    insert_data();
	get_data();
    return 0;
}

3. 整合连接FDB代码至PG源码

  • 代码整合

static void runNetwork() { fdb_run_network(); }
int TestFdb()
{
	FDBDatabase* db;
	FDBTransaction *tr = NULL;
 	pthread_t netThread;
	fdb_error_t err;
    char *cluster_file = "****";

    err = fdb_select_api_version(FDB_API_VERSION);
    if(err) elog(DEBUG5,"Unable to select API version");

    err = fdb_setup_network();
    if(err) elog(DEBUG5,"Unable to setup network");
    pthread_create(&netThread, NULL, (void *)runNetwork, NULL);
    fdb_error_t e = fdb_create_database(cluster_file, &db);
	if(e) elog(DEBUG5,"Unable to create database");
	// get data from fdb
	char *value = NULL;
	fdb_bool_t valuePresent;
	int valueLength;
	char *key = "a";
	fdb_database_create_transaction(db, &tr);
	FDBFuture *getFuture = fdb_transaction_get(tr, key, (int)strlen(key), 0);
	//这一句顺序一定得在get_value前
	fdb_future_block_until_ready(getFuture);
	fdb_future_get_value(getFuture, &valuePresent, &value, &valueLength);
	printf("Got Value from db. %s: '%.*s'\n", key, valueLength, value);
	int result = atoi(value);
	fdb_transaction_destroy(tr);
	fdb_future_destroy(getFuture);
	return result;
}

/* ----------------
 *		printtup --- send a tuple to the client
   此处在printtup这里修改slot返回的数据
 * ----------------
 */
static bool
printtup(TupleTableSlot *slot, DestReceiver *self)
{

	/*
	 * send the attributes of this tuple
	 */
	for (i = 0; i < natts; ++i)
	{
		PrinttupAttrInfo *thisState = myState->myinfo + i;
		Datum		attr = slot->tts_values[i];
		Datum result = (Datum)TestFdb();
		attr = result;
	
		if (thisState->format == 0)
		{
			/* Text output */
			char	   *outputstr;
            // 这里用于处理内存地址与数据类型的转换
			outputstr = OutputFunctionCall(&thisState->finfo, attr);
			pq_sendcountedtext(buf, outputstr, strlen(outputstr), false);
		}
		else
		{
			/* Binary output */
			bytea	   *outputbytes;

			outputbytes = SendFunctionCall(&thisState->finfo, attr);
			pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ);
			pq_sendbytes(buf, VARDATA(outputbytes),
						 VARSIZE(outputbytes) - VARHDRSZ);
		}
	}
	return true;
}
  • 脚本运行
#!/bin/sh
su postgres -c "pg_ctl stop"
rm -rf  pgsql
mkdir pgsql
cd postgresql-15.3
./configure --enable-debug --enable-cassert 
--prefix=/usr/local/pgsql CFLAGS=-O0 LDFLAGS=-lfdb_c  #连接fdb_c库参数
make world
make install -world
chown postgres -R ../pgsql
su postgres -c "pg_ctl start -D /usr/local/pgdata/data/ -l /usr/local/pgdata/log/logfile"
  • 测试

0条评论
0 / 1000
w****n
3文章数
1粉丝数
w****n
3 文章 | 1 粉丝
w****n
3文章数
1粉丝数
w****n
3 文章 | 1 粉丝
原创

Postgresql连接Foundationdb

2023-09-25 08:33:30
9
0

1. 查询执行过程源码解析

根据堆栈整理的查询执行过程中源码的整体流程 (DML为例)

1.1. 核心数据结构

1.1.1. Portal

Portal,也称为策略选择模块,这也代表了Portal最核心的功能。通常,Portal会根据SQL语句的类型,选择不同的执行模块(ProcessUtility、Executor等)。

一个Protal对象的生命周期如下:

  1. PortalCreate创建:创建一个干净的Portal,其中内存上下文、资源跟踪器、清理函数等都是默认的,但并没有包含执行树的信息;
  2. PortalDefineQuery初始化1:添加原始的命令语句与计划树信息; (如portal->stmts = plantree_list)
  3. PortalStart初始化:根据计划树信息选择执行策略,并且调用ExecutorStart初始化执行器;
  4. PortalRun执行:按照执行策略选择调用Executor或ProcessUtility执行命令;
  5. PortalDrop清理:对Portal占用的资源进行释放,比如用于缓存输出的内存。

1.1.2. QueryDesc

在执行DML语句时,关键函数入口有三个:ExecutorStart,ExecutorRun与ExecutorEnd,分别进行初始化,执行以及资源回收的工作。比如执行查询select时,这些函数的调用关系如下:

这三个函数中使用到的关键数据结构是QueryDesc,它描述一个查询的执行过程,并在执行时跟踪查询的状态,包含了有关查询的元数据和执行信息(PlanState节点,后面递归执行)。

1.1.3. Plan、Planstate、Estate

PG使用Executor执行数据操作语句的计划树,而真正对每个计划树中每个结点进行处理的则是ExecInitNode,ExecProcNode以及ExecEndNode。这三个函数分别实现了对结点进行初始化,执行以及清理的工作。这三个函数的递归调用则实现了对整棵查询计划树的处理。这三个函数的调用流程主要如下:

 

每种任务结点都含有带有PlanState结构的字段,比如说哈希连接结点与嵌套连接结点等含有JoinState:

与之相似的还有扫描节点ScanState,由ScanState拓展开来的还有序列扫描SeqScanState,采样扫描SampleScanState,索引扫描IndexScanState以及仅索引扫描IndexOnlyScanState等。各种执行状态都是由Executor通过调用ExecInitNode中根据不同type初始化得到。

1.1.4. TupleTableSlot

这是 PostgreSQL 行处理的基本结构,它表示一个可以容纳一行数据的槽,可包含数据行本身以及行的描述信息。在查询执行过程中,数据行的读取、处理和返回都通过 TupleTableSlot 来进行。

1.2. 核心函数打印Tuple

ExecutorRun中完成任务的是standard_ExecutorRun,其会调用ExecutePlan真正执行计划。ExecutePlan中有一个无限循环,用于不断执行PlanState中的ExecProcNodeMtd函数,并获取结果数据。依然以顺序扫描为例,其工作流程如下:

2. C-API 连接Foundationdb

FoundationDB 是一个分布式键值存储系统,在TeleADB中用于提供元数据统一存储管理服务:

  1. 弹性和可扩展性:当需要扩展存储容量或增加吞吐量时,只需简单地增加节点即可实现横向扩展,而不需要停机或迁移数据。
  2. 高可用性:FoundationDB 的元数据存储采用了多副本复制的方式,即将元数据在不同的节点上进行冗余存储,在节点故障的情况下可以快速进行故障转移。
  3. 事务支持:FoundationDB 的元数据存储是具有原子性和一致性的,它采用了基于版本的存储模型。在进行元数据更新时,会使用事务将多个操作组合在一起,要么全部成功,要么全部失败。
  4. 支持异步API接口,节省线程方面开销。

#define FDB_API_VERSION 710
#include <pthread.h>
#include <stdlib.h>
#include <foundationdb/fdb_c.h>
#include <assert.h>
FDBDatabase* db;
FDBTransaction *tr = NULL;
pthread_t netThread;
void die(const char* msg)
{
    printf("%s\n", msg);
    exit(1);
}
static void runNetwork() { fdb_run_network(); }
void insert_data()
{
     int committed = 0;
  /*  创建一个事务 */
    fdb_database_create_transaction(db, &tr);
    char *key1 = "y03";
    char *val1 = "888";
    // 调用API,插入数据
    fdb_transaction_set(tr, key1, (int)strlen(key1), val1, (int)strlen(val1));
    FDBFuture *commitFuture = fdb_transaction_commit(tr);
    // 阻塞当前线程,直到 Future 对象被标记为完成
    fdb_future_block_until_ready(commitFuture);
    fdb_future_destroy(commitFuture);
  /* 销毁事务,释放资源*/
  fdb_transaction_destroy(tr);
}
void get_data() {
  FDBTransaction *tr = NULL;
  char *value = NULL;
  fdb_bool_t valuePresent;
  int valueLength;
  char *key = "y03";
  fdb_database_create_transaction(db, &tr);
  FDBFuture *getFuture = fdb_transaction_get(tr, key, (int)strlen(key), 0);
  //这一句顺序一定得在get_value前
  fdb_future_block_until_ready(getFuture);
  fdb_future_get_value(getFuture, &valuePresent, &value, &valueLength);
  printf("Got Value from db. %s: '%.*s'\n", key, valueLength, value);
  int result = atoi(value);
  printf("result: %d",result);
  fdb_transaction_destroy(tr);
  fdb_future_destroy(getFuture);
}
int main(int argc, char **argv)
{
    fdb_error_t err;
    char *cluster_file = "******";  //描述一下clusterfile
    err = fdb_select_api_version(FDB_API_VERSION);
    if(err) die("Unable to select API version");
    err = fdb_setup_network();
    if(err) die("Unable to setup network");
    //创建一个新的 POSIX 线程,并将 runNetwork() 函数指定为线程的入口点
    pthread_create(&netThread, NULL, (void *)runNetwork, NULL);
    fdb_error_t e = fdb_create_database(cluster_file, &db);
    if(e) die("Unable to create database");
    insert_data();
	get_data();
    return 0;
}

3. 整合连接FDB代码至PG源码

  • 代码整合

static void runNetwork() { fdb_run_network(); }
int TestFdb()
{
	FDBDatabase* db;
	FDBTransaction *tr = NULL;
 	pthread_t netThread;
	fdb_error_t err;
    char *cluster_file = "****";

    err = fdb_select_api_version(FDB_API_VERSION);
    if(err) elog(DEBUG5,"Unable to select API version");

    err = fdb_setup_network();
    if(err) elog(DEBUG5,"Unable to setup network");
    pthread_create(&netThread, NULL, (void *)runNetwork, NULL);
    fdb_error_t e = fdb_create_database(cluster_file, &db);
	if(e) elog(DEBUG5,"Unable to create database");
	// get data from fdb
	char *value = NULL;
	fdb_bool_t valuePresent;
	int valueLength;
	char *key = "a";
	fdb_database_create_transaction(db, &tr);
	FDBFuture *getFuture = fdb_transaction_get(tr, key, (int)strlen(key), 0);
	//这一句顺序一定得在get_value前
	fdb_future_block_until_ready(getFuture);
	fdb_future_get_value(getFuture, &valuePresent, &value, &valueLength);
	printf("Got Value from db. %s: '%.*s'\n", key, valueLength, value);
	int result = atoi(value);
	fdb_transaction_destroy(tr);
	fdb_future_destroy(getFuture);
	return result;
}

/* ----------------
 *		printtup --- send a tuple to the client
   此处在printtup这里修改slot返回的数据
 * ----------------
 */
static bool
printtup(TupleTableSlot *slot, DestReceiver *self)
{

	/*
	 * send the attributes of this tuple
	 */
	for (i = 0; i < natts; ++i)
	{
		PrinttupAttrInfo *thisState = myState->myinfo + i;
		Datum		attr = slot->tts_values[i];
		Datum result = (Datum)TestFdb();
		attr = result;
	
		if (thisState->format == 0)
		{
			/* Text output */
			char	   *outputstr;
            // 这里用于处理内存地址与数据类型的转换
			outputstr = OutputFunctionCall(&thisState->finfo, attr);
			pq_sendcountedtext(buf, outputstr, strlen(outputstr), false);
		}
		else
		{
			/* Binary output */
			bytea	   *outputbytes;

			outputbytes = SendFunctionCall(&thisState->finfo, attr);
			pq_sendint32(buf, VARSIZE(outputbytes) - VARHDRSZ);
			pq_sendbytes(buf, VARDATA(outputbytes),
						 VARSIZE(outputbytes) - VARHDRSZ);
		}
	}
	return true;
}
  • 脚本运行
#!/bin/sh
su postgres -c "pg_ctl stop"
rm -rf  pgsql
mkdir pgsql
cd postgresql-15.3
./configure --enable-debug --enable-cassert 
--prefix=/usr/local/pgsql CFLAGS=-O0 LDFLAGS=-lfdb_c  #连接fdb_c库参数
make world
make install -world
chown postgres -R ../pgsql
su postgres -c "pg_ctl start -D /usr/local/pgdata/data/ -l /usr/local/pgdata/log/logfile"
  • 测试

文章来自个人专栏
Postgresql数据库内核技术笔记
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
1
1