一、Controller选举
在 Kafka 启动的时候,会初始化 KafkaController 组件和 zkUtils 组件,前者用来管理 Kafka 集群,后者用来操作 Zookeeper ,因为 Kafka 是重度依赖于 Zk 的,很多的元数据信息都是放在 Zk 中的。
在 kafkaController 组件中,主要就是通过 controllerElector 进行选举,首先是会对 /controller 这个 node,进行监控,由leaderChangeListener 监听器对数据变更做出相应的处理
def startup {
inLock(controllerContext.controllerLock) {
// 在 /controller node 上,注册一个监听器
// 效果就是说,只要这个节点上的数据发生了变化,就会感知到,比如 controller 换人了,或者 controller 挂了,或者有人成为 controller
// 通过 leaderChangeListener 对变化进行相应的处理
controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
// 发起选举
elect
}
}
复制代码
在选举的时候,会首先尝试去 /controller 路径下获取 controller id ,如果存在的话,就直接返回 id ,如果不存在就返回 -1 , 通过 zkUtils 尝试去创建 /controller 以及下属的 broker 信息(”version” -> 1, “brokerid” -> brokerId, “timestamp” -> timestamp ), zk 的话,保证了同一时间只有一个可以创建成功,创建成功之后,就将自己所在的 broker 设置为 controller,如果 controller 发生了宕机的话,就会通过之前创建的监听器(leaderChangeListener ),通过 elect 进行重新选举。
def handleDataDeleted(dataPath: String) {
inLock(controllerContext.controllerLock) {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
.format(brokerId, dataPath))
if(amILeader)
onResigningAsLeader()
elect
}
}
复制代码
二、Broker 注册
在 kafka 启动的时候,还有一个叫做 KafkaHealthCheck 这样的一个组件,他用来负责在 Kafka 启动的时候,将 BrokerId,和 Broker 的一些基本信息 注册到 Zk 上面去,主要就是在 /broker/ids 下面创建自己BrokerId 的节点,如果 /broker/ids 路径还没有的话,会进行递归创建。
def registerBrokerInZk(id: Int,
host: String,
port: Int,
advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint],
jmxPort: Int,
rack: Option[String],
apiVersion: ApiVersion) {
val brokerIdPath = BrokerIdsPath + "/" + id
val timestamp = SystemTime.milliseconds.toString
val version = if (apiVersion >= KAFKA_0_10_0_IV1) 3 else 2
var jsonMap = Map("version" -> version,
"host" -> host,
"port" -> port,
"endpoints" -> advertisedEndpoints.values.map(_.connectionString).toArray,
"jmx_port" -> jmxPort,
"timestamp" -> timestamp
)
rack.foreach(rack => if (version >= 3) jsonMap += ("rack" -> rack))
val brokerInfo = Json.encode(jsonMap)
registerBrokerInZk(brokerIdPath, brokerInfo) // 创建临时节点
info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
}
复制代码
三、集群感知
当 controller 选举完毕之后,会调用一个 onControllerFailover 回调函数,它会在 /broker/ids 节点上面增加一个 listener 用来监听数据变更,每次就计算一下,新注册的 Broker 和 死掉的 Broker ,然后就是封装自身的各种信息和数据结构,通过构建的 channelManager 将更新元数据的请求发送给集群中非 Controller 节点。
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
ControllerStats.leaderElectionTimer.time {
try {
val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
val curBrokerIds = curBrokers.map(_.id)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds // 新注册的
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds // 死掉的
val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
controllerContext.liveBrokers = curBrokers
val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
.format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker) // 添加 broker
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker) // 删除 broker
if(newBrokerIds.size > 0)
// 如果有新的,处理
controller.onBrokerStartup(newBrokerIdsSorted)
if(deadBrokerIds.size > 0)
// 如果有死掉的,处理
controller.onBrokerFailure(deadBrokerIdsSorted)
} catch {
case e: Throwable => error("Error while handling broker changes", e)
}
}
}
}
}
复制代码
在 KafkaApis 对象中,由 handleUpdateMetadataRequest 方法更新元数据请求进行处理,这里比较简单,就是将当前 Broker 内部缓存的集群节点信息清空,以这个请求过来的数据为准,在更新完毕之后,再通过 channel 将响应结果返回回去。
四、Topic
4.1 Topic 创建
创建 Topic 其实就是通过 TopicCommand 设置 name ,partition , replica 进行创建,但这里会保证分区副本的一个均匀分布,将 Topic 信息和分区分配的方案写入到 /brokers/topics/ 路径下。
Controller 在选举完毕之后,会初始化一个对于 Topic 路径的监听器,当发现数据发生变更的时候,会将这个 Topic 的新的信息发送给其余的 Broker ,这样就保证了每个 Broker 都有相应的元数据信息。
4.2 Leader 宕机
Producer 客户端往 Topic 发送数据的话,就是会从 Broker 中先获取元数据,根据 leader 封装 Batch ,然后发给 Broker ,leader 写入本地磁盘,follow 同步数据。
Broker 挂掉之后, Controller 会触发对这个 Broker 上面所有 leader 的重新选举,然后将选举结果推送给各个 Broker ,在选举完成之前, Producer 往这个 broker 上面写数据,肯定是一直失败的,Producer 失败之后,就会将连接断开,客户端一旦发现有断开的连接,就会去重新拉取元数据信息,这时就可以找到要写入数据所有的新的 leader ,然后在重试的时候,数据就可以写入。
4.3 副本重平衡
如果要进行分区副本重平衡的话,是需要通过脚本执行指定文件来做的,在实现的时候,会先获取到 Topic 之前的分配策略,将新的分配策略写入到 /admin/reassign_partitions 里面,在 Controller 选举完之后,会有一个 listener 去监听 /admin/reassign_partitions 这个路径,一旦数据发生变更,就会执行一个 rebalance
For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
* may go through the following transition.
* AR leader/isr
* {1,2,3} 1/{1,2,3} (initial state)
* {1,2,3,4,5,6} 1/{1,2,3} (step 2) 更新 ZK ,在ZK中放入新的副本
一旦副本发生了更新, Controller 是可以感知到的,并将元数据信息都推送给别的 Broker
当 broker 感知到自己负责的 follow 发生变更之后,就会通过 fetch 请求,和 leader 进行同步
* {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
* 这样的话,其实就是在新机器上开启了新副本,作为 follow ,在跟 leader 进行一个 sync 同步
* {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
* 等待上面的 follow 和 leader 完全一致的时候,会从新副本中选举出来 leader
* {1,2,3,4,5,6} 4/{4,5,6} (step 8)
* 在从 ISR 列表中,将老的副本删除掉
* {4,5,6} 4/{4,5,6} (step 10)
* 从 zk 中也删除老的副本
复制代码