searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

pgtrashcan

2023-10-16 01:19:03
42
0

背景

PostgreSQL本身不支持类似Oracle recycle bin这样的回收站机制。

本文将介绍使用PostgreSQL动态模块,以及_PG_init函数来创建Hook程序。

(_PG_init函数在加载动态模块时立即执行, 动态模块在会话建立时被加载, 所以很好被利用)

在Hook程序中写一些逻辑, 将删除表的操作, 转移到其他SCHEMA。

pgtrashcan就是利用这种机制来实现类似Oracle recycle bin的功能的。

测试

本次使用的是telepg12,telepg12中有pgtrashcan.so文件,不需要下载开源包适配编译,通过激活模块,

将以下设置添加到:postgresql.conf中:

shared_preload_libraries = 'pgtrashcan'

然后重新启动telepg12服务器,通过命令pg_ctl restart -D yourdatadir

1. 创建表test:

postgres=# create table test(id int);
CREATE TABLE
postrges=# \dt
       List of relations
Schema | Name | Type  | Owner
--------+------+-------+----------
public | test | table | jianghao
(1 row)

2. 删除表test

postgres=# drop table test;
DROP TABLE;
postgres=# \dt
       List of relations
Schema | Name | Type | Owner
--------+------+------+-------
       |      |      |
(0 rows)

postgres=# select * from trash.test;
id
----
(0 rows)

test表已经没了 但是能在trash模式中看到

3. 查看已有模式

postgres=# \dn
 List of schemas
Name  | Owner
--------+----------
public | jianghao
trash  | jianghao
(2 rows)

发现多出了trash模式

4. 进入trash模式

postgres=# select current_schema;
current_schema
----------------
public
(1 row)

postgres=# SET search_path TO trash;
SET
postgres=# select current_schema;
current_schema
----------------
trash
(1 row)

postrges=# \dt
       List of relations
Schema | Name | Type  | Owner
--------+------+-------+----------
trash  | test | table | jianghao
(1 row)

查看trash模式下的多出了test的表

5. 当需要test的表时,可以从trash模式中移到public模式中

postrges=# alter table trash.test set schema public;
ALTER TABLE
postrges=# \dt
       List of relations
Schema | Name | Type  | Owner
--------+------+-------+----------
public | test | table | jianghao
public | t    | table | jianghao
(2 rows)

postrges=# select * from trash.test;
2023-10-12 17:23:11.574 CST [91740] ERROR: relation "trash.test" does not exist at character 15
2023-10-12 17:23:11.574 CST [91740] STATEMENT:  select * from trash.test;
ERROR:  42P01: relation "trash.test" does not exist
LINE 1: select * from trash.test;
                      ^

注:pgtrashcan插件的使用限制

1. 目前pgtrashcan不支持drop cascade用法(例如删除主表, 或者删除连带的FK关系)。

postrges=# drop table t cascade;
2023-10-12 17:57:06.094 CST [91740] ERROR: trash can does not support DROP CASCADE
2023-10-12 17:57:06.094 CST [91740] STATEMENT:  drop table t cascade;
ERROR:  0A000: trash can does not support DROP CASCADE
LOCATION: pgtrashcan_ProcessUtility, pgtrashcan.c:119

2. 代码中, 并没有同名表的判断, 所以当Trash schema中存在同名的表时, 也会报错。

postrges=# drop table trash;
2023-10-12 17:06:23.133 CST [91740] ERROR: relation "test" already exists in schema "trash"
2023-10-12 17:06:23.133 CST [91740] STATEMENT:  drop table test;
ERROR:  42P07: relation "test" already exists in schema "trash"
LOCATION: AlterRelationNamespaceInternal, tablecmds.c:15093

3. drop table 的用户需要创建schema的权限, 以及写Trash schema的权限. 如果没有创建schema 的权限也会报错。

postgres=# drop table t;
2023-10-12 18:02:34.374 CST [91740] ERROR: permission denied for schema Trash
2023-10-12 18:02:34.374 CST [91740] STATEMENT: drop table t;
ERROR: 42501: permission denied for schema Trash  
LOCATION: aclcheck_error, aclchk.c:3371  

源码分析

pgtrashcan.c中有4个函数,分别为_PG_init()、makeRangeVarFromAnyName()、create_trashcan_schema()、pgtrashcan_ProcessUtility()。

_PG_init()

_PG_init() 是 PostgreSQL 扩展模块中的一个特定函数,用于在加载扩展模块时进行初始化。该函数是扩展模块必须实现的一个入口函数,用于执行在加载扩展时需要进行的初始化工作。

在 PostgreSQL 中,扩展模块是一种动态加载的、可插拔的代码单元,用于扩展 PostgreSQL 的功能。扩展可以包含新的函数、数据类型、运算符等,以及与 PostgreSQL 系统集成的其他功能。

void
_PG_init(void)
{
prev_ProcessUtility = ProcessUtility_hook;
if (!prev_ProcessUtility)
prev_ProcessUtility = standard_ProcessUtility;
ProcessUtility_hook = pgtrashcan_ProcessUtility;
}

用到了hook函数,hook工作原理:每一个hook是由一个全局性的函数指针构成的。服务端进行运行初始化其为NULL,当数据库必须调用的时候,首先会检测是否为NULL,不是则优先调用函数,否则执行标准函数。

设置函数指针:当数据库载入共享库时,首先会将其载入到内存中,然后执行一个函数调用_PG_init。这个函数存在大多数共享库中是有效的。所以我们可以通过这个函数来加载我们自己的hook。

取消函数指针设置:当数据库需要卸载其共享库时,会调用函数 _PG_fini() 。我们可以再此进行设置函数指针为NULL,这样就取消设置了

hook可以修改和中断用户的操作。

下面是常用hook列表,可以根据列表进行对数据库相关过程进行修改,不需要直接在PG源码下修改,仅需要加一个扩展组件即可。

Hook 初始版本 说明
check_password_hook 9.0 处理用户密码时调用的hook,可以对用户的密码进行限制,增加密码的规范。
ClientAuthentication_hook 9.1 处理连接时调用的hook,可以对连接进行管理。
ExecutorStart_hook 8.4 处理查询执行开始时调用的hook。
ExecutorRun_hook 8.4 处理查询执行时调用的hook。
ExecutorFinish_hook 8.4 处理查询结束时调用的hook。
ExecutorEnd_hook 8.4 处理查询完成后调用的hook。
ExecutorCheckPerms_hook 9.1 处理访问权限时调用的hook。
ProcessUtility_hook 9.0 通用hook,可以处理很多的过程。

注解: standard_ProcessUtility 标准hook函数,如果自定义hook等于NULL,默认执行standard_ProcessUtility(),该函数在:src/backend/tcop/utility.c

在C文件中,先声明ProcessUtility_hook,并赋值为NULL:

static ProcessUtility_hook_type prev_ProcessUtility = NULL;

然后调用_PG_init()安装hooks:

void
_PG_init(void)
{
prev_ProcessUtility = ProcessUtility_hook;
if (!prev_ProcessUtility)
prev_ProcessUtility = standard_ProcessUtility;
ProcessUtility_hook = pgtrashcan_ProcessUtility;
}

调用的时候,首先会检测是否为NULL,不是则优先调用函数,否则执行标准函数。

makeRangeVarFromAnyName()

