新一代消息队列 Pulsar

276次阅读  |  发布于2年以前

在信息流场景,内容的请求处理、原子模块调度、结果的分发等至关重要,直接影响到内容的外显、推荐、排序等。基于消息 100% 成功的要求,我们团队对 Pulsar 进行了调研,并采用腾讯云的 TDMQ(Pulsar 版)实现消息的可靠处理。本文主要参考 Pulsar 的官方文档和技术文章,对 Pulsar 的特性、机制、原理等进行整理总结。后续我们团队计划产出多篇文章,重点聚焦分析 Pulsar 与其他消息队列(Kafka、RocketMQ 等) 的调度和写盘等,以及 Pulsar 在信息流内容链路场景的使用实践。

1. Pulsar 概述

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。

Pulsar 是一个 pub-sub (发布-订阅)模型的消息队列系统。

1.1. Pulsar 架构

Pulsar 由 Producer、Consumer、多个 Broker 、一个 BookKeeper 集群、一个 Zookeeper 集群构成,具体如下图所示。

从 Pulsar 的架构图上可以看出, Pulsar 在架构设计上采用了计算与存储分离的模式,发布/订阅相关的计算逻辑在 Broker 上完成,而数据的持久化存储交由 BookKeeper 去实现。

1.1.1. Broker 扩展

在 Pulsar 中 Broker 是无状态的,当需要支持更多的消费者或生产者时,可以简单地添加更多的 Broker 节点来满足业务需求。Pulsar 支持自动的分区负载均衡,在 Broker 节点的资源使用率达到阈值时,会将负载迁移到负载较低的 Broker 节点,这个过程中分区也将在多个 Broker 节点中做平衡迁移,一些分区的所有权会转移到新的 Broker 节点。在后面 Bundle 小节会具体介绍这部分的实现。

1.1.2. Bookie 扩展

存储层的扩容,通过增加 Bookie 节点来实现。在 BooKie 扩容的阶段,由于分片机制,整个过程不会涉及到不必要的数据搬迁,即不需要将旧数据从现有存储节点重新复制到新存储节点。在后续的 Bookkeeper 小节中会具体介绍。

1.2. Topic

和其他消息队列类似,Pulsar 中也有 Topic。Topic 即在生产者与消费者中传输消息的通道。消息可以以 Topic 为单位进行归类,生产者负责将消息发送到特定的 Topic,而消费者指定特定的 Topic 进行消费。

1.2.1. 分区 Topic(Topic-Partition)

Pulsar 的 Topic 可以分为非分区 Topic 和分区 Topic 。

普通的 Topic 仅仅被保存在单个 Broker 中,这限制了 Topic 的最大吞吐量。分区 Topic 是一种特殊类型的主题,支持被多个 Broker 处理,从而实现更高的吞吐量。

针对一个 Topic ,可以设置多个 Topic 分区来提高 Topic 的吞吐量。每个 Topic Partition 由 Pulsar 分配给某个 Broker ,该 Broker 称为该 Topic Partition 的所有者。生成者和消费者会与每个 Topic 分区的 Broker 创建链接,发送消息并消费消息。

如下图所示, Topic1 有 Partition1、 Partition2、 Partition3、 Partition4 、 Partition5 五个分区, Partition1 和 Partition4 由 Broker1 处理, Partition2 和 Partition5 由 Broker2 处理, Partition3 由 Broker3 处理。

从 Pulsar 社区版的 golang-sdk 可以看出,客户端的 Producer 和 Consumer 在初始化的时候,都会与每一个 Topic-Partition 创建链接,并且会监听是否有新的 Partition,以创建新的连接。

1.2.2. 非持久 topic

默认情况下, Pulsar 会保存所有没确认的消息到 BookKeeper 中。持久 Topic 的消息在 Broker 重启或者 Consumer 出现问题时保存下来。

除了持久 Topic , Pulsar 也支持非持久 Topic 。这些 Topic 的消息只存在于内存中,不会存储到磁盘。

