Kafka Producer 服务端原理剖析

一、启动

1.1 Acceptor

在服务端启动的时候, 会初始化 SocketServer , 里面会初始化 acceptor 线程和 processor 线程, 用来处理连接请求.

         /**
         *  这里在初始化 Acceptor 的时候,首先会创建出来对应的 selector 和 channel
         *  然后会启动默认的那三个 Porcessor 线程
         *  selector 默认监听 9092
         */
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        // 这里实际上是通过 java 去设置线程的属性并创建线程,然后调用 start 方法
        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()

        // 这里就是要阻塞在这里,等待线程创建完成
        // 这里认为线程创建完成的标记为:上面初始化好的 channel 注册到了 selector 中
        // 默认在 run 中关注的就是一个 ACCEPT 事件
        acceptor.awaitStartup()
复制代码

这里其实比较核心的就是这个 Acceptor 线程, 它在初始化的时候, 内部会初始化好 selector 和 channel , 并在内部启动三个 (默认) 的 Processor 线程, 然后将 Acceptor 自身启动, 通过 CountDownLatch 调用 awit 方法, 阻塞在这, 等待线程启动完毕, 在 accpetor 线程的 run() 方法中, 将初始化好的 channel 注册到 selector 上并监听 OP_ACCEPT 事件后就通过 CountDownLact 的 countdown 方法, 表示线程创建完毕, 解除之前的阻塞, 程序继续执行.

1.2 Processor

像 Processor 初始化的话, 内部会有一个比较核心的 connection 的连接队列, 会有自己的一个 Selector , 主要也是通过 run() 方法, 一直循环, 处理各种事务

无标题-2020-12-17-2209.png

当一个 Kafka 客户端要发起连接的时候, 肯定是先经过 Accpetor 线程, 当 selector 收到连接请求的时候, 会对 processor 线程进行一个轮询, 将请求交给 processor 线程进行处理, processor 这的话, 基本逻辑就是会通过 selectionKey 获取到 channle 信息, 然后对内部的一个计数器 + 1, 代表连接的数量, 如果连接的数量大于了最大值的话, 是会抛出异常, 拒绝连接的.

通过 channle 进行 accept 连接的建立, 然后设置一些 channel 的基本属性, 最后将这个 channel 加入到 processor 线程中的 connectior 队列中去, 然后唤醒 selector (主要是有可能这个 processor 的 selector 阻塞在 poll 方法那了, 所以需要唤醒一下) .

无标题-2020-12-17-2209 (2).png

Processor 线程在 run 方法的时候, 会从 connector 队列中进行 poll , 让获取到的 SocketChannel, 注册到自己内部的 KSelector 里去, 并监听 OP_READ 事件, 然后根据生成的 selectionKey 和 connectionId 等信息封装为一个 KafkaChannel , 并将 selectionKey 和 kafkaChannel 做一个关联, 最后有一个 Map 的数据结构, 存放 connectionId 和 KafkaChannel, 做了一个缓存.

Processor 线程的 run 方法也会一直对 selector 进行 Poll ,查看是否有客户端发送请求过来,主要还是通过 selectionKey 获取 channel 通过 channel 进行 Poll,获取到请求数据,将请求数据最终封装到 completedReceives 集合中去。这里有一点要注意的是,其实每次加入到 completedReceives 队列中的数据,只是请求队列中的第一个,如果****completedReceives 集合中的这个请求还没有收到响应结果的话,是不会放入第二个请求的。 在后续处理中,会将收到的请求放入 RequestQueue 里面。

Producer 连接原理 (2).png

1.3 KafkaRequestHandlerPool

在 kafka 服务启动的时候, 会初始化一个 KafkaApis 这样一个类, 里面主要就是负责处理各种请求, 之后会通过 apis 封装一个 KafkaRequestHandlePool , 用来处理 processor 封装好的请求 , 默认是有八个线程, 通过循环一直对上面封装好的 RequestQueue 进行 poll 获取到请求, 然后通过 kafkaApis 进行请求的处理.

通过 kafkaApi 处理比较关键的就是 replicaManager 将数据写入磁盘,然后调用回调函数,由于客户端发过来的请求是 <NodeId , Batch>,所以需要将其拆分为 <Partitions,Batch> ,然后进行处理,每个 Partition 都有自己对应的一个响应结果,请求的结果,会放入到每个 processor 线程对应的 responseQueue 中去。