static RangeVar *
makeRangeVarFromAnyName(List *names)
{
RangeVar *r = makeNode(RangeVar);

switch (list_length(names))
{
case 1:
r->catalogname = NULL;
r->schemaname = NULL;
r->relname = strVal(linitial(names));
break;
case 2:
r->catalogname = NULL;
r->schemaname = strVal(linitial(names));
r->relname = strVal(lsecond(names));
break;
case 3:
r->catalogname = strVal(linitial(names));;
r->schemaname = strVal(lsecond(names));
r->relname = strVal(lthird(names));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("improper qualified name (too many dotted names): %s",
NameListToString(names))));
break;
}

#if PG_VERSION_NUM >= 90100
r->relpersistence = RELPERSISTENCE_PERMANENT;
#endif
r->location = -1;

return r;
}

RangeVar结构体:

RangeVar 是 PostgreSQL 中的一个结构体,用于表示一个关系变量(relation variable),通常用于标识表、视图、函数等数据库对象的名称。

catalogname:数据库名

schemaname:模式名

relname:对象的名称,例如表名、视图名、函数名等

inh:(bool)是否通过继承扩展关系 ?是否递归作用于子节点?

relpersistence:参考pg_class.h文件 有三种类型:RELPERSISTENCE_PERMANENT(常规表)、RELPERSISTENCE_UNLOGGED(未记录的表)、RELPERSISTENCE_TEMP(临时表)

alias:表别名和可选的列别名

location:对象的位置信息(通常是源代码中的位置),用于错误报告,未知为-1

typedef struct RangeVar
{
NodeTag type;
char   *catalogname; /* the catalog (database) name, or NULL */
char   *schemaname; /* the schema name, or NULL */
char   *relname; /* the relation/sequence name */
bool inh; /* expand rel by inheritance? recursively act
* on children? */
char relpersistence; /* see RELPERSISTENCE_* in pg_class.h */
Alias   *alias; /* table alias & optional column aliases */
int location; /* token location, or -1 if unknown */
#ifdef __TBASE__
/* used for interval partition */
bool     intervalparent;    /* is interval partition */
PartitionForExpr *partitionvalue;/* partition for (...) */
#endif
#ifdef __STORAGE_SCALABLE__
char       *pubname;        /* relation published by publication */
#endif
#ifdef _PG_ORCL_
   char       *dblinkname;
#endif
} RangeVar;

RangeVar 结构体允许表示带模式的对象名称,例如 schemaname.relname。这个结构通常用于在 SQL 解析阶段识别操作的对象。在数据库操作中,我们常常需要知道要操作的表、视图或函数的名称及所属模式,RangeVar 结构体用于表示这些信息。

根据list的长度,进入switch分支

通过strVal()函数把其他类型转化为字符串类型

调用linitial()函数获取list第一个值、lsecond()函数获取第二个值、lthird()函数获得第三个值

然后赋值给RangeVar结构体中的成员

create_trashcan_schema()

static void
create_trashcan_schema(void)
{
HeapTuple   tuple;
Oid datdba;

if (SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(trashcan_nspname)))
return;

tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database with OID %u does not exist", MyDatabaseId)));

datdba = ((Form_pg_database) GETSTRUCT(tuple))->datdba;
ReleaseSysCache(tuple);

NamespaceCreate(trashcan_nspname, datdba, false);

CommandCounterIncrement();
}

该函数是用来创建Trash模式回收站,当表删除后会进入Trash模式中。

在PostgreSQL中,heaptuple(堆元组)是数据库中表的行的一种内部表示形式。它表示表中的一行数据,通常在磁盘上以堆存储的方式存储。堆是一种基本的数据存储结构,用于存储表中的数据行。

在堆上,每一行数据被组织成一个heaptuple。heaptuple通常包含表中的列的实际数据值以及一些元数据,例如行的可见性信息、行的长度等。这些元数据对于PostgreSQL引擎来说是重要的,以确保事务的一致性和可靠性。

在 PostgreSQL 中,Oid 表示对象标识符(Object Identifier)。它是一个整数数据类型,通常用于唯一标识数据库中的对象,如表、索引、函数等。Oid 类型通常用于系统表中,以标识和关联不同数据库对象的唯一标识。

在CatCache中查找元组有两种方式:精确匹配SearchCatCache和部分匹配SearchCatcacheList。前者用于给定CatCache所需的所有键值,并返回CatCache中能完全匹配这个键值的元组;而后者只需要给出部分键值,并将部分匹配的元组以一个CatCList的方式返回。

SysCache提供了SearchSysCache、SearchSysCacheList、ReleaseSysCache、SearchSysCacheCopy、SearchSysCacheExists、GetSysCacheOid、SearchSysCacheAttName、SearchSysCacheCopyAttName、SearchSysCacheExistsAttName,主要就是调用catcache.c提供的查找元组的函数,以及释放计数函数。

SearchSysCache是SearchCatCache的一层封装,为调用者执行初始化和进行查找键设置。如果找到元组,则返回该元组的缓存副本;如果未找到,则返回NULL。元组是“cache”副本,不能修改。调用方使用完元组后,调用ReleaseSysCache()释放SearchSysCache()获取的引用计数。如果不这样做,元组将一直锁定在缓存中,直到事务结束,这是可以容忍的,但不可取的。注意:调用方不能释放返回的元组!

HeapTuple SearchSysCache(int cacheId, Datum key1, Datum key2, Datum key3, Datum key4) {
    if (cacheId < 0 || cacheId >= SysCacheSize ||
        !PointerIsValid(SysCache[cacheId]))
        elog(ERROR, "invalid cache id: %d", cacheId);
    return SearchCatCache(SysCache[cacheId], key1, key2, key3, key4);
}

这个函数会搜索指定的系统缓存,使用提供的关键字进行匹配。如果找到匹配的缓存项,它将返回表示匹配项的 HeapTuple,该元组包含了缓存项的数据。如果没有找到匹配的缓存项,则返回空指针。

在使用完返回的 HeapTuple 后,您需要使用 ReleaseSysCache 函数释放资源。

SearchSysCacheList同理

struct catclist *SearchSysCacheList(int cacheId, int nkeys,
                   Datum key1, Datum key2, Datum key3, Datum key4){
    if (cacheId < 0 || cacheId >= SysCacheSize ||
        !PointerIsValid(SysCache[cacheId]))
        elog(ERROR, "invalid cache id: %d", cacheId);

    return SearchCatCacheList(SysCache[cacheId], nkeys,
                              key1, key2, key3, key4);
}

ReleaseSysCache是RelesaseCatCache的房子,用于释放tuple的引用计数

void ReleaseSysCache(HeapTuple tuple) {
    ReleaseCatCache(tuple);
}

SearchSysCacheCopy调用SearchSysCache并返回syscache条目的一份拷贝,并释放原条目

HeapTuple SearchSysCacheCopy(int cacheId, Datum key1, Datum key2, Datum key3, Datum key4) {
   HeapTuple    tuple, newtuple;
   tuple = SearchSysCache(cacheId, key1, key2, key3, key4);
   if (!HeapTupleIsValid(tuple))
       return tuple;
   newtuple = heap_copytuple(tuple);
   ReleaseSysCache(tuple);
   return newtuple;
}

SearchSysCacheExists查看某个tuple释放存在

