我们要保证RabbitMQ消息的可靠性,首先要了解到以下的一些基本概念。
死信队列
- 死信队列:DLX,dead-letter-exchange
- 利用DLX,当消息在一个队列中变成死信后,它能重新publish到另外一个指定的Exchange,这个Exchange就是DLX
消息成为死信的情况
- 消息被拒绝(basic.reject/basic.nack),并且requeue=false
- 消息TTL过期
- 队列达到最大长度
保证可靠性需要解决的一些问题
如何保证顺序消费?
- 拆分多个queue,每个queue一个consumer,就是多一些queue,每个节点只消费一个queue,但是吞吐量会下降,但是可以通过在消费者内部采用多线程的方式去消费。
- 使用分布式锁,保证多个节点同时只有一个节点在执行。
如何保证发送端到队列的可靠性?
我们开启事务或者是其confirm,但是考虑到事务会导致性能很低下,所以我们一般会考虑使用confirm,下面我们介绍使用confirm需要注意的一些点
- 如果消息没有到exchange,则confirm回调,ack=false;
- 如果消息到达exchange,则confirm回调,ack=true;
- exchange到queue成功,则不回调return;
- exchange到queue失败,则回调return(需设置mandatory=true,否则不会回调,消息就丢失了);
所以我们不能简单的只通过ConfirmCallback的ack返回值为true就判定当前消息发送成功了
为了保证发送端消息的可靠性我们可以采用这种方式:
我们在发送消息的同时把相应的数据和消息ID存入到数据库当中,当我们接收到ReturnCallback的时候对数据库的数据的状态直接改为fail。然后再接收ConfirmCallback返回的ack,如果为false的话我们直接改数据库的为fail,如果返回true要判断ReturnCallback的状态。
因为会出现ReturnCallback返回失败的信息但是ConfirmCallback返回成功的情况,而且ReturnCallback 比 ConfirmCallback 先回调,所以我们要先判断ReturnCallback 的值再判断ConfirmCallback的值。
然后我们可以通过调度任务的方式对数据库中已经标识为失败的数据进行重新推送,建议设置一个阈值,如果超过三次都不成功的就不要重新推送了,对这些数据通过其他方式进行人为干预补偿。
如何保证队列的可靠性?:
因为我们的MQ可能是会出现宕机的情况的,所以我们要考虑到持久化的一个概念。需要注意的是交换器、队列、消息我们都需要持久化,持久化的配置我们会在下面的代码中有体现.
如何保证队列到消费端消息的可靠性?
关闭RabbitMQ自动ack(改成手动)。
- 手动ACK:消息一旦被接收,消费者自动发送ACK(适用于不太重要的消息,丢失也没影响,适用自动ACK会比较方便,而且对系统的损耗也没那么大)
- 自动ACK:消息接收后,不会发送ACK,需要手动调用(适用于重要的消息,不容丢失,那么最好在消费完成后再手动ACK,防止消息的丢失)
消息重试
消息消费失败后,我们可以设置其重试的次数,具体的配置见下文的配置文件。
保证消息不重复消费(幂等性)
每个消息用一个唯一标识来区分,消费前先判断标识有没有被消费过,若已消费过,则直接ACK。
多次消费失败后的处理方式
我们可以通过一个死信队列对我们的数据进行一个转发处理。当一条数据消费失败的时候我们记录其次数,如果次数超过我们系统指定的次数后,我们直接返回拒绝的应答,然后进入队列转发,此消息将会被转发到另外一个队列当中。
消费消息的三种回执方式
- basicAck
表示成功确认,使用此回执方法后,消息会被rabbitmq broker删除
void basicAck(long deliveryTag, boolean multiple)
复制代码
- deliveryTag表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。
- multiple表示是否批量确认,值为true则会一次性ack所有小于当前消息deliveryTag的消息。
- basicNack
表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列,如果选择不重新投递并且已经绑定了死信转发的队列会直接进行转发。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
复制代码
- deliveryTag表示消息投递序号;
- multiple表示是否批量确认;
- requeue表示是否将消息重新入队列。
3.basicReject
拒绝消息,与basicNack的区别在于不能批量操作
void basicReject(long deliveryTag, boolean requeue)
复制代码
- deliveryTag表示消息投递序号;
requeue表示是否将消息重新入队列。
代码样例
- RabbitMQ相关配置如下
- 队列配置如下
注意:此处Queue和Exchange中的durable参数我们都设置成了true,就是开启持久化的意思,如果想开启消息的持久化我们需要设置deliveryMode=2,但是Spring Boot中这个参数已经默认帮我们设置了,所以这里就无需设置了。
- 登录RabbitMQ的管理后台,看到Queue如下
可以看到有两个队列存在,并且看到DEAD_QUEUE里是有DLX和DLK等标识
- ReturnCallback相关代码如下
注意:这里我们只是简单的打印日志,正常情况下我们是需要记录一下失败的消息,方便后面的消息重发。
- ConfirmCallback相关代码如下
注意:这里我们只是简单的打印日志,正常情况下我们是需要记录一下失败的消息,方便后面的消息重发。
- 消息发送代码如下
- 发送完消息后我们可以收到ConfirmCallback的回调,如下
- 但是如果想看到ReturnCallback的回调会比较困难,我们可以模拟一下,我们首先要对交换器和路由进行解绑操作,操作如下
- 我们就可以看到ReturnCallback打印的日志,也可以确定ReturnCallback是在ConfirmCallback之前回调的,具体如下:
- 我们把上面的绑定关系进行还原后继续来看接收消息的代码:
说明:这里主要是利用重试机制进行重试,但是当次数超过三次后,我们会对消息进行拒绝,但是因为我们设置了私信转发队列,所以拒绝后消息会转发到另外一个队列当中。
- 相关的日志如下:
到此我们简单的Demo已经讲完了,中间可动态调整的东西其实很多,需要大家根据具体的业务来决定具体的实现。
感谢各位大佬的❤️关注+点赞❤️,原创不易,鼓励笔者创作更好的文章