searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Postmaster创建子进程

2023-10-13 05:48:32
13
0

背景

PostgreSQL中对日志的回放是单进程实现的,为了提高日志回放的性能,可以考虑将其设计为多进程并行回放。事实上,PG社区中提出过相关的并行回放方案。但该方案的进程模型存在一些问题,因此对PG内部创建进程的流程进行了梳理。

StartChildProcess

StartChildProcessstartchildp被定义为postmaster启动辅助进程的函数。参数“type”决定了将创建什么类型的子进程。 所有类型的子进程都转到 AuxiliaryProcessMain处理。
 
StartChildProcess在postmaster进程中被调用,postmaster会fork出自己的子进程,子进程会对自身的资源和环境进行初始化,例如使用isUnderPostmaster变量标识自己为子进程;获取子进程号;清除父进程的退出函数;初始化进程latch。完成初始化后需要关闭父进程的socket连接,并释放父进程的内存上下文。并由AuxiliaryProcessMain函数完成辅助进程的初始化。

AuxiliaryProcessMain

AuxiliaryProcessMain接口中根据参数不同,执行对应子进程分支。
 
该函数会通过入参获取子进程初始化的基本信息,通过getopt解析接口参数,初始化系统ps命令后显示的名称,并完成一系列的初始化操作。最后进入各个辅助进程的main函数。

BaseInit

无论父子进程都要做的初始化操作。
void
BaseInit(void)
{
    /*
     * Attach to shared memory and semaphores, and initialize our
     * input/output/debugging file descriptors.
     */
    InitCommunication();
    DebugFileOpen();

    /* Do local initialization of file, storage and buffer managers */
    InitFileAccess();
    InitSync();
    smgrinit();
    InitBufferPoolAccess();
}
InitCommunication:初始化共享内存、信号量。postmaster已完成初始化,子进程不进行初始化。
static void
InitCommunication(void)
{
    /*
     * initialize shared memory and semaphores appropriately.
     */
    if (!IsUnderPostmaster)     /* postmaster already did this */
    {
        CreateSharedMemoryAndSemaphores();
    }
}
DebugFileOpen:初始化错误输出文件。
InitFileAccess:初始化vfd。
*/
void
InitFileAccess(void)
{
    Assert(SizeVfdCache == 0);  /* call me only once */

    /* initialize cache header entry */
    VfdCache = (Vfd *) malloc(sizeof(Vfd));
    if (VfdCache == NULL)
        ereport(FATAL,
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("out of memory")));

    MemSet((char *) &(VfdCache[0]), 0, sizeof(Vfd));
    VfdCache->fd = VFD_CLOSED;

    SizeVfdCache = 1;

    on_proc_exit(AtProcExit_Files, 0);
}
InitSync:初始化sync相关数据结构,仅在父进程、startup、checkpointer进程下初始化。
void
InitSync(void)
{
    if (!IsUnderPostmaster || AmStartupProcess() || AmCheckpointerProcess())
    {
        HASHCTL     hash_ctl;

        pendingOpsCxt = AllocSetContextCreate(TopMemoryContext,
                                              "Pending ops context",
                                              ALLOCSET_DEFAULT_SIZES);
        MemoryContextAllowInCriticalSection(pendingOpsCxt, true);

        hash_ctl.keysize = sizeof(FileTag);
        hash_ctl.entrysize = sizeof(PendingFsyncEntry);
        hash_ctl.hcxt = pendingOpsCxt;
        pendingOps = hash_create("Pending Ops Table",
                                 100L,
                                 &hash_ctl,
                                 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
        pendingUnlinks = NIL;
    }

}
smgrinit:初始化smgr。实际调用mdinit,分配资源管理的内存上下文。
void
smgrinit(void)
{
    int         i;

    for (i = 0; i < NSmgr; i++)
    {
        if (smgrsw[i].smgr_init)
            smgrsw[i].smgr_init();
    }

    /* register the shutdown proc */
    on_proc_exit(smgrshutdown, 0);
}

void
mdinit(void)
{
    MdCxt = AllocSetContextCreate(TopMemoryContext,
                                  "MdSmgr",
                                  ALLOCSET_DEFAULT_SIZES);
}
InitBufferPoolAccess:初始化访问缓冲区的数据结构。
void
InitBufferPoolAccess(void)
{
    HASHCTL     hash_ctl;

    memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));

    hash_ctl.keysize = sizeof(int32);
    hash_ctl.entrysize = sizeof(PrivateRefCountEntry);

    PrivateRefCountHash = hash_create("PrivateRefCount", 100, &hash_ctl,
                                      HASH_ELEM | HASH_BLOBS);
}