bool SearchSysCacheExists(int cacheId, Datum key1, Datum key2, Datum key3, Datum key4) {
   HeapTuple    tuple;
   tuple = SearchSysCache(cacheId, key1, key2, key3, key4);
   if (!HeapTupleIsValid(tuple))
       return false;
   ReleaseSysCache(tuple);
   return true;
}

这个函数会搜索指定的系统缓存,使用提供的关键字进行匹配。如果找到匹配的缓存项,则返回 true,表示缓存项存在;否则返回 false,表示缓存项不存在。

GetSysCacheOid调用SearchSysCache,返回获取到的tuple的OID

Oid GetSysCacheOid(int cacheId, Datum key1, Datum key2, Datum key3, Datum key4){
   HeapTuple    tuple;
   Oid            result;
   tuple = SearchSysCache(cacheId, key1, key2, key3, key4);
   if (!HeapTupleIsValid(tuple))
        return InvalidOid;
   result = HeapTupleGetOid(tuple);
   ReleaseSysCache(tuple);
   return result;
}

SearchSysCacheAttName调用SearchSysCache查找ATTNAME,返回获取到的tuple

HeapTuple SearchSysCacheAttName(Oid relid, const char *attname){
    HeapTuple    tuple;
    tuple = SearchSysCache(ATTNAME,
                           ObjectIdGetDatum(relid),
                           CStringGetDatum(attname),
                           0, 0);
    if (!HeapTupleIsValid(tuple))
        return NULL;
    if (((Form_pg_attribute) GETSTRUCT(tuple))->attisdropped){
        ReleaseSysCache(tuple);
        return NULL;
    }
    return tuple;
}
 HeapTuple
SearchSysCacheCopyAttName(Oid relid, const char *attname)
{
    HeapTuple    tuple,
                newtuple;

    tuple = SearchSysCacheAttName(relid, attname);
    if (!HeapTupleIsValid(tuple))
        return tuple;
    newtuple = heap_copytuple(tuple);
    ReleaseSysCache(tuple);
    return newtuple;
}

SysCacheAttr给定先前由SearchSysCache()获取的元组,提取特定属性。这相当于对从非缓存关系获取的元组使用heap_ghtattr()。通常,这只用于可能为NULL或可变长度的属性;通过将元组映射到include/catalog/中的C struct声明,可以访问系统表中的固定大小属性。与heap_getattr()一样,如果属性是按引用传递的类型,则返回一个指向元组数据区域的指针---调用方不能修改或释放数据!注意:使用带有cacheId的SysCacheGetAttr()是合法的,它引用了从中获取元组的同一目录的不同缓存。

Datum
SysCacheGetAttr(int cacheId, HeapTuple tup,
AttrNumber attributeNumber,
bool *isNull)
{
/*
* We just need to get the TupleDesc out of the cache entry, and then we
* can apply heap_getattr(). Normally the cache control data is already
* valid (because the caller recently fetched the tuple via this same
* cache), but there are cases where we have to initialize the cache here.
*/
if (cacheId < 0 || cacheId >= SysCacheSize ||
!PointerIsValid(SysCache[cacheId]))
elog(ERROR, "invalid cache ID: %d", cacheId);
if (!PointerIsValid(SysCache[cacheId]->cc_tupdesc))
{
InitCatCachePhase2(SysCache[cacheId], false);
Assert(PointerIsValid(SysCache[cacheId]->cc_tupdesc));
}

return heap_getattr(tup, attributeNumber,
SysCache[cacheId]->cc_tupdesc,
isNull);
}

在该函数中运用到的 SearchSysCacheExists1、SearchSysCache1是根据SearchSysCacheExists、SearchSysCache重定义的

#define SearchSysCacheExists1(cacheId, key1) \
SearchSysCacheExists(cacheId, key1, 0, 0, 0)
#define SearchSysCacheExists2(cacheId, key1, key2) \
SearchSysCacheExists(cacheId, key1, key2, 0, 0)
#define SearchSysCacheExists3(cacheId, key1, key2, key3) \
SearchSysCacheExists(cacheId, key1, key2, key3, 0)
#define SearchSysCacheExists4(cacheId, key1, key2, key3, key4) \
SearchSysCacheExists(cacheId, key1, key2, key3, key4)
#define SearchSysCache1(cacheId, key1) \
SearchSysCache(cacheId, key1, 0, 0, 0)
#define SearchSysCache2(cacheId, key1, key2) \
SearchSysCache(cacheId, key1, key2, 0, 0)
#define SearchSysCache3(cacheId, key1, key2, key3) \
SearchSysCache(cacheId, key1, key2, key3, 0)
#define SearchSysCache4(cacheId, key1, key2, key3, key4) \
SearchSysCache(cacheId, key1, key2, key3, key4)
  • cacheId 是要搜索的系统缓存的标识符。

  • key1, key2, key3, key4 是用于匹配缓存项的关键字。这些关键字的具体含义和用法取决于要查询的系统缓存类型。

/*
* PointerGetDatum
* Returns datum representation for a pointer.
*/

#define PointerGetDatum(X) ((Datum) (X))

返回指针的数据表示形式。

/*
* ObjectIdGetDatum
* Returns datum representation for an object identifier.
*/

#define ObjectIdGetDatum(X) ((Datum) SET_4_BYTES(X))

返回对象标识符的数据表示形式。

/*
* Accessor macros to be used with HeapTuple pointers.
*/
#define HeapTupleIsValid(tuple) PointerIsValid(tuple)

/*
* PointerIsValid
* True iff pointer is valid.
*/
#define PointerIsValid(pointer) ((const void*)(pointer) != NULL)

与HeapTuple指针一起使用的访问器宏。

/*
* GETSTRUCT - given a HeapTuple pointer, return address of the user data
*/
#define GETSTRUCT(TUP) ((char *) ((TUP)->t_data) + (TUP)->t_data->t_hoff)

给定一个HeapTuple指针,返回用户数据的地址。

用于创建一个新的命名空间(也称为模式)。

/* ----------------
* NamespaceCreate
*
* Create a namespace (schema) with the given name and owner OID.
*
* If isTemp is true, this schema is a per-backend schema for holding
* temporary tables. Currently, it is used to prevent it from being
* linked as a member of any active extension. (If someone does CREATE
* TEMP TABLE in an extension script, we don't want the temp schema to
* become part of the extension). And to avoid checking for default ACL
* for temp namespace (as it is not necessary).
* ---------------
*/
//创建具有给定名称和所有者OID的名称空间(模式)。
//如果isTemp为true,则此模式是用于保存临时表的每个后端模式。目前,它用于防止将其链接为任何活动扩展的成员。(如果有人在扩展脚本中创建了TEMP TABLE,我们不希望TEMP模式成为扩展的一部分)。并避免检查临时名称空间的默认acl(因为这是不必要的)。
Oid
NamespaceCreate(const char *nspName, Oid ownerId, bool isTemp)
{
Relation nspdesc;
HeapTuple tup;
Oid nspoid;
bool nulls[Natts_pg_namespace];
Datum values[Natts_pg_namespace];
NameData nname;
TupleDesc tupDesc;
ObjectAddress myself;
int i;
Acl   *nspacl;

/* sanity checks 合理性检查*/
if (!nspName)
elog(ERROR, "no namespace name supplied");

/* make sure there is no existing namespace of same name 确保没有同名的现有名称空间*/
if (SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(nspName)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_SCHEMA),
errmsg("schema \"%s\" already exists", nspName)));