因为 Broker 不会对消息进行持久化存储,当 Producer 将消息发送到 Broker 时, Broker 可以立即将 ack 返回给 Producer ,所以非持久 Topic 的消息传递会比持久 Topic 的消息传递更快一些。相对的,当 Broker 因为一些原因宕机、重启后,非持久 Topic 的消息都会消失,订阅者将无法收到这些消息。

1.2.3. 重试 topic

由于业务逻辑处理出现异常,消息一般需要被重新消费。Pulsar 支持生产者同时将消息发送到普通的 Topic 和重试 Topic ,并指定允许延时和最大重试次数。当配置了允许消费者自动重试时,如果消息没有被消费成功,会被保存到重试 Topic 中,并在指定延时时间后,重新被消费。

1.2.4. 死信 topic

当 Consumer 消费消息出错时,可以通过配置重试 Topic 对消息进行重试,但是,如果当消息超过了最大的重试次数仍处理失败时,该怎么办呢?Pulsar 提供了死信 Topic ,通过配置 deadLetterTopic,当消息达到最大重试次数的时候, Pulsar 会将消息推送到死信 Topic 中进行保存。

1.3. 订阅(subscription)

通过订阅的方式,我们可以指定消息如何投递给消费者。

1.3.1. 订阅类型(Subscription type)

Pulsar 支持独占(Exclusive)、灾备(Failover)、共享(Shared)、Key_Shared 这四种订阅类型。

1.3.2. 订阅模式(Subscription modes)

订阅模式有持久化和非持久化两种。订阅模式取决于游标(cursor)的类型。

创建订阅时,将创建一个相关的游标来记录最后使用的位置。当订阅的 consumer 重新启动时,它可以从它所消费的最后一条消息继续消费。

一个订阅可以有一个或多个消费者。当使用者订阅主题时,它必须指定订阅名称。持久订阅和非持久订阅可以具有相同的名称,它们彼此独立。如果使用者指定了以前不存在的订阅,则会自动创建订阅。

默认情况下,没有任何持久订阅的 Topic 的消息将被标记为已删除。如果要防止消息被标记为已删除,可以为此 Topic 创建持久订阅。在这种情况下,只有被确认的消息才会被标记为已删除。

1.3.3. 多主题订阅

当 Consumer 订阅 Topic 时,默认指定订阅一个主题。从 Pulsar 的 1.23.0-incubating 的版本开始, Pulsar 消费者可以同时订阅多个 Topic 。可以通过两种方式进行订阅:

2. Pulsar 生产者 (Producer)

Producer 是连接 topic 的程序,它将消息发布到一个 Pulsar broker 上。

2.1. 访问模式

消息生成者有多种模式访问 Topic ,可以使用以下几种方式将消息发送到 Topic 。

2.2. 路由模式

当将消息发送到分区 Topic 时,需要指定消息的路由模式,这决定了消息将会被发送到哪个分区 Topic 。Pulsar 有以下三种消息路由模式,RoundRobinPartition 为默认路由模式。

2.3. 批量处理

Pulsar 支持对消息进行批量处理。批量处理启用后, Producer 会在一次请求中累积并发送一批消息。批量处理时的消息数量取决于最大消息数(单次批量处理请求可以发送的最大消息数)和最大发布延迟(单个请求的最大发布延迟时间)决定。开启批量处理后,积压的数量是批量处理的请求总数,而不是消息总数。

2.3.1. 索引确认机制

通常情况下,只有 Consumer 确认了批量请求中的所有消息,这个批量请求才会被认定为已处理。当这批消息没有全部被确认的情况下,发生故障时,会导致一些已确认的消息被重复确认。

为了避免 Consumer 重复消费已确认的消息, Pulsar 从 Pulsar 2.6.0 开始采用批量索引确认机制。如果启用批量索引确认机制, Consumer 将筛选出已被确认的批量索引,并将批量索引确认请求发送给 Broker 。Broker 维护批量索引的确认状态并跟踪每批索引的确认状态,以避免向 Consumer 发送已确认的消息。当该批信息的所有索引都被确认后,该批信息将被删除。

默认情况下,索引确认机制处于关闭状态。开启索引确认机制将产生导致更多内存开销。

2.3.2. key-based batching