InitAuxiliaryProcess

初始化辅助进程需要的数据结构MyProc,这些结构被维护在共享内存中。由AuxiliaryProcs数组存储。
void
InitAuxiliaryProcess(void)
{
    PGPROC     *auxproc;
    int         proctype;

    /*
     * ProcGlobal和AuxiliaryProcs在postmaster启动过程完成初始化,初始化在共享内存中
     */
    if (ProcGlobal == NULL || AuxiliaryProcs == NULL)
        elog(PANIC, "proc header uninitialized");
    
    if (MyProc != NULL)
        elog(ERROR, "you already exist");

    SpinLockAcquire(ProcStructLock);

    set_spins_per_delay(ProcGlobal->spins_per_delay);

    for (proctype = 0; proctype < NUM_AUXILIARY_PROCS; proctype++)
    {
        auxproc = &AuxiliaryProcs[proctype];
        if (auxproc->pid == 0)
            break;
    }
    if (proctype >= NUM_AUXILIARY_PROCS)
    {
        SpinLockRelease(ProcStructLock);
        elog(FATAL, "all AuxiliaryProcs are in use");
    }

    /* 初始化MyProc信息 */
    ((volatile PGPROC *) auxproc)->pid = MyProcPid;

    MyProc = auxproc;

    SpinLockRelease(ProcStructLock);
    
    SHMQueueElemInit(&(MyProc->links));
    MyProc->waitStatus = PROC_WAIT_STATUS_OK;
    MyProc->lxid = InvalidLocalTransactionId;
    MyProc->fpVXIDLock = false;
    MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
    MyProc->xid = InvalidTransactionId;
    MyProc->xmin = InvalidTransactionId;
    MyProc->backendId = InvalidBackendId;
    MyProc->databaseId = InvalidOid;
    MyProc->roleId = InvalidOid;
    MyProc->tempNamespaceId = InvalidOid;
    MyProc->isBackgroundWorker = IsBackgroundWorker;
    MyProc->delayChkpt = 0;
    MyProc->statusFlags = 0;
    MyProc->lwWaiting = false;
    MyProc->lwWaitMode = 0;
    MyProc->waitLock = NULL;
    MyProc->waitProcLock = NULL;
    pg_atomic_write_u64(&MyProc->waitStart, 0);
#ifdef USE_ASSERT_CHECKING
    {
        int         i;

        /* Last process should have released all locks. */
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
            Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
    }
#endif

    OwnLatch(&MyProc->procLatch);
    SwitchToSharedLatch();

    /* now that we have a proc, report wait events to shared memory */
    pgstat_set_wait_event_storage(&MyProc->wait_event_info);

    /* Check that group locking fields are in a proper initial state. */
    Assert(MyProc->lockGroupLeader == NULL);
    Assert(dlist_is_empty(&MyProc->lockGroupMembers));

    PGSemaphoreReset(MyProc->sem);

    on_shmem_exit(AuxiliaryProcKill, Int32GetDatum(proctype));
}

ProcSignalInit

