1 问题场景
执行如下的 sql:
begin;
drop table t;
此 sql 后续的可能执行情况:
// 1. 被提交
commit;
// 2. 被回滚
rollback;
// 3. 走 xa 接口
prepare transaction 'mytest';
// 3.1 走 xa 提交
commmit prepared 'mytest';
// 3.2 走 xa 回滚
rollback prepared 'mytest';
那么对于表 t 的数据文件清理,就要满足以下要求:
- 在 commit 提交成功才能清理数据;
- 在未提交前,不能清理数据,事物被回滚,数据要保证完整性;
- 事物提交完成,要能成功的清理数据;
- 对于走 xa 接口,还要考虑 prepare 完成后,数据库重启了,在执行 commit prepare 时,如何知道要清理哪些数据。
2 postgres ddl 实现事务性
- Postgres 通过 pendingDeletes 这个变量来记录事物执行过程中需要处理的临时文件;
- 通过 wal 将 pendingDeletes 日志化,来达到事务一致性效果。
typedef struct PendingRelDelete
{
RelFileNode relnode; /* relation that may need to be deleted */
BackendId backend; /* InvalidBackendId if not a temp rel */
bool atCommit; /* T=delete at commit; F=delete at abort */
int nestLevel; /* xact nesting level of request */
struct PendingRelDelete *next; /* linked-list link */
} PendingRelDelete;
static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
2.1 新建表
/* Add the relation to the list of stuff to delete at abort */
pending = (PendingRelDelete *)
MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
pending->relnode = rnode;
pending->backend = backend;
pending->atCommit = false; /* delete if abort */
pending->nestLevel = GetCurrentTransactionNestLevel();
pending->next = pendingDeletes;
pendingDeletes = pending;
对于新建表,如果事务回滚,那么中间文件要被清理。
可以看出 pending->atCommit = false; /* delete if abort */ 明确说明。
2.2 删除表
/* Add the relation to the list of stuff to delete at commit */
pending = (PendingRelDelete *)
MemoryContextAlloc(TopMemoryContext, sizeof(PendingRelDelete));
pending->relnode = rel->rd_node;
pending->backend = rel->rd_backend;
pending->atCommit = true; /* delete if commit */
pending->nestLevel = GetCurrentTransactionNestLevel();
pending->next = pendingDeletes;
pendingDeletes = pending;
对于 drop 表,如果事物提交,那么数据文件要被清理。
2.3 执行清理函数
/*
* smgrDoPendingDeletes() -- Take care of relation deletes at end of xact.
*
* This also runs when aborting a subxact; we want to clean up a failed
* subxact immediately.
*
*/
void
smgrDoPendingDeletes(bool isCommit)
{
for (pending = pendingDeletes; pending != NULL; pending = next)
{
......
}
}
- 函数是在事务结束时被调用(commit 完成或者 rollback 完成);
- 有一个参数 commit,代表当前事务状态,例如设置 true,那么执行 commit 清理,清理例如 drop table 的表。
2.3.1 commit 清理
static void
CommitTransaction(void)
{
// 此时,事务(元数据更改)已经提交完成。
/*
* Likewise, dropping of files deleted during the transaction is best done
* after releasing relcache and buffer pins. (This is not strictly
* necessary during commit, since such pins should have been released
* already, but this ordering is definitely critical during abort.) Since
* this may take many seconds, also delay until after releasing locks.
* Other backends will observe the attendant catalog changes and not
* attempt to access affected files.
*/
smgrDoPendingDeletes(true);
}
2.3.2 rollback 清理
/*
* AbortTransaction
*/
static void
AbortTransaction(void)
{
// 此时,事务(元数据更改)已经回滚完成。
smgrDoPendingDeletes(false);
}
3 pendingDeletes 持久化
/*
* smgrGetPendingDeletes() -- Get a list of non-temp relations to be deleted.
*/
int
smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
{
for (pending = pendingDeletes; pending != NULL; pending = pending->next)
{
if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
&& pending->backend == InvalidBackendId)
nrels++;
}
return nrels;
}
- smgrGetPendingDeletes 用来过滤 commit/rollback 时,分别要清理的表文件;
- forCommit 参数就代表过滤的类型。
3.1 xa 接口的持久化
3.1.1 prepare transaction
void
StartPrepare(GlobalTransaction gxact)
{
hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
if (hdr.ncommitrels > 0)
{
save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileNode));
pfree(commitrels);
}
if (hdr.nabortrels > 0)
{
save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
pfree(abortrels);
}
}
在执行 prepare transaction 时,将其写入 2pc phase 文件。
3.1.2 崩溃恢复
void
RecoverPreparedTransactions(void)
{
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
}
崩溃恢复通过 RecoverPreparedTransactions 将其从 2pc phase 文件中读出来。
3.1.3 commit/rollback prepared
/*
* FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
*/
void
FinishPreparedTransaction(const char *gid, bool isCommit)
{
commitrels = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
abortrels = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
}
从 2pc phase 文件中读取出来。
3.2 commit 持久化
static TransactionId
RecordTransactionCommit(void)
{
/* Get data needed for commit record */
nrels = smgrGetPendingDeletes(true, &rels);
XactLogCommitRecord(global_committs,
xactStopTimestamp + GTMdeltaTimestamp,
nchildren, children, nrels, rels,
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
MyXactFlags,
InvalidTransactionId /* plain commit */ );
}
将要清理的文件信息作为 commit 的一部分写入 wal。
3.3 rollback 持久化
static TransactionId
RecordTransactionAbort(bool isSubXact)
{
/* Fetch the data we need for the abort record */
nrels = smgrGetPendingDeletes(false, &rels);
XactLogAbortRecord(global_timestamp,
xact_time,
nchildren, children,
nrels, rels,
MyXactFlags, InvalidTransactionId);
}
将要清理的文件信息作为 rollback 的一部分写入 wal。