队列的结构

通常队列由rabbit_amqqueue_processbacking_queue两部分组成,前者负责协议相关的消息处理,即接受生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。

四种状态

如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。而当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断地流动,消息的状态会不断发生变化。RabbitMQ中的队列消息可能会处于以下四种状态:

  • alpha:消息内容(包括消息体、属性和headers)和消息索引都存储在内存中
  • beta:消息内容保存在磁盘中,消息索引保存在内存中
  • gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都有
  • delta:消息内容和索引都在磁盘中

其中,gamma状态的消息是只有持久化的消息才会有的状态。

RabbitMQ在运行时会根据统计的消息传送速率定期计算一个当前内存中能够保存的最大消息数量,如果alpha状态的消息数量大于此值时,就会引起消息的状态转换,多余的消息可能会转换到beta、gamma或者delta状态。其中,delta状态需要执行两次I/O操作才能读取到消息,一次是读消息索引,一次是读消息内容;而对于beta和gamma状态都只需要一次I/O操作就可以读取到消息。

对于普通的没有设置优先级和镜像的队列来说,backing_queue内部通过5个子队列Q1、Q2、Delta、Q3和Q4来体现消息的各个状态,其中Q1、Q4只包含alpha状态的消息,Q2、Q3包含beta和gamma状态的消息,Delta只包含delta状态的消息,一般情况下,消息按照Q1->Q2->Delta->Q3->Q4这样的顺序步骤进行流动。如图:

从Q1到Q4基本经历了内存到磁盘,再从磁盘到内存的过程,如此可以在队列负载很高的情况下,能够通过将一部分消息由磁盘保存来节省内存空间,而在负载降低的时候,这部分消息又渐渐回到内存被消费者获取,使得整个队列具有很好的弹性。

通常在负载正常时,如果消息被消费的速度不小于接受新消息的速度,对于非持久化的消息,通常只会处于alpha状态,而对于持久化的消息,一定会进入gamma状态,并且在开启生产端确认机制时,只有到了gamma状态时才会确认该消息已被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。

在系统负载较高时,这些消息若不能很快的被消费掉,就会进入到很深的队列中去,这样会增加处理每个消息的平均开销,因为要花更多的时间和资源处理堆积的消息,如此用来处理新流入的消息的能力就会降低,导致恶性循环。应对这一问题,RabbitMQ有一套流控机制,在下文会介绍。

惰性队列

队列具有两种模式,一个是default,一个是lazy。默认情况下,当生产者将消息发送到RabbitMQ中的时候,队列中的消息会尽可能地存储在内存之中,这样可以更加快速地将消息发送给消费者。当RabbitMQ需要释放内存时,将消息换入磁盘会耗费较长时间,也会阻塞队列的操作,进而无法接受新的消息。

RabbitMQ从3.6.0版本开始引入了惰性队列的概念,即将接受到的消息直接存入文件系统中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。惰性队列虽然减少了内存的消耗,但是增加了I/O的使用,因此对于持久化的消息,本身就不可避免磁盘I/O,使用惰性队列是较佳的选择。要注意的是,如果惰性队列中存储的是非持久化的消息,重启之后消息一样会丢失。

内存及磁盘告警

当内存使用超过配置的阈值或者磁盘剩余空间低于配置的阈值时,RabbitMQ会暂时阻塞客户端的连接并停止接收从客户端发来的消息。被阻塞的Connection的状态要么是blocking,要么是blocked,前者对应于并不试图发送消息的Connection,后者对应于一直有消息发送的Connection,这种状态下的Connection会被停止发送消息。注意在一个集群中,如果一个Broker节点的内存或者磁盘受限,都会引起整个集群中所有的Connection被阻塞。

内存告警

默认情况下内存阈值为0.4,表示当RabbitMQ使用的内存超过40%时,会产生内存告警并阻塞所有生产者的连接。一旦告警被解除(有消息被消费或者从内存转储到磁盘等情况的发生),一切都会恢复正常。

在某个Broker快达到内存阈值时,会先尝试将队列中的消息换页到磁盘以释放内存空间。默认情况下,在内存到达内存阈值的50%时会进行换页动作。

磁盘告警

当剩余磁盘空间低于确定的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务崩溃。默认情况下,磁盘阈值为50MB。RabbitMQ会定期检测磁盘剩余空间,检测的频率与上一次执行检测到的磁盘剩余空间大小有关,随着磁盘剩余空间与磁盘阈值的接近,检测频率会有所增加。

流控