非postmaster进程间发送信号需用到ProcSignal,在共享内存里通过信号交互。
void
ProcSignalInit(int pss_idx)
{
    ProcSignalSlot *slot;
    uint64      barrier_generation;

    Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots);

    slot = &ProcSignal->psh_slot[pss_idx - 1];

    /* sanity check */
    if (slot->pss_pid != 0)
        elog(LOG, "process %d taking over ProcSignal slot %d, but it's not empty",
             MyProcPid, pss_idx);

    /* Clear out any leftover signal reasons */
    MemSet(slot->pss_signalFlags, 0, NUM_PROCSIGNALS * sizeof(sig_atomic_t));

    pg_atomic_write_u32(&slot->pss_barrierCheckMask, 0);
    barrier_generation =
        pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration);
    pg_atomic_write_u64(&slot->pss_barrierGeneration, barrier_generation);
    pg_memory_barrier();

    /* Mark slot with my PID */
    slot->pss_pid = MyProcPid;

    /* Remember slot location for CheckProcSignal */
    MyProcSignalSlot = slot;

    /* Set up to release the slot on process exit */
    on_shmem_exit(CleanupProcSignalState, Int32GetDatum(pss_idx));
}
每一个想要接受信号的进程把自己的PID 注册到 ProcSignalSlots 数组。这个数组有一个backend进程ID构成的索引。如果没有backendid,使用“MaxBackends + MyAuxProcType + 1”结果作为索引值。

InitBufferPoolBackend

注册回调函数AtProcExit_Buffers。
void
InitBufferPoolBackend(void)
{
    on_shmem_exit(AtProcExit_Buffers, 0);
}

/*
 * During backend exit, ensure that we released all shared-buffer locks and
 * assert that we have no remaining pins.
 */
static void
AtProcExit_Buffers(int code, Datum arg)
{
    AbortBufferIO();
    UnlockBuffers();

    CheckForBufferLeaks();

    /* localbuf.c needs a chance too */
    AtProcExit_LocalBuffers();
}

CreateAuxProcessResourceOwner

为当前进程创建空ResourceOwner对象。
CreateAuxProcessResourceOwner(void)
{
    Assert(AuxProcessResourceOwner == NULL);
    Assert(CurrentResourceOwner == NULL);
    AuxProcessResourceOwner = ResourceOwnerCreate(NULL, "AuxiliaryProcess");
    CurrentResourceOwner = AuxProcessResourceOwner;

    /*
     * Register a shmem-exit callback for cleanup of aux-process resource
     * owner.  (This needs to run after, e.g., ShutdownXLOG.)
     */
    on_shmem_exit(ReleaseAuxProcessResourcesCallback, 0);

}

pgstat_initialize、pgstat_beinit、pgstat_bestart

pgstat辅助进程已经在postmaster中完成初始化。

pgstat_initialize

通过将prevWalUsage赋值为pgWalUsage,可以让pgstat_send_wal函数计算出pgWalUsage计数器相对于上一次发送时增加了多少。pgstat_send_wal函数向收集器发送wal日志相关的统计信息。
void
pgstat_initialize(void)
{
    prevWalUsage = pgWalUsage;

    /* Set up a process-exit hook to clean up */
    on_shmem_exit(pgstat_shutdown_hook, 0);
}

pgstat_beinit

初始化MyBEEntry,用于跟踪进程状态。BackendStatusArray维护在共享内存中。
void
pgstat_beinit(void)
{
    /* Initialize MyBEEntry */
    if (MyBackendId != InvalidBackendId)
    {
        Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
        MyBEEntry = &BackendStatusArray[MyBackendId - 1];
    }
    else
    {
        /* 辅助进程无backendid,维护在backends元素之后 */
        Assert(MyAuxProcType != NotAnAuxProcess);

        MyBEEntry = &BackendStatusArray[MaxBackends + MyAuxProcType];
    }

    /* Set up a process-exit hook to clean up */
    on_shmem_exit(pgstat_beshutdown_hook, 0);
}

pgstat_bestart

