一、启动
1.1 Acceptor
在服务端启动的时候, 会初始化 SocketServer , 里面会初始化 acceptor 线程和 processor 线程, 用来处理连接请求.
/**
* 这里在初始化 Acceptor 的时候,首先会创建出来对应的 selector 和 channel
* 然后会启动默认的那三个 Porcessor 线程
* selector 默认监听 9092
*/
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
// 这里实际上是通过 java 去设置线程的属性并创建线程,然后调用 start 方法
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
// 这里就是要阻塞在这里,等待线程创建完成
// 这里认为线程创建完成的标记为:上面初始化好的 channel 注册到了 selector 中
// 默认在 run 中关注的就是一个 ACCEPT 事件
acceptor.awaitStartup()
复制代码
这里其实比较核心的就是这个 Acceptor 线程, 它在初始化的时候, 内部会初始化好 selector 和 channel , 并在内部启动三个 (默认) 的 Processor 线程, 然后将 Acceptor 自身启动, 通过 CountDownLatch 调用 awit 方法, 阻塞在这, 等待线程启动完毕, 在 accpetor 线程的 run() 方法中, 将初始化好的 channel 注册到 selector 上并监听 OP_ACCEPT 事件后就通过 CountDownLact 的 countdown 方法, 表示线程创建完毕, 解除之前的阻塞, 程序继续执行.
1.2 Processor
像 Processor 初始化的话, 内部会有一个比较核心的 connection 的连接队列, 会有自己的一个 Selector , 主要也是通过 run() 方法, 一直循环, 处理各种事务
当一个 Kafka 客户端要发起连接的时候, 肯定是先经过 Accpetor 线程, 当 selector 收到连接请求的时候, 会对 processor 线程进行一个轮询, 将请求交给 processor 线程进行处理, processor 这的话, 基本逻辑就是会通过 selectionKey 获取到 channle 信息, 然后对内部的一个计数器 + 1, 代表连接的数量, 如果连接的数量大于了最大值的话, 是会抛出异常, 拒绝连接的.
通过 channle 进行 accept 连接的建立, 然后设置一些 channel 的基本属性, 最后将这个 channel 加入到 processor 线程中的 connectior 队列中去, 然后唤醒 selector (主要是有可能这个 processor 的 selector 阻塞在 poll 方法那了, 所以需要唤醒一下) .
Processor 线程在 run 方法的时候, 会从 connector 队列中进行 poll , 让获取到的 SocketChannel, 注册到自己内部的 KSelector 里去, 并监听 OP_READ 事件, 然后根据生成的 selectionKey 和 connectionId 等信息封装为一个 KafkaChannel , 并将 selectionKey 和 kafkaChannel 做一个关联, 最后有一个 Map 的数据结构, 存放 connectionId 和 KafkaChannel, 做了一个缓存.
Processor 线程的 run 方法也会一直对 selector 进行 Poll ,查看是否有客户端发送请求过来,主要还是通过 selectionKey 获取 channel 通过 channel 进行 Poll,获取到请求数据,将请求数据最终封装到 completedReceives 集合中去。这里有一点要注意的是,其实每次加入到 completedReceives 队列中的数据,只是请求队列中的第一个,如果****completedReceives 集合中的这个请求还没有收到响应结果的话,是不会放入第二个请求的。 在后续处理中,会将收到的请求放入 RequestQueue 里面。
1.3 KafkaRequestHandlerPool
在 kafka 服务启动的时候, 会初始化一个 KafkaApis 这样一个类, 里面主要就是负责处理各种请求, 之后会通过 apis 封装一个 KafkaRequestHandlePool , 用来处理 processor 封装好的请求 , 默认是有八个线程, 通过循环一直对上面封装好的 RequestQueue 进行 poll 获取到请求, 然后通过 kafkaApis 进行请求的处理.
通过 kafkaApi 处理比较关键的就是 replicaManager 将数据写入磁盘,然后调用回调函数,由于客户端发过来的请求是 <NodeId , Batch>,所以需要将其拆分为 <Partitions,Batch> ,然后进行处理,每个 Partition 都有自己对应的一个响应结果,请求的结果,会放入到每个 processor 线程对应的 responseQueue 中去。
之后 processor 会不断的从 response 队列中获取到响应信息, 然后通过 channel 获取到 Selector , 通过 selector 将响应消息暂存一下, 并监听 OP_WRITE 事件, 然后通过 processor 中的 poll 方法, 判断 channel 可以进行 write 操作之后, 就会将响应消息发送出去.
二、磁盘文件
在 Kafka 启动的时候, 会启动 LogManager 和 ReplicaManager 这两个类, 这两个类主要就是负责处理数据的读写, 首先是创建 LogManager , 然后 ReplicaManage 会包装住 LogManage.
2.1 LogManager
主要的逻辑就是, 根据我们配置的 log.dir 参数, 对每一个路径都会设置一个线程池, 默认数量的话就是 io thread 参数的值, 会读取路径下的文件, 封装 TopicAndPartition , 每个这个对象都会对应着他的文件.
private def loadLogs(): Unit = {
info("Loading logs.")
// 线程池任务
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
// job 任务集合 (线程池任务)
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
for (dir <- this.logDirs) {
// 每个路径都设置一个默认为8 的线程池
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
var recoveryPoints = Map[TopicAndPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
case e: Exception => {
warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
warn("Resetting the recovery checkpoint to 0")
}
}
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
CoreUtils.runnable {
debug("Loading log '" + logDir.getName + "'")
// 处理路径下 topic和分区的关系
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
// 找到对应topic,parition 的日志文件
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
// 放入之前封装好的 map 中,就是 topicPartition对应 log 文件
val previous = this.logs.put(topicPartition, current)
if (previous != null) {
throw new IllegalArgumentException(
"Duplicate log directories found: %s, %s!".format(
current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
}
// submit 提交任务
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
}
复制代码
2.2 ReplicaManager
当客户端发送一条消息, 经过上一阶段的网络连接之后, 会通过 KafkaApis 对不同请求进行处理, 是 PRODUCER 请求的话, 在建立好连接之后, 在最后会通过 replicaManager 对数据文件进行追加.
def appendMessages(timeout: Long, // 超时时长
requiredAcks: Short, // ack 值
internalTopicsAllowed: Boolean,
messagesPerPartition: Map[TopicPartition, MessageSet], // 每个分区对应的 消息
responseCallback: Map[TopicPartition, PartitionResponse] => Unit) { // 回调函数
/**
* 当前仅支持3中 ack
* -1 : 不管结果
* 1 : leader 成功即可
* 0 : leader 和 所有的 follow 都成功
*/
if (isValidRequiredAcks(requiredAcks)) {
val sTime = SystemTime.milliseconds
// 数据写入每个分区的磁盘文件中
// 会获取到本地磁盘文件写入的结果
val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
// 将每个写入磁盘文件的结果进行封装
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.timestamp)) // response status
}
// 如果有延迟操作
if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
// 封装响应回调
responseCallback(produceResponseStatus)
}
} else {
// ACK 传递参数超出规定范围
val responseStatus = messagesPerPartition.map {
case (topicAndPartition, messageSet) =>
(topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
LogAppendInfo.UnknownLogAppendInfo.firstOffset,
Message.NoTimestamp))
}
responseCallback(responseStatus)
}
}
复制代码
在后续通过 appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) 追加数据的时候, 会首先检查这个 TOPIC 是不是内部 Topic , 如果是内部 Topic 是不允许进行写入的, 如果不是的话, 就通过 partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks) 对对应的分区文件进行追加, 它的内部核心是通过 Segment , 这样的一个对应, 对这个 Segment 进行追加, 完成之后更新 LEO.
如果 Segment 写满了或者写不完这一批数据的话, 就会获取到 LEO , 用这个 LEO 作为新文件的文字, 创建 .log 文件和 .index 文件, 并构建出来一个新的 LogSegment, 通过这个 Segment 来进行数据的追加.
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
// 如果需要追加 索引,要先追加索引文件
// 本质是一个稀疏索引,并不是每一条消息都有一个索引与之对应
// 默认就是写入 4096 个字节数据之后,就会创建一个稀疏索引
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
// append the messages
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
复制代码
第一步:
先判断是否要写入 .index 的索引文件,因为这个 .index 索引是一个稀疏索引,不是说每一条数据都有一个索引与之对应,默认的话是每隔 4096 个字节写入一条稀疏索引,这里写稀疏索引主要是通过 MappedByteBuffer 这个来实现的,它是做了一个和 os cache 的映射,所以在写索引的时候,其实数据是写入到 cache 中,而不是写入到磁盘中去。index.append(offset, log.sizeInBytes()) 这个调用写索引的参数,offset 就代表索引文件中的 offset ,lg.sizeInBytes 就是用来确认 log 文件的物理位置。
def append(offset: Long, position: Int) {
inLock(lock) {
// 第1步:判断索引文件未写满
require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
// 第2步:必须满足以下条件之一才允许写入索引项:
// 条件1:当前索引文件为空
// 条件2:要写入的位移大于当前所有已写入的索引项的位移——Kafka规定索引项中的位移值必须是单调增加的
if (_entries == 0 || offset > _lastOffset) {
debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName))
/**
* 这里就是基于 MappedByteBuffer 来实现的
* 实际上写入的就是 os cache 而不是磁盘文件
*/
mmap.putInt((offset - baseOffset).toInt) // 一个稀疏索引 对应一个文件的物理位置
mmap.putInt(position)
// 第4步:更新其他元数据统计信息,如当前索引项计数器_entries和当前索引项最新位移值_lastOffset _entries += 1
_entries += 1
_lastOffset = offset
require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
} else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
.format(offset, _entries, _lastOffset, _file.getAbsolutePath))
}
}
}
复制代码
第二步:
通过 Log 类将数据通过 .log 文件对应的 channel 将数据写入 OS Cache , 当数据写完之后, 更新 LEO 的值.
第三步:
判断未刷入磁盘的数据是否大于 flushInterval 的值, 如果大于的话, 通过 flush 方法, 将数据执行落盘操作, 如果是 .log 文件的话, 就是通过 channel 的 force() 将数据刷入磁盘, 如果是 索引文件的话, 就直接通过 mmap 的 force() 将数据落盘. 最后返回一个结果, 然后通过层层回调, 将结果封装成响应信息, 交由 KafkaApis 将响应信息返回.
********************************************************
**********************************************************