带你学习Kafka

前言

本文先介绍了kafka是什么,以及kafka的优势,日常使用场景,以及kafka的一些基本概念,这些都是最基本的,也是初学者需要了解和掌握的。接着从整体架构入手,带你看看kafka的设计思想,包括工作流程存储机制。然后分别从生产者和消费者角度看看kafka的工作细节。

kafka介绍

Kafka最初由Linkedin公司开发,是一个分布式、支持分区、多副本、基于Zookeeper的分布式消息队列,最大特性就是可以实时处理大量数据以满足各种需求场景,它是用scala语言编写的。

特性

  • 1、高吞吐,低延迟:kafka每秒可以处理几十万条消息,延迟最低只有几毫秒。每个topic可以分多个partition并行操作,consumer group对partition进行消费操作。
  • 2、可扩展,伸缩性好:kafka集群支持热扩展。
  • 3、持久性、可靠性:生产者使用ack+ISR机制可保证消息不丢失,Exactly Once语义可保证消息有且只被消费一次。
  • 4、容错性:partition多副本选举机制可保证一台机器宕机不影响整体功能,因为一般partition多个副本都是分布在不同机器节点上。
  • 5、高并发:支持数千个客户端同时读写。

发布订阅模式

image.png
kafka是基于发布/订阅模式的,生产者将消息发布到Topic中,有多个消费者订阅该主题,发布到Topic的消息会被所有订阅者消费,被消费的数据不会立即从Topic中清除。

基本概念

  • Producer:消息生产者,向Kafka Broker发消息的客户端
  • Consumer:消息消费者,从Kafka Broker取消息的客户端。
  • Consumer Group:消费者组,消费者组内的每个消费者负责消费不同分区的数据,提高消费能力。一个分区只能由组内一个消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker:一台Kafka机器就是一个Broker。一个集群由多个Broker组成,一个Broker可以容纳多个Topic。
  • Topic:主题,可以理解为一个队列,Topic将消息分类,生产者和消费者面向的是一个Topic
  • Partition:一个Topic可以分为多个partition,分布到不同的Broker上,每个Partition都是一个有序的队列。
  • Replica:副本,为实现备份的功能,保证集群中某个节点发生故障时,该节点上的Partition数据不丢死,且Kafka仍然能够继续工作,Kafka提供了副本机制,一个Topic的每个分区都有若干个副本,一个Leader和若干个Follower。
  • Leader:每个分区Partition多个副本中的“主”副本,生产者发送数据的对象以及消费者消费数据的对象都是Leader。
  • Follower:每个分区Partition多个副本中的“从”副本,实时从Leader同步数据,保持和Leader数据的同步,Leader发生故障时,某个Follower会被选举成为新的Leader。
  • Offset:消费者消费的位置信息,监控数据消费到什么位置,当消费者挂掉再恢复的时候可以从上次的消费位置Offset继续消费。
  • Zookeeper:帮助Kafka存储和管理集群信息。

架构

image.png

kafka存储的消息来自多个Producer的进程,数据从而被存储到不同的Topic主题下的不同Partition分区中,在一个分区内,这些消息被索引并连同时间戳存储在一起,其他被称作Consumer的进程可以从分区中拉取消息。

存储机制

image.png

由于生产者生产的消息会不断追加到log文件末尾,为了防止log文件过大导致数据查找效率低下,kafka采取了分片和索引机制,将每个Partition分为多个Segment,每个Segment对应两个文件:.index索引文件和.log数据文件(实际是四个文件),这些文件位于同一个文件夹下,该文件夹的命名规则是topic名-分区号。例如first这个topic有三个分区,其对应的文件夹为first-0,first-1,first-2。

ls /root/data/kafka/first-0
00000000000000009014.index
00000000000000009014.log
00000000000000009014.timeindex
00000000000000009014.snapshot
leader-epoch-checkpoint
复制代码

如上图,index和log文件以当前Segment的第一条消息的Offset命名。下图为index文件和log文件的结构示意图

image.png

生产者

Producer我们来研究下是如何分区的以及为什么要分区,还有生产者是如何保证发送的消息能发送到指定的分区不丢失的。

分区策略

分区原因:

  • 方便在集群中扩展,topic只是一个逻辑上的概念,partition才是物理上的一个消息队列。增删topic相当于增删多个partition。对topic进行读写相当于以partition为单位进行读写。
  • 可以提高并发,对一个topic进行读写,相当于对多个partiton并行进行读写。