key_shared 模式下,Broker 会根据消息的 key 来分发消息,但默认的批量处理模式,无法保证将所有的相同的 key 都打包到同一批中,而且 Consumer 在接收到批数据时,会默认把第一个消息的 key 当作这批消息的 key ,这会导致消息的错乱。因此 key_shared 模式下,不支持默认的批量处理。

key-based batching 能够确保 Producer 在打包消息时,将相同 key 的消息打包到同一批中,从而 consumer 在消费的时候,也能够消费到指定 key 的批数据。

没有指定 key 的消息在打包成批后,这一批数据也是没有 key 的, Broker 在分发这批消息时,会使用 NON_KEY 作为这批消息的 key 。

2.4. 消息分块

启用分块后,如果消息大小超过允许发送的最大消息大小时, Producer 会将原始消息分割成多个分块消息,并将分块消息与消息的元数据按顺序发送到 Broker。

在 Broker 中,分块消息会和普通消息以相同的方式存储在 Ledger 中。唯一的区别是, Consumer 需要缓存分块消息,并在接收到所有的分块消息后将其合并成真正的消息。如果 Producer 不能及时发布消息的所有分块, Consumer 不能在消息的过期时间内接收到所有的分块,那么 Consumer 已接收到的分块消息就会过期。

Consumer 会将分块的消息拼接在一起,并将它们放入接收器队列中。客户端从接收器队列中消费消息。当 Consumer 消费到原始的大消息并确认后, Consumer 就会发送与该大消息关联的所有分块消息的确认。

2.4.1. 处理一个 producer 和一个订阅 consumer 的分块消息

如下图所示,当生产者向主题发送一批大的分块消息和普通的非分块消息时。假设生产者发送的消息为 M1,M1 有三个分块 M1-C1,M1-C2 和 M1-C3。这个 Broker 在其管理的 Ledger 里面保存所有的三个块消息,然后以相同的顺序分发给消费者(独占/灾备模式)。消费者将在内存缓存所有的块消息,直到收到所有的消息块。将这些消息合并成为原始的消息 M1,发送给处理进程。

2.4.2. 多个生产者和一个生产者处理块消息

当多个生产者发布块消息到单个主题,这个 Broker 在同一个 Ledger 里面保存来自不同生产者的所有块消息。如下所示,生产者 1 发布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三个块组成。生产者 2 发布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三个块组成。这些特定消息的所有分块是顺序排列的,但是其在 Ledger 里面可能不是连续的。这种方式会给消费者带来一定的内存负担。因为消费者会为每个大消息在内存开辟一块缓冲区,以便将所有的块消息合并为原始的大消息。

3. Pulsar 消费者 (Consumer)

Consumer 是通过订阅关系连接 Topic ,接收消息的程序。

Consumer 向 Broker 发送 flow permit request 以获取消息。在 Consumer 端有一个队列,用于接收从 Broker 推送来的消息。

3.1. 消息确认

Pulsar 提供两种确认模式:

如图,上方为累积确认模式,当消费者发送 M12 的确认消息给 Broker 后, Broker 会把 M12 之前的消息和 M12 一样都标记为已确认。下方为单条确认模式,当消费者发送 M7 的确认消息给 Broker 后, Broker 会把 M7 这条消息标记为已确认。当消费者发送 M12 的确认消息给 Broker 后, Broker 会把 M12 这条消息标记为已确认。

需要注意的是,订阅模式中的 shared 模式是不支持累积确认的。因为该订阅模式下的每个消费者都能消费数据,无法保证单个消费者的消费消息的时序和顺序。

3.1.1. AcknowledgmentsGroupingTracker

消息的单条确认和累积确认并不是直接发送确认请求给 Broker,而是把请求转交给 AcknowledgmentsGroupingTracker 处理。

为了保证消息确认的性能,并避免 Broker 接收到非常高并发的 ack 请求,Tracker 默认支持批量确认,即使是单条消息的确认,也会先进入队列,然后再一批发往 Broker。在创建 consumer 的时候,可以设置 acknowledgementGroupTimeMicros,默认情况下,每 100ms 或者堆积超过 1000 时,AcknowledgmentsGroupingTracker 会发送一批确认请求。如果设置为 0,则每次确认消息后,Consumer 都会立即发送确认请求。