if (!isTemp)
nspacl = get_user_default_acl(ACL_OBJECT_NAMESPACE, ownerId,
 InvalidOid);
   //get_user_default_acl用于在创建新对象时确定指定用户对指定命名空间的默认访问控制列表(ACL)。
else
nspacl = NULL;

/* initialize nulls and values 初始化空值和值*/
for (i = 0; i < Natts_pg_namespace; i++)
{
nulls[i] = false;
values[i] = (Datum) NULL;
}
namestrcpy(&nname, nspName);
values[Anum_pg_namespace_nspname - 1] = NameGetDatum(&nname);
values[Anum_pg_namespace_nspowner - 1] = ObjectIdGetDatum(ownerId);
if (nspacl != NULL)
values[Anum_pg_namespace_nspacl - 1] = PointerGetDatum(nspacl);
else
nulls[Anum_pg_namespace_nspacl - 1] = true;

nspdesc = heap_open(NamespaceRelationId, RowExclusiveLock);
tupDesc = nspdesc->rd_att;

tup = heap_form_tuple(tupDesc, values, nulls);

nspoid = CatalogTupleInsert(nspdesc, tup);
Assert(OidIsValid(nspoid));

heap_close(nspdesc, RowExclusiveLock);

/* Record dependencies 记录依赖项*/
myself.classId = NamespaceRelationId;
myself.objectId = nspoid;
myself.objectSubId = 0;

/* dependency on owner */
recordDependencyOnOwner(NamespaceRelationId, nspoid, ownerId);

/* dependency on extension ... but not for magic temp schemas */
if (!isTemp)
recordDependencyOnCurrentExtension(&myself, false);

/* Post creation hook for new schema */
InvokeObjectPostCreateHook(NamespaceRelationId, nspoid, 0);

return nspoid;
}
void
CommandCounterIncrement(void)
{
/*
* If the current value of the command counter hasn't been "used" to mark
* tuples, we need not increment it, since there's no need to distinguish
* a read-only command from others. This helps postpone command counter
* overflow, and keeps no-op CommandCounterIncrement operations cheap.
*/
if (currentCommandIdUsed)
{
/*
* Workers synchronize transaction state at the beginning of each
* parallel operation, so we can't account for new commands after that
* point.
*/
if (IsInParallelMode() || IsParallelWorker())
elog(ERROR, "cannot start commands during a parallel operation");

currentCommandId += 1;
if (currentCommandId == InvalidCommandId)
{
currentCommandId -= 1;
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot have more than 2^32-2 commands in a transaction")));
}
currentCommandIdUsed = false;

/* Propagate new command ID into static snapshots */
SnapshotSetCommandId(currentCommandId);

#ifdef PGXC
/*
* Remote node should report local command id changes only if
* required by the Coordinator. The requirement of the
* Coordinator is inferred from the fact that Coordinator
* has itself sent the command id to the remote nodes.
*/
if (IsConnFromCoord() && IsSendCommandId())
ReportCommandIdChange(currentCommandId);
#endif

/*
* Make any catalog changes done by the just-completed command visible
* in the local syscache. We obviously don't need to do this after a
* read-only command. (But see hacks in inval.c to make real sure we
* don't think a command that queued inval messages was read-only.)
*/
AtCCI_LocalCache();
}
}

用于增加事务命令计数器(Command Counter)。这个计数器用于跟踪事务和命令的执行次数,以确保系统状态的正确更新。该函数没有参数,调用它会增加事务命令计数器。

通常,CommandCounterIncrement() 会在事务提交时自动调用,以确保所有对数据库状态的修改都能在后续的查询中生效。也可以在适当的时候手动调用该函数,以确保所做的更改被其他会话或查询所看到。

检查系统缓存中是否存在Trashcan模式,如果存在就直接返回不需要重新创建。

找到系统缓存中的tuple,检查tuple的合理性,找到tuple的地址

创建Trash模式,然后增势事务命令计数器。

pgtrashcan_ProcessUtility()

#if PG_VERSION_NUM >= 90300
static void
pgtrashcan_ProcessUtility(Node *parsetree,
 const char *queryString,
 ProcessUtilityContext context,
 ParamListInfo params,
 DestReceiver *dest,
 char *completionTag)
#else
static void
pgtrashcan_ProcessUtility(Node *parsetree,
 const char *queryString,
 ParamListInfo params,
 bool isTopLevel,
 DestReceiver *dest,
 char *completionTag)
#endif
{
if (nodeTag(parsetree) == T_DropStmt)
{
DropStmt *stmt = (DropStmt *) parsetree;

if (stmt->removeType == OBJECT_TABLE)
{
RangeVar *r;
AlterObjectSchemaStmt *newstmt = makeNode(AlterObjectSchemaStmt);
newstmt->objectType = stmt->removeType;
newstmt->newschema = pstrdup(trashcan_nspname);
#if PG_VERSION_NUM >= 90200
newstmt->missing_ok = stmt->missing_ok;
#endif
if (stmt->behavior != DROP_RESTRICT)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("trash can does not support DROP CASCADE")));

r = makeRangeVarFromAnyName(linitial(stmt->objects));
r->inhOpt = INH_YES;
r->alias = NULL;
newstmt->relation = r;

if (!r->schemaname || strcmp(r->schemaname, trashcan_nspname) != 0)
{
parsetree = (Node *) newstmt;
create_trashcan_schema();
}
}
}

#if PG_VERSION_NUM >= 90300
(*prev_ProcessUtility) (parsetree, queryString, context, params, dest, completionTag);
#else
(*prev_ProcessUtility) (parsetree, queryString, params, isTopLevel, dest, completionTag);
#endif
}

将任何节点的类型强制转换为Node

#define nodeTag(nodeptr)        (((const Node*)(nodeptr))->type)
/* ----------------------
* Drop Table|Sequence|View|Index|Type|Domain|Conversion|Schema Statement
* ----------------------
*/

typedef struct DropStmt
{
NodeTag type;
List   *objects; /* list of names */
ObjectType removeType; /* object type */
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
bool missing_ok; /* skip error if object is missing? */
bool concurrent; /* drop index concurrently? */
} DropStmt;
  • removeType: 要删除的对象类型,如表、视图、索引等。

  • objects: 要删除的对象列表,可能包含多个对象名。

  • behavior: 删除行为,如 RESTRICT、CASCADE 等。

判断SQL语句是否为drop table,是的话底层将删除表的语法改为将表移到新的模式Trash中,判断是否允许删除对象时的级联行为,通过DROP_RESTRICT 选项,系统将确保只有在没有依赖项的情况下才能删除该对象。如果存在依赖项,系统将拒绝删除操作并显示错误消息。

创建RangeVar 结构,用于表示表名或关系变量的名称信息,最后判断是否存在Trash模式,存在就跳过,没有调用create_trashcan_schema()函数创建。

(*prev_ProcessUtility) (parsetree, queryString, context, params, dest, completionTag); 是一个函数指针的声明,它声明了一个指向函数的指针,该函数的参数和返回值类型符合特定的形式。

0条评论
作者已关闭评论
j****n
4文章数
0粉丝数
j****n
4 文章 | 0 粉丝
j****n
4文章数
0粉丝数
j****n
4 文章 | 0 粉丝
原创

