1. 消息可靠性

image.png

1.1. 生产者消息确认

image.png

image-20210718161707992

1.1.1. 修改配置

首先,修改 publisher 服务中的 application.yml 文件,添加下面的内容:

1
2
3
4
5
6
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true

说明:

  • publish-confirm-type:开启 publisher-confirm,这里支持两种类型:
    • simple:同步等待 confirm 结果,直到超时
    • correlated异步回调,定义 ConfirmCallback,MQ 返回结果时会回调这个 ConfirmCallback
  • publish-returns:开启 publish-return 功能,同样是基于 callback 机制,不过是定义 ReturnCallback
  • template.mandatory定义消息路由失败时的策略。true,则调用 ReturnCallback;false:则直接丢弃消息

1.1.2. 定义 ReturnCallback -ApplicationContextAware

每个 RabbitTemplate 只能配置一个 ReturnCallback,因此需要在项目加载时配置:
修改 publisher 服务,添加一个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投递失败,记录日志
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
});
}
}

1.1.3. 定义 ConfirmCallback

ConfirmCallback 可以在发送消息时指定,因为每个业务处理 confirm 成功或失败的逻辑不一定相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失败
log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

// 休眠一会儿,等待ack回执
Thread.sleep(2000);
}

1.2. 消息持久化

生产者确认可以确保消息投递到 RabbitMQ 的队列中,但是消息发送到 RabbitMQ 以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在 RabbitMQ 中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

默认情况下,由 SpringAMQP 声明的交换机、队列、消息都是持久化的

1.3. 消费者消息确认

image.png

1.4. 消费失败重试机制⭐️🔴

当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,导致 mq 的消息处理飙升,带来不必要的压力

1.4.1. 本地重试

image.png
image.png

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring 会返回 ack,消息会被丢弃

1.4.2. 失败策略

image.png

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由 Spring 内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecovery 接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。==默认就是这种方式==
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是 RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

兜底方案
image.png

1.5. 如何确保 RabbitMQ 消息的可靠性⭐️🔴

%%
▶61.🏡⭐️◼️【🌈费曼无敌🌈⭐️第一步⭐️】◼️⭐️-point-20230304-2008%%
❕ ^yw0uls
%%
▶7.🏡⭐️◼️【🌈费曼无敌🌈⭐️第一步⭐️】◼️⭐️-point-20230402-1954%%
❕ ^g43mqv

  • 开启==生产者确认机制==,确保生产者的消息能到达队列
  • 开启==持久化==功能,确保消息未消费前在队列中不会丢失
  • 开启==消费者确认机制为 auto==,由 spring 确认消息处理成功后完成 ack
  • 开启==消费者失败重试机制==,并设置 MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

1.5.1. 详细说明

