searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

MongoDB 中的观察者模式

2024-10-17 09:34:43
1
0

基本概念

观察者模式是一种行为型设计模式,它定义了一种一对多的依赖关系,当一个对象的状态发生改变时,其所有依赖者都会收到通知并自动更新。

当对象间存在一对多关系时,则使用观察者模式。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。

在 MongoDB 中,Observer 模式使用在对请求的监听上。当有一条变更操作发生时,除了对存储引擎进行数据写入外,还可能执行以下额外的操作:

  1. 如果运行在副本集模式下,还要写 oplog。
  2. 如果修改了 admin 库下的 system.users、system.roles 系统表,还需要通知 authManager 进行权限更新。
  3. 如果修改了 cache.chunks.xxx 路由缓存表,还需要进行本地路由刷新。
  4. ...

以上这些操作都可以按功能模块划分出具体的 observer,当有变更操作发生时,会通知这些 observer 执行对应的操作。

实现方式

类型定义

observer.png

OpObServer 是一个抽象类,其主要的实现包括:

  • OpObserverImpl: 实现了常见的 DDL、DML 的处理方式,包括如何调用接口写 oplog。
  • OpObserverShardingImpl: 继承自 OpObserverImpl,并实现了 shardSever 之间迁移数据时,如何处理插入、更新、删除的数据处于正在迁移的 chunk 中的情况。
  • AuthOpObserver: 实现了对 system.users、system.roles 等系统表进行变更时的处理流程。
  • ConfigServerOpObserver: 实现了在 configServer 节点上对 config 库执行变更时的处理流程。
  • ShardServerOpObserver: 实现了 shardServer 节点上对 cache.chunks.xxx 路由缓存表执行变更时的处理流程。
  • OpObserverRegistry: OpObserver 注册器,内部使用数组存储,当有监听的动作发生时,会循环调用数组中每个 OpObserver 的方法。

调用流程

在 mongod 节点(副本集、单节点、shardServer、configServer)启动时,会根据运行配置注册多个 OpObserver 到 OpObserverRegistry 中,而 OpObserverRegistry 存放在全局唯一的 ServerContext 结构中。参考 _initAndListen 的实现:

auto serviceContext = getGlobalServiceContext();

    serviceContext->setFastClockSource(FastClockSourceFactory::create(Milliseconds(10)));
    // 分配 opObserverRegistry
    auto opObserverRegistry = stdx::make_unique<OpObserverRegistry>();
    // 注册 OpObserver
    opObserverRegistry->addObserver(stdx::make_unique<OpObserverShardingImpl>());
    opObserverRegistry->addObserver(stdx::make_unique<AuthOpObserver>());

    if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
        // shardServer 节点上,注册 ShardServerOpObserver
        opObserverRegistry->addObserver(stdx::make_unique<ShardServerOpObserver>());
    } else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
        // configServer 节点上,注册 ConfigServerOpObserver
        opObserverRegistry->addObserver(stdx::make_unique<ConfigServerOpObserver>());
    }
    setupFreeMonitoringOpObserver(opObserverRegistry.get());

    serviceContext->setOpObserver(std::move(opObserverRegistry));

当有监听的动作发生时,会调用 ServerContext 结构中的 OpObserverRegistry 对应的方法。以 CollectionImpl::insertDocuments 插入动作为例:

getGlobalServiceContext()->getOpObserver()->onInserts(
        opCtx, ns(), uuid(), begin, end, fromMigrate);

Observer 操作的一致性

以 insert 操作为例,对应的 observer 的动作需要与其保持一致。即同时提交或者回滚。

有些 observer 动作是可以利用存储引擎的事务原子性来保证的,比如 oplog 的提交。

有些 observer 动作不会修改存储引擎数据,只是会修改内存状态,比如 AuthOpObserver 的权限更新动作。对于这些可以利用 WiredTigerRecoveryUnit 的 Change 机制,在 WiredTigerRecoveryUnit 进行事务提交或者回滚之后,调用 Change 中传入的 rollback 和 commit 回调函数完成相应的操作。

不过 Change 机制不和对应的事务一起保证原子性。可能在事务提交之后马上断电,没有执行对应的 Change 回调函数。以 AuthOpObserver 为例,可能修改了 admin.system.users 表并记录了 oplog,但是没有来得及更新内存结构就宕机了。
不过这种情况问题不大,因为节点重启之后,会根据磁盘上最新的一致性数据重新构建内存数据。

0条评论
0 / 1000
彭振翼
6文章数
1粉丝数
彭振翼
6 文章 | 1 粉丝
原创