分区原则:我们要将发送的数据封装成一个ProducerRecord对象,该对象需要指定一些参数:topic是必选的,其他的如partition,timestamp,key,value,headers都是可选的。
那么生产者怎么知道该发往哪个Partiton呢?

  • 当指明Partition时,直接将给定的值作为Partition的值
  • 没有指明Partition但有key的情况下,将key的hash值与分区数取余得到Partition的值。
  • 既没有Partition有没有Key的情况下,第一次调用时随机生成一个整数(后面每次调用都在这个整数上自增),将这个数与可用的分区数取余得到Partition的值。

数据可靠性保证

为了保证Producer发送的数据可靠的发送到指定Partition,Partition收到数据后会发送给Producer一个ACK,如果Producer收到ACK,就会进入下一轮发送,否则重新发送数据。

image.png

ISR机制

这个机制很简单,就是自动给每个Partition维护一个ISR列表,这个列表里一定有Leader,还包含跟Leader保持数据同步的Follower(注意是保持同步的),也就是说只要某个Follower一直跟Leader保持同步,那么就会存在于ISR中。如果Follower一些问题不能从Leader同步数据,那么这个Follower被认为是“out-of-sync”,会从ISR列表中剔除。

ACK应答机制

对于某些不太重要的数据,对数据可靠性要求不高,能够容忍少量数据丢失的场景,没必要等到ISR中的Follower全部同步数据成功,针对不同场景,kafka提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡,选择合适的配置。

image.png

ACK参数有三个配置:

  • 0:Producer不等待Broker的ACK,是最低延迟。Broker一旦收到数据还没有写入磁盘就已经返回,当Broker故障的时候有可能丢失数据。
  • 1:Producer等待Broker的ACK,Partition的Leader一旦写入成功就返回ACK,如果在Follower同步成功之前Leader故障,那么将会丢失数据。
  • -1(all):Producer等待Broker的ACK,Partition的Leader和Follower全部写入成功之后才返回ACK。但是在Broker发送ACK时,Leader发生故障,则会造成数据重复。

副本数据同步策略

Partition就是通过副本数据同步保证数据从生产者发送到Broker的时候是不丢失的。确保Follower与Leader同步成功,Leader再返回ACK应答,这样才能保证在Leader挂掉之后,能在Follower中选举出新的Leader而不丢失数据。这里同步数据分两种方案,各有优缺点:

  • 半数以上Follower完成同步就发送ACK:优点是延迟低,缺点是选举新的Leader时,容忍n台节点故障,至少需要2n+1个副本。
  • 全部Follower完成同步才发送ACK:优点是选举新的Leader时,容忍n台节点故障,只需要n+1个副本。缺点是延迟高。

故障处理细节

image.png

首先解释下两个Offset:

  • LEO:每个副本中最大的Offset。
  • HW:消费者能见到的最大的Offset,也就是ISR队列中最小的LEO。

当Follower发生故障时,会被临时踢出ISR集合,等到Follower故障恢复后,Follower会读取本地磁盘记录上的上次的HW,并将log文件中高于HW的部分截取掉,从HW开始向Leader同步最新的数据。
当Leader发生故障时,会从ISR集合中选一个Follower作为新的Leader,之后为了保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。

这些措施都是为了保证多副本之间的数据一致性。

Exactly Once语义

将Exactly Once之前,先来了解下At least once,At most once以及Exactly Once之间的区别,其实很简单

  • At least once:就是至少一次。将服务器的ACK级别设置为-1,可以保证Producer的数据至少成功发送一次。因为即使发送失败也会一直重试发送,直到发送成功。可以保证消息不丢失,但是消息可能重复。
  • At most once:就是至多一次。将服务器的ACK级别设置为0,可以保证Producer只会发送一次消息,不管成不成功都不再发送了。可以保证消息不重复,但是消息可能丢失。
  • Exactly Once:就是有且仅有一次。即Exactly Once = At Least Once+幂等性,它的意思就是Producer不论向Broker发送多少条重复的消息,Broker都只会持久化一条消息。

要启用幂等性,只需要将Producer的参数中国enable.idompotence设置为true即可,开启幂等性的Producer在初始化时会被分配一个PID,发往同一Partition的消息会附带Sequence Number。Broker端会对<PID,Partition,SegNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。但是PID重启后就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区会话的Exactly Once。

消费者

消费方式

消息队列的消费方式一般都有Pull和Push两种。

  • Consumer采取Pull(拉取)模式从Broker中读取数据。
  • Consumer采取Push(推送)模式,Broker给Consumer推送消息的速率由Broker决定,很难适应消费速率不同的消费者。

