基本概念
观察者模式是一种行为型设计模式,它定义了一种一对多的依赖关系,当一个对象的状态发生改变时,其所有依赖者都会收到通知并自动更新。
当对象间存在一对多关系时,则使用观察者模式。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。
在 MongoDB 中,Observer 模式使用在对请求的监听上。当有一条变更操作发生时,除了对存储引擎进行数据写入外,还可能执行以下额外的操作:
- 如果运行在副本集模式下,还要写 oplog。
- 如果修改了 admin 库下的 system.users、system.roles 系统表,还需要通知 authManager 进行权限更新。
- 如果修改了 cache.chunks.xxx 路由缓存表,还需要进行本地路由刷新。
- ...
以上这些操作都可以按功能模块划分出具体的 observer,当有变更操作发生时,会通知这些 observer 执行对应的操作。
实现方式
类型定义
OpObServer 是一个抽象类,其主要的实现包括:
- OpObserverImpl: 实现了常见的 DDL、DML 的处理方式,包括如何调用接口写 oplog。
- OpObserverShardingImpl: 继承自 OpObserverImpl,并实现了 shardSever 之间迁移数据时,如何处理插入、更新、删除的数据处于正在迁移的 chunk 中的情况。
- AuthOpObserver: 实现了对 system.users、system.roles 等系统表进行变更时的处理流程。
- ConfigServerOpObserver: 实现了在 configServer 节点上对 config 库执行变更时的处理流程。
- ShardServerOpObserver: 实现了 shardServer 节点上对 cache.chunks.xxx 路由缓存表执行变更时的处理流程。
- OpObserverRegistry: OpObserver 注册器,内部使用数组存储,当有监听的动作发生时,会循环调用数组中每个 OpObserver 的方法。
调用流程
在 mongod 节点(副本集、单节点、shardServer、configServer)启动时,会根据运行配置注册多个 OpObserver 到 OpObserverRegistry 中,而 OpObserverRegistry 存放在全局唯一的 ServerContext 结构中。参考 _initAndListen 的实现:
auto serviceContext = getGlobalServiceContext();
serviceContext->setFastClockSource(FastClockSourceFactory::create(Milliseconds(10)));
// 分配 opObserverRegistry
auto opObserverRegistry = stdx::make_unique<OpObserverRegistry>();
// 注册 OpObserver
opObserverRegistry->addObserver(stdx::make_unique<OpObserverShardingImpl>());
opObserverRegistry->addObserver(stdx::make_unique<AuthOpObserver>());
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
// shardServer 节点上,注册 ShardServerOpObserver
opObserverRegistry->addObserver(stdx::make_unique<ShardServerOpObserver>());
} else if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
// configServer 节点上,注册 ConfigServerOpObserver
opObserverRegistry->addObserver(stdx::make_unique<ConfigServerOpObserver>());
}
setupFreeMonitoringOpObserver(opObserverRegistry.get());
serviceContext->setOpObserver(std::move(opObserverRegistry));
当有监听的动作发生时,会调用 ServerContext 结构中的 OpObserverRegistry 对应的方法。以 CollectionImpl::insertDocuments 插入动作为例:
getGlobalServiceContext()->getOpObserver()->onInserts(
opCtx, ns(), uuid(), begin, end, fromMigrate);
Observer 操作的一致性
以 insert 操作为例,对应的 observer 的动作需要与其保持一致。即同时提交或者回滚。
有些 observer 动作是可以利用存储引擎的事务原子性来保证的,比如 oplog 的提交。
有些 observer 动作不会修改存储引擎数据,只是会修改内存状态,比如 AuthOpObserver 的权限更新动作。对于这些可以利用 WiredTigerRecoveryUnit 的 Change 机制,在 WiredTigerRecoveryUnit 进行事务提交或者回滚之后,调用 Change 中传入的 rollback 和 commit 回调函数完成相应的操作。
不过 Change 机制不和对应的事务一起保证原子性。可能在事务提交之后马上断电,没有执行对应的 Change 回调函数。以 AuthOpObserver 为例,可能修改了 admin.system.users 表并记录了 oplog,但是没有来得及更新内存结构就宕机了。
不过这种情况问题不大,因为节点重启之后,会根据磁盘上最新的一致性数据重新构建内存数据。