消息返回

mandatoryimmediate是发布消息时的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。mandatory参数告诉服务器该消息至少能路由到一个队列中,而immediate参数告诉服务器要投递的队列必须有消费者。

mandatory

mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者;当mandatory参数设置为false时,出现上述情形,则消息直接被丢弃。

对于没有被正确路由而返回给生产者的消息,可以通过给channel添加监听器获取到那些消息。

immediate

immediate参数设为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return命令返回给生产者。

RabbitMQ从3.0的版本开始去掉了对immediate参数的支持。

备份交换器

生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失;如果设置了mandatory参数,那么需要添加监听器的编程逻辑,生产者的代码会很复杂。如果既不想丢失消息,也不想使代码复杂化,那么可以使用备份交换器,将未被路由的消息存储在RabbitMQ中,需要的时候再去处理。

备份交换器和普通的交换器没有什么太大的区别,声明一个备份交换器后,在声明普通交换器时添加alternate-exchange参数即可建立它们之间的联系。为了方便使用,备份交换器建议设置为fanout类型的。

过期时间(TTL)

RabbitMQ可以对消息和队列设置TTL。

设置消息的TTL

如果不设置TTL,则表示此消息不会过期,而如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。设置消息的TTL有两种方式:

  1. 通过队列属性设置,此时队列中所有消息都有相同的过期时间。
  2. 对消息本身进行单独设置,每条消息的TTL可以不同。

如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准,消息在队列中的生存时间一旦超过设置的TTL时,就会变成死信。

还要注意的是,对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,因为队列中己过期的消息肯定在队列头部;而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

设置队列的TTL

在声明队列时可以通过x-expires参数控制队列被自动删除前处于未使用状态的时间,未使用的意思是队列上没有任何消费者,队列也没有被重新声明,并且在过期时间段内也未调用过Basic.Get命令。队列的TTL不能像消息一样设置为0。

死信队列

当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX(Dead-Letter-Exchange,死信交换器),绑定DLX的队列就称之为死信队列。消息变成死信一般是由于以下几种情况:

  • 消息被拒绝(Basic.Reject / Basic.Nack),并且设置requeue参数为false
  • 消息过期
  • 队列达到最大长度值

DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到死信队列中。

延迟队列

延迟队列存储的对象是对应的延迟消息,即当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。RabbitMQ本身并没有直接支持延迟队列的功能,但是可以通过DLX和TTL模拟出延迟队列的功能。

生产者将消息发送到过期时间为n毫秒的队列中,这个队列并未有消费者来消费消息,当过期时间到达时,消息会通过死信交换器转发到死信队列中,而消费者从死信队列中消费消息。这个时候就达到了“生产者发布了消息,过了n毫秒后消费者消费了消息”的延迟队列的效果。

优先级队列

可以将队列声明为优先级队列,即在创建队列的时候添加参数x-max-priority指定最大的优先级,值为0-255,此时的规则如下:

  • 优先级高的消息具备优先被消费的特权
  • 没有指定优先级的消息会将优先级以0对待
  • 对于超过优先级队列所定最大优先级的消息,优先级以最大优先级对待
  • 对于相同优先级的消息,后进的排在前面

如果在消费者的消费速度大于生产者的速度且Broker中没有消息堆积的情况下, 对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

持久化

持久化用以解决因为服务器的异常崩溃导致的消息丢失。RabbitMQ的持久化分为交换器的持久化、队列的持久化和消息的持久化:

  • 交换器持久化:交换器的持久化是通过在声明交换器时将durable参数置为true实现的。
  • 队列持久化:队列的持久化是通过在声明队列时将durable参数置为true实现的。如果队列不设置持久化,那么在RabbitMQ服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。但即使设置了该队列为持久化的,也不能保证内部存储的消息不会丢失。
  • 消息持久化:要确保消息不会丢失,需要通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。

就算将交换器、队列、消息都设置了持久化之后也不能百分百保证数据不会丢失。比如说消费者在订阅消费队列时将autoAck参数设置为true,那么当消费者接受到相关消息之后,还没来得及处理就宕机了,这样数据就丢失了。要解决这个问题可以将autoAck参数设置为false,并进行手动确认。

其次,在持久化的消息正确存入RabbitMQ之后,RabbitMQ并不会为每条消息都进行同步存盘(调用内核的fsync方法),此时仅仅保存到操作系统缓存之中,如果这时候发生了宕机,那么消息将会丢失。要解决这个问题可以使用RabbitMQ的镜像队列机制。