pgtrashcan

2023-10-16 01:19:03
42
0

背景

PostgreSQL本身不支持类似Oracle recycle bin这样的回收站机制。

本文将介绍使用PostgreSQL动态模块,以及_PG_init函数来创建Hook程序。

(_PG_init函数在加载动态模块时立即执行, 动态模块在会话建立时被加载, 所以很好被利用)

在Hook程序中写一些逻辑, 将删除表的操作, 转移到其他SCHEMA。

pgtrashcan就是利用这种机制来实现类似Oracle recycle bin的功能的。

测试

本次使用的是telepg12,telepg12中有pgtrashcan.so文件,不需要下载开源包适配编译,通过激活模块,

将以下设置添加到:postgresql.conf中:

shared_preload_libraries = 'pgtrashcan'

然后重新启动telepg12服务器,通过命令pg_ctl restart -D yourdatadir

1. 创建表test:

postgres=# create table test(id int);
CREATE TABLE
postrges=# \dt
       List of relations
Schema | Name | Type  | Owner
--------+------+-------+----------
public | test | table | jianghao
(1 row)

2. 删除表test

postgres=# drop table test;
DROP TABLE;
postgres=# \dt
       List of relations
Schema | Name | Type | Owner
--------+------+------+-------
       |      |      |
(0 rows)

postgres=# select * from trash.test;
id
----
(0 rows)

test表已经没了 但是能在trash模式中看到

3. 查看已有模式

postgres=# \dn
 List of schemas
Name  | Owner
--------+----------
public | jianghao
trash  | jianghao
(2 rows)

发现多出了trash模式

4. 进入trash模式

postgres=# select current_schema;
current_schema
----------------
public
(1 row)

postgres=# SET search_path TO trash;
SET
postgres=# select current_schema;
current_schema
----------------
trash
(1 row)

postrges=# \dt
       List of relations
Schema | Name | Type  | Owner
--------+------+-------+----------
trash  | test | table | jianghao
(1 row)

查看trash模式下的多出了test的表

5. 当需要test的表时,可以从trash模式中移到public模式中

postrges=# alter table trash.test set schema public;
ALTER TABLE
postrges=# \dt
       List of relations
Schema | Name | Type  | Owner
--------+------+-------+----------
public | test | table | jianghao
public | t    | table | jianghao
(2 rows)

postrges=# select * from trash.test;
2023-10-12 17:23:11.574 CST [91740] ERROR: relation "trash.test" does not exist at character 15
2023-10-12 17:23:11.574 CST [91740] STATEMENT:  select * from trash.test;
ERROR:  42P01: relation "trash.test" does not exist
LINE 1: select * from trash.test;
                      ^

注:pgtrashcan插件的使用限制

1. 目前pgtrashcan不支持drop cascade用法(例如删除主表, 或者删除连带的FK关系)。

postrges=# drop table t cascade;
2023-10-12 17:57:06.094 CST [91740] ERROR: trash can does not support DROP CASCADE
2023-10-12 17:57:06.094 CST [91740] STATEMENT:  drop table t cascade;
ERROR:  0A000: trash can does not support DROP CASCADE
LOCATION: pgtrashcan_ProcessUtility, pgtrashcan.c:119

2. 代码中, 并没有同名表的判断, 所以当Trash schema中存在同名的表时, 也会报错。

postrges=# drop table trash;
2023-10-12 17:06:23.133 CST [91740] ERROR: relation "test" already exists in schema "trash"
2023-10-12 17:06:23.133 CST [91740] STATEMENT:  drop table test;
ERROR:  42P07: relation "test" already exists in schema "trash"
LOCATION: AlterRelationNamespaceInternal, tablecmds.c:15093

3. drop table 的用户需要创建schema的权限, 以及写Trash schema的权限. 如果没有创建schema 的权限也会报错。

postgres=# drop table t;
2023-10-12 18:02:34.374 CST [91740] ERROR: permission denied for schema Trash
2023-10-12 18:02:34.374 CST [91740] STATEMENT: drop table t;
ERROR: 42501: permission denied for schema Trash  
LOCATION: aclcheck_error, aclchk.c:3371  

源码分析

pgtrashcan.c中有4个函数,分别为_PG_init()、makeRangeVarFromAnyName()、create_trashcan_schema()、pgtrashcan_ProcessUtility()。

_PG_init()

_PG_init() 是 PostgreSQL 扩展模块中的一个特定函数,用于在加载扩展模块时进行初始化。该函数是扩展模块必须实现的一个入口函数,用于执行在加载扩展时需要进行的初始化工作。

在 PostgreSQL 中,扩展模块是一种动态加载的、可插拔的代码单元,用于扩展 PostgreSQL 的功能。扩展可以包含新的函数、数据类型、运算符等,以及与 PostgreSQL 系统集成的其他功能。

void
_PG_init(void)
{
prev_ProcessUtility = ProcessUtility_hook;
if (!prev_ProcessUtility)
prev_ProcessUtility = standard_ProcessUtility;
ProcessUtility_hook = pgtrashcan_ProcessUtility;
}

用到了hook函数,hook工作原理:每一个hook是由一个全局性的函数指针构成的。服务端进行运行初始化其为NULL,当数据库必须调用的时候,首先会检测是否为NULL,不是则优先调用函数,否则执行标准函数。

设置函数指针:当数据库载入共享库时,首先会将其载入到内存中,然后执行一个函数调用_PG_init。这个函数存在大多数共享库中是有效的。所以我们可以通过这个函数来加载我们自己的hook。

取消函数指针设置:当数据库需要卸载其共享库时,会调用函数 _PG_fini() 。我们可以再此进行设置函数指针为NULL,这样就取消设置了

hook可以修改和中断用户的操作。

下面是常用hook列表,可以根据列表进行对数据库相关过程进行修改,不需要直接在PG源码下修改,仅需要加一个扩展组件即可。

Hook 初始版本 说明
check_password_hook 9.0 处理用户密码时调用的hook,可以对用户的密码进行限制,增加密码的规范。
ClientAuthentication_hook 9.1 处理连接时调用的hook,可以对连接进行管理。
ExecutorStart_hook 8.4 处理查询执行开始时调用的hook。
ExecutorRun_hook 8.4 处理查询执行时调用的hook。
ExecutorFinish_hook 8.4 处理查询结束时调用的hook。
ExecutorEnd_hook 8.4 处理查询完成后调用的hook。
ExecutorCheckPerms_hook 9.1 处理访问权限时调用的hook。
ProcessUtility_hook 9.0 通用hook,可以处理很多的过程。

注解: standard_ProcessUtility 标准hook函数,如果自定义hook等于NULL,默认执行standard_ProcessUtility(),该函数在:src/backend/tcop/utility.c

在C文件中,先声明ProcessUtility_hook,并赋值为NULL:

static ProcessUtility_hook_type prev_ProcessUtility = NULL;

然后调用_PG_init()安装hooks:

void
_PG_init(void)
{
prev_ProcessUtility = ProcessUtility_hook;
if (!prev_ProcessUtility)
prev_ProcessUtility = standard_ProcessUtility;
ProcessUtility_hook = pgtrashcan_ProcessUtility;
}

调用的时候,首先会检测是否为NULL,不是则优先调用函数,否则执行标准函数。

