springboot-kafka消费数据提交offset

本文总结在使用 springboot-kafka 消费数据时,提交 offset 的相关设置。

关于提交偏移量,有好几个设置。如果 consumer property 中的 enable.auto.commit 被设置为 true,Kafka 根据其配置自动提交偏移量。

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
        bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
    // highlight-start
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    // highlight-end
    return props;
}
复制代码

如果为 false,则 listener 容器支持多种 AckMode 设置。如下列表。默认的 AckMode 为 BATCH。在 springboot-kafka 的2.3版本及以后,enable.auto.commit 的默认值为 false,在此版本以前,默认值为 true

消费者的 poll() 方法,会返回一条及以上的 ConsumerRecords。每条 ConsumerRecords 都会调用 MessageListener ,以下列表描述了容器对每个 AckMode 采取的操作(未使用事务时):

  • RECORD:当 listener 处理完记录返回时提交偏移量。
  • BATCH:当 poll() 返回的所有记录都处理完后,提交偏移量。
  • TIME:当 poll() 返回的所有记录都被处理后,且超过了自上次提交以来的 ackTime,提交偏移量。
  • COUNT:当 poll() 返回的所有记录都已处理完毕时,且自上次提交以来已收到 ackCount 记录,提交偏移量。
  • COUNT_TIME:类似于 TIME 和 COUNT,但如果任一条件为真,则执行提交。
  • MANUAL:listener 负责调用Acknowledgment.acknowledge(),在调用后,提交行为类似 BATCH。
  • MANUAL_IMMEDIATE:当 listener 调用了 Acknowledgment.acknowledge() 方法后,立刻提交。
  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // highlight-start
    factory.getContainerProperties().setAckMode(AckMode.TIME);
    // highlight-end
    return factory;
  }
复制代码

使用事务时,偏移量被发送到事务,语义等同于 RECORD 或 BATCH,具体取决于 listener 类型(记录或批处理)。

:::tip
MANUAL 和 MANUAL_IMMEDIATE 模式, 要求侦听器是 AcknowledgeingMessageListener 或 BatchAcknowledgeingMessageListener
:::

消费者使用 commitSync()commitAsync(),是由 syncCommits 属性决定,默认为 true ,你可以通过 setSyncCommit 来设置,当设置为true时,你可以通过 setSyncCommitTimeout 设置同步超时时长。

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // highlight-start
    factory.getContainerProperties().setSyncCommits(true);
    factory.getContainerProperties().setSyncCommitTimeout(Duration.ofSeconds(1));
    // highlight-end
    return factory;
  }
复制代码

当使用异步提交时,可以设置回调方法,默认的回调为 LoggingCommitCallback ,当错误时会打印日志,在日志级别为 debug 时,会打印成功日志。

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.TIME);
    // highlight-start
    factory.getContainerProperties().setSyncCommits(false);
    factory.getContainerProperties().setCommitCallback(new OffsetCommitCallback(){

      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> map,
          Exception e) {
        LOG.INFO(map.values());
        LOG.ERROR(e.printStackTrace(););
      }
    });
     // highlight-end
    return factory;
  }
复制代码

Acknowledgment 有以下方法:

public interface Acknowledgment {
  // 此方法使侦听器可以控制何时提交偏移量。
  void acknowledge();

  // record listener 使用
  default void nack(long sleep) {
    throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
  }

  // batch listener 使用
  default void nack(int index, long sleep) {
    throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
  }
}
复制代码

使用 record listener 时,当调用 nack() 时,从上一次 poll() 拉取到的,所有正在处理的偏移量将被提交,其余的将被丢弃,本次处理失败和未处理的记录,将在下一次传递。通过设置 sleep 参数,消费者线程可以在重新交付之前暂停。这与在容器配置了 SeekToCurrentErrorHandler 时引发异常的功能类似。

使用 batch listener 时,可以指定批处理中发生故障的索引。当调用 nack() 时,将为索引之前的记录提交偏移量,并在分区上执行失败和丢弃记录的搜索,以便在下一次 poll() 时重新传递它们。这是对 SeekToCurrentBatchErrorHandler 的改进,后者只能寻找整个批次进行重新投递。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享