push方式以最快速度传递消息,很容易造成Consumer来不及处理消息,典型表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当速率消费消息,pull模式的不足之处是如果kafka没有数据,消费者可能会陷入循环,一直拉不到数据。

消费

订阅Topic是以一个消费组订阅的,一个消费组里面可以有多个消费者,但是一个Partition只会由消费组里的一个消费者消费。但是一个Partition可以同时被多个消费组消费。因此,如果消费组内的消费者>Partition,可能会有个别的消费者空闲,没有分配到Partition消费。但是如果一个消费组内的消费者<Partition,那么会有消费者分配到多个Partiton,这就涉及到分配的问题。

分区分配策略

一个Consumer Group有多个Consumer,一个Topic有多个Partition,所以必然会涉及到Partition的分配问题,即确定哪个Partition由哪个Consumer来消费。Kafka有两种分配策略,一个是RoundRobin,一个是Range,默认是Range。当消费组内的消费者发生变化时,会触发分区重新分配。

RoundRobin

image.png
RoundRobin轮询方式将分区所有作为一个整体进行Hash排序,消费者组内分配分配个数差别最多为1,是按照组来分的,可以解决多个消费者消费数据不均衡的问题,是比较公平的一种分配方式。
但是,当消费者组内订阅不同主题时,可能造成消费混乱。如下图所示,Consumer0订阅主题A,Consumer1订阅主题B。

image.png

如上图,将A、B主题的分区排序后分配给消费者组,TopicB分区中的数据可能分配到Consumer0中。

Range

image.png

Range方式是按照主题来分的,不会产生轮询方式的消费混乱问题。但是可能会产生消息分配不对等问题。例如下图,Consumer0和Consumer1同时订阅了主题A和主题B,Consumer0被分配了4个Partition,Consumer1被分配了2个Partition。

image.png

Offset的保存

一个消费组消费Partition,需要保存Offset记录消费到哪,以前保存在Zookeeper,由于Zookeeper写性能不好,以前的解决办法是Consumer每隔一分钟上报一次,这里Zookeeper的性能严重影响了消费速度,而且很容易出现重复消费。在0.10版本之后,Kafka把offset的保存放在一个叫consumeroffsets topic的Topic中。
写进消息的Key由Groupid、Topic、Partition组成,Value是偏移量Offset。Topic配置的清理策略是Compact,总是保留最新的Key,其余删掉。一般情况下,每个key的Offset都是缓存在内存中,查询的时候不用遍历Partition。如果没有缓存,第一次就会遍历Partition建立缓存,然后查询返回。
确定Consumer Group位移信息写入consumer_offsets的哪个Partition,具体计算公式如下:

__consumers_offsets partition =
           Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)   
//groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
复制代码

分配Partition的过程及Rebalance

生产过程中Broker要分配Partition,消费中也要分配Partition给消费者。类似Broker中选了一个Controller出来,消费也要从Broker中选一个Coordinator,用于分配Partition。

选Coordinator

要看Offset保存在哪个Partition:该Partition Leader所在的Broker就是被选定的Coordinator。这里可以看到Consumer Group的Coordinator和保存Consumer Group Offset的Partition Leader是同一台机器。

分配过程

把Coordinator选出来就要分配了:

  • consumer启动或者Coordinator宕机了,Consumer会任意请求一个Broker,发送ConsumerMetadataRequest请求。Broker会按照上面的方法,选出这个Consumer对应的Coordinator地址。
  • Consumer发送心跳请求给Coordinator,返回illegalGenration就说明Consumer的信息是旧的了,需要重新加入进来,进行Rebalance。返回成功那么Consumer就从上次分配的Partition中继续执行。

Rebalance过程

当Partition或者消费者的数量发生变化时,都得进行Rebalance。

  • Consumer向Coordinator发送JoinGroupRequest请求。
  • 这时其他Consumer发送心跳请求过来时,Coordinator会告诉他们要Rebalance了。
  • 其他Consumer也发送JoinGroupRequest请求。
  • 所有记录在册的Consumer都发了JoinGroupRequest请求之后,Coordinator就会在其中选一个Consumer作为Leader,然后回复JoinGroupResponse,会告诉Consumer你是Follower还是Leader,还会把Follower的信息带给他,让他根据这些信息去分配Partition。
  • Consumer向Coordinator发送SyncGroupRequest,其中Leader的SyncGroupRequest中会包含分配的情况。
  • Coordinator回包,把分配的情况告诉Consumer,包括Leader。

