框架源码专题-RocketMQ-3、消息ACK机制及消费进度管理
1. 消息类型
1.1. 顺序消费 (顺序消息)
❕ ^gmi3ab
https://www.bilibili.com/video/BV1L4411y7mn?p=24&vd_source=c5b2d0d7bc377c0c35dbc251d95cf204
顺序消息指的是,严格按照消息的发送顺序进行消费的消息 (FIFO)。
默认情况下生产者会把消息以 Round Robin 轮询方式发送到不同的 Queue 分区队列;而消费消息时会从多个 Queue 上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个 Queue 中,消费时也只从这个 Queue 上拉取消息,就严格保证了消息的顺序性。
当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个 OrderId 获取到的肯定是同一个队列。
1.1.1. 顺序消息生产
1.1.2. 顺序消息消费
1.1.3. 能否严格有序
https://dbaplus.cn/news-21-1123-1.html
1.2. 延时消息
1.2.1. 使用场景
比如电商里,提交了一个订单就可以发送一个延时消息,1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
1.2.2. 使用限制
1 |
|
现在 RocketMq 并不支持任意时间的延时,需要设置几个固定的延时等级,从 1s 到 2h 分别对应着等级 1 到 18
1.3. 事务消息
1.3.1. 流程分析
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
(1) 发送消息(half 消息)
(2) 服务端响应消息写入结果
(3) 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行)
(4) 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)
1.3.2. 使用限制
- 事务消息不支持延时消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写AbstractTransactionCheckListener
类来修改这个行为。 - 事务消息将在 Broker 配置文件中的参数
transactionMsgTimeout
这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS
来改变这个限制,该参数优先于transactionMsgTimeout
参数。 - 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ 服务器能通过它们的生产者 ID 查询到消费者。
2. 消费类型
消费者从 Broker 中获取消息的方式有两种:pull 拉取方式和 push 推动方式。
消费者组对于消息消费的模式又分为两种:集群消费 Clustering 和广播消费 Broadcasting。
2.1. 拉取式消费
2.2. 推送式消费
2.3. 对比
pull:需要应用去实现对关联 Queue 的遍历,实时性差;但便于应用控制消息的拉取
push:封装了对关联 Queue 的遍历,实时性强,但会占用较多的系统资源
3. 消费模式
3.1. 广播消费
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收同一个 Topic 的全量消息。即每条消息都会被发送到 Consumer Group 中的每个 Consumer。
3.2. 集群消费
3.2.1. 各种关系⭐️🔴
❕❕ ^r5bwjc
3.2.1.1. 所有主 Broker 分摊所有 Topic 的 MessageQueue
分布式专题-MQ-RocketMQ-1、基本原理3.2.1.2. 订阅关系一致性
由于消息队列 RocketMQ 版的订阅关系主要由 Topic+Tag 共同组成,因此,保持订阅关系一致意味着同一个消费者 Group ID 下所有的 Consumer 实例需在以下方面均保持一致:
- 订阅的 Topic 必须一致,例如:Consumer1 订阅 TopicA 和 TopicB,Consumer2 也必须订阅 TopicA 和 TopicB,不能只订阅 TopicA、只订阅 TopicB 或订阅 TopicA 和 TopicC。
- 订阅的同一个 Topic 中的 Tag 必须一致,包括 Tag 的数量和 Tag 的顺序,例如:Consumer1 订阅 TopicB 且 Tag 为 Tag1||Tag2,Consumer2 订阅 TopicB 的 Tag 也必须是 Tag1||Tag2,不能只订阅 Tag1、只订阅 Tag2 或者订阅 Tag2||Tag1。
https://help.aliyun.com/document_detail/43523.html
3.2.1.3. 消费者组与 Topic - 多对多
- 如上图所示,一个消费者组可以消费多个 Topic,多个 Tag
但同一个消费者组中的消费者必须订阅完全相同的 Topic 和 Tag - 一个 Topic 也可以被多个消费者组消费
3.2.1.4. 消费者与 MessageQueue - 1 对多
在集群消费模式下,每条消息只需要投递到订阅这个 topic 的 Consumer Group 下的一个实例即可。RocketMQ 采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条 message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照 queue 的数量和实例的数量平均分配 queue 给每个实例。
默认的分配算法是 AllocateMessageQueueAveragely,如下图:
还有另外一种平均的算法是 AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条 queue,只是以环状轮流分 queue 的形式,如下图:
需要注意的是,集群模式下,queue 都是只允许分配给一个实例,这是由于如果多个实例同时消费一个 queue 的消息,由于拉取哪些消息是 consumer 主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个 queue 只分给一个 consumer 实例,一个 consumer 实例可以允许同时分到不同的 queue。
通过增加 consumer 实例去分摊 queue 的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的 queue 将分配到其他实例上继续消费。
但是如果 consumer 实例的数量比 message queue 的总数量还多的话,多出来的 consumer 实例将无法分到 queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让 queue 的总数量大于等于 consumer 的数量。
集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊同一个 Topic 的消息。即每条消息只会被发送到 Consumer Group 中的某个 Consumer。
3.3. 消息进度保存
广播模式:消费进度保存在consumer 端。因为广播模式下 consumer group 中每个 consumer 都会消费所有消息,但它们的消费进度是不同。所以 consumer 各自保存各自的消费进度。
集群模式:消费进度保存在broker 中。consumer group 中的所有 consumer 共同消费同一个 Topic 中的消息,同一条消息只会被消费一次。消费进度会参与到了消费的负载均衡中,故消费进度是需要共享的。下图是 broker 中存放的各个 Topic 的各个 Queue 的消费进度。
4. Rebalance 机制
Rebalance 即再均衡,指的是,将⼀个 Topic 下的多个 Queue 在同⼀个 Consumer Group 中的多个 Consumer 间进行重新分配的过程。
4.1. Rebalance 危害
Rebalance 的在提升消费能力的同时,也带来一些问题:
消费暂停:在只有一个 Consumer 时,其负责消费所有队列;在新增了一个 Consumer 后会触发 Rebalance 的发生。此时原 Consumer 就需要暂停部分队列的消费,等到这些队列分配给新的 Consumer 后,这些暂停消费的队列才能继续被消费。
消费重复:Consumer 在消费新分配给自己的队列时,必须接着之前 Consumer 提交的消费进度的 offset 继续消费。然而默认情况下,offset 是异步提交的,这个异步性导致提交到 Broker 的 offset 与 Consumer 实际消费的消息并不一致。这个不一致的差值就是可能会重复消费的消息。
同步提交:consumer 提交了其消费完毕的一批消息的 offset 给 broker 后,需要等待 broker 的成功 ACK。当收到 ACK 后,consumer 才会继续获取并消费下一批消息。在等待 ACK 期间,consumer 是阻塞的。
异步提交:consumer 提交了其消费完毕的一批消息的 offset 给 broker 后,不需要等待 broker 的成功 ACK。consumer 可以直接获取并消费下一批消息。对于一次性读取消息的数量,需要根据具体业务场景选择一个相对均衡的是很有必要的。因为数量过大,系统性能提升了,但产生重复消费的消息数量可能会增加;数量过小,系统性能会下降,但被重复消费的消息数量可能会减少。
消费突刺:由于 Rebalance 可能导致重复消费,如果需要重复消费的消息过多,或者因为 Rebalance 暂停时间过长从而导致积压了部分消息。那么有可能会导致在 Rebalance 结束之后瞬间需要消费很多消息。
5. Queue 分配算法
一个 Topic 中的 Queue 只能由 Consumer Group 中的一个 Consumer 进行消费,而一个 Consumer 可以同时消费多个 Queue 中的消息。那么 Queue 与 Consumer 间的配对关系是如何确定的,即 Queue 要分配给哪个 Consumer 进行消费,也是有算法策略的。常见的有四种策略。这些策略是通过在创建 Consumer 时的构造器传进去的。
5.1. 平均分配策略
5.2. 环形平均策略
5.3. 一致性 hash 策略
5.4. 同机房策略
5.5. 对比
一致性 hash 算法存在的问题:
两种平均分配策略的分配效率较高,一致性 hash 策略的较低。因为一致性 hash 算法较复杂。另外,一致性 hash 策略分配的结果也很大可能上存在不平均的情况。
一致性 hash 算法存在的意义:
其可以有效减少由于消费者组扩容或缩容所带来的大量的 Rebalance。
一致性 hash 算法的应用场景:
Consumer 数量变化较频繁的场景。
6. 至少一次原则
RocketMQ 有一个原则:每条消息必须要被成功消费一次。
那么什么是成功消费呢?Consumer 在消费完消息后会向其消费进度记录器提交其消费消息的 offset, offset 被成功记录到记录器中,那么这条消费就被成功消费了。
什么是消费进度记录器? 对于广播消费模式来说,Consumer 本身就是消费进度记录器。对于集群消费模式来说,Broker 是消费进度记录器。
7. offset 管理
7.1. 重试队列
当 rocketMQ 对消息的消费出现异常时,会将发生异常的消息的 offset 提交到 Broker 中的重试队列。系统在发生消息消费异常时会为当前的 topic@group 创建一个重试队列,该队列以%RETRY% 开头,到达重试时间后进行消费重试。
8. 重复消费 (消费幂等)
8.1. 什么是消费幂等
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的。
幂等:若某操作执行多次与执行一次对系统产生的影响是相同的,则称该操作是幂等的。
在互联网应用中,尤其在网络不稳定的情况下,消息很有可能会出现重复发送或重复消费。如果重复的消息可能会影响业务处理,那么就应该对消息做幂等处理。
8.2. 消息重复的场景分析
什么情况下可能会出现消息被重复消费呢?最常见的有以下三种情况:
8.2.1. 发送时消息重复
当一条消息已被成功发送到 Broker 并完成持久化,此时出现了网络闪断,从而导致 Broker 对 Producer 应答失败。如果此时 Producer 意识到消息发送失败并尝试再次发送消息,此时 Broker 中就可能会出现两条内容相同并且 Message ID 也相同的消息,那么后续 Consumer 就一定会消费两次该消息。
8.2.2. 消费时消息重复
消息已投递到 Consumer 并完成业务处理,当 Consumer 给 Broker 反馈应答时网络闪断,Broker 没有接收到消费成功响应。为了保证消息至少被消费一次的原则,Broker 将在网络恢复后再次尝试投递之前已被处理过的消息。此时消费者就会收到与之前处理过的内容相同、Message ID 也相同的消息。
8.2.3. Rebalance 时消息重复
当 Consumer Group 中的 Consumer 数量发生变化时,或其订阅的 Topic 的 Queue 数量发生变化时,会触发 Rebalance,此时 Consumer 可能会收到曾经被消费过的消息。
8.3. 通用解决方案
8.3.1. 两要素
幂等解决方案的设计中涉及到两项要素:幂等令牌、唯一性处理。只要充分利用好这两要素,就可以设计出好的幂等解决方案。
幂等令牌:是生产者和消费者两者中的既定协议,通常指具备唯⼀业务标识的字符串。例如,订单号、流水号。一般由 Producer 随着消息一同发送来的。
唯一性处理:服务端通过采用⼀定的算法策略,保证同⼀个业务逻辑不会被重复执行成功多次。
例如,对同一笔订单的多次支付操作,只会成功一次。
8.3.2. 解决方案
对于常见的系统,幂等性操作的通用性解决方案是:
- 首先通过缓存去重。在缓存中如果已经存在了某幂等令牌,则说明本次操作是重复性操作;若缓存没有命中,则进入下一步。
- 在唯一性处理之前,先在数据库中查询幂等令牌作为索引的数据是否存在。若存在,则说明本次操作为重复性操作;若不存在,则进入下一步。
- 在同一事务中完成三项操作:唯一性处理后,将幂等令牌写入到缓存,并将幂等令牌作为唯一索引的数据写入到 DB 中。
第 1 步已经判断过是否是重复性操作了,为什么第 2 步还要再次判断?能够进入第 2 步,说明已经不是重复操作了,第 2 次判断是否重复? 当然不重复。一般缓存中的数据是具有有效期的。缓存中数据的有效期一旦过期,就是发生缓存穿透,使请求直接就到达了 DBMS。
8.3.3. 解决方案举例
以支付场景为例:
- 当支付请求到达后,首先在 Redis 缓存中却获取key 为支付流水号的缓存value。若 value 不空,则说明本次支付是重复操作,业务系统直接返回调用侧重复支付标识;若 value 为空,则进入下一步操作
- 到 DBMS 中根据支付流水号查询是否存在相应实例。若存在,则说明本次支付是重复操作,业务系统直接返回调用侧重复支付标识;若不存在,则说明本次操作是首次操作,进入下一步完成唯一性处理
- 在分布式事务中完成三项操作:
完成支付任务;
将当前支付流水号作为 key,任意字符串作为 value,通过 set(key, value, expireTime) 将数
据写入到 Redis 缓存;
将当前支付流水号作为主键,与其它相关数据共同写入到 DBMS。
8.4. 消费幂等的实现
消费幂等的解决方案很简单:为消息指定不会重复的唯一标识。因为 Message ID 有可能出现重复的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 设置。
以支付场景为例,可以将消息的 Key 设置为订单号,作为幂等处理的依据。具体代码示例如下:
1 |
|
消费者收到消息时可以根据消息的 Key 即订单号来实现消费幂等:
1 |
|
9. 消息堆积与消费延迟
9.1. 概念
消息处理流程中,如果 Consumer 的消费速度跟不上 Producer 的发送速度,MQ 中未处理的消息会越来越多(进的多出的少),这部分消息就被称为堆积消息。消息出现堆积进而会造成消息的消费延迟。
以下场景需要重点关注消息堆积和消费延迟问题:
- 业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复。
- 业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消费延迟也无法接受。
9.2. 产生原因分析
9.2.1. 消息拉取
Consumer 通过长轮询 Pull 模式批量拉取的方式从服务端获取消息,将拉取到的消息缓存到本地缓冲队列中。对于拉取式消费,在内网环境下会有很高的吞吐量,所以这一阶段一般不会成为消息堆积的瓶颈。
一个单线程单分区的低规格主机 (Consumer,4C8G),其可达到几万的 TPS。如果是多个分区多个线程,则可以轻松达到几十万的 TPS。
9.2.2. 消息消费
Consumer 将本地缓存的消息提交到消费线程中,使用业务消费逻辑对消息进行处理,处理完毕后获取到一个结果。这是真正的消息消费过程。此时 Consumer 的消费能力就完全依赖于消息的消费耗时和消费并发度了。如果由于业务处理逻辑复杂等原因,导致处理单条消息的耗时较长,则整体的消息吞吐量肯定不会高,此时就会导致 Consumer 本地缓冲队列达到上限,停止从服务端拉取消息。
9.2.3. 结论
消息堆积的主要瓶颈在于客户端的消费能力,而消费能力由消费耗时和消费并发度决定。注意,消费耗时的优先级要高于消费并发度。即在保证了消费耗时的合理性前提下,再考虑消费并发度问题。
9.3. 消费耗时
影响消息处理时长的主要因素是代码逻辑。而代码逻辑中可能会影响处理时长代码主要有两种类型:
CPU 内部计算型代码和外部 I/O 操作型代码。
通常情况下代码中如果没有复杂的递归和循环的话,内部计算耗时相对外部 I/O 操作来说几乎可以忽略。所以外部 IO 型代码是影响消息处理时长的主要症结所在。
外部 IO 操作型代码举例:
- 读写外部数据库,例如对远程 MySQL 的访问
- 读写外部缓存系统,例如对远程 Redis 的访问
- 下游系统调用,例如 Dubbo 的 RPC 远程调用,Spring Cloud 的对下游系统的 Http 接口调用
关于下游系统调用逻辑需要进行提前梳理,掌握每个调用操作预期的耗时,这样做是为了能够判断消费逻辑中 IO 操作的耗时是否合理。通常消息堆积是由于下游系统出现了服务异常或达到了 DBMS 容量限制,导致消费耗时增加。
服务异常,并不仅仅是系统中出现的类似 500 这样的代码错误,而可能是更加隐蔽的问题。例如,网络带宽问题。
达到了 DBMS 容量限制,其也会引发消息的消费耗时增加。
9.4. 消费并发度
9.5. 如何避免
为了避免在业务使用时出现非预期的消息堆积和消费延迟问题,需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。其中最重要的就是梳理消息的消费耗时和设置消息消费的并发度。
9.5.1. 梳理消息的消费耗时
通过压测获取消息的消费耗时,并对耗时较高的操作的代码逻辑进行分析。梳理消息的消费耗时需要关注以下信息:
消息消费逻辑的计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。
消息消费逻辑中的 I/O 操作是否是必须的,能否用本地缓存等方案规避。
消费逻辑中的复杂耗时的操作是否可以做异步化处理。如果可以,是否会造成逻辑错乱。
9.5.2. 设置消费并发度
对于消息消费并发度的计算,可以通过以下两步实施:
逐步调大单个 Consumer 节点的线程数,并观测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量。
根据上下游链路的流量峰值计算出需要设置的节点数
节点数 = 流量峰值 / 单个节点消息吞吐量
10. 消息刷盘
11. 保证消费成功
PushConsumer 为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ 才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。
消费的时候,我们需要注入一个消费回调,具体 sample 代码如下:
1 |
|
业务实现消费回调的时候,当且仅当此回调函数返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,RocketMQ 才会认为这批消息(默认是 1 条)是消费完成的。(具体如何 ACK 见后面章节)
如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
,RocketMQ 就会认为这批消息消费失败了。
为了保证消息是肯定被至少消费成功一次,RocketMQ 会把这批消息重发回 Broker(topic 不是原 topic 而是这个消费租的 RETRY topic),在延迟的某个时间点(默认是 10 秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认 16 次),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。
12. 启动的时候从哪里消费
当新实例启动的时候,PushConsumer 会拿到本消费组 broker 已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次 Pull 请求。
如果这个消费进度在 Broker 并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:
1 |
|
所以,社区中经常有人问:“为什么我设了 CONSUME_FROM_LAST_OFFSET
,历史的消息还是被消费了”? 原因就在于只有全新的消费组才会使用到这些策略,老的消费组都是按已经存储过的消费进度继续消费。
对于老消费组想跳过历史消息需要自身做过滤,或者使用先修改消费进度。
13. 无法避免重复消费
RocketMQ 是以 consumer group+queue 为单位是管理消费进度的,以一个 consumer offset 标记这个消费组在这条 queue 上的消费进度。
如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。
每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到 broker,以此持久化消费进度。
但是每次记录消费进度的时候,只会把一批消息中最小的 offset 值为消费进度值,如下图:
这种方式和传统的一条 message 单独 ack 的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了 100 条消息如 2101-2200 的消息,后面 99 条都消费结束了,只有 2101 消费一直没有结束的情况。
在这种情况下,RocketMQ 为了保证消息肯定被消费成功,消费进度职能维持在 2101,直到 2101 也消费结束了,本地的消费进度才能标记 2200 消费结束了(注:consumerOffset=2201)。
在这种设计下,就有消费大量重复的风险。如 2101 在还没有消费完成的时候消费实例突然退出(机器断电,或者被 kill)。这条 queue 的消费进度还是维持在 2101,当 queue 重新分配给新的实例的时候,新的实例从 broker 上拿到的消费进度还是维持在 2101,这时候就会又从 2101 开始消费,2102-2200 这批消息实际上已经被消费过还是会投递一次。
对于这个场景,RocketMQ 暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是 RocketMQ 官方多次强调的态度。
实际上,从源码的角度上看,RocketMQ 可能是考虑过这个问题的,截止到 3.2.6 的版本的源码中,可以看到为了缓解这个问题的影响面,DefaultMQPushConsumer
中有个配置 consumeConcurrentlyMaxSpan
1 |
|
这个值默认是 2000,当 RocketMQ 发现本地缓存的消息的最大值 - 最小值差距大于这个值(2000)的时候,会触发流控——也就是说如果头尾都卡住了部分消息,达到了这个阈值就不再拉取消息。
但作用实际很有限,像刚刚这个例子,2101 的消费是死循环,其他消费非常正常的话,是无能为力的。一旦退出,在不人工干预的情况下,2101 后所有消息全部重复!
14. 实战经验
15. 参考与感谢
https://jaskey.github.io/blog/2017/01/25/rocketmq-consume-offset-management/
分布式专题-MQ-RocketMQ-1、基本原理