MongoDB 中的观察者模式

2024-10-17 09:34:43
1
0

基本概念

观察者模式是一种行为型设计模式,它定义了一种一对多的依赖关系,当一个对象的状态发生改变时,其所有依赖者都会收到通知并自动更新。

当对象间存在一对多关系时,则使用观察者模式。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。

在 MongoDB 中,Observer 模式使用在对请求的监听上。当有一条变更操作发生时,除了对存储引擎进行数据写入外,还可能执行以下额外的操作:

  1. 如果运行在副本集模式下,还要写 oplog。
  2. 如果修改了 admin 库下的 system.users、system.roles 系统表,还需要通知 authManager 进行权限更新。
  3. 如果修改了 cache.chunks.xxx 路由缓存表,还需要进行本地路由刷新。
  4. ...

以上这些操作都可以按功能模块划分出具体的 observer,当有变更操作发生时,会通知这些 observer 执行对应的操作。

实现方式

类型定义

observer.png

OpObServer 是一个抽象类,其主要的实现包括:

  • OpObserverImpl: 实现了常见的 DDL、DML 的处理方式,包括如何调用接口写 oplog。
  • OpObserverShardingImpl: 继承自 OpObserverImpl,并实现了 shardSever 之间迁移数据时,如何处理插入、更新、删除的数据处于正在迁移的 chunk 中的情况。
  • AuthOpObserver: 实现了对 system.users、system.roles 等系统表进行变更时的处理流程。
  • ConfigServerOpObserver: 实现了在 configServer 节点上对 config 库执行变更时的处理流程。
  • ShardServerOpObserver: 实现了 shardServer 节点上对 cache.chunks.xxx 路由缓存表执行变更时的处理流程。
  • OpObserverRegistry: OpObserver 注册器,内部使用数组存储,当有监听的动作发生时,会循环调用数组中每个 OpObserver 的方法。

调用流程

在 mongod 节点(副本集、单节点、shardServer、configServer)启动时,会根据运行配置注册多个 OpObserver 到 OpObserverRegistry 中,而 OpObserverRegistry 存放在全局唯一的 ServerContext 结构中。参考 _initAndListen 的实现:

auto serviceContext = getGlobalServiceContext();

    serviceContext->setFastClockSource(FastClockSourceFactory::create(Milliseconds(10)));
    // 分配 opObserverRegistry
    auto opObserverRegistry = stdx::make_unique<OpObserverRegistry>();
    // 注册 OpObserver
    opObserverRegistry->addObserver(stdx::make_unique<OpObserverShardingImpl>());
    opObserverRegistry->addObserver(stdx::make_unique<AuthOpObserver>());

    if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
        // shardServer 节点上,注册 ShardServerOpObserver
        opObserverRegistry->addObserver(stdx::make_unique<ShardServerOpObserver>());
    } else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
        // configServer 节点上,注册 ConfigServerOpObserver
        opObserverRegistry->addObserver(stdx::make_unique<ConfigServerOpObserver>());
    }
    setupFreeMonitoringOpObserver(opObserverRegistry.get());

    serviceContext->setOpObserver(std::move(opObserverRegistry));

当有监听的动作发生时,会调用 ServerContext 结构中的 OpObserverRegistry 对应的方法。以 CollectionImpl::insertDocuments 插入动作为例:

getGlobalServiceContext()->getOpObserver()->onInserts(
        opCtx, ns(), uuid(), begin, end, fromMigrate);

Observer 操作的一致性

以 insert 操作为例,对应的 observer 的动作需要与其保持一致。即同时提交或者回滚。

有些 observer 动作是可以利用存储引擎的事务原子性来保证的,比如 oplog 的提交。

有些 observer 动作不会修改存储引擎数据,只是会修改内存状态,比如 AuthOpObserver 的权限更新动作。对于这些可以利用 WiredTigerRecoveryUnit 的 Change 机制,在 WiredTigerRecoveryUnit 进行事务提交或者回滚之后,调用 Change 中传入的 rollback 和 commit 回调函数完成相应的操作。

不过 Change 机制不和对应的事务一起保证原子性。可能在事务提交之后马上断电,没有执行对应的 Change 回调函数。以 AuthOpObserver 为例,可能修改了 admin.system.users 表并记录了 oplog,但是没有来得及更新内存结构就宕机了。
不过这种情况问题不大,因为节点重启之后,会根据磁盘上最新的一致性数据重新构建内存数据。

文章来自个人专栏
MongoDB
6 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0