3.2. 取消确认

当 Consumer 无法处理一条消息并想重新消费时, Consumer 可以发送一个取消确认的消息给 Broker , Broker 会重新将这条消息发送给 Consumer 。如果启用了批量处理,那这一批中的所有消息都会重新发送给消费者。

消息取消确认也有单条取消模式和累积取消模式 ,取决于消费者使用的订阅模式。

在 Exclusive 模式和 Failover 订阅模式中,消费者仅仅只能对收到的最后一条消息进行取消确认。

在 Shared 和 Key_Shared 的订阅类型中,消费者可以单独否定确认消息。

如果启用了批量处理,那这一批中的所有消息都会重新发送给消费者。

3.2.1. NegativeAcksTracker

取消确认和其他消息确认一样,不会立即请求 Broker,而是把请求转交给 NegativeAcksTracker 进行处理。Tracker 中记录着每条消息以及需要延迟的时间。Tracker 默认是 33ms 左右一个时间刻度进行检查,默认延迟时间是 1 分钟,抽取出已经到期的消息并触发重新投递。Tracker 存在的意义是为了合并请求。另外如果延迟时间还没到,消息会暂存在内存,如果业务侧有大量的消息需要延迟消费,还是建议使用 reconsumeLater 接口。NegativeAck 唯一的好处是不需要每条消息都指定时间,可以全局设置延迟时间。

3.3. redelivery backoff 机制

通常情况下可以使用取消确认来达到处理消息失败后重新处理消息的目的,但通过 redelivery backoff 可以更好的实现这种目的。可以通过指定消息重试的次数、消息重发的延迟来重新消费处理失败的消息。

3.4. 确认超时

除了取消确认和 redelivery backoff 机制外,还可以通过开启自动重传递机制来处理未确认的消息。启用自动重传递后,client 会在 ackTimeout 时间内跟踪未确认的消息,并在消息确认超时后自动向代理重新发送未确认的消息请求。

3.5. 消息预拉取

Consumer 客户端 SDK 会默认预先拉取消息到 Consumer 本地,Broker 侧会把这些已经推送到 Consumer 本地的消息记录为 pendingAck,这些消息既不会再投递给别的消费者,也不会 ack 超时,除非当前 Consumer 被关闭,消息才会被重新投递。Broker 侧有一个 RedeliveryTracker 接口,这个 Tracker 会记录消息到底被重新投递了多少次,每条消息推送给消费者时,会先从 Tracker 的哈希表中查询一下重投递的次数,和消息一并推送给消费者。

3.6. 未确认的消息处理

如果消息被消费者消费后一直没有确认怎么办?

unAckedMessageTracker 中维护了一个时间轮,时间轮的刻度根据 ackTimeout 、tickDurationInMs 这两个参数生成,每个刻度时间= ackTimeout / tickDurationInMs。新追踪的消息会放入最后一个刻度,每次调度都会移除队列头第一个刻度,并新增一个刻度放入队列尾,保证刻度总数不变。每次调度,队列头刻度里的消息将会被清理,unAckedMessageTracker 会自动把这些消息做重投递。

重投递就是客户端发送一个 redeliverUnacknowledgedMessages 命令给 Broker。每一条推送给消费者但是未 ack 的消息,在 Broker 侧都会有一个集合来记录(pengdingAck),这是用来避免重复投递的。触发重投递后,Broker 会把对应的消息从这个集合里移除,然后这些消息就可以再次被消费了。

4. Pulsar 服务端

Broker 是 Pulsar 的一个无状态组件,主要负责运行以下两个组件:

4.1. 消息确认与留存

Pulsar Broker 会默认删除已经被所有 Consumer 确认的消息,并以 backlog 的方式持久化存储所有未被确认的内消息。

Pulsar 的 message retention(消息留存) 和 message expiry (消息过期)这两个特性可以调整 Broker 的默认设置。

4.2. 消息去重

实现消息去重的一种方式是确保消息仅生成一次,即生产者幂等。这种方式的缺点是把消息去重的工作交由应用去做。

