searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Pulsar 消息存储管理

2023-06-29 06:21:59
68
0

1.背景介绍

Apache Pulsar,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、跨区域复制、具有强一致性、高吞吐、低延迟及高可扩展性等数据存储特性。

 

2.存储管理

Pulsar 采用 Apache BookKeeper 作为持久化存储,为了适配Broker层Topic的数据模型,Pulsar 并没有简单直接地调用BookKeeper 客户端,而是抽象了一层存储层ManagedLedger,将存储的相关逻辑统一封装起来,其中包括消息的存储、消费游标的管理,Ledger的管理、缓存的管理。

2.1. 存储模型

Pulsar的消息存储模型如下:

跟kafka类似,Pulsar 中的 Topic 也可以分为 分区主题(PartitionedTopic) 和非分区主题(NonPartitionedTopic),其中 PartitionedTopic 下面存在多个 Partition, 每个 Partition 本质上就是一个 NonPartitionedTopic,而 PartitionedTopic 只是一个对外的逻辑名称而已。如果 Topic 本身就是 NonPartitionedTopic,那么 Topic 名就是它本身。

每个 NonPartitionedTopic 都会对应一个或多个 Ledger,之所以存在多个Ledger,是因为通常客户端向一个Topic发送消息,实际上就是向该Topic的一个Ledger写入数据,当这个Ledger写满并且达到了切换的时间时,managedLedger会关闭当前的Ledger,创建新的Ledger继续写入数据。这么做的原因是,Pulsar的数据清理机制不是消息为单位的,而是以Ledger为单位的,只有这个Ledger中的所有消息被标记为删除,才能把整个Ledger删除。

每个Ledger中会包含不同数量的Entry。但是Entry数并不一定等于消息数,如果客户端没有开启批量发送,那么每个Entry就只会存放一个消息,如果开启批量发送,那么一个Entry中可能包含多个消息。

每条消息中都会存在一个MessageId,由LedgerId、EntryId、PartitionId、BatchIndex组成,PartitionId表示这条消息属于哪个Partition,BatchIndex只会出现在批量消息中,标识该消息是这一批量消息中的第几条。

 

2.2. ManagedLedger

ManagedLedger封装了上述的存储细节,Pulsar中会为每一个Topic创建一个ManagedLedger,统一管理消息的存储和清理流程。一个ManagedLedger的创建流程如下:

1. 创建Ledger时,首先会创建Ledger的配置文件ManagedLedgerConfig,主要是包含PersistencePolicies、RetentionPolicies、OffloadPolicies、读写Entry的超时时间、每个Ledger中最大Entry数等配置项。其中:

  1. PersistencePolicies主要是包含EnsembleSize、WriteQuorum、AckQuorum等。EnsembleSize表示该集群至少需要有多少个Bookie节点才能正常工作,WriteQuorum表示消息将要并行写入的Bookie数量,AckQuorum表示数据并行写入Bookie后,Broker要等待返回的相应数量。
  2. RetentionPolicie主要包含Ledger的回收策略,回收策略包括时间和大小两个维度。
  3. OffloadPolicies主要包含将Topic中的冷数据转移到更加廉价的第三方存储上。

所有的ManagedLedgerConfig中的配置项,都可以在broker.conf中配置。

2. 创建配置文件对象后,Broker会通过工厂类(ManagedLedgerFactory)创建一个ManagedLedger,ManagedLedger是一个接口,Pulsar中只有一种是实现,ManagedLedgerImpl。

3. ManagedLedger创建完成后,Broker会继续调用它的initialize方法开始初始化ManagedLedger。在初始化过程中,首先ManagedLedger会通过MetadataStore获取locakZookeeper上对应Topic的所有Ledger元数据。由于之前可能已经存储过Ledger,但最后一个Ledger在Zookeeper中的元数据可能并不全(可能Broker突然宕机导致元数据没来得及更新),因此会从BookKeeper中再读取一次最后的Ledger的信息并更新。由于Pulsar中的Ledger只要关闭了就只能读而不能写入数据,因此ManagedLedger会再创建一个新的Ledger用于后续的写入数据。

4. 初始化Cursor(游标)。ManagedLedger中会包含一个Cursor,用于记录消费者消费的位置信息,初始化Cursor时ManagedLedger会通过MetadataStore获取localZooKeeper上该ManagedLedger下的所有Cursor,如果存在已有的Cursor,则生成对应的CursorInfo对象并缓存到ManagedLedger中。

2.3.存储流程

Pulsar中消息的存储流程只要是涉及ManagedLedger的addEntry方法,只需要调用该方法即可存储信息。一条消息在Pulsar中的存储流程如下:

0条评论
0 / 1000
f****n
3文章数
0粉丝数
f****n
3 文章 | 0 粉丝
f****n
3文章数
0粉丝数
f****n
3 文章 | 0 粉丝
原创