Kafka的高性能

kakfa的高吞吐低延迟以及我们说Kafka快主要都体现在读写两个方面。具体来说是依靠以下几个方面:

  • 利用Partition实现并行处理
  • 磁盘顺序写
  • 充分利用PageCache
  • 零拷贝技术
  • 批处理以及数据压缩

利用Partition实现并行处理

我们知道Kakfa是一个发布订阅的消息系统,无论是发布还是订阅,都要指定Topic,每个Topic都包含一个或多个Partition,不同Partition可位于不同节点。一方面由于不同Partition可能位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面由于Partition在物理上对应一个文件夹,即使多个Partition位于同一节点,也可通过配置让同一节点上的不同Partition置于不同磁盘上,从而实现磁盘间的并行处理。

影响磁盘的关键因素是磁盘服务时间,即磁盘完成一个IO请求花费的时间,它由寻道时间、旋转延迟和数据传输时间三部分构成。机械键盘的连续读写性能很好,但随机读写性能很差,主要因为磁头移动到正确的磁道上需要时间,随机读写需要磁头不停移动,时间都浪费在了磁头寻址上,所以性能不高。

顺序写磁盘

image.png

Kafka中每个分区都是一个有序的、不可变的消息序列,新的消息不断追加到Partition的末尾,这个就是顺序写。由于磁盘有限,不可能保存所有数据,实际上作为消息系统的Kafka也没有必要保存所有数据,需要删除旧的数据。又由于是顺序写入,所以Kafka采用各种删除策略删除数据的时候,是将Partition分为多个Segment,每个Segment对应一个物理文件,通过删除整个文件的方式来删除Partition内的数据,这种清除数据的方式也避免了对文件的随机写操作。

充分利用Page Cache

引入Cache层的目的是为了提高Linux操作系统对磁盘访问的性能。Cache层在内存中缓存了磁盘上的部分数据,当数据的请求到达时,如果在Cache中存在该数据且是最新的,且直接将数据传递给应用程序,免除了对底层磁盘的操作,提高了性能。Cache层也正是磁盘IOPS为什么能突破200的主要原因之一。
在Linux层面,文件Cache分为两层,一是Page Cache,另一个是Buffer Cache,每一个Page Cache包含若干Buffer Cache,Page Cache主要用来作为文件系统上的文件数据的缓存来用,尤其是针对当进程对文件有read/write操作的时候,Buffer Cache主要是设计用来在系统对块设备进行读写时,对块进行数据缓存的系统来使用。

使用Page Cache有如下好处:

  • IO Scheduler会将连续的小块写组装成大块的物理写从而提高性能。
  • IO Scheduler会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间。
  • 充分利用所有空闲内存(非JVM内存)
  • 读操作可直接在Page Cache内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过Page Cache)交换数据。
  • 如果进程重启,JVM内的Cache会失效,但Page Cache仍然可用。

Broker在收到数据后,写磁盘只是将数据写入Page Cache,并不保证数据一定完全写入磁盘。虽然会在机器宕机时有丢失数据的可能,但是这种丢失只发生在机器断电操作系统不工作的情况,这种场景可以由Replica机制来解决。

零拷贝技术

Kafka中存在大量的网路数据持久化到磁盘(Producer到Broker)和磁盘文件通过网络发送(Broker到Consumer)的过程。这两个过程的性能直接影响到Kafka的整体吞吐量。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的权限。为了避免用户进程直接操作内核,保证内核安全,操作系统将虚拟内存划分为两部分,一部分是内核空间,一部分是用户空间。

传统的Linux系统中,标准的IO接口都是基于数据拷贝操作的,即IO操作会导致数据在内核地址空间的缓冲区和用户地址空间的缓冲区之间进行拷贝。这样做的好处是如果请求的数据已经存放在内核的高速缓冲存储器中,那么就可以减少实际的IO操作,但坏处是数据拷贝过程会导致CPU开销。

网络数据持久化到磁盘

传统模式下,数据从网络传输到文件需要4次数据拷贝、4次上下文切换和两次系统调用。

data = socket.read()// 读取网络数据 
File file = new File() 
file.write(data)// 持久化到磁盘 
file.flush()
复制代码

这一过程实际上发生了四次数据拷贝:

  • 首先通过DMA copy将网络数据拷贝到内核态Socket Buffer。
  • 然后应用程序将内核态Buffer数据读入用户态(CPU 拷贝)
  • 接着用户程序将用户态Buffer拷贝到内核态(CPU 拷贝)
  • 最后通过DMA copy将数据拷贝到磁盘文件。

