这是我参与更文挑战的第2天,活动详情查看: 更文挑战
Kafka是用 Scala语言编写的高性能跨语言分布式消息队列,单机吞吐量可以到达 10w 级,消息延迟在 ms 级。Kafka 是完全的分布式系统,Broker
、Producer
、Consumer
都原生自动支持分布式,依赖于 ZooKeeper 做分布式协调。
Kafka 支持一写多读,消息可以被多个客户端消费,消息有可能会重复,但是不会丢失。
1. Kafka核心概念
首先 Kafka 消息队列由三个角色组成,左面的是消息的生产方 Producer
;中间是 Kafka 集群, Kafka 集群由多台 Kafka server 组成,每个 Server 称为一个 Broker
(Kafka支持水平扩展,一般Broker数量越多,集群吞吐率越高),也就是消息代理;右面的是消息的消费方 Consumer
。Kafka通过Zookeeper
管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。
Producer
使用push模式将消息发布到broker,Consumer
使用pull模式从broker订阅并消费消息。
1.2 Topic
每条消息发布到Kafka集群的消息都有一个类别,这个类别被称为Topic
。物理逻辑上我们可以认为,一个 Topic 就是一个 Queue。在实际应用中,不同业务数据就可以设置为不同的 Topic。一个 Topic 可以有多个消费方,当生产方在某个 Topic 发出一条消息后,所有订阅了这个 Topic 的消费方都可以收到这条消息。
1.3 Partition
为了提高并行能力,Kafka 为每个 Topic 维护了多个 Partition
分区,每个分区可以看作一份追加类型的日志。 每个分区中的消息保证 ID 唯一且有序,新消息不断追加到尾部。Partition 实际存储数据时,会对按大小进行分段(Segment),来保证总是对较小的文件进行写操作,提高性能,方便管理。
如图中间部分,Partition 分布于多个 Broker 上。图中绿色的模块表示 Topic1 被分为了 3 个 Partition。每个 Partition 会被复制多份存在于不同的 Broker 上,如图中红色的模块,这样可以保证主分区出现问题时进行容灾。每个 Broker 可以保存多个 Topic 的多个 Partition。
Kafka 只保证一个分区内的消息有序,不能保证一个 Topic 的不同分区之间的消息有序。为了保证较高的处理效率,所有的消息读写都是在主 Partition 中进行,其他副本分区只会从主分区复制数据。Kafka 会在 ZooKeeper 上针对每个 Topic 维护一个称为 ISR(in-sync replica),就是已同步的副本集。如果某个主分区不可用了,Kafka 就会从 ISR 集合中选择一个副本作为新的主分区。
1.4 Consumer Group
Consumer 按照 Group 来消费消息,Kafka 通过 Group Coordinator 来管理 Consumer 实际负责消费哪个 Partition,默认支持 Range 和轮询分。
Topic中的每一条消息可以被多个 Consumer Group
消费,如上图中的 GroupA 和 GroupB。但是每个 Partition 在一个 Group 中只能由一个 Consumer 消费。所以如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
2. Kafka进阶使用
2.1 如何保证消费顺序
还是来看上图的例子,这个 Topic 分为 4 个 Partition,就是图中绿色的 P1到 P4,上部的生产方根据规则选择一个 Partition 进行写入,默认规则是轮询策略。也可以由生产方指定 Partition 或者指定 key 来根据 Hash 值选择 Partition。
如果topic有多个partition,消费数据时就不能保证数据的消费顺序。在需要严格保证消息的消费顺序的场景下,如果不要求消费性能时,可以将partition数目设为1,将所有消息由Producer写入这个partition中进行消费。反之可以将同一类型的业务(比如对一个订单的所有操作)指定一个相同的routing key
(订单编号), 将消息发送到同一个partition中也可以保证消费顺序。
2.2 如何提升吞吐量
Producer可以通过对三个参数的配置提高吞吐量:
buffer.memory
:设置发送消息的缓冲区,默认值是33554432,就是32MBcompression.type
:默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销。batch.size
:批量发送消息大小,默认值是:16384,就是16KB,也就是一个batch满了16kb就发送出去。如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里。
一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,如果一个批次设置大了,会有延迟。一般根据一条消息大小来设置。如果我们消息比较少。配合使用的参数linger.ms
,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的,一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去。
2.3 如何保证消息不丢失
2.3.1 broker设置
replication.factor
>=3: 消息分区的副本,保证Broker的高可用。min.insync.replicas
>1:消息写入多个副本才算提交unclean.leader.election.enable
=false:数据缺失太多的broker不能成为leader
replication.factor
应该 > min.insync.replicas
,保证可用性,如果相等,任何一个副本挂了,那么往这个分区插入数据的时候会报错。
2.3.2 Producer设置
Producer在往broker发送消息时,可以通过retries
的设置解决网络抖动引发的问题,此外可以通过对request.required.acks
的设置保证消息保证消息发送的可靠性。
0
:只要请求已发送出去,就算是发送完了,不关心有没有写成功。性能很好,如果是对一些日志进行分析,可以承受丢数据的情况,用这个参数,性能会很好。1
:发送一条消息,当leader partition写入成功以后,才算写入成功。不过这种方式也有丢数据的可能。-1
:需要ISR列表里面,所有副本都写完以后,这条消息才算写入成功。可以通过对kafka服务端
2.3.3 consumer设置
消息写入Broker后,Consumer在消费后会提交offset标记消费进度。但如果我们选择的自动提交,如果是批量消费可能存在某个线程处理失败,但是offset还是被提交的情况,造成消息丢失。可以通过enable.auto.commit=false
关闭自动提交,在消息重复处理完后手动提交offset,但这样也会出现消息处理完还未提交,consumer挂掉的情况,造成重复消费。后面会提到。
2.4 如何保证消息不重复消费
Kafka提供的offset
可以非常直接实现Exactly once
(每条消息肯定会被传输一次且仅传输一次)。
-
每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,
kafka-0.10.1.X
版本之前版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。 -
kafka-0.10.1.X
之后的版本提交offset发送给kafka内部topic:__consumer_offsets
,提交过去的时候, key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact(合并),也就是每个group.id+topic+分区号就保留最新数据。
如果我们选择的自动提交,可能存在消息在消费过程中,消费者在自动提交偏移量之前异常退出,导致kafka未提交偏移量,进而出现重复消费的问题。我们可以对auto_commit_interval_ms
设一个较小的值来规避,但并未完全解决。鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),可以通过手动提交offset对偏移量更加灵活精准地控制,同时在业务代码中对消息进行幂等处理加以配合,保证消息不被重复处理且不丢失消息。
2.5 如何提升消费能力
- 增加topic的partition数量设置,并匹配consumer的数量
- 批量拉取消息,多线程消费。通过
max.poll.records
(每次拉取的条数)和max.poll.interval.ms
( 每次poll的最大时间间隔),让每个consumer一次可以拉取多条数据进行多线程消费。
实战代码可以阅读这篇文章。