这是我参与更文挑战的第 26 天,活动详情查看: 更文挑战
日积月累,水滴石穿 ?
持久化
持久化可以提高 RabbitMQ
的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。
交换机的持久化
交换器的持久化是在声明交换器的时候,将 durable
属性设置为 true
。如果交换器不设置持久化,那么在 RabbitMQ
服务重启之后,相关的交换器就会被删除。对于长期使用的交换器来说,建议将其置为持久化。
原生Api
/**
* 参数1:交换机名称
* 参数2:交换机类型
* 参数3:是否持久化 默认 false
*/
channel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT,true);
复制代码
SpringBoot
@Bean
public TopicExchange payTopicExchange(){
/**
* 参数1:交换机类型
* 参数2:是否持久化 true是, 默认为 true
* 参数3:是否自动删除 true是, 默认为 false
*/
return new TopicExchange(exchangeMame,true,false);
}
复制代码
队列持久化
队列的持久化也是在声明队列的时候,将durable
参数设置为true
。如果队列不设置持久化,那么 RabbitMQ
服务重启之后,队列就会被删除,既然队列都不存在了,队列中的消息也会丢失。
原生Api
/**
* 参数1:String queue 队列名称 如果队列不存在会自动创建
* 参数2:boolean durable 队列是否持久化 true 持久化 false 不持久化 默认:false
* 参数3:boolean exclusive 是否独占队列 true 独占队列 false 不独占 默认:true
* 参数4:boolean autoDelete 是否在消费完成后自动删除 true 自动删除 默认:true
* 参数5:Map<String, Object> arguments 额外附加参数
*/
channel.queueDeclare("hello-1",true,false,false,null);
复制代码
SpringBoot
@Bean
public Queue dlQueue(){
/**
* 参数1:队列名称
* 参数2:是否持久化 默认:true
*/
return new Queue(dlQueue,true);
}
复制代码
消息的持久化
队列的持久化能保证其本身不会因重启、关闭、宕机的情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将消息设置为持久化。
在发送消息的时候,通过将BasicProperties
中的属性deliveryMode(投递模式)设置为 2
即可实现消息的持久化。
原生Api
channel.basicPublish("exchangeName" , "routingKey",
new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.build(),
"ddf".getBytes());
复制代码
SpringBoot
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
//设置消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
};
rabbitTemplate.convertAndSend("exchangeName","routingKey","消息内容",
messagePostProcessor);
复制代码
可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ 的性能。写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理,以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做权衡。
一般的系统也用不到对消息进行持久化。不过交换机和队列的持久化还是要支持的。
上述几种方式我们只是保证了消息发送到交换机、队列不会由于RabbitMQ的重启、关闭、宕机的情况而丢失消息。但如果消费者在消费的时候出现问题了呢?
补充
对于消费者来说,如果在订阅消息的时候,将autoAck
设置为了true
,消费者接收到相关消息后,还没有正式处理消息逻辑之前,就出现了异常挂掉了,但消息已经被自动确认了,这样也算数据丢失。这种情况的姐姐方式很多。
- 将
autoAck
参数设置为false
,进行手动确认。 - 将消息重试并设置死信队列(不在举例,可以看笔者前面几篇文章)。
原生aip
/**
* 参数1:队列名称
* 参数2:消息自动确认 true 消费者自动向mq确认消息已经消费 false 不会自动确认
* 参数3:消费者的回调函数
*/
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费-2:" + new String(body));
/**
* 参数1:确认队列中那个消息被消费了
* 参数2:是否开启多个消息同时确认 true 开启
*/
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
复制代码
SpringBoot
增加配置
spring:
rabbitmq:
listener:
simple:
# 手动确认
acknowledge-mode: manual
复制代码
编码
@RabbitListener(queues = "${mq.queueBinding.queue}")
public void infoConsumption(String data, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
/**
* 参数1:确认队列中那个消息被消费了
* 参数2:是否开启多个消息同时确认 true 开启
*/
channel.basicAck(tag,false);
}
复制代码
参考文献
朱忠华老师《RabbitMQ实战指南》一书
- 如你对本文有疑问或本文有错误之处,欢迎评论留言指出。如觉得本文对你有所帮助,欢迎点赞和关注。