makeRangeVarFromAnyName()

static RangeVar *
makeRangeVarFromAnyName(List *names)
{
RangeVar *r = makeNode(RangeVar);

switch (list_length(names))
{
case 1:
r->catalogname = NULL;
r->schemaname = NULL;
r->relname = strVal(linitial(names));
break;
case 2:
r->catalogname = NULL;
r->schemaname = strVal(linitial(names));
r->relname = strVal(lsecond(names));
break;
case 3:
r->catalogname = strVal(linitial(names));;
r->schemaname = strVal(lsecond(names));
r->relname = strVal(lthird(names));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("improper qualified name (too many dotted names): %s",
NameListToString(names))));
break;
}

#if PG_VERSION_NUM >= 90100
r->relpersistence = RELPERSISTENCE_PERMANENT;
#endif
r->location = -1;

return r;
}

RangeVar结构体:

RangeVar 是 PostgreSQL 中的一个结构体,用于表示一个关系变量(relation variable),通常用于标识表、视图、函数等数据库对象的名称。

catalogname:数据库名

schemaname:模式名

relname:对象的名称,例如表名、视图名、函数名等

inh:(bool)是否通过继承扩展关系 ?是否递归作用于子节点?

relpersistence:参考pg_class.h文件 有三种类型:RELPERSISTENCE_PERMANENT(常规表)、RELPERSISTENCE_UNLOGGED(未记录的表)、RELPERSISTENCE_TEMP(临时表)

alias:表别名和可选的列别名

location:对象的位置信息(通常是源代码中的位置),用于错误报告,未知为-1

typedef struct RangeVar
{
NodeTag type;
char   *catalogname; /* the catalog (database) name, or NULL */
char   *schemaname; /* the schema name, or NULL */
char   *relname; /* the relation/sequence name */
bool inh; /* expand rel by inheritance? recursively act
* on children? */
char relpersistence; /* see RELPERSISTENCE_* in pg_class.h */
Alias   *alias; /* table alias & optional column aliases */
int location; /* token location, or -1 if unknown */
#ifdef __TBASE__
/* used for interval partition */
bool     intervalparent;    /* is interval partition */
PartitionForExpr *partitionvalue;/* partition for (...) */
#endif
#ifdef __STORAGE_SCALABLE__
char       *pubname;        /* relation published by publication */
#endif
#ifdef _PG_ORCL_
   char       *dblinkname;
#endif
} RangeVar;

RangeVar 结构体允许表示带模式的对象名称,例如 schemaname.relname。这个结构通常用于在 SQL 解析阶段识别操作的对象。在数据库操作中,我们常常需要知道要操作的表、视图或函数的名称及所属模式,RangeVar 结构体用于表示这些信息。

根据list的长度,进入switch分支

通过strVal()函数把其他类型转化为字符串类型

调用linitial()函数获取list第一个值、lsecond()函数获取第二个值、lthird()函数获得第三个值

然后赋值给RangeVar结构体中的成员

create_trashcan_schema()

static void
create_trashcan_schema(void)
{
HeapTuple   tuple;
Oid datdba;

if (SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(trashcan_nspname)))
return;

tuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(MyDatabaseId));
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_DATABASE),
errmsg("database with OID %u does not exist", MyDatabaseId)));

datdba = ((Form_pg_database) GETSTRUCT(tuple))->datdba;
ReleaseSysCache(tuple);

NamespaceCreate(trashcan_nspname, datdba, false);

CommandCounterIncrement();
}

该函数是用来创建Trash模式回收站,当表删除后会进入Trash模式中。

在PostgreSQL中,heaptuple(堆元组)是数据库中表的行的一种内部表示形式。它表示表中的一行数据,通常在磁盘上以堆存储的方式存储。堆是一种基本的数据存储结构,用于存储表中的数据行。

在堆上,每一行数据被组织成一个heaptuple。heaptuple通常包含表中的列的实际数据值以及一些元数据,例如行的可见性信息、行的长度等。这些元数据对于PostgreSQL引擎来说是重要的,以确保事务的一致性和可靠性。

在 PostgreSQL 中,Oid 表示对象标识符(Object Identifier)。它是一个整数数据类型,通常用于唯一标识数据库中的对象,如表、索引、函数等。Oid 类型通常用于系统表中,以标识和关联不同数据库对象的唯一标识。

在CatCache中查找元组有两种方式:精确匹配SearchCatCache和部分匹配SearchCatcacheList。前者用于给定CatCache所需的所有键值,并返回CatCache中能完全匹配这个键值的元组;而后者只需要给出部分键值,并将部分匹配的元组以一个CatCList的方式返回。

SysCache提供了SearchSysCache、SearchSysCacheList、ReleaseSysCache、SearchSysCacheCopy、SearchSysCacheExists、GetSysCacheOid、SearchSysCacheAttName、SearchSysCacheCopyAttName、SearchSysCacheExistsAttName,主要就是调用catcache.c提供的查找元组的函数,以及释放计数函数。

SearchSysCache是SearchCatCache的一层封装,为调用者执行初始化和进行查找键设置。如果找到元组,则返回该元组的缓存副本;如果未找到,则返回NULL。元组是“cache”副本,不能修改。调用方使用完元组后,调用ReleaseSysCache()释放SearchSysCache()获取的引用计数。如果不这样做,元组将一直锁定在缓存中,直到事务结束,这是可以容忍的,但不可取的。注意:调用方不能释放返回的元组!

HeapTuple SearchSysCache(int cacheId, Datum key1, Datum key2, Datum key3, Datum key4) {
    if (cacheId < 0 || cacheId >= SysCacheSize ||
        !PointerIsValid(SysCache[cacheId]))
        elog(ERROR, "invalid cache id: %d", cacheId);
    return SearchCatCache(SysCache[cacheId], key1, key2, key3, key4);
}

这个函数会搜索指定的系统缓存,使用提供的关键字进行匹配。如果找到匹配的缓存项,它将返回表示匹配项的 HeapTuple,该元组包含了缓存项的数据。如果没有找到匹配的缓存项,则返回空指针。

在使用完返回的 HeapTuple 后,您需要使用 ReleaseSysCache 函数释放资源。

SearchSysCacheList同理

struct catclist *SearchSysCacheList(int cacheId, int nkeys,
                   Datum key1, Datum key2, Datum key3, Datum key4){
    if (cacheId < 0 || cacheId >= SysCacheSize ||
        !PointerIsValid(SysCache[cacheId]))
        elog(ERROR, "invalid cache id: %d", cacheId);

    return SearchCatCacheList(SysCache[cacheId], nkeys,
                              key1, key2, key3, key4);
}

ReleaseSysCache是RelesaseCatCache的房子,用于释放tuple的引用计数

void ReleaseSysCache(HeapTuple tuple) {
    ReleaseCatCache(tuple);
}

SearchSysCacheCopy调用SearchSysCache并返回syscache条目的一份拷贝,并释放原条目

HeapTuple SearchSysCacheCopy(int cacheId, Datum key1, Datum key2, Datum key3, Datum key4) {
   HeapTuple    tuple, newtuple;
   tuple = SearchSysCache(cacheId, key1, key2, key3, key4);
   if (!HeapTupleIsValid(tuple))
       return tuple;
   newtuple = heap_copytuple(tuple);
   ReleaseSysCache(tuple);
   return newtuple;
}