生产者确认

当消息的生产者将消息发送出去之后,如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,这样生产者也就不知道消息有没有正确到达服务器。RabbitMQ针对这个问题,提供了两种解决方式:

  1. 通过事务机制实现
  2. 通过发送方确认(publisher confirm)机制实现

事务机制

RabbitMQ客户端中与事务机制相关的方法有三个:channel.txSelectchannel.txCommitchannel.txRollback。其中channel.txSelect用于将当前的信道设置成事务模式,channel.txCommit用于提交事务,channel.txRollback用于事务回滚。

当开启事务并且提交成功,那么消息就一定到达了RabbitMQ中了,如果在事务提交之前由于RabbitMQ异常崩溃或者其它原因抛出异常,这个时候便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚,与此同时可以进行消息重发。

发送方确认机制

事务机制会严重降低RabbitMQ的消息吞吐量,而发送方确认机制则更加轻量级。

生产者通过Confirm.Select命令将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),当消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.ACK)给生产者(包含消息的唯一ID),生产者可以通过回调方法来处理该确认消息。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。如果RabbitMQ因为内部错误导致消息丢失,就会发送一条Basic.Nack命令,生产者同样也可以在回调方法中处理该命令。

RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的multiple参数(默认为true,也就是批量确认),表示到这个序号之前的所有消息都已经得到了处理。

事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。而发送方确认机制如果使用同步串行的编程方式实现,其实并没有比事务机制好多少,但是该机制的优势是在于并不一定需要同步确认,对此有以下两种改进方案:

  • 批量confirm方法:每发送一批消息后,调用channel.waitForConfirms方法,等待服务器的确认返回(该方法会阻塞到最后一条消息得到确认或者得到nack才结束)。
  • 异步confirm方法:提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理。

对于批量confirm方法,客户端程序需要定期或者定量(达到多少条),亦或者两者结合起来调用channel.waitForConfirms来等待RabbitMQ的确认返回,但是如果出现返回Basic.Nack或者超时情况时,客户端需要将这一批次的消息全部重发。

而对于异步confirm方法,它的编程比较复杂一些,但因为不会被阻塞,所以性能也略好一些。

消费者确认

上面的生产者确认是保证生产者的消息正确的到达了服务器,而为了保证消息从队列可靠地到达消费者,消费者在订阅队列时可以指定autoAck参数,当autoAck等于true时,RabbitMQ会自动把发送出去的消息设置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费了这些消息;当autoAckfalse时,RabbitMQ会等待消费者显示地回复确认信号后才从内存(或者磁盘)中移除消息。RabbitMQ不会为未确认的消息设置过期时间,除非消费此消息的消费者已经断开连接,此时RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者。

消费者拒绝

在消费者接受到消息后,如果想明确拒绝当前的消息而不是确认,那么可以使用Basic.Reject命令,其中如果将参数requeue设置为false,则RabbitMQ立即会把消息从队列中删除,否则,RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。

Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令,将multiple设为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的信息。

注意:如果requeue设置为false,那么可以启用死信队列的功能。

Basic.Recover命令用来请求RabbitMQ重新发送还未被确认的消息,也具备可重入队列的特性。如果将其参数requeue设置为true,则未被确认的消息会被重新加入到队列中,这样对于同一条消息来说,可能会被分配给与之前不同的消费者。如果将requeue参数设置为false,那么同一条消息会被分配给与之前相同的消费者。

消息分发

当RabbitMQ队列拥有多个消费者时,队列收到的消息将以轮询的分发方式发送给消费者,但轮询的分发机制在各台机器性能差异较大时效率很低,那么就可以使用Basic.Qos命令限制信道上的消费者所能保持的最大未确认消息的数量,此时RabbitMQ会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么RabbitMQ就不会向这个消费者再发送任何消息,直到消费者确认了某条消息后,RabbitMQ将相应的计数减一,之后消费者才可以继续接收消息。

可靠性总结

为了提升数据的可靠性,从上文分析可以总结出以下几个途径:

  • 设置mandatory参数或者备份交换器来处理未能正确路由到队列的消息。
  • 设置事务机制或者publisher confirm机制保证生产者的消息正确的到达了RabbitMQ。
  • 设置交换器、队列和消息为持久化。
  • 设置消费端对应的autoAck参数为false,在消费完消息后手动确认。

参考资料