官网:www.rabbitmq.com/tutorials/t…
在Channel启用发布者确认
发布者确认是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下不启用它们。使用 confirmSelect 方法在Channel级别启用发布者确认:
Channel channel = connection.createChannel();
channel.confirmSelect();
复制代码
必须在您希望使用发布者确认的每个Channel上调用此方法。确认应该只启用一次,而不是为每个发布的消息启用。
策略 #1: 单独发布消息
让我们从最简单的使用确认发布的方法开始,即发布消息并同步等待其确认:
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
// uses a 5 second timeout
channel.waitForConfirmsOrDie(5_000);
}
复制代码
在前面的示例中,我们像往常一样发布一条消息,并使用 Channel#waitForConfirmsOrDie(long) 方法等待其确认。一旦消息得到确认,该方法就会返回。如果消息在超时内没有得到确认或者它被 nack-ed(意味着代理由于某种原因无法处理它),该方法将抛出异常。异常的处理通常包括记录错误消息和/或重试发送消息。
这种技术非常简单,但也有一个主要缺点:它显着减慢了发布速度,因为消息的确认会阻止所有后续消息的发布。这种方法不会提供超过每秒数百条已发布消息的吞吐量。尽管如此,这对于某些应用程序来说已经足够了。
策略 #2: 批量发布消息
int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
复制代码
与等待单个消息的确认相比,等待一批消息得到确认大大提高了吞吐量(使用远程 RabbitMQ 节点最多 20-30 次)。一个缺点是我们不知道在失败的情况下到底出了什么问题,所以我们可能不得不在内存中保留一整批来记录一些有意义的东西或重新发布消息。而且这个方案还是同步的,所以会阻塞消息的发布。
策略 #3: 异步处理发布者确认
代理异步确认已发布的消息,只需要在客户端注册一个回调即可收到这些确认的通知:
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
});
复制代码
有 2 个回调:一个用于确认消息,另一个用于 nack-ed 消息(可以被代理视为丢失的消息)。每个回调有2个参数:
- sequenceNumber:标识已确认或已确认消息的编号。
- multiple:这是一个布尔值。如果为 false,则仅confirmed/nack-ed 一条消息,如果为 true,则confirmed/nack-ed 所有具有较小或相等序列号的消息。
代理异步监听已发布的消息消费情况,也只需要在客户端注册一个回调即可收到这些返回的通知:
channel.addReturnListener(r -> {
System.err.println("===========================");
System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
System.err.println("Return主题:" + new String(r.getBody()));
System.err.println("===========================");
});
复制代码
r 参数为一个 Return 对象,该监听发生在 Broker 的 Exchange 正确接收了消息,但是并没有进入 Queue 时。
发布前可以通过Channel#getNextPublishSeqNo()获取序列号:
int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);
复制代码
将消息与序列号相关联的一种简单方法是使用映射。假设我们要发布字符串,因为它们很容易变成用于发布的字节数组。下面是一个代码示例,它使用映射将发布序列号与消息的字符串正文相关联:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());
复制代码
发布代码现在使用map跟踪出站消息。我们需要在确认到达时清理此映射,并在消息被拒绝时执行诸如记录警告之类的操作:
// ConcurrentNavigableMap 根据key大小升序排列的map
// #headMap返回小于等于当前key的数据组成的map
// outstandingConfirms-原始map
// confirmed-新map
// confirmed.clear();
// outstandingConfirms中与confirmed中相同的数据被删除,confirmed中的数据全部被删除
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
// 新map中的数据全部被删除,存在于旧map中的相同数据也被删除
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code
复制代码
上一个示例包含一个回调,用于在确认到达时清理map。请注意,此回调处理单个和多个确认。当确认到达时使用此回调(作为 Channel#addConfirmListener 的第一个参数)。nack-ed 消息的回调检索消息正文并发出警告。然后它重新使用之前的回调来清理未完成确认的映射(无论消息是confirmed还是 nack-ed,它们在映射中的相应条目都必须被删除。)