1、前言
上一章我们已经跟踪完了Master的启动过程以及相应的一些RpcEnv环境准备工作,我们知道Master是RpcEndpoint,其它角色想要跟Master进行通信,就必须有Master的引用,既RpcEndpointRef类,同时RpcEndpointRef必须有发送消息的方法,而RpcEndpoint则必须有接收消息并且返回的方法,下面我们回到这两个类的源代码
RpcEndpoint可以清晰的看到两个方法:
// 接收消息不用返回
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}
// 接收消息需要返回信息
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}
复制代码
回到RpcEndpointRef类,也可以清晰的看到两个方法:
// 对应RpcEndpoint.receive()方法
def send(message: Any): Unit
// 对应RpcEndpoint.receiveAndReply()方法
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
复制代码
这里我们可以推测下,Worker需要与Master进行通信,既Worker需要有RpcEnv,又需要持有Master的引用才能实现与Master进行通信,下面我们将针对我们的猜测进行跟踪
2、Worker启动流程
打开Worker类图,可见Worker也是一个伴生对象,直接从main()方法入手
与Master类型,调用了startRpcEnvAndEndpoint()方法
点进Worker.startRpcEnvAndEndpoint()方法,可以看到两个熟悉的方法RpcEnv.create(),这个方法我们在上一篇文章已经解释过了,就是准备RpcEnv通信环境,准备好相应的服务,另外一个就是rpcEnv.setupEndpoint()将端点注册到Rpc环境并通过调用该端点的OnStart()方法启动该端点实例
看下Worker的类属性,可以看到Worker也是一个RpcEndpoint是类,同时我们也可以看到一个重要的属性:
masterRpcAddresses: Array[RpcAddress]
复制代码
一个数组,用户存放Master的地址,且类型是RpcAddress,这与Inbox的一个属性一致,只要拿到了Master地址才能与Master进行Rpc通信
下面我们继续来看下Worker启动需要与Master进行哪些通信,进入Worker.onStart()方法,可以看到我们要关注的重点registerWithMaster()方法
进入Worker.registerWithMaster()方法,找到我们关注的另一个方法 tryRegisterAllMasters(),从方法名我们可以得知,向所有Master进行注册
进入Worker.tryRegisterAllMasters() 方法,这边我们要解释下代码逻辑:
// 1、遍历所有Master地址
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
// 2、根据Master地址获取Master引用,为了通信
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
/**
* 向Master发送注册消息
*/
sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
复制代码
从上面代码逻辑可以看到,Worker保存有Master的通信地址,就相当于拿到了跟Master进行通信的入口
进入Worker.sendRegisterMessageToMaster()方法,可以看到拿到Master引用,调用send()方法,向Master发送RegisterWorker()消息,其中包含Worker的信息
这边我们回到Master的receive()方法,找到相应的消息,查看处理逻辑:
if (state == RecoveryState.STANDBY) {
// Master 是STANBY模式,返回Master消息
} else if (idToWorker.contains(id)) {
// 该Worker已经注册,返回已注册消息
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
// 记录以及校验Worker信息
if (registerWorker(worker)) {
// 加入注册成功集合
persistenceEngine.addWorker(worker)
/**
* 注册成功,发送一条消息给Worker
*/
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
schedule()
} else {
// 该Worker已经注册过,返回注册失败消息
}
复制代码
这边我们还需要关注下,Master.registerWorker()方法,用于剔除该Worker的旧的历史信息,保留最新的一份,并且更新workerAddress,以便后续Master调用Worker分配资源等操作
回到主流程,Worker注册完后,Master还有后续操作,向Worker发送注册成功的消息RegisteredWorker;
我们到Worker.receive()寻找RegisteredWorker消息的处理逻辑,可以看到RegisteredWorker是RegisterWorkerResponse的实现,处理逻辑走handleRegisterResponse()方法
进入Worker.handleRegisterResponse()方法,这边我们照样简化下代码逻辑:
// 注册成功
registered = true
// 修改Master的地址
changeMaster(masterRef, masterWebUiUrl, masterAddress)
// 启动调度发送心跳
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
// 注册成功,开始调度发送心跳,向自己发送SendHeartbeat消息
// 这里大家肯定会有异或,为什么是向自己发送,直接发送给Master不就好了?
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
// 后续操作
复制代码
Worker收到Master的注册成功的消息RegisteredWorker后,会更新本地记录的Master地址,与当前Master保持一致,同时启动调度,定期向Master发送心跳信息
self.send(SendHeartbeat),向自己发送消息原因:
- 异步解耦,调度器负责定时将心跳消息放入消息队列里面,至于心跳消息的发送,由消费线程去处理消息并发送心跳
回到Worker.receive()方法,可以看到sendToMaster()向Master发送心跳,并且将workerId带过去
最后再回到Master.receive()方法,查看Heartbeat消息处理逻辑,这个逻辑就比较简单了,如果存在则更新最后心跳的时间,如果不存则要求重新注册
至此,当Worker心跳消息完成通信后,也代表着Worker启动完成,且与Master成功完成通信
跟踪完Master与Worker启动,也就证明了Spark的资源层的启动我们全部跟完了
下面将worker加入到我们的源码总结中
3、总结
- RpcEndpoint 拥有receive()和receiveAndReply()方法,与之对应的是RpcEndpointRef的send()和ask()方法
- 要想与其他角色通信必须拥有RpcEnv环境,这个是通信的基础,其次必须持有要通信对象的引用RpcEndpointRef,属性中包含了通信地址RpcAddress
- Worker同样具有RpcEnv环境,且Worker属性中记录有所有Master的通信地址,用户创建Master引用类用于与Master通信