SearchSysCacheExists查看某个tuple释放存在

bool SearchSysCacheExists(int cacheId, Datum key1, Datum key2, Datum key3, Datum key4) {
   HeapTuple    tuple;
   tuple = SearchSysCache(cacheId, key1, key2, key3, key4);
   if (!HeapTupleIsValid(tuple))
       return false;
   ReleaseSysCache(tuple);
   return true;
}

这个函数会搜索指定的系统缓存,使用提供的关键字进行匹配。如果找到匹配的缓存项,则返回 true,表示缓存项存在;否则返回 false,表示缓存项不存在。

GetSysCacheOid调用SearchSysCache,返回获取到的tuple的OID

Oid GetSysCacheOid(int cacheId, Datum key1, Datum key2, Datum key3, Datum key4){
   HeapTuple    tuple;
   Oid            result;
   tuple = SearchSysCache(cacheId, key1, key2, key3, key4);
   if (!HeapTupleIsValid(tuple))
        return InvalidOid;
   result = HeapTupleGetOid(tuple);
   ReleaseSysCache(tuple);
   return result;
}

SearchSysCacheAttName调用SearchSysCache查找ATTNAME,返回获取到的tuple

HeapTuple SearchSysCacheAttName(Oid relid, const char *attname){
    HeapTuple    tuple;
    tuple = SearchSysCache(ATTNAME,
                           ObjectIdGetDatum(relid),
                           CStringGetDatum(attname),
                           0, 0);
    if (!HeapTupleIsValid(tuple))
        return NULL;
    if (((Form_pg_attribute) GETSTRUCT(tuple))->attisdropped){
        ReleaseSysCache(tuple);
        return NULL;
    }
    return tuple;
}
 HeapTuple
SearchSysCacheCopyAttName(Oid relid, const char *attname)
{
    HeapTuple    tuple,
                newtuple;

    tuple = SearchSysCacheAttName(relid, attname);
    if (!HeapTupleIsValid(tuple))
        return tuple;
    newtuple = heap_copytuple(tuple);
    ReleaseSysCache(tuple);
    return newtuple;
}

SysCacheAttr给定先前由SearchSysCache()获取的元组,提取特定属性。这相当于对从非缓存关系获取的元组使用heap_ghtattr()。通常,这只用于可能为NULL或可变长度的属性;通过将元组映射到include/catalog/中的C struct声明,可以访问系统表中的固定大小属性。与heap_getattr()一样,如果属性是按引用传递的类型,则返回一个指向元组数据区域的指针---调用方不能修改或释放数据!注意:使用带有cacheId的SysCacheGetAttr()是合法的,它引用了从中获取元组的同一目录的不同缓存。

Datum
SysCacheGetAttr(int cacheId, HeapTuple tup,
AttrNumber attributeNumber,
bool *isNull)
{
/*
* We just need to get the TupleDesc out of the cache entry, and then we
* can apply heap_getattr(). Normally the cache control data is already
* valid (because the caller recently fetched the tuple via this same
* cache), but there are cases where we have to initialize the cache here.
*/
if (cacheId < 0 || cacheId >= SysCacheSize ||
!PointerIsValid(SysCache[cacheId]))
elog(ERROR, "invalid cache ID: %d", cacheId);
if (!PointerIsValid(SysCache[cacheId]->cc_tupdesc))
{
InitCatCachePhase2(SysCache[cacheId], false);
Assert(PointerIsValid(SysCache[cacheId]->cc_tupdesc));
}

return heap_getattr(tup, attributeNumber,
SysCache[cacheId]->cc_tupdesc,
isNull);
}

在该函数中运用到的 SearchSysCacheExists1、SearchSysCache1是根据SearchSysCacheExists、SearchSysCache重定义的

#define SearchSysCacheExists1(cacheId, key1) \
SearchSysCacheExists(cacheId, key1, 0, 0, 0)
#define SearchSysCacheExists2(cacheId, key1, key2) \
SearchSysCacheExists(cacheId, key1, key2, 0, 0)
#define SearchSysCacheExists3(cacheId, key1, key2, key3) \
SearchSysCacheExists(cacheId, key1, key2, key3, 0)
#define SearchSysCacheExists4(cacheId, key1, key2, key3, key4) \
SearchSysCacheExists(cacheId, key1, key2, key3, key4)
#define SearchSysCache1(cacheId, key1) \
SearchSysCache(cacheId, key1, 0, 0, 0)
#define SearchSysCache2(cacheId, key1, key2) \
SearchSysCache(cacheId, key1, key2, 0, 0)
#define SearchSysCache3(cacheId, key1, key2, key3) \
SearchSysCache(cacheId, key1, key2, key3, 0)
#define SearchSysCache4(cacheId, key1, key2, key3, key4) \
SearchSysCache(cacheId, key1, key2, key3, key4)
  • cacheId 是要搜索的系统缓存的标识符。

  • key1, key2, key3, key4 是用于匹配缓存项的关键字。这些关键字的具体含义和用法取决于要查询的系统缓存类型。

/*
* PointerGetDatum
* Returns datum representation for a pointer.
*/

#define PointerGetDatum(X) ((Datum) (X))

返回指针的数据表示形式。

/*
* ObjectIdGetDatum
* Returns datum representation for an object identifier.
*/

#define ObjectIdGetDatum(X) ((Datum) SET_4_BYTES(X))

返回对象标识符的数据表示形式。

/*
* Accessor macros to be used with HeapTuple pointers.
*/
#define HeapTupleIsValid(tuple) PointerIsValid(tuple)

/*
* PointerIsValid
* True iff pointer is valid.
*/
#define PointerIsValid(pointer) ((const void*)(pointer) != NULL)

与HeapTuple指针一起使用的访问器宏。

/*
* GETSTRUCT - given a HeapTuple pointer, return address of the user data
*/
#define GETSTRUCT(TUP) ((char *) ((TUP)->t_data) + (TUP)->t_data->t_hoff)

给定一个HeapTuple指针,返回用户数据的地址。

用于创建一个新的命名空间(也称为模式)。

/* ----------------
* NamespaceCreate
*
* Create a namespace (schema) with the given name and owner OID.
*
* If isTemp is true, this schema is a per-backend schema for holding
* temporary tables. Currently, it is used to prevent it from being
* linked as a member of any active extension. (If someone does CREATE
* TEMP TABLE in an extension script, we don't want the temp schema to
* become part of the extension). And to avoid checking for default ACL
* for temp namespace (as it is not necessary).
* ---------------
*/
//创建具有给定名称和所有者OID的名称空间(模式)。
//如果isTemp为true,则此模式是用于保存临时表的每个后端模式。目前,它用于防止将其链接为任何活动扩展的成员。(如果有人在扩展脚本中创建了TEMP TABLE,我们不希望TEMP模式成为扩展的一部分)。并避免检查临时名称空间的默认acl(因为这是不必要的)。
Oid
NamespaceCreate(const char *nspName, Oid ownerId, bool isTemp)
{
Relation nspdesc;
HeapTuple tup;
Oid nspoid;
bool nulls[Natts_pg_namespace];
Datum values[Natts_pg_namespace];
NameData nname;
TupleDesc tupDesc;
ObjectAddress myself;
int i;
Acl   *nspacl;

/* sanity checks 合理性检查*/
if (!nspName)
elog(ERROR, "no namespace name supplied");

/* make sure there is no existing namespace of same name 确保没有同名的现有名称空间*/
if (SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(nspName)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_SCHEMA),
errmsg("schema \"%s\" already exists", nspName)));