当RabbitMQ出现内存或者磁盘资源达到阈值时,会触发流控机制,阻塞生产者的Connection,让生产者不能继续发送消息,直到内存或者磁盘资源得到释放。RabbitMQ基于Erlang开发,一个消息的生命周期中,会涉及多个进程间的转发,这些Erlang进程之间不共享内存,每个进程都有自己独立的内存空间,如果没有合适的流控机制,可能会导致某个进程占用内存过大,导致OOM。因此,要保证各个进程占用的内存在一个合理的范围。

RabbitMQ的流控机制的原理实质上就是通过监控各进程的mailbox,当某个进程负载过高来不及接收消息时,这个进程的mailbox就会开始堆积消息,当堆积到一定量时,就会阻塞住上游进程让其不得接收新消息,从而慢慢上游进程的mailbox也会开始积压消息,到了一定的量也会阻塞上游的上游的进程,最后就会使得负责网络数据包接收的进程阻塞掉,暂停接收数据。

从Connection到Channel到队列再到消息持久化存储形成了一个完整的流控链:

其中的各个进程如下所述:

  • rabbit_reader:Connection的处理进程,负责接收、解析AMQP协议数据包等
  • rabbit_channel:Channel的处理进程,负责处理AMQP协议的各种方法、进行路由解析等
  • rabbit_amqqueue_process:队列的处理进程,负责实现队列的所有逻辑
  • rabbit_msg_store:负责实现消息的持久化

对于处于整个流控链中的任意进程,只要该进程阻塞,上游的进程必定全部被阻塞。也就是说,如果某个进程达到性能瓶颈,必然会导致上游所有的进程被阻塞。所以我们可以利用流控机制的这个特点找出瓶颈所在。

一个Connection触发流控时会处于flow的状态,也就意味着这个Connection的状态每秒在blockedunblocked之间来回切换数次,这样可以将消息发送的速率控制在服务器能够支撑的范围之内。

镜像队列

RabbitMQ的集群在默认模式下,队列实例只存在于一个节点上,既不能保证该节点崩溃的情况下队列还可以继续运行,也不能线性扩展该队列的吞吐量。虽然RabbitMQ的队列实际只会在一个节点上,但元数据可以存在于各个节点上。举个例子来说,当创建一个新的交换器时,RabbitMQ会把该信息同步到所有节点上,这个时候客户端不管连接到哪个RabbitMQ节点,都可以访问到这个新的交换器,也就能找到交换器下的队列:

RabbitMQ内部的元数据主要有:

  1. 队列元数据:队列名称和属性
  2. 交换器元数据:交换器名称,类型和属性
  3. 绑定元数据:路由信息

尽管交换器和绑定关系能够在单点故障问题上幸免于难,但是队列和其上存储的消息却不行,它们仅存在于单个节点上。引入镜像队列的机制,可以将队列镜像到集群中的其它Broker节点之上,如果集群中的一个节点失效了,队列能够自动地切换到镜像中的另一个节点上以保证服务的可用性。通常情况下,针对每一个配置镜像的队列都包含一个主拷贝和若干个从拷贝,相应架构如下:

除了发送消息外的所有动作都只会向主拷贝发送,然后再由主拷贝将命令执行的结果广播给各个从拷贝,从拷贝实际只是个冷备(默认的情况下所有RabbitMQ节点上都会有镜像队列的拷贝),如果使用消息确认模式,RabbitMQ会在主拷贝和从拷贝都安全的接受到消息时才通知生产者。从这个结构上来看,如果从拷贝的节点挂了,实际没有任何影响,如果主拷贝挂了,那么会有一个重新选举的过程,这也是镜像队列的优点,除非所有节点都挂了,才会导致消息丢失。重新选举后,RabbitMQ会给消费者一个消费者取消通知(Consumer Cancellation),让消费者重连新的主拷贝。

实现原理

不同于普通的非镜像队列,镜像队列的实现结构如下:

所有对镜像队列主拷贝的操作,都会通过GM同步到各个slave节点,Coodinator负责组播结果的确认。GM是一种可靠的组播通信协议,该协议能够保证组播消息的原子性,即保证组内的存活节点要么都收到消息要么都收不到。

GM的组播并不是由master来负责通知所有slave的(目的是为了避免master压力过大,同时避免master失效导致消息无法最终ack),RabbitMQ把所有节点组成一个链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新节点上;当有节点失效时,相邻的节点会接管以保证本次广播的消息会复制到所有的节点。操作命令由master发起,也由master最终确认通知到了所有的slave,而中间过程则由slave接力的方式进行消息传播。

参考资料