填充MyBEEntry相关信息。
void
pgstat_bestart(void)
{
    volatile PgBackendStatus *vbeentry = MyBEEntry;
    PgBackendStatus lbeentry;
    ...
    memcpy(&lbeentry,
           unvolatize(PgBackendStatus *, vbeentry),
           sizeof(PgBackendStatus));
    ...

    lbeentry.st_procpid = MyProcPid;
    lbeentry.st_backendType = MyBackendType;
    lbeentry.st_backendIdx = MyAuxProcIdx;
    lbeentry.st_proc_start_timestamp = MyStartTimestamp;
    lbeentry.st_activity_start_timestamp = 0;
    lbeentry.st_state_start_timestamp = 0;
    lbeentry.st_xact_start_timestamp = 0;
    lbeentry.st_databaseid = MyDatabaseId;

    /* We have userid for client-backends, wal-sender and bgworker processes */
    if (lbeentry.st_backendType == B_BACKEND
        || lbeentry.st_backendType == B_WAL_SENDER
        || lbeentry.st_backendType == B_BG_WORKER)
        lbeentry.st_userid = GetSessionUserId();
    else
        lbeentry.st_userid = InvalidOid;
    
    ...
    if (MyProcPort)
        memcpy(&lbeentry.st_clientaddr, &MyProcPort->raddr,
               sizeof(lbeentry.st_clientaddr));
    else
        MemSet(&lbeentry.st_clientaddr, 0, sizeof(lbeentry.st_clientaddr));

#ifdef USE_SSL
    if (MyProcPort && MyProcPort->ssl_in_use)
    {
        lbeentry.st_ssl = true;
        lsslstatus.ssl_bits = be_tls_get_cipher_bits(MyProcPort);
        strlcpy(lsslstatus.ssl_version, be_tls_get_version(MyProcPort), NAMEDATALEN);
        strlcpy(lsslstatus.ssl_cipher, be_tls_get_cipher(MyProcPort), NAMEDATALEN);
        be_tls_get_peer_subject_name(MyProcPort, lsslstatus.ssl_client_dn, NAMEDATALEN);
        be_tls_get_peer_serial(MyProcPort, lsslstatus.ssl_client_serial, NAMEDATALEN);
        be_tls_get_peer_issuer_name(MyProcPort, lsslstatus.ssl_issuer_dn, NAMEDATALEN);
    }
    else
    {
        lbeentry.st_ssl = false;
    }
#else
    lbeentry.st_ssl = false;
#endif

#ifdef ENABLE_GSS
    if (MyProcPort && MyProcPort->gss != NULL)
    {
        const char *princ = be_gssapi_get_princ(MyProcPort);

        lbeentry.st_gss = true;
        lgssstatus.gss_auth = be_gssapi_get_auth(MyProcPort);
        lgssstatus.gss_enc = be_gssapi_get_enc(MyProcPort);
        if (princ)
            strlcpy(lgssstatus.gss_princ, princ, NAMEDATALEN);
    }
    else
    {
        lbeentry.st_gss = false;
    }
#else
    lbeentry.st_gss = false;
#endif

    lbeentry.st_state = STATE_UNDEFINED;
    lbeentry.st_progress_command = PROGRESS_COMMAND_INVALID;
    lbeentry.st_progress_command_target = InvalidOid;
    lbeentry.st_query_id = UINT64CONST(0);

    PGSTAT_BEGIN_WRITE_ACTIVITY(vbeentry);

    /* make sure we'll memcpy the same st_changecount back */
    lbeentry.st_changecount = vbeentry->st_changecount;

    memcpy(unvolatize(PgBackendStatus *, vbeentry),
           &lbeentry,
           sizeof(PgBackendStatus));

    lbeentry.st_appname[0] = '\0';
    if (MyProcPort && MyProcPort->remote_hostname)
        strlcpy(lbeentry.st_clienthostname, MyProcPort->remote_hostname,
                NAMEDATALEN);
    else
        lbeentry.st_clienthostname[0] = '\0';
    lbeentry.st_activity_raw[0] = '\0';
    /* Also make sure the last byte in each string area is always 0 */
    lbeentry.st_appname[NAMEDATALEN - 1] = '\0';
    lbeentry.st_clienthostname[NAMEDATALEN - 1] = '\0';
    lbeentry.st_activity_raw[pgstat_track_activity_query_size - 1] = '\0';

    ...
}

以上是postmaster通过StartChildProcess创建子进程的过程,实际上postmaster还可通过注册background worker的方式创建独立的后台进程。以上内容为个人对源码的理解,可能存在理解出错的地方,欢迎大家指出问题。

0条评论
0 / 1000
李****林
4文章数
0粉丝数
李****林
4 文章 | 0 粉丝
李****林
4文章数
0粉丝数
李****林
4 文章 | 0 粉丝
原创

