Kafka是一个多分区、多副本的分布式消息系统。在实际应用中,kafka通常可以用作消息系统、存储系统和流式处理平台。本主要整理了kafka入门实践知识,以备查阅。
文章内容参考《深入理解Kafka:核心设计与实践原理》,欢迎大家购买阅读。
基本概念
Producer、Consumer、broker
通常情况下,一个kafka体系架构包括多个Producer、多个Consumer、多个broker以及一个Zookeeper集群。具体结构如下图所示:
- Producer:生产者,负责将消息发送到kafka中。
- Consumer:消费者,负责从kafka中拉取消息进行消费。
- Broker:Kafka服务节点,一个或多个Broker组成了一个Kafka集群
- Zookeeper集群:负责管理kafka集群元数据以及控制器选举等。
主题(Topic)、分区(Partition)和偏移量(offset)
生产者发送消息给kafka的时候,必须指定消息的主题(Topic),消费者也是通过订阅Topic才能进行消息消费的。
- 主题(Topic):Topic是消息的分类,它是一个逻辑上的概念。消息必须指定Topic才能发送和消费。
- 分区(Partition):Partition是物理上的概念,它表示消息真实存储的物理位置。一个Topic可以对应有多个Partition,每个Partition真实的存储了消息内容。每一条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的Partition。
- 偏移量(offset):offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性。
kafka只保证分区有序,不保证主题有序。
此外,Kafka为分区引入了多副本(Replica)机制,副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。当leader副本出故障时,会从follow副本中重新选举新的leader副本。
AR、ISR和OSR
- AR(Assigned Replicas):分区中的所有副本统称为AR。
- ISR(In-Sync Replicas):所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR,ISR 集合是 AR 集合中的一个子集。
- OSR(Out-of-Sync Replicas):与 leader 副本同步滞后过多的副本(不包括 leader 副本)组成 OSR。AR=ISR+OSR。
leader副本负责维护和跟踪 ISR 集合中所有follower副本的滞后状态,当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中有 follower 副本“追上”了 leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。
HW 和 LEO
- HW(High Watermark):高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。
- LEO(Log End Offset):它标识当前日志文件中下一条待写入消息的 offset,上图中 offset 为9的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加1。
分区ISR 集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO 即为分区的HW,对消费者而言只能消费 HW 之前的消息。
生产者
ProducerRecord
在Java版本的kafka客户端中,构建消息的对象是ProducerRecord
,它并不是单纯意义上的消息,它包含了多个属性:
public class ProducerRecord<K, V> {
private final String topic; //主题
private final Integer partition; //分区号
private final Headers headers; //消息头部
private final K key; //键
private final V value; //值
private final Long timestamp; //消息的时间戳
//省略其他成员方法和构造方法
}
复制代码
序列化
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给 Kafka。在Java版本的kafka客户端中,序列化器通过实现org.apache.kafka.common.serialization.Serializer
接口实现。接口定义如下:
public interface Serializer<T> extends Closeable {
/**
* 配置
*/
void configure(Map<String, ?> configs, boolean isKey);
/**
* 将data转换为字节数组
*/
byte[] serialize(String topic, T data);
/**
* 关闭序列化器
*/
@Override
void close();
}
复制代码
通过key.serializer
和value.serializer
可以分别指定消息key和value的序列化器,指定之后,kafka客户端就会采用相应的序列化器对key和value执行序列化处理。
分区器
消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了partition
字段,那么就不需要分区器。如果没有指定,就需要依赖分区器,分区器的作用就是为消息分配分区。
Kafka中提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner
,它实现了org.apache.kafka.clients.producer.Partitioner
接口,这个接口中定义了2个方法,具体如下所示。
public interface Partitioner extends Configurable, Closeable {
/**
* 根据主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息来计算分区
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();
}
复制代码
默认的分区器会对key
进行哈希,最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果Key
为null
,则会随机分配一个分区。这个随机是在这个参数metadata.max.age.ms
的时间范围内随机选择一个。对于这个时间段内,如果key为null
,则只会发送到唯一的分区。这个值默认情况下是10分钟更新一次。
通过配置参数
partitioner.class
来配置分区器。
生产者拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,也可以用来在发送回调逻辑前做一些定制化的需求。其接口为org.apache.kafka.clients.producer. ProducerInterceptor
,包含以下3个接口:
/**
* 在消息发送之前,对消息内容进行前置处理
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
/**
* 在消息被应答(Acknowledgement)之前或消息发送失败时会调用本方法。
* 优先于用户设定的 Callback 之前执行
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
复制代码
通过配置参数
interceptor.classes
来配置生产者拦截器。
重要生产者参数
参数名称 | 默认值 | 含义 |
---|---|---|
acks | 1 | 指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。 1: 生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。 0: 生产者发送消息之后不需要等待任何服务端的响应。 -1: 生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。 |
max.request.size | 1048576 | 限制生产者客户端能发送的消息的最大值 |
retries | 0 | 生产者重试的次数 |
retry.backoff.ms | 100 | 设定两次重试之间的时间间隔 |
更多可参考:juejin.cn/book/684473…
消费者
消费者和消费组
消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。
- 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。
反序列化
与生产者的序列化相反,消费者的反序列化是将字节数组转换为具体的对象的。反序列器实现了org.apache.kafka.common.serialization.Deserializer
接口。
public interface Deserializer<T> extends Closeable {
/**
* 配置
*/
void configure(Map<String, ?> configs, boolean isKey);
/**
* 反序列化,将data字节数组转换为实例T
*/
T deserialize(String topic, byte[] data);
@Override
void close();
}
复制代码
消息消费
消费者消费到的每条消息的类型为ConsumerRecord
,和生产者发送的消息类型ProducerRecord
相对应,不过 ConsumerRecord
中的内容更加丰富,具体的结构参考如下代码:
public class ConsumerRecord<K, V> {
private final String topic; // 主题
private final int partition; // 分区
private final long offset; // 偏移量
private final long timestamp; // 时间
private final TimestampType timestampType;
private final int serializedKeySize; // 序列化key大小
private final int serializedValueSize; // 序列化value大小
private final Headers headers; // 头
private final K key; // key
private final V value; // value
private volatile Long checksum; // 校验和
//省略若干方法
}
复制代码
位移提交
对于 Kafka中的分区而言,它的每条消息都有唯一的offset
,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset
的概念,表示消费到分区中某个消息所在的位置。
对于消息在分区中的位置,我们将 offset 称为“偏移量”;对于消费者消费到的位置,将 offset 称为“位移”,有时候也会更明确地称之为“消费位移”。
KafkaConsumer
类提供了position(TopicPartition)
和committed(TopicPartition)
两个方法来分别获取上下一次拉取消息的位置和当前已经消费到的位置。
在Kafka中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit
配置,默认值为true
。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms
配置,默认值为5秒。
在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll()
方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
在 Kafka 中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。手动提交可以细分为同步提交和异步提交,对应于KafkaConsumer
中的commitSync()
和commitAsync()
两种类型的方法。
对于采用 commitSync() 的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用 commitSync() 的另一个含参方法,具体定义如下:
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)
复制代码
指定位移消费
在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset
的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。如果将 auto.offset.reset 参数配置为“earliest”,那么消费者会从起始处。
此外,KafkaConsumer
还提供了seek()
方法,支持追前消费或回溯消费。
public void seek(TopicPartition partition, long offset)
复制代码
总之,seek()
方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法来向前跳过若干消息,也可以通过这个方法来向后回溯若干消息,这样为消息的消费提供了很大的灵活性。seek()
方法也为我们提供了将消费位移保存在外部存储介质中的能力,还可以配合再均衡监听器来提供更加精准的消费能力。
再均衡
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,并且在再均衡发生期间,消费组内的消费者是无法读取消息的。
另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失,因此容易发生重复消费问题。在一个消息者消费了部分消息还没来得及提交位移时,如果发生再均衡,另一个消费者会重新消费这部分消息。
kafka提供了再均衡监听器ConsumerRebalanceListener
,允许在再均衡发生之前以及再均衡完成之后做一些处理逻辑。接口定义如下:
public interface ConsumerRebalanceListener {
/**
* 这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。
* 可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。
*/
void onPartitionsRevoked(Collection<TopicPartition> partitions);
/**
* 这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。
*/
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
复制代码
消费者拦截器
消费者拦截器主要在消费消息前或在提交消费位移后进行一些定制化的操作。接口为org.apache.kafka.clients.consumer. ConsumerInterceptor
,定义如下:
public interface ConsumerInterceptor<K, V> extends Configurable {
/**
* 在KafkaConsumer.poll()方法返回之前会调用本方法对消息进行相应的定制化操作。
* 比如修改返回的消息内容、按照某种规则过滤消息等。
*/
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
/**
* 在提交完消费位移之后调用。可以使用这个方法来记录跟踪所提交的位移信息。
*/
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
}
复制代码
通过配置参数
interceptor.classes
来配置消费者拦截器。
消费者多线程实现
KafkaProducer
是线程安全的,然而KafkaConsumer
却是非线程安全的。KafkaConsumer
中定义了一个私有acquire()
方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出ConcurrentModifcationException
异常。KafkaConsumer
中的基本所有的公用方法在执行所要执行的动作之前都会调用这个acquire()
方法。另外,KafkaConsumer
还定义了私有release()
方法,跟acquire()
方法成对出现,可以理解成加锁操作和解锁操作。
为了提高整体的消费能力,我们可以通过多线程方式实现消息消费。
方式一:为每个线程实例化一个KafkaConsumer对象
一个线程对应一个KafkaConsumer
实例,一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。
方式二:多个消费线程同时消费同一个分区
这个通过assign()
、seek()
等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力。不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少,也并不推荐。
方式三:多线程处理消息
一般来说,处理消息才是消费的性能瓶颈。因此,我们可以使用多线程方式来异步处理消息,进而提高整个消费性能。当然这种方式的缺点一是无法保证消息的顺序处理,而是消费位移提交难以控制。
ConsumerRecords<String, String> records =
kafkaConsumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
executorService.submit(new RecordsHandler(records));
}
复制代码
为了实现消费位移控制,需要引入更加复杂的机制实现。这里暂且不说了,具体可以看看作者原文。
重要消费者参数
参数 | 默认值 | 含义 |
---|---|---|
fetch.min.bytes | 1 | 用来配置 Consumer 在一次拉取请求(调用 poll() 方法)中能从 Kafka 中拉取的最小数据量。 |
fetch.max.bytes | 52428800 | 来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大数据量 |
fetch.max.wait.ms | 500 | 用于指定 Kafka 的等待时间 |
max.partition.fetch.bytes | 1048576 | 用来限制一次拉取中每个分区的消息大小 |
max.poll.records | 500 | 用来配置 Consumer 在一次拉取请求中拉取的最大消息数 |
更多参数可以参考:juejin.cn/book/684473…