if (!isTemp)
nspacl = get_user_default_acl(ACL_OBJECT_NAMESPACE, ownerId,
 InvalidOid);
   //get_user_default_acl用于在创建新对象时确定指定用户对指定命名空间的默认访问控制列表(ACL)。
else
nspacl = NULL;

/* initialize nulls and values 初始化空值和值*/
for (i = 0; i < Natts_pg_namespace; i++)
{
nulls[i] = false;
values[i] = (Datum) NULL;
}
namestrcpy(&nname, nspName);
values[Anum_pg_namespace_nspname - 1] = NameGetDatum(&nname);
values[Anum_pg_namespace_nspowner - 1] = ObjectIdGetDatum(ownerId);
if (nspacl != NULL)
values[Anum_pg_namespace_nspacl - 1] = PointerGetDatum(nspacl);
else
nulls[Anum_pg_namespace_nspacl - 1] = true;

nspdesc = heap_open(NamespaceRelationId, RowExclusiveLock);
tupDesc = nspdesc->rd_att;

tup = heap_form_tuple(tupDesc, values, nulls);

nspoid = CatalogTupleInsert(nspdesc, tup);
Assert(OidIsValid(nspoid));

heap_close(nspdesc, RowExclusiveLock);

/* Record dependencies 记录依赖项*/
myself.classId = NamespaceRelationId;
myself.objectId = nspoid;
myself.objectSubId = 0;

/* dependency on owner */
recordDependencyOnOwner(NamespaceRelationId, nspoid, ownerId);

/* dependency on extension ... but not for magic temp schemas */
if (!isTemp)
recordDependencyOnCurrentExtension(&myself, false);

/* Post creation hook for new schema */
InvokeObjectPostCreateHook(NamespaceRelationId, nspoid, 0);

return nspoid;
}
void
CommandCounterIncrement(void)
{
/*
* If the current value of the command counter hasn't been "used" to mark
* tuples, we need not increment it, since there's no need to distinguish
* a read-only command from others. This helps postpone command counter
* overflow, and keeps no-op CommandCounterIncrement operations cheap.
*/
if (currentCommandIdUsed)
{
/*
* Workers synchronize transaction state at the beginning of each
* parallel operation, so we can't account for new commands after that
* point.
*/
if (IsInParallelMode() || IsParallelWorker())
elog(ERROR, "cannot start commands during a parallel operation");

currentCommandId += 1;
if (currentCommandId == InvalidCommandId)
{
currentCommandId -= 1;
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot have more than 2^32-2 commands in a transaction")));
}
currentCommandIdUsed = false;

/* Propagate new command ID into static snapshots */
SnapshotSetCommandId(currentCommandId);

#ifdef PGXC
/*
* Remote node should report local command id changes only if
* required by the Coordinator. The requirement of the
* Coordinator is inferred from the fact that Coordinator
* has itself sent the command id to the remote nodes.
*/
if (IsConnFromCoord() && IsSendCommandId())
ReportCommandIdChange(currentCommandId);
#endif

/*
* Make any catalog changes done by the just-completed command visible
* in the local syscache. We obviously don't need to do this after a
* read-only command. (But see hacks in inval.c to make real sure we
* don't think a command that queued inval messages was read-only.)
*/
AtCCI_LocalCache();
}
}

用于增加事务命令计数器(Command Counter)。这个计数器用于跟踪事务和命令的执行次数,以确保系统状态的正确更新。该函数没有参数,调用它会增加事务命令计数器。

通常,CommandCounterIncrement() 会在事务提交时自动调用,以确保所有对数据库状态的修改都能在后续的查询中生效。也可以在适当的时候手动调用该函数,以确保所做的更改被其他会话或查询所看到。

检查系统缓存中是否存在Trashcan模式,如果存在就直接返回不需要重新创建。

找到系统缓存中的tuple,检查tuple的合理性,找到tuple的地址

创建Trash模式,然后增势事务命令计数器。

pgtrashcan_ProcessUtility()

#if PG_VERSION_NUM >= 90300
static void
pgtrashcan_ProcessUtility(Node *parsetree,
 const char *queryString,
 ProcessUtilityContext context,
 ParamListInfo params,
 DestReceiver *dest,
 char *completionTag)
#else
static void
pgtrashcan_ProcessUtility(Node *parsetree,
 const char *queryString,
 ParamListInfo params,
 bool isTopLevel,
 DestReceiver *dest,
 char *completionTag)
#endif
{
if (nodeTag(parsetree) == T_DropStmt)
{
DropStmt *stmt = (DropStmt *) parsetree;

if (stmt->removeType == OBJECT_TABLE)
{
RangeVar *r;
AlterObjectSchemaStmt *newstmt = makeNode(AlterObjectSchemaStmt);
newstmt->objectType = stmt->removeType;
newstmt->newschema = pstrdup(trashcan_nspname);
#if PG_VERSION_NUM >= 90200
newstmt->missing_ok = stmt->missing_ok;
#endif
if (stmt->behavior != DROP_RESTRICT)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("trash can does not support DROP CASCADE")));

r = makeRangeVarFromAnyName(linitial(stmt->objects));
r->inhOpt = INH_YES;
r->alias = NULL;
newstmt->relation = r;

if (!r->schemaname || strcmp(r->schemaname, trashcan_nspname) != 0)
{
parsetree = (Node *) newstmt;
create_trashcan_schema();
}
}
}

#if PG_VERSION_NUM >= 90300
(*prev_ProcessUtility) (parsetree, queryString, context, params, dest, completionTag);
#else
(*prev_ProcessUtility) (parsetree, queryString, params, isTopLevel, dest, completionTag);
#endif
}

将任何节点的类型强制转换为Node

#define nodeTag(nodeptr)        (((const Node*)(nodeptr))->type)
/* ----------------------
* Drop Table|Sequence|View|Index|Type|Domain|Conversion|Schema Statement
* ----------------------
*/

typedef struct DropStmt
{
NodeTag type;
List   *objects; /* list of names */
ObjectType removeType; /* object type */
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
bool missing_ok; /* skip error if object is missing? */
bool concurrent; /* drop index concurrently? */
} DropStmt;
  • removeType: 要删除的对象类型,如表、视图、索引等。

  • objects: 要删除的对象列表,可能包含多个对象名。

  • behavior: 删除行为,如 RESTRICT、CASCADE 等。

判断SQL语句是否为drop table,是的话底层将删除表的语法改为将表移到新的模式Trash中,判断是否允许删除对象时的级联行为,通过DROP_RESTRICT 选项,系统将确保只有在没有依赖项的情况下才能删除该对象。如果存在依赖项,系统将拒绝删除操作并显示错误消息。

创建RangeVar 结构,用于表示表名或关系变量的名称信息,最后判断是否存在Trash模式,存在就跳过,没有调用create_trashcan_schema()函数创建。

(*prev_ProcessUtility) (parsetree, queryString, context, params, dest, completionTag); 是一个函数指针的声明,它声明了一个指向函数的指针,该函数的参数和返回值类型符合特定的形式。

文章来自个人专栏
postgresql学习
4 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0