之后 processor 会不断的从 response 队列中获取到响应信息, 然后通过 channel 获取到 Selector , 通过 selector 将响应消息暂存一下, 并监听 OP_WRITE 事件, 然后通过 processor 中的 poll 方法, 判断 channel 可以进行 write 操作之后, 就会将响应消息发送出去.

Producer 连接原理 (3).png

二、磁盘文件

在 Kafka 启动的时候, 会启动 LogManager 和 ReplicaManager 这两个类, 这两个类主要就是负责处理数据的读写, 首先是创建 LogManager , 然后 ReplicaManage 会包装住 LogManage.

2.1 LogManager

主要的逻辑就是, 根据我们配置的 log.dir 参数, 对每一个路径都会设置一个线程池, 默认数量的话就是 io thread 参数的值, 会读取路径下的文件, 封装 TopicAndPartition , 每个这个对象都会对应着他的文件.

private def loadLogs(): Unit = {
    info("Loading logs.")
        // 线程池任务
    val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
   // job 任务集合 (线程池任务)
    val jobs = mutable.Map.empty[File, Seq[Future[_]]]

    for (dir <- this.logDirs) {
      // 每个路径都设置一个默认为8 的线程池
      val pool = Executors.newFixedThreadPool(ioThreads)
      threadPools.append(pool)

      val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)

      var recoveryPoints = Map[TopicAndPartition, Long]()
      try {
        recoveryPoints = this.recoveryPointCheckpoints(dir).read
      } catch {
        case e: Exception => {
          warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
          warn("Resetting the recovery checkpoint to 0")
        }
      }

      val jobsForDir = for {
        dirContent <- Option(dir.listFiles).toList
        logDir <- dirContent if logDir.isDirectory
      } yield {
        CoreUtils.runnable {
          debug("Loading log '" + logDir.getName + "'")
                    // 处理路径下 topic和分区的关系 
          val topicPartition = Log.parseTopicPartitionName(logDir)
          val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
          val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
                    // 找到对应topic,parition 的日志文件
          val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
          // 放入之前封装好的 map 中,就是 topicPartition对应 log 文件
          val previous = this.logs.put(topicPartition, current)

          if (previous != null) {
            throw new IllegalArgumentException(
              "Duplicate log directories found: %s, %s!".format(
              current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
          }
        }
      }
            // submit 提交任务
      jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
    }
复制代码

2.2 ReplicaManager

当客户端发送一条消息, 经过上一阶段的网络连接之后, 会通过 KafkaApis 对不同请求进行处理, 是 PRODUCER 请求的话, 在建立好连接之后, 在最后会通过 replicaManager 对数据文件进行追加.

 def appendMessages(timeout: Long, // 超时时长
                     requiredAcks: Short, // ack 值
                     internalTopicsAllowed: Boolean,
                     messagesPerPartition: Map[TopicPartition, MessageSet], // 每个分区对应的 消息
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit) { // 回调函数

    /**
     *  当前仅支持3中 ack
     *  -1 : 不管结果
     *  1 : leader 成功即可
     *  0 : leader 和 所有的 follow 都成功
     */
    if (isValidRequiredAcks(requiredAcks)) {
      val sTime = SystemTime.milliseconds

      // 数据写入每个分区的磁盘文件中
      // 会获取到本地磁盘文件写入的结果
      val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
      debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
            // 将每个写入磁盘文件的结果进行封装
      val produceStatus = localProduceResults.map { case (topicPartition, result) =>
        topicPartition ->
                ProducePartitionStatus(
                  result.info.lastOffset + 1, // required offset
                  new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.timestamp)) // response status
      }
            // 如果有延迟操作
      if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {
        // create delayed produce operation
        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)

        // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
        val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

        // try to complete the request immediately, otherwise put it into the purgatory
        // this is because while the delayed produce operation is being created, new
        // requests may arrive and hence make this operation completable.
        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

      } else {
        // we can respond immediately
        val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
        // 封装响应回调
        responseCallback(produceResponseStatus)
      }
    } else {
      // ACK 传递参数超出规定范围
      val responseStatus = messagesPerPartition.map {
        case (topicAndPartition, messageSet) =>
          (topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
                                                      LogAppendInfo.UnknownLogAppendInfo.firstOffset,
                                                      Message.NoTimestamp))
      }
      responseCallback(responseStatus)
    }
  }
