Kafka核心概念及进阶使用

这是我参与更文挑战的第2天,活动详情查看: 更文挑战

  Kafka是用 Scala语言编写的高性能跨语言分布式消息队列,单机吞吐量可以到达 10w 级,消息延迟在 ms 级。Kafka 是完全的分布式系统,BrokerProducerConsumer 都原生自动支持分布式,依赖于 ZooKeeper 做分布式协调。

  Kafka 支持一写多读,消息可以被多个客户端消费,消息有可能会重复,但是不会丢失。

1. Kafka核心概念

image.png

  首先 Kafka 消息队列由三个角色组成,左面的是消息的生产方 Producer;中间是 Kafka 集群, Kafka 集群由多台 Kafka server 组成,每个 Server 称为一个 Broker(Kafka支持水平扩展,一般Broker数量越多,集群吞吐率越高),也就是消息代理;右面的是消息的消费方 Consumer。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。

  Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

1.2 Topic

  每条消息发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理逻辑上我们可以认为,一个 Topic 就是一个 Queue。在实际应用中,不同业务数据就可以设置为不同的 Topic。一个 Topic 可以有多个消费方,当生产方在某个 Topic 发出一条消息后,所有订阅了这个 Topic 的消费方都可以收到这条消息。

1.3 Partition

  为了提高并行能力,Kafka 为每个 Topic 维护了多个 Partition 分区,每个分区可以看作一份追加类型的日志。 每个分区中的消息保证 ID 唯一且有序,新消息不断追加到尾部。Partition 实际存储数据时,会对按大小进行分段(Segment),来保证总是对较小的文件进行写操作,提高性能,方便管理。

  如图中间部分,Partition 分布于多个 Broker 上。图中绿色的模块表示 Topic1 被分为了 3 个 Partition。每个 Partition 会被复制多份存在于不同的 Broker 上,如图中红色的模块,这样可以保证主分区出现问题时进行容灾。每个 Broker 可以保存多个 Topic 的多个 Partition。

  Kafka 只保证一个分区内的消息有序,不能保证一个 Topic 的不同分区之间的消息有序。为了保证较高的处理效率,所有的消息读写都是在主 Partition 中进行,其他副本分区只会从主分区复制数据。Kafka 会在 ZooKeeper 上针对每个 Topic 维护一个称为 ISR(in-sync replica),就是已同步的副本集。如果某个主分区不可用了,Kafka 就会从 ISR 集合中选择一个副本作为新的主分区。

1.4 Consumer Group

image.png

  Consumer 按照 Group 来消费消息,Kafka 通过 Group Coordinator 来管理 Consumer 实际负责消费哪个 Partition,默认支持 Range 和轮询分。

  Topic中的每一条消息可以被多个 Consumer Group 消费,如上图中的 GroupA 和 GroupB。但是每个 Partition 在一个 Group 中只能由一个 Consumer 消费。所以如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。

2. Kafka进阶使用

2.1 如何保证消费顺序

  还是来看上图的例子,这个 Topic 分为 4 个 Partition,就是图中绿色的 P1到 P4,上部的生产方根据规则选择一个 Partition 进行写入,默认规则是轮询策略。也可以由生产方指定 Partition 或者指定 key 来根据 Hash 值选择 Partition。

  如果topic有多个partition,消费数据时就不能保证数据的消费顺序。在需要严格保证消息的消费顺序的场景下,如果不要求消费性能时,可以将partition数目设为1,将所有消息由Producer写入这个partition中进行消费。反之可以将同一类型的业务(比如对一个订单的所有操作)指定一个相同的routing key(订单编号), 将消息发送到同一个partition中也可以保证消费顺序。

2.2 如何提升吞吐量

  Producer可以通过对三个参数的配置提高吞吐量:

  • buffer.memory:设置发送消息的缓冲区,默认值是33554432,就是32MB
  • compression.type:默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销。
  • batch.size:批量发送消息大小,默认值是:16384,就是16KB,也就是一个batch满了16kb就发送出去。如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里。

一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,如果一个批次设置大了,会有延迟。一般根据一条消息大小来设置。如果我们消息比较少。配合使用的参数linger.ms,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的,一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去。

2.3 如何保证消息不丢失

2.3.1 broker设置
  • replication.factor>=3: 消息分区的副本,保证Broker的高可用。
  • min.insync.replicas >1:消息写入多个副本才算提交
  • unclean.leader.election.enable=false:数据缺失太多的broker不能成为leader

replication.factor 应该 > min.insync.replicas ,保证可用性,如果相等,任何一个副本挂了,那么往这个分区插入数据的时候会报错。

2.3.2 Producer设置

Producer在往broker发送消息时,可以通过retries的设置解决网络抖动引发的问题,此外可以通过对request.required.acks的设置保证消息保证消息发送的可靠性。

  • 0 :只要请求已发送出去,就算是发送完了,不关心有没有写成功。性能很好,如果是对一些日志进行分析,可以承受丢数据的情况,用这个参数,性能会很好。
  • 1 :发送一条消息,当leader partition写入成功以后,才算写入成功。不过这种方式也有丢数据的可能。
  • -1:需要ISR列表里面,所有副本都写完以后,这条消息才算写入成功。可以通过对kafka服务端
2.3.3 consumer设置

消息写入Broker后,Consumer在消费后会提交offset标记消费进度。但如果我们选择的自动提交,如果是批量消费可能存在某个线程处理失败,但是offset还是被提交的情况,造成消息丢失。可以通过enable.auto.commit=false 关闭自动提交,在消息重复处理完后手动提交offset,但这样也会出现消息处理完还未提交,consumer挂掉的情况,造成重复消费。后面会提到。

2.4 如何保证消息不重复消费

   Kafka提供的offset可以非常直接实现Exactly once(每条消息肯定会被传输一次且仅传输一次)。

  • 每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,kafka-0.10.1.X版本之前版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。

  • kafka-0.10.1.X之后的版本提交offset发送给kafka内部topic:__consumer_offsets,提交过去的时候, key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact(合并),也就是每个group.id+topic+分区号就保留最新数据。

如果我们选择的自动提交,可能存在消息在消费过程中,消费者在自动提交偏移量之前异常退出,导致kafka未提交偏移量,进而出现重复消费的问题。我们可以对auto_commit_interval_ms设一个较小的值来规避,但并未完全解决。鉴于Kafka自动提交offset的不灵活性和不精确性(只能是按指定频率的提交),可以通过手动提交offset对偏移量更加灵活精准地控制,同时在业务代码中对消息进行幂等处理加以配合,保证消息不被重复处理且不丢失消息。

2.5 如何提升消费能力

  • 增加topic的partition数量设置,并匹配consumer的数量
  • 批量拉取消息,多线程消费。通过max.poll.records (每次拉取的条数)和max.poll.interval.ms( 每次poll的最大时间间隔),让每个consumer一次可以拉取多条数据进行多线程消费。

实战代码可以阅读这篇文章

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享