本文总结在使用 springboot-kafka 消费数据时,提交 offset 的相关设置。
关于提交偏移量,有好几个设置。如果 consumer property 中的 enable.auto.commit
被设置为 true,Kafka 根据其配置自动提交偏移量。
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
// highlight-start
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
// highlight-end
return props;
}
复制代码
如果为 false,则 listener 容器支持多种 AckMode 设置。如下列表。默认的 AckMode 为 BATCH。在 springboot-kafka 的2.3版本及以后,enable.auto.commit
的默认值为 false,在此版本以前,默认值为 true。
消费者的 poll()
方法,会返回一条及以上的 ConsumerRecords。每条 ConsumerRecords 都会调用 MessageListener ,以下列表描述了容器对每个 AckMode 采取的操作(未使用事务时):
- RECORD:当 listener 处理完记录返回时提交偏移量。
- BATCH:当 poll() 返回的所有记录都处理完后,提交偏移量。
- TIME:当 poll() 返回的所有记录都被处理后,且超过了自上次提交以来的 ackTime,提交偏移量。
- COUNT:当 poll() 返回的所有记录都已处理完毕时,且自上次提交以来已收到 ackCount 记录,提交偏移量。
- COUNT_TIME:类似于 TIME 和 COUNT,但如果任一条件为真,则执行提交。
- MANUAL:listener 负责调用Acknowledgment.acknowledge(),在调用后,提交行为类似 BATCH。
- MANUAL_IMMEDIATE:当 listener 调用了 Acknowledgment.acknowledge() 方法后,立刻提交。
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// highlight-start
factory.getContainerProperties().setAckMode(AckMode.TIME);
// highlight-end
return factory;
}
复制代码
使用事务时,偏移量被发送到事务,语义等同于 RECORD 或 BATCH,具体取决于 listener 类型(记录或批处理)。
:::tip
MANUAL 和 MANUAL_IMMEDIATE 模式, 要求侦听器是 AcknowledgeingMessageListener 或 BatchAcknowledgeingMessageListener
:::
消费者使用 commitSync()
或 commitAsync()
,是由 syncCommits 属性决定,默认为 true ,你可以通过 setSyncCommit 来设置,当设置为true时,你可以通过 setSyncCommitTimeout 设置同步超时时长。
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// highlight-start
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(1));
// highlight-end
return factory;
}
复制代码
当使用异步提交时,可以设置回调方法,默认的回调为 LoggingCommitCallback ,当错误时会打印日志,在日志级别为 debug 时,会打印成功日志。
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.TIME);
// highlight-start
factory.getContainerProperties().setSyncCommits(false);
factory.getContainerProperties().setCommitCallback(new OffsetCommitCallback(){
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map,
Exception e) {
LOG.INFO(map.values());
LOG.ERROR(e.printStackTrace(););
}
});
// highlight-end
return factory;
}
复制代码
Acknowledgment 有以下方法:
public interface Acknowledgment {
// 此方法使侦听器可以控制何时提交偏移量。
void acknowledge();
// record listener 使用
default void nack(long sleep) {
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
}
// batch listener 使用
default void nack(int index, long sleep) {
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
}
}
复制代码
使用 record listener 时,当调用 nack() 时,从上一次 poll() 拉取到的,所有正在处理的偏移量将被提交,其余的将被丢弃,本次处理失败和未处理的记录,将在下一次传递。通过设置 sleep 参数,消费者线程可以在重新交付之前暂停。这与在容器配置了 SeekToCurrentErrorHandler 时引发异常的功能类似。
使用 batch listener 时,可以指定批处理中发生故障的索引。当调用 nack() 时,将为索引之前的记录提交偏移量,并在分区上执行失败和丢弃记录的搜索,以便在下一次 poll() 时重新传递它们。这是对 SeekToCurrentBatchErrorHandler 的改进,后者只能寻找整个批次进行重新投递。