Pulsar 消息存储管理

2023-06-29 06:21:59
68
0

1.背景介绍

Apache Pulsar,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、跨区域复制、具有强一致性、高吞吐、低延迟及高可扩展性等数据存储特性。

 

2.存储管理

Pulsar 采用 Apache BookKeeper 作为持久化存储,为了适配Broker层Topic的数据模型,Pulsar 并没有简单直接地调用BookKeeper 客户端,而是抽象了一层存储层ManagedLedger,将存储的相关逻辑统一封装起来,其中包括消息的存储、消费游标的管理,Ledger的管理、缓存的管理。

2.1. 存储模型

Pulsar的消息存储模型如下:

跟kafka类似,Pulsar 中的 Topic 也可以分为 分区主题(PartitionedTopic) 和非分区主题(NonPartitionedTopic),其中 PartitionedTopic 下面存在多个 Partition, 每个 Partition 本质上就是一个 NonPartitionedTopic,而 PartitionedTopic 只是一个对外的逻辑名称而已。如果 Topic 本身就是 NonPartitionedTopic,那么 Topic 名就是它本身。

每个 NonPartitionedTopic 都会对应一个或多个 Ledger,之所以存在多个Ledger,是因为通常客户端向一个Topic发送消息,实际上就是向该Topic的一个Ledger写入数据,当这个Ledger写满并且达到了切换的时间时,managedLedger会关闭当前的Ledger,创建新的Ledger继续写入数据。这么做的原因是,Pulsar的数据清理机制不是消息为单位的,而是以Ledger为单位的,只有这个Ledger中的所有消息被标记为删除,才能把整个Ledger删除。

每个Ledger中会包含不同数量的Entry。但是Entry数并不一定等于消息数,如果客户端没有开启批量发送,那么每个Entry就只会存放一个消息,如果开启批量发送,那么一个Entry中可能包含多个消息。

每条消息中都会存在一个MessageId,由LedgerId、EntryId、PartitionId、BatchIndex组成,PartitionId表示这条消息属于哪个Partition,BatchIndex只会出现在批量消息中,标识该消息是这一批量消息中的第几条。

 

2.2. ManagedLedger

ManagedLedger封装了上述的存储细节,Pulsar中会为每一个Topic创建一个ManagedLedger,统一管理消息的存储和清理流程。一个ManagedLedger的创建流程如下:

1. 创建Ledger时,首先会创建Ledger的配置文件ManagedLedgerConfig,主要是包含PersistencePolicies、RetentionPolicies、OffloadPolicies、读写Entry的超时时间、每个Ledger中最大Entry数等配置项。其中:

  1. PersistencePolicies主要是包含EnsembleSize、WriteQuorum、AckQuorum等。EnsembleSize表示该集群至少需要有多少个Bookie节点才能正常工作,WriteQuorum表示消息将要并行写入的Bookie数量,AckQuorum表示数据并行写入Bookie后,Broker要等待返回的相应数量。
  2. RetentionPolicie主要包含Ledger的回收策略,回收策略包括时间和大小两个维度。
  3. OffloadPolicies主要包含将Topic中的冷数据转移到更加廉价的第三方存储上。

所有的ManagedLedgerConfig中的配置项,都可以在broker.conf中配置。

2. 创建配置文件对象后,Broker会通过工厂类(ManagedLedgerFactory)创建一个ManagedLedger,ManagedLedger是一个接口,Pulsar中只有一种是实现,ManagedLedgerImpl。

3. ManagedLedger创建完成后,Broker会继续调用它的initialize方法开始初始化ManagedLedger。在初始化过程中,首先ManagedLedger会通过MetadataStore获取locakZookeeper上对应Topic的所有Ledger元数据。由于之前可能已经存储过Ledger,但最后一个Ledger在Zookeeper中的元数据可能并不全(可能Broker突然宕机导致元数据没来得及更新),因此会从BookKeeper中再读取一次最后的Ledger的信息并更新。由于Pulsar中的Ledger只要关闭了就只能读而不能写入数据,因此ManagedLedger会再创建一个新的Ledger用于后续的写入数据。

4. 初始化Cursor(游标)。ManagedLedger中会包含一个Cursor,用于记录消费者消费的位置信息,初始化Cursor时ManagedLedger会通过MetadataStore获取localZooKeeper上该ManagedLedger下的所有Cursor,如果存在已有的Cursor,则生成对应的CursorInfo对象并缓存到ManagedLedger中。

2.3.存储流程

Pulsar中消息的存储流程只要是涉及ManagedLedger的addEntry方法,只需要调用该方法即可存储信息。一条消息在Pulsar中的存储流程如下:

文章来自个人专栏
云原生消息
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0