(DMA:直接存储器访问。DMA是一种无需CPU的参与,让外设和系统内存之间进行双向数据传输的硬件机制。使用DMA可以使系统CPU从实际的IO数据传输过程中摆脱出来,从而大大提高系统的吞吐率)

image.png

数据落盘通常都是非实时的,充分利用Page Cache技术提高IO效率。对于Kafka来说,Producer生产的数据存到Broker,这个过程读取到socket buffer的网络数据,其实可以在内核空间完成落盘,并没有必要将socket buffer的网络数据,读取到应用进程缓冲区。在此场景下,接收到来自socket buffer的网络数据,应用进程不需要中间处理,可以使用mmap内存文件映射。
Memory Mapped Files:简称mmap,也有叫MMFile的,使用mmap的目的是将内核中读缓冲区的地址与用户空间的缓冲区进行映射。从而实现内核缓冲区和应用程序内存的共享,省去了将数据从内核读缓冲区拷贝到用户缓冲区的过程,工作原理是直接利用操作系统的Page来实现文件到物理内存的映射。完成映射之后你对物理内存的操作会被同步到磁盘上。使用这种方式可以获取很大的IO提升,省去了用户空间到内核空间复制的开销。mmap也有一个明显的缺陷:不可靠,写到mmap的数据并没有真正被写到硬盘,操作系统会在程序主动调用Flush的时候才把数据真正的写到硬盘。

image.png

零拷贝技术指在计算机执行操作时,CPU不需要先将数据从一个内存区域复制到另一个内存区域,从而可以减少上下文切换以及CPU的拷贝时间。它的作用是在数据从网络设备到用户程序空间传递的过程中,减少数据拷贝次数,减少系统调用,实现CPU的零参与,彻底消除CPU在这方面的负载。
目前零拷贝技术主要有三种类型:

  • 直接IO:数据跨过内核,在用户地址空间和IO设备之间传递,内核只是进行必要的虚拟存储配置等辅助工作。
  • 避免内核和用户空间之间的数据拷贝:当应用程序不需要对数据进行访问时,则可以避免将数据从内核拷贝到用户空间,mmap、sendfile、splice&&tee、sockmap。
  • copy on write:写时拷贝技术,数据不需要提前拷贝,而是当需要修改的时候再进行部分拷贝。

磁盘文件通过网络发送(Broker到Consumer)

传统方式实现:先读取磁盘,再用socket发送,实际也是四次Copy。

buffer = File.read 
Socket.send(buffer)
复制代码

这个过程类比上面的生产消息

  • 首先通过系统调用将文件数据读入内核态Buffer(DMA拷贝)
  • 然后应用程序将内核态Buffer数据读入到用户态Buffer(CPU拷贝)
  • 接着用户应用程序通过Socket发送数据时将用户态Buffer数据拷贝到内核态Buffer(CPU拷贝)
  • 最后通过DMA拷贝将数据拷贝到NIC Buffer。

Linux 2.4+内核通过sendfile系统调用,提供了零拷贝。数据通过DMA拷贝到内核态Buffer后,直接通过DMA拷贝到NIC Buffer,无需CPU拷贝,这也是零拷贝这一说法的来源。除了减少数据拷贝外,整个读文件和网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,因此大大提高了性能。

image.png

kafka在这里采用的方案是通过NIO的transferTo/transferFrom调用操作系统的sendfile实现零拷贝。总共发生了两次内核数据拷贝、两次上下文切换和一次系统调用,消除了CPU数据拷贝。

批处理和数据压缩

在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO。因此,除了操作系统提供的低级批处理之外,Kafka客户端和Broker还会在通过网络发送数据之前,在一个批处理中累积多条记录(包括读和写)。记录的批处理分摊了网络往返的开销,使用了更大的数据包从而提高了带宽利用率。
Producer可以将数据压缩之后发送给Broker,从而减少网络传输代价,目前支持的压缩算法有:Snappy,Gzip,LZ4。

结语

到这里,我们大概了解Kakfa设计的核心思想了,高可靠、高性能、高吞吐量是怎么做到的了?Partition-Segment-log/index文件的分区存储机制,ISR机制和ACK应答机制,Partition是怎么分配的以及动态的Rebalance机制,消费消息的分区分配的RoundRobin策略和Range策略,Kafka高性能高吞吐的五种设计:多partition并行处理、顺序写磁盘、充分利用Page Cache、零拷贝技术、批处理和数据压缩。你学会了吗?

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享