Postmaster创建子进程

2023-10-13 05:48:32
13
0

背景

PostgreSQL中对日志的回放是单进程实现的,为了提高日志回放的性能,可以考虑将其设计为多进程并行回放。事实上,PG社区中提出过相关的并行回放方案。但该方案的进程模型存在一些问题,因此对PG内部创建进程的流程进行了梳理。

StartChildProcess

StartChildProcessstartchildp被定义为postmaster启动辅助进程的函数。参数“type”决定了将创建什么类型的子进程。 所有类型的子进程都转到 AuxiliaryProcessMain处理。
 
StartChildProcess在postmaster进程中被调用,postmaster会fork出自己的子进程,子进程会对自身的资源和环境进行初始化,例如使用isUnderPostmaster变量标识自己为子进程;获取子进程号;清除父进程的退出函数;初始化进程latch。完成初始化后需要关闭父进程的socket连接,并释放父进程的内存上下文。并由AuxiliaryProcessMain函数完成辅助进程的初始化。

AuxiliaryProcessMain

AuxiliaryProcessMain接口中根据参数不同,执行对应子进程分支。
 
该函数会通过入参获取子进程初始化的基本信息,通过getopt解析接口参数,初始化系统ps命令后显示的名称,并完成一系列的初始化操作。最后进入各个辅助进程的main函数。

BaseInit

无论父子进程都要做的初始化操作。
void
BaseInit(void)
{
    /*
     * Attach to shared memory and semaphores, and initialize our
     * input/output/debugging file descriptors.
     */
    InitCommunication();
    DebugFileOpen();

    /* Do local initialization of file, storage and buffer managers */
    InitFileAccess();
    InitSync();
    smgrinit();
    InitBufferPoolAccess();
}
InitCommunication:初始化共享内存、信号量。postmaster已完成初始化,子进程不进行初始化。
static void
InitCommunication(void)
{
    /*
     * initialize shared memory and semaphores appropriately.
     */
    if (!IsUnderPostmaster)     /* postmaster already did this */
    {
        CreateSharedMemoryAndSemaphores();
    }
}
DebugFileOpen:初始化错误输出文件。
InitFileAccess:初始化vfd。
*/
void
InitFileAccess(void)
{
    Assert(SizeVfdCache == 0);  /* call me only once */

    /* initialize cache header entry */
    VfdCache = (Vfd *) malloc(sizeof(Vfd));
    if (VfdCache == NULL)
        ereport(FATAL,
                (errcode(ERRCODE_OUT_OF_MEMORY),
                 errmsg("out of memory")));

    MemSet((char *) &(VfdCache[0]), 0, sizeof(Vfd));
    VfdCache->fd = VFD_CLOSED;

    SizeVfdCache = 1;

    on_proc_exit(AtProcExit_Files, 0);
}
InitSync:初始化sync相关数据结构,仅在父进程、startup、checkpointer进程下初始化。
void
InitSync(void)
{
    if (!IsUnderPostmaster || AmStartupProcess() || AmCheckpointerProcess())
    {
        HASHCTL     hash_ctl;

        pendingOpsCxt = AllocSetContextCreate(TopMemoryContext,
                                              "Pending ops context",
                                              ALLOCSET_DEFAULT_SIZES);
        MemoryContextAllowInCriticalSection(pendingOpsCxt, true);

        hash_ctl.keysize = sizeof(FileTag);
        hash_ctl.entrysize = sizeof(PendingFsyncEntry);
        hash_ctl.hcxt = pendingOpsCxt;
        pendingOps = hash_create("Pending Ops Table",
                                 100L,
                                 &hash_ctl,
                                 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
        pendingUnlinks = NIL;
    }

}
smgrinit:初始化smgr。实际调用mdinit,分配资源管理的内存上下文。
void
smgrinit(void)
{
    int         i;

    for (i = 0; i < NSmgr; i++)
    {
        if (smgrsw[i].smgr_init)
            smgrsw[i].smgr_init();
    }

    /* register the shutdown proc */
    on_proc_exit(smgrshutdown, 0);
}

void
mdinit(void)
{
    MdCxt = AllocSetContextCreate(TopMemoryContext,
                                  "MdSmgr",
                                  ALLOCSET_DEFAULT_SIZES);
}
InitBufferPoolAccess:初始化访问缓冲区的数据结构。
void
InitBufferPoolAccess(void)
{
    HASHCTL     hash_ctl;

    memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray));

    hash_ctl.keysize = sizeof(int32);
    hash_ctl.entrysize = sizeof(PrivateRefCountEntry);

    PrivateRefCountHash = hash_create("PrivateRefCount", 100, &hash_ctl,
                                      HASH_ELEM | HASH_BLOBS);
}

