框架源码专题-RocketMQ-1、基本原理
1. 历史
Kafka:一个 topic 中每个 Queue 都是一个单独的文件,所有消息都存在其中
RocketMQ:所有 topic 所有 Queue 的消息都放在一种文件中,即 CommitLog,默认大小为 1G。但又为每个 topic 创建一个目录,然后为每个 Queue 创建一个文件,即 ConsumeQueue,用来存储索引信息。存储使用 CommitLog,查询使用 ConsumeQueue。实现了读写分离。
2. 基本概念
2.1. 消息(Message)
消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
2.2. 主题(Topic)- 消费者 - 消费者组
Topic 表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。 topic:message 1:n message : topic 1:1
一个生产者可以同时发送多种 Topic 的消息;而一个消费者只对某种特定的 Topic 感兴趣,即只可以订阅和消费一种 Topic 的消息。 producer:topic 1:n consumer:topic 1:1
2.3. 队列(Queue)- 同消费组只能 1 个消费者
存储消息的物理实体。一个 Topic 中可以包含多个 Queue,每个 Queue 中存放的就是该 Topic 的消息。一个 Topic 的 Queue 也被称为一个 Topic 中消息的分区(Partition)。
一个 Topic 的 Queue 中的消息只能被一个消费者组中的一个消费者消费。一个 Queue 中的消息不允许同一个消费者组中的多个消费者同时消费。
在学习参考其它相关资料时,还会看到一个概念:分片(Sharding)。分片不同于分区。在 RocketMQ 中,分片指的是存放相应 Topic 的 Broker。每个分片中会创建出相应数量的分区,即 Queue,每个 Queue 的大小都是相同的。
2.4. 消息标识(MessageId/Key)
RocketMQ 中每个消息拥有唯一的 MessageId,且可以携带具有业务标识的 Key,以方便对消息的查询。
不过需要注意的是,MessageId 有两个:在生产者 send() 消息时会自动生成一个 MessageId(msgId),当消息到达 Broker 后,Broker 也会自动生成一个 MessageId(offsetMsgId)。msgId、offsetMsgId 与 key 都称为消息标识。
msgId:由 producer 端生成,其生成规则为:
==producerIp + 进程 pid + MessageClientIDSetter 类的 ClassLoader 的 hashCode + 当前时间 + AutomicInteger 自增计数器==
offsetMsgId:由 broker 端生成,其生成规则为:==brokerIp + 物理分区的 offset(Queue 中的偏移量)==
key:由用户指定的业务相关的唯一标识
3. 系统组成
3.1. Producer
消息生产者,负责生产消息。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
例如,业务系统产生的日志写入到 MQ 的过程,就是消息生产的过程。再如,电商平台中用户提交的秒杀请求写入到 MQ 的过程,就是消息生产的过程
3.2. Consumer
RocketMQ 中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类 Consumer 消费的是同一个 Topic 类型的消息。消费者组使得在消息消费方面,实现负载均衡(将一个 Topic 中的不同的 Queue 平均分配给同一个 Consumer Group 的不同的 Consumer,注意,并不是将消息负载均衡)和容错(一个 Consmer 挂了,该 Consumer Group 中的其它 Consumer 可以接着消费原 Consumer 消费的 Queue)的目标变得非常容易。
消费者组中 Consumer 的数量应该小于等于订阅 Topic 的 Queue 数量。如果超出 Queue 数量,则多出的 Consumer 将不能消费消息。
不过,一个 Topic 类型的消息可以被多个消费者组同时消费。
注意
1)消费者组只能消费一个 Topic 的消息,不能同时消费多个 Topic 消息
2)一个消费者组中的消费者必须订阅完全相同的 Topic
3.3. Broker
3.3.1. 功能介绍
Broker 充当着消息中转角色,负责存储消息、转发消息。Broker 在 RocketMQ 系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker 同时也存储着消息相关的元数据,包括消费者组消费进度偏移 offset、主题、队列等。
3.3.2. 模块构成
Remoting Module:整个 Broker 的实体,负责处理来自 clients 端的请求。而这个 Broker 实体则由以下模块构成。
Client Manager:客户端管理器。负责接收、解析客户端 (Producer/Consumer) 请求,管理客户端。例如,维护 Consumer 的 Topic 订阅信息
Store Service:存储服务。提供方便简单的 API 接口,处理消息存储到物理硬盘和消息查询功能。
HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
Index Service:索引服务。根据特定的 Message key,对投递到 Broker 的消息进行索引服务,同时也提供根据 Message Key 对消息进行快速查询的功能。
3.3.3. 集群部署
为了增强 Broker 性能与吞吐量,Broker 一般都是以集群形式出现的。各集群节点中可能存放着相同 Topic 的不同 Queue,即所有主 Broker 分摊所有 Topic 的所有 messageQueue。
不过,这里有个问题,如果某 Broker 节点宕机,如何保证数据不丢失呢?其解决
方案是,将每个 Broker 集群节点进行横向扩展,即将 Broker 节点再建为一个 HA 集群,解决单点问题。
Broker 节点集群是一个主从集群,即集群中具有 Master 与 Slave 两种角色。Master 负责处理读写操作请求,Slave 负责对 Master 中的数据进行备份。当 Master 挂掉了,Slave 则会自动切换为 Master 去工作。所以这个 Broker 集群是主备集群。一个 Master 可以包含多个 Slave,但一个 Slave 只能隶属于一个 Master。 Master 与 Slave 的对应关系是通过指定相同的 BrokerName、不同的 BrokerId 来确定的。BrokerId 为 0 表 示 Master,非 0 表示 Slave。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
3.4. NameServer
NameServer 是一个 Broker 与 Topic 路由的注册中心,支持 Broker 的动态注册与发现。
RocketMQ 的思想来自于 Kafka,而 Kafka 是依赖了 Zookeeper 的。所以,在 RocketMQ 的早期版本,即在 MetaQ v1.0 与 v2.0 版本中,也是依赖于 Zookeeper 的。从 MetaQ v3.0,即 RocketMQ 开始去掉了 Zookeeper 依赖,使用了自己的 NameServer。
主要包括两个功能:
Broker 管理:接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据;提供心跳检测机制,检查 Broker 是否还存活。
路由信息管理:每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Conumser 通过 NameServer 可以获取整个 Broker 集群的路由信息,从而进行消息的投递和消费。
3.4.1. 路由注册
NameServer 通常也是以集群的方式部署,不过,NameServer 是无状态的,即 NameServer 集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。那各节点中的数据是如何进行数据同步的呢?在 Broker 节点启动时,轮询 NameServer 列表,与每个 NameServer 节点建立长连接,发起注册请求。在 NameServer 内部维护着⼀个 Broker 列表,用来动态存储 Broker 的信息。
注意,这是与其它像 zk、Eureka、Nacos 等注册中心不同的地方。
这种 NameServer 的无状态方式,有什么优缺点:
优点:NameServer 集群搭建简单,扩容简单。
缺点:对于 Broker,必须明确指出所有 NameServer 地址。否则未指出的将不会去注册。也正因为如此,NameServer 并不能随便扩容。因为,若 Broker 不重新配置,新增的 NameServer 对于 Broker 来说是不可见的,其不会向这个 NameServer 进行注册。
Broker 节点为了证明自己是活着的,为了维护与 NameServer 间的长连接,会将最新的信息以心跳包的方式上报给 NameServer,每 30 秒发送一次心跳。心跳包中包含 BrokerId、Broker 地址 (IP+Port)、 Broker 名称、Broker 所属集群名称等等。NameServer 在接收到心跳包后,会更新心跳时间戳,记录这个 Broker 的最新存活时间。
3.4.2. 路由剔除
NameServer 中有⼀个定时任务,每隔 10 秒就会扫描⼀次 Broker 表,查看每一个 Broker 的最新心跳时间戳距离当前时间是否超过 120 秒,如果超过,则会判定 Broker 失效,然后将其从 Broker 列表中剔除。
扩展:对于 RocketMQ 日常运维工作,例如 Broker 升级,需要停掉 Broker 的工作。OP 需要怎么做?OP 需要将 Broker 的读写权限禁掉。一旦 client(Consumer 或 Producer) 向 broker 发送请求,都会收到 broker 的 NO_PERMISSION 响应,然后 client 会进行对其它 Broker 的重试。当 OP 观察到这个 Broker 没有流量后,再关闭它,实现 Broker 从 NameServer 的移除。
3.4.3. 路由发现
RocketMQ 的路由发现采用的是 Pull 模型。当 Topic 路由信息出现变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取主题最新的路由。默认客户端每 30 秒会拉取一次最新的路由。
扩展:
1)Push 模型:推送模型。其实时性较好,是一个“发布 - 订阅”模型,需要维护一个长连接。而长连接的维护是需要资源成本的。该模型适合于的场景: 实时性要求较高 Client 数量不多,Server 数据变化较频繁
2)Pull 模型:拉取模型。存在的问题是,实时性较差。
3)Long Polling 模型:长轮询模型。其是对 Push 与 Pull 模型的整合,充分利用了这两种模型的优势,屏蔽了它们的劣势。
对比 Nacos:服务注册与发现-6、Nacos
3.4.4. 客户端 NameServer 选择策略
这里的客户端指的是 Producer 与 Consumer
客户端在配置时必须要写上 NameServer 集群的地址,那么客户端到底连接的是哪个 NameServer 节点呢?客户端首先会生产一个随机数,然后再与 NameServer 节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接。如果连接失败,则会采用 round-robin 策略,逐个尝试着去连接其它节点。
首先采用的是随机策略进行的选择,失败后采用的是轮询策略。
扩展:Zookeeper Client 是如何选择 Zookeeper Server 的? 简单来说就是,经过两次 Shuffle,然后选择第一台 Zookeeper Server。详细说就是,将配置文件中的 zk server 地址进行第一次 Shuffle,然后随机选择一个。这个选择出的一般都是一个 hostname。然后获取到该 hostname 对应的所有 ip,再对这些 ip 进行第二次 Shuffle,从 Shuffle 过的结果中取第一个 server 地址进行连接。
4. 工作流程
4.1. 具体流程
1)启动 NameServer,NameServer 启动后开始监听端口,等待 Broker、Producer、Consumer 连接。
2)启动 Broker 时,Broker 会与所有的 NameServer 建立并保持长连接,然后每 ==30 秒==向 NameServer 定时发送心跳包。
3)发送消息前,可以先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,当然,在创建 Topic 时也会将 Topic 与 Broker 的关系写入到 NameServer 中。不过,这步是可选的,也可以在发送消息时自动创建 Topic。
4)Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取路由信息,即当前发送的 Topic 消息的 Queue 与 Broker 的地址(IP+Port)的映射关系。然后根据算法策略从队选择一个 Queue,与队列所在的 Broker 建立长连接从而向 Broker 发消息。当然,在获取到路由信息后,Producer 会首先将路由信息缓存到本地,再==每 30 秒从 NameServer 更新一次路由信息==。
5)Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取其所订阅 Topic 的路由信息,然后根据算法策略从路由信息中获取到其所要消费的 Queue,然后直接跟 Broker 建立长连接,开始消费其中的消息。Consumer 在获取到路由信息后,同样也会每 30 秒从 NameServer 更新一次路由信息。不过不同于 Producer 的是,Consumer 还会向 Broker 发送心跳,以确保 Broker 的存活状态。
4.2. Topic 的创建模式
手动创建 Topic 时,有两种模式:
集群模式:该模式下创建的 Topic 在该集群中,所有 Broker 中的 Queue 数量是相同的。
Broker 模式:该模式下创建的 Topic 在该集群中,每个 Broker 中的 Queue 数量可以不同。
自动创建 Topic 时,默认采用的是 Broker 模式,会为每个 Broker 默认创建 4 个 Queue。
4.3. 读/写队列
按数量大的创建 queue
从物理上来讲,读/写队列是同一个队列。所以,不存在读/写队列数据同步问题。读/写队列是逻辑上进行区分的概念。一般情况下,读/写队列数量是相同的。
例如,创建 Topic 时设置的写队列数量为 8,读队列数量为 4,此时系统会创建 8 个 Queue,分别是 0 1 2 3 4 5 6 7。Producer 会将消息写入到这 8 个队列,但 Consumer 只会消费 0 1 2 3 这 4 个队列中的消息,4 5 6 7 中的消息是不会被消费到的。
再如,创建 Topic 时设置的写队列数量为 4,读队列数量为 8,此时系统会创建 8 个 Queue,分别是 0 1 2 3 4 5 6 7。Producer 会将消息写入到 0 1 2 3 这 4 个队列,但 Consumer 只会消费 0 1 2 3 4 5 6 7 这 8 个队列中的消息,但是 4 5 6 7 中是没有消息的。此时假设 Consumer Group 中包含两个 Consuer,Consumer1 消 费 0 1 2 3,而 Consumer2 消费 4 5 6 7。但实际情况是,Consumer2 是没有消息可消费的。
也就是说,当读/写队列数量设置不同时,总是有问题的。那么,为什么要这样设计呢?
其这样设计的目的是为了,方便 Topic 的 Queue 的缩容。
例如,原来创建的 Topic 中包含 16 个 Queue,如何能够使其 Queue 缩容为 8 个,还不会丢失消息?可以动态修改写队列数量为 8,读队列数量不变。此时新的消息只能写入到前 8 个队列,而消费者消费的却是 16 个队列中的数据。当发现后 8 个 Queue中的消息消费完毕后,就可以再将读队列数量动态设置为 8。整个缩容过程,没有丢失任何消息。
perm 用于设置对当前创建 Topic 的操作权限:2 表示只写,4 表示只读,6 表示读写。
5. 底层原理
5.1. 数据复制与刷盘策略⭐️🔴
5.1.1. 复制策略
复制策略是Broker的Master与Slave间的数据同步方式。分为同步复制与异步复制:
同步复制:消息写入 master 后,master 会等待 slave 同步数据成功后才向 producer 返回成功 ACK
异步复制:消息写入 master 后,master 立即向 producer 返回成功 ACK,无需等待 slave 同步数据成功异步复制策略会降低系统的写入延迟,RT 变小,提高了系统的吞吐量
5.1.2. 刷盘策略
刷盘策略指的是broker中消息的落盘方式,即消息发送到broker内存后消息持久化到磁盘的方式。
分为同步刷盘与异步刷盘:
同步刷盘:当消息持久化到 broker 的磁盘后才算是消息写入成功。
异步刷盘:当消息写入到 broker 的内存后即表示消息写入成功,无需等待消息持久化到磁盘。
1)异步刷盘策略会降低系统的写入延迟,RT 变小,提高了系统的吞吐量
2)消息写入到 Broker 的内存,一般是写入到了 PageCache
3)对于异步刷盘策略,消息会写入到 PageCache 后立即返回成功 ACK。但并不会立即做落盘操作,而是当 PageCache 到达一定量时会自动进行落盘。
5.2. Broker 集群模式
根据 Broker 集群中各个节点间关系的不同,Broker 集群可以分为以下几类:
5.2.1. 单 Master
只有一个 broker(其本质上就不能称为集群)。这种方式也只能是在测试时使用,生产环境下不能使用,因为存在单点问题。
5.2.2. 多 Master
broker 集群仅由多个 master 构成,不存在 Slave。同一 Topic 的各个 Queue 会平均分布在各个 master 节点上。
优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅(不可消费),消息实时性会受到影响。
以上优点的前提是,这些 Master 都配置了 RAID 磁盘阵列。如果没有配置,一旦出现某 Master 宕机,则会发生大量消息丢失的情况。
5.2.3. 多 Master 多 Slave 模式 - 异步复制
broker 集群由多个 master 构成,每个 master 又配置了多个 slave(在配置了 RAID 磁盘阵列的情况下,一个 master 一般配置一个 slave 即可)。master 与 slave 的关系是主备关系,即 master 负责处理消息的读写请求,而 slave 仅负责消息的备份与 master 宕机后的角色切换。
异步复制即前面所讲的复制策略中的异步复制策略,即消息写入 master 成功后,master 立即向 producer 返回成功 ACK,无需等待 slave 同步数据成功。
该模式的最大特点之一是,当 master 宕机后 slave 能够自动切换为 master。不过由于 slave 从 master 的同步具有短暂的延迟(毫秒级),所以当 master 宕机后,这种异步复制方式可能会存在少量消息的丢失问题。
Slave 从 Master 同步的延迟越短,其可能丢失的消息就越少。对于 Master 的 RAID 磁盘阵列,若使用的也是异步复制策略,同样也存在延迟问题,同样也可能会丢失消息。但 RAID 阵列的秘诀是微秒级的(因为是由硬盘支持的),所以其丢失的数据量会更少。
5.2.4. 多 Master 多 Slave 模式 - 同步双写
该模式是多 Master 多 Slave 模式的同步复制实现。所谓同步双写,指的是消息写入 master 成功后,master 会等待 slave 同步数据成功后才向 producer 返回成功 ACK,即 master 与 slave 都要写入成功后才会返回成功 ACK,也即双写。
该模式与异步复制模式相比,优点是消息的安全性更高,不存在消息丢失的情况。但单个消息的 RT 略高,从而导致性能要略低(大约低 10%)。
该模式存在一个大的问题:对于目前的版本,Master 宕机后,Slave 不会自动切换到 Master。
5.3. 消息的生产
5.3.1. 消息的生产过程
- Producer 发送消息之前,会先向 NameServer 发出获取消息 Topic 的路由信息的请求
- NameServer 返回该 Topic 的路由表及 Broker 列表
- Producer 根据代码中指定的 Queue 选择策略,从 Queue 列表中选出一个队列,用于后续存储消息
- Producer 对消息做一些特殊处理,例如,消息本身超过 4M,则会对其进行压缩
- Producer 向选择出的 Queue 所在的 Broker 发出 RPC 请求,将消息发送到选择出的 Queue
5.3.2. Producer 的负载均衡
❕ ^sjwepe
Producer 端,每个实例在发消息的时候,默认会轮询所有的 message queue 发送,以达到让消息平均落在不同的 queue 上。而由于 queue 可以散落在不同的 broker,所以消息就发送到不同的 broker 下,如下图:
图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。
5.3.3. Queue 选择算法
5.3.3.1. 轮询算法 - 默认
默认选择算法。该算法保证了每个 Queue 中可以均匀的获取到消息。
该算法存在一个问题:由于某些原因,在某些 Broker 上的 Queue 可能投递延迟较严重。从而导致 Producer 的缓存队列中出现较大的消息积压,影响消息的投递性能。
5.3.3.2. 最小投递延迟算法
该算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的 Queue。
如果延迟相同,则采用轮询算法投递。该算法可以有效提升消息的投递性能。
该算法也存在一个问题:消息在 Queue 上的分配不均匀。投递延迟小的 Queue 其可能会存在大量的消息。而对该 Queue 的消费者压力会增大,降低消息的消费能力,可能会导致 MQ 中消息的堆积。
5.4. 消息的存储
RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合完成的,消息真正的物理存储文件是 CommitLog,ConsumeQueue 是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个 Topic 下的每个 Message Queue 都有一个对应的 ConsumeQueue 文件。
RocketMQ 中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的 store 目录中。
abort:该文件在 Broker 启动后会自动创建,正常关闭 Broker,该文件会自动消失。若在没有启动
Broker 的情况下,发现这个文件是存在的,则说明之前 Broker 的关闭是非正常关闭。
checkpoint:其中存储着 commitlog、consumequeue、index 文件的最后刷盘时间戳
commitlog:其中存放着 commitlog 文件,而消息是写在 commitlog 文件中的
config:存放着 Broker 运行期间的一些配置数据
consumequeue:其中存放着 consumequeue 文件,队列就存放在这个目录中
index:其中存放着消息索引文件 indexFile
lock:运行期间使用到的全局资源锁
5.4.1. commitlog
说明:在很多资料中 commitlog 目录中的文件简单就称为 commitlog 文件。但在源码中,该文件被命名为 mappedFile。
5.4.1.1. 目录与文件
commitlog 目录中存放着很多的 mappedFile 文件,当前 Broker 中的所有消息都是落盘到这些 mappedFile 文件中的。mappedFile 文件大小为 1G(小于等于 1G),文件名由 20 位十进制数构成,表示当前文件的第一条消息的起始位移偏移量。
第一个文件名一定是 20 位 0 构成的。因为第一个文件的第一条消息的偏移量 commitlog offset 为 0 当第一个文件放满时,则会自动生成第二个文件继续存放消息。假设第一个文件大小是 1073741820 字节(1G = 1073741824 字节),则第二个文件名就是 00000000001073741824。 以此类推,第 n 个文件名应该是前 n-1 个文件大小之和。 一个 Broker 中所有 mappedFile 文件的 commitlog offset 是连续的
需要注意的是,一个 Broker 中仅包含一个 commitlog 目录,所有的 mappedFile 文件都是存放在该目录中的。即无论当前 Broker 中存放着多少 Topic 的消息,这些消息都是被顺序写入到了 mappedFile 文件中的。也就是说,这些消息在 Broker 中存放时并没有被按照 Topic 进行分类存放。mappedFile 文件是顺序读写的文件,所有其访问效率很高 无论是 SSD 磁盘还是 SATA 磁盘,通常情况下,顺序存取效率都会高于随机存取。
5.4.1.2. 消息单元
5.4.2. consumequeue
5.4.2.1. 目录与文件
为了提高效率,会为每个 Topic 在~/store/consumequeue 中创建一个目录,目录名为 Topic 名称。在该 Topic 目录下,会再为每个该 Topic 的 Queue 建立一个目录,目录名为 queueId。每个目录中存放着若干 consumequeue 文件,consumequeue 文件是 commitlog 的索引文件,可以根据 consumequeue 定位到具体的消息。
对比:Kafka 每个 topic 一个文件
5.4.2.2. 索引条目
每个 consumequeue 文件可以包含 30w 个索引条目,每个索引条目包含了三个消息重要属性:
消息在 mappedFile 文件中的偏移量 CommitLog Offset
消息长度
消息 Tag 的 hashcode 值
这三个属性占 20 个字节,所以每个文件的大小是固定的 30w * 20 字节。
一个 consumequeue 文件中所有消息的 Topic 一定是相同的。但每条消息的 Tag 可能是不同的。
5.4.3. 对文件的读写
5.4.3.1. 消息写入
一条消息进入到 Broker 后经历了以下几个过程才最终被持久化。
- Broker 根据 queueId,获取到该消息对应索引条目要在 consumequeue 目录中的写入偏移量,即 QueueOffset
- 将 queueId、queueOffset 等数据,与消息一起封装为消息单元
- 将消息单元写入到 commitlog
- 同时,形成消息索引条目
- 将消息索引条目分发到相应的 consumequeue
5.4.3.2. 消息拉取
当 Consumer 来拉取消息时会经历以下几个步骤:
- Consumer 获取到其要消费消息所在 Queue 的消费偏移量 offset,计算出其要消费消息的
消息 offset
消费 offset 即消费进度,consumer 对某个 Queue 的消费 offset,即消费到了该 Queue 的第几 条消息 消息 offset = 消费 offset + 1
- Consumer 向 Broker 发送拉取请求,其中会包含其要拉取消息的 Queue、消息 offset 及消息 Tag。
- Broker 计算在该 consumequeue 中的 queueOffset。 queueOffset = 消息 offset * 20 字节
- 从该 queueOffset 处开始向后查找第一个指定 Tag 的索引条目。
- 解析该索引条目的前 8 个字节,即可定位到该消息在 commitlog 中的 commitlog offset
- 从对应 commitlog offset 中读取消息单元,并发送给 Consumer
5.4.3.3. 性能提示⭐️🔴⭐️🔴
RocketMQ 中,无论是消息本身还是消息索引,都是存储在磁盘上的。其不会影响消息的消费吗?当然不会。其实 RocketMQ 的性能在目前的 MQ 产品中性能是非常高的。因为系统通过一系列相关机制大大提升了性能。
首先,RocketMQ 对文件的读写操作是通过mmap 零拷贝进行的,将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率。
其次,consumequeue 中的数据是顺序存放的,还引入了 PageCache 的预读取机制,使得对
consumequeue 文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能。
PageCache 机制,页缓存机制,是 OS 对文件的缓存机制,用于加速对文件的读写操作。一般来说,程序对文件进行顺序读写的速度几乎接近于内存读写速度,主要原因是由于 OS 使用 PageCache 机制对读写访问操作进行性能优化,将一部分的内存用作 PageCache。
写操作:OS 会先将数据写入到 PageCache 中,随后会以异步方式由 pdæ ush(page dirty æ ush) 内核线程将 Cache 中的数据刷盘到物理磁盘
读操作:若用户要读取数据,其首先会从 PageCache 中读取,若没有命中,则 OS 在从物理磁 盘上加载该数据到 PageCache 的同时,也会顺序对其相邻数据块中的数据进行预读取。
RocketMQ 中可能会影响性能的是对 commitlog 文件的读取。因为对 commitlog 文件来说,读取消息时会产生大量的随机访问,而随机访问会严重影响性能。不过,如果选择合适的系统 IO 调度算法,比如设置调度算法为 Deadline(采用 SSD 固态硬盘的话),随机读的性能也会有所提升。
5.5. 消息的消费
分布式专题-MQ-RocketMQ-3、消息的消费5.6. 幂等性
https://www.bilibili.com/video/BV1L4411y7mn?t=155.5&p=100
6. 面试题
1 |
|
6.1. RocketMQ 如何保证消息有序性
分布式专题-MQ-RocketMQ-3、消息的消费6.2. RocketMQ 和 Kafka 的区别和相同点
失败重试、延时定时消息、分布式事务、消息查询
6.3. RocketMQ 如何保证高可用⭐️🔴
❕ ^06jcfj
https://www.bilibili.com/video/BV1YT411z7GV?p=36&vd_source=c5b2d0d7bc377c0c35dbc251d95cf204
6.3.1. 架构层面
避免用单节点或者简单的一主一从架构,可以采取多主从的架构,并且主从之间采用同步复制的方式进行数据双写。
6.3.2. 刷盘策略
RocketMQ 默认的异步刷盘,可以改成同步刷盘 SYNC_FLUSH。
6.3.3. 生产消息的高可用
当消息发送失败了,在消息重试的时候,会尽量规避上一次发送的 Broker,选择还没推送过该消息的 Broker,以增大消息发送的成功率。
6.3.4. 消费消息的高可用
消费者获取到消息之后,可以等到整个业务处理完成,再进行 CONSUME_SUCCESS 状态确认,如果业务处理过程中发生了异常那么就会触发 broker 的重试机制。
对比 Rabbitmq:分布式专题-MQ-RabbitMQ-1、基本原理
6.4. RocketMQ 的存储机制
6.4.1. CommitLog
消息生产者发送消息到 broker,都是会按照顺序存储在 CommitLog 文件中,每个 commitLog 文件的大小为 1G。同时有一个异步线程监听 CommitLog 文件,有内容写入就会生成索引文件写入 ConsumeQueue 中。
CommitLog - 存储所有的消息元数据,包括 Topic、QueueId 以及 message
CosumerQueue - 消费逻辑队列:监听存储消息在 CommitLog 的 offset
IndexFile - 索引文件:存储消息的 key 和时间戳等信息,使得 RocketMq 可以采用 key 和时间区间来查询消息
也就是说,rocketMq 将消息均存储在 CommitLog 中,并分别提供了 CosumerQueue 和 IndexFile 两个索引,来快速检索消息。
6.4.2. ConsumeQueue
6.4.3. IndexFile
https://www.modb.pro/db/181270
6.4.4. ReputMessageServiceThreadLoop
每 1ms 扫描一次 CommitLog
文件,生成新插入内容的索引,写入到 ConsumeQueue
中
6.5. 为什么 Rocketmq 性能高⭐️🔴⭐️🔴
6.5.1. 顺序写
顺序写比随机写的性能会高很多,不会有大量寻址的过程
6.5.2. 异步刷盘
相比较于同步刷盘,异步刷盘的性能会高很多
6.5.3. 零拷贝
使用 mmap 的方式进行零拷贝,提高了数据传输的效率
6.5.4. CompletableFuture 提升同步双写性能
FutureTask
- 🚩 - CompleteFutureTask - 🏡 2023-04-04 14:45
#todo
6.5.5. Commitlog 写入时锁的配置
默认是异步刷盘,所以默认使用的是自旋锁
6.5.6. 读写分离之对外内存机制
6.6. 让你来设计一个消息队列,你会怎么设计
6.6.1. 数据存储角度
理论上,从速度来看,分布式文件系统>分布式 KV(持久化)>数据库,而可靠性却截然相反,如果追求性能可以基于文件系统的顺序写。
6.6.2. 高可用角度
分区 + 复制 + 选举的思想
6.6.3. 网络框架角度
选用高效的 Netty 框架,producer 同步异步发送消息,consumer 同步异步接收消息。同步能够保证结果,异步能够保证性能。
6.7. 有几百万消息持续积压几小时,怎么解决⭐️🔴
发生了线上故障,几千万条数据在 MQ 里积压很久。是修复 consumer 的问题,让他恢复消费速度,然后等待几个小时消费完毕?这是个解决方案。不过有时候我们还会进行临时紧急扩容。
一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟是 18 万条。1000 多万条,所以如果积压了几百万到上千万的数据,即使消费者恢复了,也需要大概 1 小时的时间才能恢复过来。
一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍或者 20 倍的 queue 数量。然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。
这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
等快速消费完积压数据之后,再恢复原先部署架构,重新用原先的 consumer 机器来消费消息。
6.8. 什么是路由注册?RocketMQ 如何进行路由注册?
RocketMQ 的路由注册是通过 Broker 向 NameServer 发送心跳包实现的,首先 Broker 每隔 30s 向 NameServer 发送心跳语句,NameServer 每隔 10 秒扫描 BrokerLiveTable
,检查上次心跳时间与当前时间时差是否超过 120 秒,超过则路由踢除。
RocketMQ 的路由发现采用的是 Pull 模型。当 Topic 路由信息出现变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取主题最新的路由。默认客户端每 30 秒会拉取一次最新的路由。
对比 Nacos:服务注册与发现-6、Nacos
6.9. RocketMQ 的总体架构,以及每个组件的功能?
RocketMQ 一共由四个部分组成:NameServer、Broker、Producer、Consumer,它们分别对应着发现、存、发、收四个功能。这四部分的功能很像邮政系统,Producer 相当于负责发送信件的发件人,Consumer 相当于负责接收信件的收件人,Broker 相当于负责暂存信件传输的邮局,NameServer 相当于负责协调各个地方邮局的管理机构。一般情况下,为了保证高可用,每一部分都是以集群形式部署的。
6.10. 讲一讲 RocketMQ 中的分布式事务及实现
事务-2、分布式事务6.11. 丢数情况
6.12. 讲一讲 RocketMQ 中事务回查机制的实现
6.13. 订阅关系不一致问题
❕ ^nsewfi
RocketMQ 同一个消费组内的消费者订阅不同 tag,会有问题吗?
https://www.modb.pro/db/214714
会出现丢消息的问题
6.13.1. 不一致情况分 3 种
- 消费组 1 的 Consumer1 和 Consumer2 都订阅了 Topic1,但是订阅的 Tag 不一致。
- 消费组 2 的 Consumer1 和 Consumer2 订阅的 Topic 不一致。
- 消费组 3 的 Consumer1 和 Consumer2 订阅的 Topic 和 Tag 都一致,但是订阅 Tag 的顺序不一致。
7. 实战经验
8. 参考与感谢
❕ ^axhrcj
8.1. 尚硅谷
8.1.1. 视频
8.1.2. 资料
1 |
|
8.2. 黑马程序员
8.2.1. 视频
8.2.2. 资料
https://github.com/DillonDong/notes/tree/master/RocketMQ
1 |
|
8.3. 马士兵 - 带面试题
8.3.1. 视频 1
8.3.2. 资料 1
1 |
|
8.3.3. 视频 2
https://www.bilibili.com/video/BV1kB4y1D7Q2?p=23&vd_source=c5b2d0d7bc377c0c35dbc251d95cf204
8.3.4. 资料 2
1 |
|
8.4. 网络笔记
https://www.cnblogs.com/starcrm/p/13063833.html
https://www.codenong.com/cs109783051/