背景
PostgreSQL中对日志的回放是单进程实现的,为了提高日志回放的性能,可以考虑将其设计为多进程并行回放。事实上,PG社区中提出过相关的并行回放方案。但该方案的进程模型存在一些问题,因此对PG内部创建进程的流程进行了梳理。
StartChildProcess
StartChildProcessstartchildp被定义为postmaster启动辅助进程的函数。参数“type”决定了将创建什么类型的子进程。 所有类型的子进程都转到 AuxiliaryProcessMain处理。
StartChildProcess在postmaster进程中被调用,postmaster会fork出自己的子进程,子进程会对自身的资源和环境进行初始化,例如使用isUnderPostmaster变量标识自己为子进程;获取子进程号;清除父进程的退出函数;初始化进程latch。完成初始化后需要关闭父进程的socket连接,并释放父进程的内存上下文。并由AuxiliaryProcessMain函数完成辅助进程的初始化。
AuxiliaryProcessMain
AuxiliaryProcessMain接口中根据参数不同,执行对应子进程分支。
该函数会通过入参获取子进程初始化的基本信息,通过getopt解析接口参数,初始化系统ps命令后显示的名称,并完成一系列的初始化操作。最后进入各个辅助进程的main函数。
BaseInit
无论父子进程都要做的初始化操作。
void
BaseInit(void)
{
/*
* Attach to shared memory and semaphores, and initialize our
* input/output/debugging file descriptors.
*/
InitCommunication();
DebugFileOpen();
/* Do local initialization of file, storage and buffer managers */
InitFileAccess();
InitSync();
smgrinit();
InitBufferPoolAccess();
}
InitCommunication:初始化共享内存、信号量。postmaster已完成初始化,子进程不进行初始化。
static void
InitCommunication(void)
{
/*
* initialize shared memory and semaphores appropriately.
*/
if (!IsUnderPostmaster) /* postmaster already did this */
{
CreateSharedMemoryAndSemaphores();
}
}
DebugFileOpen:初始化错误输出文件。
InitFileAccess:初始化vfd。
*/
void
InitFileAccess(void)
{
Assert(SizeVfdCache == 0); /* call me only once */
/* initialize cache header entry */
VfdCache = (Vfd *) malloc(sizeof(Vfd));
if (VfdCache == NULL)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
MemSet((char *) &(VfdCache[0]), 0, sizeof(Vfd));
VfdCache->fd = VFD_CLOSED;
SizeVfdCache = 1;
on_proc_exit(AtProcExit_Files, 0);
}
InitSync:初始化sync相关数据结构,仅在父进程、startup、checkpointer进程下初始化。
void
InitSync(void)
{
if (!IsUnderPostmaster || AmStartupProcess() || AmCheckpointerProcess())
{
HASHCTL hash_ctl;
pendingOpsCxt = AllocSetContextCreate(TopMemoryContext,
"Pending ops context",
ALLOCSET_DEFAULT_SIZES);
MemoryContextAllowInCriticalSection(pendingOpsCxt, true);
hash_ctl.keysize = sizeof(FileTag);
hash_ctl.entrysize = sizeof(PendingFsyncEntry);
hash_ctl.hcxt = pendingOpsCxt;
pendingOps = hash_create("Pending Ops Table",
100L,
&hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
pendingUnlinks = NIL;
}
}
smgrinit:初始化smgr。实际调用mdinit,分配资源管理的内存上下文。
void
smgrinit(void)
{
int i;
for (i = 0; i < NSmgr; i++)
{
if (smgrsw[i].smgr_init)
smgrsw[i].smgr_init();
}
/* register the shutdown proc */
on_proc_exit(smgrshutdown, 0);
}
void
mdinit(void)
{
MdCxt = AllocSetContextCreate(TopMemoryContext,
"MdSmgr",
ALLOCSET_DEFAULT_SIZES);
}
InitBufferPoolAccess:初始化访问缓冲区的数据结构。
void
InitBufferPoolAccess(void)
{
HASHCTL hash_ctl;
memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));
hash_ctl.keysize = sizeof(int32);
hash_ctl.entrysize = sizeof(PrivateRefCountEntry);
PrivateRefCountHash = hash_create("PrivateRefCount", 100, &hash_ctl,
HASH_ELEM | HASH_BLOBS);
}
InitAuxiliaryProcess
初始化辅助进程需要的数据结构MyProc,这些结构被维护在共享内存中。由AuxiliaryProcs数组存储。
void
InitAuxiliaryProcess(void)
{
PGPROC *auxproc;
int proctype;
/*
* ProcGlobal和AuxiliaryProcs在postmaster启动过程完成初始化,初始化在共享内存中
*/
if (ProcGlobal == NULL || AuxiliaryProcs == NULL)
elog(PANIC, "proc header uninitialized");
if (MyProc != NULL)
elog(ERROR, "you already exist");
SpinLockAcquire(ProcStructLock);
set_spins_per_delay(ProcGlobal->spins_per_delay);
for (proctype = 0; proctype < NUM_AUXILIARY_PROCS; proctype++)
{
auxproc = &AuxiliaryProcs[proctype];
if (auxproc->pid == 0)
break;
}
if (proctype >= NUM_AUXILIARY_PROCS)
{
SpinLockRelease(ProcStructLock);
elog(FATAL, "all AuxiliaryProcs are in use");
}
/* 初始化MyProc信息 */
((volatile PGPROC *) auxproc)->pid = MyProcPid;
MyProc = auxproc;
SpinLockRelease(ProcStructLock);
SHMQueueElemInit(&(MyProc->links));
MyProc->waitStatus = PROC_WAIT_STATUS_OK;
MyProc->lxid = InvalidLocalTransactionId;
MyProc->fpVXIDLock = false;
MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
MyProc->xid = InvalidTransactionId;
MyProc->xmin = InvalidTransactionId;
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
MyProc->tempNamespaceId = InvalidOid;
MyProc->isBackgroundWorker = IsBackgroundWorker;
MyProc->delayChkpt = 0;
MyProc->statusFlags = 0;
MyProc->lwWaiting = false;
MyProc->lwWaitMode = 0;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
pg_atomic_write_u64(&MyProc->waitStart, 0);
#ifdef USE_ASSERT_CHECKING
{
int i;
/* Last process should have released all locks. */
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
}
#endif
OwnLatch(&MyProc->procLatch);
SwitchToSharedLatch();
/* now that we have a proc, report wait events to shared memory */
pgstat_set_wait_event_storage(&MyProc->wait_event_info);
/* Check that group locking fields are in a proper initial state. */
Assert(MyProc->lockGroupLeader == NULL);
Assert(dlist_is_empty(&MyProc->lockGroupMembers));
PGSemaphoreReset(MyProc->sem);
on_shmem_exit(AuxiliaryProcKill, Int32GetDatum(proctype));
}
ProcSignalInit
非postmaster进程间发送信号需用到ProcSignal,在共享内存里通过信号交互。
void
ProcSignalInit(int pss_idx)
{
ProcSignalSlot *slot;
uint64 barrier_generation;
Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots);
slot = &ProcSignal->psh_slot[pss_idx - 1];
/* sanity check */
if (slot->pss_pid != 0)
elog(LOG, "process %d taking over ProcSignal slot %d, but it's not empty",
MyProcPid, pss_idx);
/* Clear out any leftover signal reasons */
MemSet(slot->pss_signalFlags, 0, NUM_PROCSIGNALS * sizeof(sig_atomic_t));
pg_atomic_write_u32(&slot->pss_barrierCheckMask, 0);
barrier_generation =
pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration);
pg_atomic_write_u64(&slot->pss_barrierGeneration, barrier_generation);
pg_memory_barrier();
/* Mark slot with my PID */
slot->pss_pid = MyProcPid;
/* Remember slot location for CheckProcSignal */
MyProcSignalSlot = slot;
/* Set up to release the slot on process exit */
on_shmem_exit(CleanupProcSignalState, Int32GetDatum(pss_idx));
}
每一个想要接受信号的进程把自己的PID 注册到 ProcSignalSlots 数组。这个数组有一个backend进程ID构成的索引。如果没有backendid,使用“MaxBackends + MyAuxProcType + 1”结果作为索引值。
InitBufferPoolBackend
注册回调函数AtProcExit_Buffers。
void
InitBufferPoolBackend(void)
{
on_shmem_exit(AtProcExit_Buffers, 0);
}
/*
* During backend exit, ensure that we released all shared-buffer locks and
* assert that we have no remaining pins.
*/
static void
AtProcExit_Buffers(int code, Datum arg)
{
AbortBufferIO();
UnlockBuffers();
CheckForBufferLeaks();
/* localbuf.c needs a chance too */
AtProcExit_LocalBuffers();
}
CreateAuxProcessResourceOwner
为当前进程创建空ResourceOwner对象。
CreateAuxProcessResourceOwner(void)
{
Assert(AuxProcessResourceOwner == NULL);
Assert(CurrentResourceOwner == NULL);
AuxProcessResourceOwner = ResourceOwnerCreate(NULL, "AuxiliaryProcess");
CurrentResourceOwner = AuxProcessResourceOwner;
/*
* Register a shmem-exit callback for cleanup of aux-process resource
* owner. (This needs to run after, e.g., ShutdownXLOG.)
*/
on_shmem_exit(ReleaseAuxProcessResourcesCallback, 0);
}
pgstat_initialize、pgstat_beinit、pgstat_bestart
pgstat辅助进程已经在postmaster中完成初始化。
pgstat_initialize
通过将prevWalUsage赋值为pgWalUsage,可以让pgstat_send_wal函数计算出pgWalUsage计数器相对于上一次发送时增加了多少。pgstat_send_wal函数向收集器发送wal日志相关的统计信息。
void
pgstat_initialize(void)
{
prevWalUsage = pgWalUsage;
/* Set up a process-exit hook to clean up */
on_shmem_exit(pgstat_shutdown_hook, 0);
}
pgstat_beinit
初始化MyBEEntry,用于跟踪进程状态。BackendStatusArray维护在共享内存中。
void
pgstat_beinit(void)
{
/* Initialize MyBEEntry */
if (MyBackendId != InvalidBackendId)
{
Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
MyBEEntry = &BackendStatusArray[MyBackendId - 1];
}
else
{
/* 辅助进程无backendid,维护在backends元素之后 */
Assert(MyAuxProcType != NotAnAuxProcess);
MyBEEntry = &BackendStatusArray[MaxBackends + MyAuxProcType];
}
/* Set up a process-exit hook to clean up */
on_shmem_exit(pgstat_beshutdown_hook, 0);
}
pgstat_bestart
填充MyBEEntry相关信息。
void
pgstat_bestart(void)
{
volatile PgBackendStatus *vbeentry = MyBEEntry;
PgBackendStatus lbeentry;
...
memcpy(&lbeentry,
unvolatize(PgBackendStatus *, vbeentry),
sizeof(PgBackendStatus));
...
lbeentry.st_procpid = MyProcPid;
lbeentry.st_backendType = MyBackendType;
lbeentry.st_backendIdx = MyAuxProcIdx;
lbeentry.st_proc_start_timestamp = MyStartTimestamp;
lbeentry.st_activity_start_timestamp = 0;
lbeentry.st_state_start_timestamp = 0;
lbeentry.st_xact_start_timestamp = 0;
lbeentry.st_databaseid = MyDatabaseId;
/* We have userid for client-backends, wal-sender and bgworker processes */
if (lbeentry.st_backendType == B_BACKEND
|| lbeentry.st_backendType == B_WAL_SENDER
|| lbeentry.st_backendType == B_BG_WORKER)
lbeentry.st_userid = GetSessionUserId();
else
lbeentry.st_userid = InvalidOid;
...
if (MyProcPort)
memcpy(&lbeentry.st_clientaddr, &MyProcPort->raddr,
sizeof(lbeentry.st_clientaddr));
else
MemSet(&lbeentry.st_clientaddr, 0, sizeof(lbeentry.st_clientaddr));
#ifdef USE_SSL
if (MyProcPort && MyProcPort->ssl_in_use)
{
lbeentry.st_ssl = true;
lsslstatus.ssl_bits = be_tls_get_cipher_bits(MyProcPort);
strlcpy(lsslstatus.ssl_version, be_tls_get_version(MyProcPort), NAMEDATALEN);
strlcpy(lsslstatus.ssl_cipher, be_tls_get_cipher(MyProcPort), NAMEDATALEN);
be_tls_get_peer_subject_name(MyProcPort, lsslstatus.ssl_client_dn, NAMEDATALEN);
be_tls_get_peer_serial(MyProcPort, lsslstatus.ssl_client_serial, NAMEDATALEN);
be_tls_get_peer_issuer_name(MyProcPort, lsslstatus.ssl_issuer_dn, NAMEDATALEN);
}
else
{
lbeentry.st_ssl = false;
}
#else
lbeentry.st_ssl = false;
#endif
#ifdef ENABLE_GSS
if (MyProcPort && MyProcPort->gss != NULL)
{
const char *princ = be_gssapi_get_princ(MyProcPort);
lbeentry.st_gss = true;
lgssstatus.gss_auth = be_gssapi_get_auth(MyProcPort);
lgssstatus.gss_enc = be_gssapi_get_enc(MyProcPort);
if (princ)
strlcpy(lgssstatus.gss_princ, princ, NAMEDATALEN);
}
else
{
lbeentry.st_gss = false;
}
#else
lbeentry.st_gss = false;
#endif
lbeentry.st_state = STATE_UNDEFINED;
lbeentry.st_progress_command = PROGRESS_COMMAND_INVALID;
lbeentry.st_progress_command_target = InvalidOid;
lbeentry.st_query_id = UINT64CONST(0);
PGSTAT_BEGIN_WRITE_ACTIVITY(vbeentry);
/* make sure we'll memcpy the same st_changecount back */
lbeentry.st_changecount = vbeentry->st_changecount;
memcpy(unvolatize(PgBackendStatus *, vbeentry),
&lbeentry,
sizeof(PgBackendStatus));
lbeentry.st_appname[0] = '\0';
if (MyProcPort && MyProcPort->remote_hostname)
strlcpy(lbeentry.st_clienthostname, MyProcPort->remote_hostname,
NAMEDATALEN);
else
lbeentry.st_clienthostname[0] = '\0';
lbeentry.st_activity_raw[0] = '\0';
/* Also make sure the last byte in each string area is always 0 */
lbeentry.st_appname[NAMEDATALEN - 1] = '\0';
lbeentry.st_clienthostname[NAMEDATALEN - 1] = '\0';
lbeentry.st_activity_raw[pgstat_track_activity_query_size - 1] = '\0';
...
}
以上是postmaster通过StartChildProcess创建子进程的过程,实际上postmaster还可通过注册background worker的方式创建独立的后台进程。以上内容为个人对源码的理解,可能存在理解出错的地方,欢迎大家指出问题。