InitAuxiliaryProcess

初始化辅助进程需要的数据结构MyProc,这些结构被维护在共享内存中。由AuxiliaryProcs数组存储。
void
InitAuxiliaryProcess(void)
{
    PGPROC     *auxproc;
    int         proctype;

    /*
     * ProcGlobal和AuxiliaryProcs在postmaster启动过程完成初始化,初始化在共享内存中
     */
    if (ProcGlobal == NULL || AuxiliaryProcs == NULL)
        elog(PANIC, "proc header uninitialized");
    
    if (MyProc != NULL)
        elog(ERROR, "you already exist");

    SpinLockAcquire(ProcStructLock);

    set_spins_per_delay(ProcGlobal->spins_per_delay);

    for (proctype = 0; proctype < NUM_AUXILIARY_PROCS; proctype++)
    {
        auxproc = &AuxiliaryProcs[proctype];
        if (auxproc->pid == 0)
            break;
    }
    if (proctype >= NUM_AUXILIARY_PROCS)
    {
        SpinLockRelease(ProcStructLock);
        elog(FATAL, "all AuxiliaryProcs are in use");
    }

    /* 初始化MyProc信息 */
    ((volatile PGPROC *) auxproc)->pid = MyProcPid;

    MyProc = auxproc;

    SpinLockRelease(ProcStructLock);
    
    SHMQueueElemInit(&(MyProc->links));
    MyProc->waitStatus = PROC_WAIT_STATUS_OK;
    MyProc->lxid = InvalidLocalTransactionId;
    MyProc->fpVXIDLock = false;
    MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
    MyProc->xid = InvalidTransactionId;
    MyProc->xmin = InvalidTransactionId;
    MyProc->backendId = InvalidBackendId;
    MyProc->databaseId = InvalidOid;
    MyProc->roleId = InvalidOid;
    MyProc->tempNamespaceId = InvalidOid;
    MyProc->isBackgroundWorker = IsBackgroundWorker;
    MyProc->delayChkpt = 0;
    MyProc->statusFlags = 0;
    MyProc->lwWaiting = false;
    MyProc->lwWaitMode = 0;
    MyProc->waitLock = NULL;
    MyProc->waitProcLock = NULL;
    pg_atomic_write_u64(&MyProc->waitStart, 0);
#ifdef USE_ASSERT_CHECKING
    {
        int         i;

        /* Last process should have released all locks. */
        for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
            Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i])));
    }
#endif

    OwnLatch(&MyProc->procLatch);
    SwitchToSharedLatch();

    /* now that we have a proc, report wait events to shared memory */
    pgstat_set_wait_event_storage(&MyProc->wait_event_info);

    /* Check that group locking fields are in a proper initial state. */
    Assert(MyProc->lockGroupLeader == NULL);
    Assert(dlist_is_empty(&MyProc->lockGroupMembers));

    PGSemaphoreReset(MyProc->sem);

    on_shmem_exit(AuxiliaryProcKill, Int32GetDatum(proctype));
}

ProcSignalInit