在 Pulsar 中, Broker 支持配置开启消息去重,用户不需要为了消息去重去调整 Producer 的代码。启用消息去重后,即使一条消息被多次发送到 Topic 上,这条消息也只会被持久化到磁盘一次。

如下图,未开启消息去重时, Producer 发送消息 1 到 Topic 后, Broker 会把消息 1 持久化到 BookKeeper ,当 Producer 又发送消息 1 时, Broker 会把消息 1 再一次持久化到 BookKeeper 。开启消息去重后,当 Producer 再次发送消息 1 时, Broker 不会把消息 1 再一次持久化到磁盘。

4.2.1. 去重原理

Producer 对每一个发送的消息,都会采用递增的方式生成一个唯一的 sequenceID,这个消息会放在 message 的元数据中传递给 Broker 。同时, Broker 也会维护一个 PendingMessage 队列,当 Broker 返回发送成功 ack 后, Producer 会将 PendingMessage 队列中的对于的 Sequence ID 删除,表示 Producer 任务这个消息生产成功。Broker 会记录针对每个 Producer 接收到的最大 Sequence ID 和已经处理完的最大 Sequence ID。

当 Broker 开启消息去重后, Broker 会对每个消息请求进行是否去重的判断。收到的最新的 Sequence ID 是否大于 Broker 端记录的两个维度的最大 Sequence ID,如果大于则不重复,如果小于或等于则消息重复。消息重复时, Broker 端会直接返回 ack,不会继续走后续的存储处理流程。

4.3. 消息延迟传递

延时消息功能允许 Consumer 能够在消息发送到 Topic 后过一段时间才能消费到这条消息。在这种机制中,消息在发布到 Broker 后,会被存储在 BookKeeper 中,当到消息特定的延迟时间时,消息就会传递给 Consumer 。

下图为消息延迟传递的机制。Broker 在存储延迟消息的时候不会进行特殊的处理。当 Consumer 消费消息的时候,如果这条消息设置了延迟时间,则会把这条消息加入 DelayedDeliveryTracker 中,当到了指定的发送时间时,DelayedDeliveryTracker 才会把这条消息推送给消费者。

4.3.1. 延迟投递原理

在 Pulsar 中,可以通过两种方式实现延迟投递。分别为 deliverAfter 和 deliverAt。

deliverAfter 可以指定具体的延迟时间戳,deliverAt 可以指定消息在多长时间后消费。两种方式本质时一样的,deliverAt 方式下,客户端会计算出具体的延迟时间戳发送给 Broker 。

DelayedDeliveryTracker 会记录所有需要延迟投递的消息的 index 。index 由 Timestamp、 Ledger ID、 Entry ID 三部分组成,其中 Ledger ID 和 Entry ID 用于定位该消息,Timestamp 除了记录需要投递的时间,还用于延迟优先级队列排序。DelayedDeliveryTracker 会根据延迟时间对消息进行排序,延迟时间最短的放在前面。当 Consumer 在消费时,如果有到期的消息需要消费,则根据 DelayedDeliveryTracker index 的 Ledger ID、 Entry ID 找到对应的消息进行消费。如下图, Producer 依次投递 m1、m2、m3、m4、m5 这五条消息,m2 没有设置延迟时间,所以会被 Consumer 直接消费。m1、m3、m4、m5 在 DelayedDeliveryTracker 会根据延迟时间进行排序,并在到达延迟时间时,依次被 Consumer 进行消费。

4.4. Bundle

我们知道, Topic 分区会散落在不同的 Broker 中,那 Topic 分区和 Broker 的关系是如何维护的呢?当某个 Broker 负载过高时, Pulsar 怎么处理呢?

Topic 分区与 Broker 的关联是通过 Bundle 机制进行管理的。

每个 namespace 存在一个 Bundle 列表,在 namesapce 创建时可以指定 Bundle 的数量。Bundle 其实是一个分片机制,每个 Bundle 拥有 namespace 整个 hash 范围的一部分。每个 Topic (分区) 通过 hash 运算落到相应的 Bundle 区间,进而找到当前区间关联的 Broker 。每个 Bundle 绑定唯一的一个 Broker ,但一个 Broker 可以有多个 Bundle 。