[[pages/002-schdule/001-Arch/001-Subject/005-分布式专题/黑马-面试题/讲义/微服务常见面试题#^fzknz2]]

2. 死信交换机

2.1. 什么是死信

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

2.2. 死信交换机

如果这个包含死信的队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为 死信交换机(Dead Letter Exchange,检查 DLX)。
如图,一个消息被消费者拒绝了,变成了死信:

image-20210718174328383

因为 simple.queue 绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:

image-20210718174416160

如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:

image-20210718174506856

另外,队列将死信投递给死信交换机时,必须知道两个信息:

  • 死信交换机名称: dead-letter-exchange
  • 死信交换机与死信队列绑定的 RoutingKey: dead-letter-routing-key

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。

image-20210821073801398

2.3. 默认兜底方案与死信队列区别

在失败重试策略中,默认的 RejectAndDontRequeueRecoverer 会在本地重试次数耗尽后,发送 reject 给 RabbitMQ,消息变成死信,被丢弃。

image.png

2.4. 总结

什么样的消息会成为死信?
%%
▶2.🏡⭐️◼️【🌈费曼无敌🌈⭐️第一步⭐️】◼️⭐️-point-20230307-0809%%
❕ ^auuhua

  • 消息被消费者 reject 或者返回 nack
  • 消息超时未消费
  • 队列满了

死信交换机的使用场景是什么?
在失败重试策略中,默认的 RejectAndDontRequeueRecoverer 会在本地重试次数耗尽后,发送 reject 给 RabbitMQ,消息变成死信,被丢弃。

我们可以给 simple.queue 添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;
  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

3. 延时队列

3.1. TTL

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间
  • 消息本身设置了超时时间

image.png

3.2. 延迟队列⭐️🔴

利用 TTL 结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在 15 分钟内未支付,则自动取消
  • 预约工作会议,20 分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以 RabbitMQ 的官方也推出了一个插件,原生支持延迟队列效果。这个插件就是 DelayExchange 插件。参考 RabbitMQ 的插件列表页面:https://www.rabbitmq.com/community-plugins.html

image-20210718192529342

4. 惰性队列

4.1. 消息堆积问题

%%
▶4.🏡⭐️◼️【🌈费曼无敌🌈⭐️第一步⭐️】◼️⭐️-point-20230307-0852%%
❕ ^4y1a0a

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
解决消息堆积有两种思路:

  • 增加更多消费者,提高消费速度。也就是我们之前说的 work queue 模式
  • 扩大队列容积,提高堆积上限

要提升队列容积,把消息保存在内存中显然是不行的。

4.2. 惰性队列

%%
▶7.🏡⭐️◼️【🌈费曼无敌🌈⭐️第一步⭐️】◼️⭐️-point-20230307-0909%%
❕ ^k34tgi

从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

消息堆积问题的解决方案?

  • 队列上绑定多个消费者,提高消费速度
  • 使用惰性队列,可以在 mq 中保存更多消息

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的 page-out,性能比较稳定

惰性队列的缺点有哪些?

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的 IO

5. 高可用方案

%%
▶4.🏡⭐️◼️【🌈费曼无敌🌈⭐️第一步⭐️】◼️⭐️-point-20230402-1811%%
❕ ^g8fwqy

5.1. 普通集群

RabbitMQ 提供了两种集群模式: 普通集群模式 镜像集群模式 先来看普通集群 这种集群模式下,各个节点只同步元数据,不同步队列中的消息。 其中元数据包含队列的名称、交换机名称及属性、交换机与队列的绑定关系等。 当我们发送消息和消费消息的时候,不管请求发送到 RabbitMQ 集群的哪个节点。 最终都会通过元数据定位到队列所在的节点去存储以及拉取数据。 很显然,这种集群方式并不能保证 Queue 的高可用,因为一旦 Queue 所在的节 点挂了,那么这个 Queue 的消息就没办法访问了。 它的好处是通过多个节点分担了流量的压力,提升了消息的吞吐能力。
image.png

5.2. 镜像集群

它和普通集群的区别在于,镜像集群中 Queue 的数据会在 RabbitMQ 集群的每个节点存储一份。 一旦任意一个节点发生故障,其他节点仍然可以继续提供服务。 所以这种集群模式实现了真正意义上的高可用。
image.png

最后,在镜像集群的模式下,我们可以通过 Keepalived+HAProxy 来实现 RabbitMQ 集群的负载均衡。其中: HAProxy 是一个能支持四层和七层的负载均衡器,可以实现对 RabbitMQ 集群的负载均衡同时为了避免 HAProxy 的单点故障,可以再增加 Keepalived 实现 HAProxy 的主备,如果 HAProxy 主节点出现故障那么备份节点就会接管主节点提供服务。 Keepalived 提供了一个虚拟 IP,业务只需要连接到虚拟 IP 即可。

image.png

6. 面试题

6.1. RabbitMQ 实现高可用

https://www.bilibili.com/video/BV16d4y1P7ZF/?spm_id_from=333.788&vd_source=c5b2d0d7bc377c0c35dbc251d95cf204

6.2. RabbitMQ 实现可靠消息传递

https://www.bilibili.com/video/BV14N4y1g7pV/?spm_id_from=333.788&vd_source=c5b2d0d7bc377c0c35dbc251d95cf204

分布式专题-MQ-RabbitMQ-1、基本原理

6.3. RabbitMQ 的消息如何实现路由

https://www.bilibili.com/video/BV1uG411x7r4/?spm_id_from=333.788&vd_source=c5b2d0d7bc377c0c35dbc251d95cf204
RabbitMQ 是一个基于高级消息队列协议 AMQP(Advanced Message Queuing Protocol) 协议实现的分布式消息中间件。AMQP 的具体工作机制是,生产者把消息发送到 RabbitMQ Broker 上的 Exchange 交换机上。 Exchange 交换机把收到的消息根据路由规则发给绑定的队列(Queue)。最后再把消息投递给订阅了这个队列的消费者,从而完成消息的异步通讯。
image.png

其中,Exchange 是一个消息交换机,它里面定义了消息路由的规则,也就是这个消息路由到那个队列。 然后 Queue 表示消息的载体,每个消息可以根据路由规则路由到一个或者多个队列里面。 而关于消息的路由机制,核心的组件是 Exchange。 它负责接收生产者的消息然后把消息路由到消息队列,而消息的路由规则由 ExchangeType 和 Binding 决定。 Binding 表示建立 Queue 和 Exchange 之间的绑定关系,每一个绑定关系会存在一个 BindingKey。 通过这种方式相当于在 Exchange 中建立了一个路由关系表
image.png
生产者发送消息的时候,需要声明一个 routingKey(路由键),Exchange 拿到 routingKey 之后,根据 RoutingKey 和路由表里面的 BindingKey 进行匹配,而匹配的规则是通过 ExchangeType 来决定的。

image.png

在 RabbitMQ 中,有三种类型的 Exchange:direct ,fanout 和 topic。 direct: 完整匹配方式,也就是 Routing key 和 Binding Key 完全一致,相当于点对点的发送。 fanout: 广播机制,这种方式不会基于 Routing key 来匹配,而是把消息广播给绑定到当前 Exchange 上的所有队列上。 topic: 正则表达式匹配,根据 Routing Key 使用正则表达式进行匹配,符合匹配规则的 Queue 都会收到这个消息

7. 实战经验

8. 参考与感谢

8.1. 视频

https://www.bilibili.com/video/BV1LQ4y127n4/?p=158&spm_id_from=pageDriver&vd_source=c5b2d0d7bc377c0c35dbc251d95cf204

8.2. 资料

/Users/taylor/Nutstore Files/Obsidian_data/pages/002-schdule/001-Arch/001-Subject/002- 框架源码专题/001-MQ/黑马 MQ 讲义/day05-MQ 高级/讲义/RabbitMQ- 高级篇.md