非postmaster进程间发送信号需用到ProcSignal,在共享内存里通过信号交互。
void
ProcSignalInit(int pss_idx)
{
    ProcSignalSlot *slot;
    uint64      barrier_generation;

    Assert(pss_idx >= 1 && pss_idx <= NumProcSignalSlots);

    slot = &ProcSignal->psh_slot[pss_idx - 1];

    /* sanity check */
    if (slot->pss_pid != 0)
        elog(LOG, "process %d taking over ProcSignal slot %d, but it's not empty",
             MyProcPid, pss_idx);

    /* Clear out any leftover signal reasons */
    MemSet(slot->pss_signalFlags, 0, NUM_PROCSIGNALS * sizeof(sig_atomic_t));

    pg_atomic_write_u32(&slot->pss_barrierCheckMask, 0);
    barrier_generation =
        pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration);
    pg_atomic_write_u64(&slot->pss_barrierGeneration, barrier_generation);
    pg_memory_barrier();

    /* Mark slot with my PID */
    slot->pss_pid = MyProcPid;

    /* Remember slot location for CheckProcSignal */
    MyProcSignalSlot = slot;

    /* Set up to release the slot on process exit */
    on_shmem_exit(CleanupProcSignalState, Int32GetDatum(pss_idx));
}
每一个想要接受信号的进程把自己的PID 注册到 ProcSignalSlots 数组。这个数组有一个backend进程ID构成的索引。如果没有backendid,使用“MaxBackends + MyAuxProcType + 1”结果作为索引值。

InitBufferPoolBackend

注册回调函数AtProcExit_Buffers。
void
InitBufferPoolBackend(void)
{
    on_shmem_exit(AtProcExit_Buffers, 0);
}

/*
 * During backend exit, ensure that we released all shared-buffer locks and
 * assert that we have no remaining pins.
 */
static void
AtProcExit_Buffers(int code, Datum arg)
{
    AbortBufferIO();
    UnlockBuffers();

    CheckForBufferLeaks();

    /* localbuf.c needs a chance too */
    AtProcExit_LocalBuffers();
}

CreateAuxProcessResourceOwner

为当前进程创建空ResourceOwner对象。
CreateAuxProcessResourceOwner(void)
{
    Assert(AuxProcessResourceOwner == NULL);
    Assert(CurrentResourceOwner == NULL);
    AuxProcessResourceOwner = ResourceOwnerCreate(NULL, "AuxiliaryProcess");
    CurrentResourceOwner = AuxProcessResourceOwner;

    /*
     * Register a shmem-exit callback for cleanup of aux-process resource
     * owner.  (This needs to run after, e.g., ShutdownXLOG.)
     */
    on_shmem_exit(ReleaseAuxProcessResourcesCallback, 0);

}

pgstat_initialize、pgstat_beinit、pgstat_bestart

pgstat辅助进程已经在postmaster中完成初始化。

pgstat_initialize

通过将prevWalUsage赋值为pgWalUsage,可以让pgstat_send_wal函数计算出pgWalUsage计数器相对于上一次发送时增加了多少。pgstat_send_wal函数向收集器发送wal日志相关的统计信息。
void
pgstat_initialize(void)
{
    prevWalUsage = pgWalUsage;

    /* Set up a process-exit hook to clean up */
    on_shmem_exit(pgstat_shutdown_hook, 0);
}

pgstat_beinit

初始化MyBEEntry,用于跟踪进程状态。BackendStatusArray维护在共享内存中。
void
pgstat_beinit(void)
{
    /* Initialize MyBEEntry */
    if (MyBackendId != InvalidBackendId)
    {
        Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
        MyBEEntry = &BackendStatusArray[MyBackendId - 1];
    }
    else
    {
        /* 辅助进程无backendid,维护在backends元素之后 */
        Assert(MyAuxProcType != NotAnAuxProcess);

        MyBEEntry = &BackendStatusArray[MaxBackends + MyAuxProcType];
    }

    /* Set up a process-exit hook to clean up */
    on_shmem_exit(pgstat_beshutdown_hook, 0);
}

pgstat_bestart