如下图,T1、T2 这两个 Topic 的 hash 结果落在[0x0000000L——0x4000000L]中,这个 hash 范围的 Bundle 对应 Broker 2, Broker 2 会对 T1、T2 进行处理。

同理,T4 的 hash 结果落在[0x4000000L——0x8000000L]中,这个 hash 范围的 Bundle 对应 Broker 1, Broker 1 会对 T4 进行处理;

T5 的 hash 结果落在[0x8000000L——0xC000000L]中,这个 hash 范围的 Bundle 对应 Broker 3, Broker 3 会对 T5 进行处理;

T3 的 hash 结果落在[0xC000000L——0x0000000L]中,这个 hash 范围的 Bundle 对应 Broker 3, Broker 3 会对 T3 进行处理。

Bundle 可以根据绑定的 Broker 的负载进行动态的调整、绑定。当 Bundle 绑定的 Broker 的 Topic 数过多、负载过高时,都会触发 Bundle 拆分,将原有的 Bundle 拆分成 2 个 Bundle ,并将其中一个 Bundle 重新分配给不同的 Broker ,以降低原 Broker 的 Topic 数或负载。

5. Pulsar 存储层(Bookkeeper)

BookKeeper 是 Pulsar 的存储组件。

对于 Pulsar 的每个 Topic(分区),其数据并不会固定的分配在某个 Bookie 上,具体的逻辑实现我们在 Bundle 一节已经讨论过,而 Topic 的物理存储,实际上是通过 BookKeeper 组件来实现的。

5.1. 分片存储

概念:

Pulsar 在物理上采用分片存储的模式,存储粒度比分区更细化、存储负载更均衡。如图,一个分区 Topic-Partition 2 的数据由多个分片组成。每个分片作为 BookKeeper 中的一个 Ledger ,均匀的分布并存储在 BookKeeper 的多个 Bookie 节点中。

基于分配存储的机制,使得 Bookie 的扩容可以即时完成,无需任何数据复制或者迁移。当 Bookie 扩容时,Broker 可以立刻发现并感知新的 Bookie ,并尝试将新的分片 Segment 写入新增加的 Bookie 中。

如上图,在 Broker 中,消息以 Entry 的形式追加的形式写入 Ledger 中,每个 Topic 分区都有多个非连续 ID 的 Ledger,Topic 分区的 Ledger 同一时刻只有一个处于可写状态。

Topic 分区在存储消息时,会先找到当前使用的 Ledger ,生成 Entry ID(每个 Entry ID 在同一个 Ledger 内是递增的)。当 Ledger 的长度或 Entry 个数超过阈值时,新消息会存储到新 Ledger 中。每个 messageID 由[Ledger ID, Entry ID, Partition 编号,batch-index]组成。( Partition :消息所属的 Topic 分区,batch-index:是否为批量消息)

一个 Ledger 会根据 Topic 指定的副本数量存储到多个 Bookie 中。一个 Bookie 可以存放多个不连续的 Ledger。

5.2. 读写数据的流程

5.2.1. 消息的写入

1.将 Entry 追加写入 Ledger 中。

将这次 Entry 的更新操作写入 Journal 日志中,当由多个数据写入时,可以批量提交,将数据刷到 Journal 磁盘中。

将 Entry 数据写入写缓存中。

返回写入成功响应。

到这里,消息写入的同步流程已经完成。

3-A. 内存中的 Entry 数据会根据 Ledger 和写入 Ledger 的时间顺序进行排序,批量写入 Entry Log 中。

3-B. Entry 在 Entry log 中的偏移量以 Index Page 的方式写入 Ledger Cache 中,即 iIdex Files。

Entry Log 和 Ledger Cache 中的 Index File 会 Flush 到磁盘中。

5.2.2. 消息的读取

A.先从写缓存中以尾部读的方式读取。

B.如果写缓存未命中,则从读缓存中读取。

C.如果读缓存未命中,则从磁盘中读取。磁盘读取有三步:

C-1.读取 Index Disk,获取 Entry 的偏移量。

C-2.根据 Entry 的偏移量,在 Entry Disk 中快速找到 Entry 。

C-3.将 Entry 数据写入读缓存中。

6. 参考文献

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8