复制代码

在后续通过 appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks) 追加数据的时候, 会首先检查这个 TOPIC 是不是内部 Topic , 如果是内部 Topic 是不允许进行写入的, 如果不是的话, 就通过 partition.appendMessagesToLeader(messages.asInstanceOf[ByteBufferMessageSet], requiredAcks) 对对应的分区文件进行追加, 它的内部核心是通过 Segment , 这样的一个对应, 对这个 Segment 进行追加, 完成之后更新 LEO.

如果 Segment 写满了或者写不完这一批数据的话, 就会获取到 LEO , 用这个 LEO 作为新文件的文字, 创建 .log 文件和 .index 文件, 并构建出来一个新的 LogSegment, 通过这个 Segment 来进行数据的追加.

def append(offset: Long, messages: ByteBufferMessageSet) {
    if (messages.sizeInBytes > 0) {
      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
      // append an entry to the index (if needed)
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        // 如果需要追加 索引,要先追加索引文件
        // 本质是一个稀疏索引,并不是每一条消息都有一个索引与之对应
        // 默认就是写入 4096 个字节数据之后,就会创建一个稀疏索引
        index.append(offset, log.sizeInBytes())
        this.bytesSinceLastIndexEntry = 0
      }
      // append the messages
      log.append(messages)
      this.bytesSinceLastIndexEntry += messages.sizeInBytes
    }
  }
复制代码
第一步:

先判断是否要写入 .index 的索引文件,因为这个 .index 索引是一个稀疏索引,不是说每一条数据都有一个索引与之对应,默认的话是每隔 4096 个字节写入一条稀疏索引,这里写稀疏索引主要是通过 MappedByteBuffer 这个来实现的,它是做了一个和 os cache 的映射,所以在写索引的时候,其实数据是写入到 cache 中,而不是写入到磁盘中去。index.append(offset, log.sizeInBytes()) 这个调用写索引的参数,offset 就代表索引文件中的 offset ,lg.sizeInBytes 就是用来确认 log 文件的物理位置。

def append(offset: Long, position: Int) {
    inLock(lock) {
      // 第1步:判断索引文件未写满
      require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
      // 第2步:必须满足以下条件之一才允许写入索引项: 
      // 条件1:当前索引文件为空 
      // 条件2:要写入的位移大于当前所有已写入的索引项的位移——Kafka规定索引项中的位移值必须是单调增加的
      if (_entries == 0 || offset > _lastOffset) {
        debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName))

        /**
         *  这里就是基于 MappedByteBuffer 来实现的
         *  实际上写入的就是 os cache 而不是磁盘文件
         */
        mmap.putInt((offset - baseOffset).toInt) // 一个稀疏索引 对应一个文件的物理位置
        mmap.putInt(position)
        // 第4步:更新其他元数据统计信息,如当前索引项计数器_entries和当前索引项最新位移值_lastOffset _entries += 1
        _entries += 1
        _lastOffset = offset
        require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
      } else {
        throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
          .format(offset, _entries, _lastOffset, _file.getAbsolutePath))
      }
    }
  }
复制代码

写入 .index 索引文件.png

第二步:

通过 Log 类将数据通过 .log 文件对应的 channel 将数据写入 OS Cache , 当数据写完之后, 更新 LEO 的值.

第三步:

判断未刷入磁盘的数据是否大于 flushInterval 的值, 如果大于的话, 通过 flush 方法, 将数据执行落盘操作, 如果是 .log 文件的话, 就是通过 channel 的 force() 将数据刷入磁盘, 如果是 索引文件的话, 就直接通过 mmap 的 force() 将数据落盘. 最后返回一个结果, 然后通过层层回调, 将结果封装成响应信息, 交由 KafkaApis 将响应信息返回.

********************************************************
*********************************************************
*

运营活动审批流程 (2).png

05_Kafka服务器的源码剖析 (17).jpg

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享