填充MyBEEntry相关信息。
void
pgstat_bestart(void)
{
    volatile PgBackendStatus *vbeentry = MyBEEntry;
    PgBackendStatus lbeentry;
    ...
    memcpy(&lbeentry,
           unvolatize(PgBackendStatus *, vbeentry),
           sizeof(PgBackendStatus));
    ...

    lbeentry.st_procpid = MyProcPid;
    lbeentry.st_backendType = MyBackendType;
    lbeentry.st_backendIdx = MyAuxProcIdx;
    lbeentry.st_proc_start_timestamp = MyStartTimestamp;
    lbeentry.st_activity_start_timestamp = 0;
    lbeentry.st_state_start_timestamp = 0;
    lbeentry.st_xact_start_timestamp = 0;
    lbeentry.st_databaseid = MyDatabaseId;

    /* We have userid for client-backends, wal-sender and bgworker processes */
    if (lbeentry.st_backendType == B_BACKEND
        || lbeentry.st_backendType == B_WAL_SENDER
        || lbeentry.st_backendType == B_BG_WORKER)
        lbeentry.st_userid = GetSessionUserId();
    else
        lbeentry.st_userid = InvalidOid;
    
    ...
    if (MyProcPort)
        memcpy(&lbeentry.st_clientaddr, &MyProcPort->raddr,
               sizeof(lbeentry.st_clientaddr));
    else
        MemSet(&lbeentry.st_clientaddr, 0, sizeof(lbeentry.st_clientaddr));

#ifdef USE_SSL
    if (MyProcPort && MyProcPort->ssl_in_use)
    {
        lbeentry.st_ssl = true;
        lsslstatus.ssl_bits = be_tls_get_cipher_bits(MyProcPort);
        strlcpy(lsslstatus.ssl_version, be_tls_get_version(MyProcPort), NAMEDATALEN);
        strlcpy(lsslstatus.ssl_cipher, be_tls_get_cipher(MyProcPort), NAMEDATALEN);
        be_tls_get_peer_subject_name(MyProcPort, lsslstatus.ssl_client_dn, NAMEDATALEN);
        be_tls_get_peer_serial(MyProcPort, lsslstatus.ssl_client_serial, NAMEDATALEN);
        be_tls_get_peer_issuer_name(MyProcPort, lsslstatus.ssl_issuer_dn, NAMEDATALEN);
    }
    else
    {
        lbeentry.st_ssl = false;
    }
#else
    lbeentry.st_ssl = false;
#endif

#ifdef ENABLE_GSS
    if (MyProcPort && MyProcPort->gss != NULL)
    {
        const char *princ = be_gssapi_get_princ(MyProcPort);

        lbeentry.st_gss = true;
        lgssstatus.gss_auth = be_gssapi_get_auth(MyProcPort);
        lgssstatus.gss_enc = be_gssapi_get_enc(MyProcPort);
        if (princ)
            strlcpy(lgssstatus.gss_princ, princ, NAMEDATALEN);
    }
    else
    {
        lbeentry.st_gss = false;
    }
#else
    lbeentry.st_gss = false;
#endif

    lbeentry.st_state = STATE_UNDEFINED;
    lbeentry.st_progress_command = PROGRESS_COMMAND_INVALID;
    lbeentry.st_progress_command_target = InvalidOid;
    lbeentry.st_query_id = UINT64CONST(0);

    PGSTAT_BEGIN_WRITE_ACTIVITY(vbeentry);

    /* make sure we'll memcpy the same st_changecount back */
    lbeentry.st_changecount = vbeentry->st_changecount;

    memcpy(unvolatize(PgBackendStatus *, vbeentry),
           &lbeentry,
           sizeof(PgBackendStatus));

    lbeentry.st_appname[0] = '\0';
    if (MyProcPort && MyProcPort->remote_hostname)
        strlcpy(lbeentry.st_clienthostname, MyProcPort->remote_hostname,
                NAMEDATALEN);
    else
        lbeentry.st_clienthostname[0] = '\0';
    lbeentry.st_activity_raw[0] = '\0';
    /* Also make sure the last byte in each string area is always 0 */
    lbeentry.st_appname[NAMEDATALEN - 1] = '\0';
    lbeentry.st_clienthostname[NAMEDATALEN - 1] = '\0';
    lbeentry.st_activity_raw[pgstat_track_activity_query_size - 1] = '\0';

    ...
}

以上是postmaster通过StartChildProcess创建子进程的过程,实际上postmaster还可通过注册background worker的方式创建独立的后台进程。以上内容为个人对源码的理解,可能存在理解出错的地方,欢迎大家指出问题。

文章来自个人专栏
PG学习
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0