本文正在参加「Java主题月 – Java Debug笔记活动」,详情查看活动链接
问题
之前做企业微信群发消息的时候,用的RabbitMq消息中间件,消费端代码如下,消费的时候如果有异常直接捕获,看着好像没啥问题。
@Component
@RabbitListener(queues = "cn.thinkjoy.wxcenter.article")
public class WxArticleSendBusiness {
private Logger logger = LoggerFactory.getLogger(WxArticleSendBusiness.class);
@RabbitHandler
public void receive(String json){
logger.info("mq接收到消息:{}",json);
try{
if(json!=null){
//业务处理
}
}catch (Exception e){
{
logger.info("MQ发送消息出现异常");
e.printStackTrace();
}
}
}
}
复制代码
那我们先来看看RabbitMq的消费端的消息确认,RabbitMQ提供消息确认机制。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者发回ack信号后才从内存(和磁盘,如果有持久化消息)中移去消息。
RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。那么问题来了如果消息异常,我也没有显示的调用noAck=false,而且我消费端是一直在线的没有断开,那么我这个异常的消息就会一直重复消费,一直打印异常的消息日志!
发生异常的是在周五的一次测试消息调用,导致消费的时候抛出异常,而且这个消息是其他部门调用的测试消息体,下周一我收到测试服务器硬盘容量不足,都超过90%的使用空间了,我打开日志一看,好家伙测试环境一共50G的硬盘几乎都满了,我打开RabbitMQ的消费日志发现一直在打印错误日志,而且还是重复的错误,我当时第一反应是生产者是不是定时任务一直在发送错误的消息体,我先清了日志,去问开发他们说只有周五发送了一次消息,没有定时任务发送,当时我也才接触RabbitMQ对其消费也不明白原理,最后为了搞懂RabbitMQ的去啃了《RabbitMQ实战指南》这是后话,那么知道是异常没有处理需要显示调用通知mq将异常的消息从mq中干掉,代码如下:
@Component
@RabbitListener(queues = "cn.thinkjoy.wxcenter.article")
public class WxArticleSendBusiness {
@RabbitHandler
public void receive(String json, Channel channel, Message message) throws IOException {
logger.info("mq接收到消息:{}", json);
try {
if (json != null) {
//业务处理
}
} catch (Exception e) {
logger.info("mq接收到消息异常:"+e);
e.getStackTrace();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
复制代码
在异常的时候显示调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
DeliveryTag为Chanel发出的消息序号,每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。
总结
当时这个bug还挺严重的,但是我写成水文了!!!