Spark源码解析04-Submit提交流程及SparkContext准备流程

1、前言

由前面的文章Spark源码解析03-Submit提交流程及Driver启动流程我们已经知道了Spark的重要角色Driver的启动,既DriverWrapper

我们回过头看下cluster提交流程

  1. 任务提交后,Master(资源管理器)会找到一个Worker(节点)启动Driver进程,Driver启动后向Master注册应用程序
  2. Master根据submit脚本的资源配置,启用相应的Worker
  3. 在Worker上启动所有的Executor,Executor反向到Driver注册
  4. Driver开始执行main函数,之后执行到Action算子时,开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行

从提交流程我们知道,我们只启动了Driver,后续的注册操作还没有完成,下面我们将从SparkContext源码的角度解析后续的流程

2、SparkContext准备流程01

Driver的启动意味我们的代码将被执行,写过Spark程序的读者对下面这两段代码并不陌生,既创建SparkContext,下面我们将对SparkContext的创建进行源码跟踪,Spark看看在SparkContext中为我们准备哪些服务

val conf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
复制代码

本文使用spark源码版本为 2.3.4,我们直奔主题进入SparkContext,看下注释“Only one SparkContext may be active per JVM”,每个JVM只能有一个活动的SparkContext,既可以理解为一个Spark程序有且仅有一个SparkContext

11-SparkContext-01

接下来我们看下SparkContext的24个属性,可以看到SparkContext封装了一系列属性,其中不乏有SparkEnv这种贯穿整个计算框架的核心服务,还有在其他Spark文章都能看到的DAGScheduler用于Stage任务划分服务等,统统别封装在SparkContext

11-SparkContext-02

这边我们挑几个重点的讲解一下

// SparkEnv 计算层的核心服务,贯穿整个application计算【这个内容后续文章介绍】
private var _env: SparkEnv = _
// 内持有与DriverEndpoint、ClientEndpoint
private var _schedulerBackend: SchedulerBackend = _
// 内持有SchedulerBackend的引用
private var _taskScheduler: TaskScheduler = _
// DAGScheduler Stage划分的核心类
@volatile private var _dagScheduler: DAGScheduler = _
复制代码

3、SparkContext准备流程02

下面我们来看SchedulerBackend和TaskScheduler的初始化,来到SparkContext的492行的位置,可以看到两个属性分别被赋值sched和ts,是通过SparkContext.createTaskScheduler()方法,然后再调用_taskScheduler.start(),启动TaskScheduler服务

11-SparkContext-03

下面我们看下SparkContext.createTaskScheduler()方法,由于代码篇幅过长,这边我缩略了部分代码,延续上一篇文章Spark源码解析03-Submit提交流程及Driver启动流程的内容,我们使用的模式是cluster,既提交url是spark://127.0.0.1,匹配到的类型刚好就是截图里的第四个case

11-SparkContext-createTaskScheduler()

下面我们简化下代码流程

val scheduler = new TaskSchedulerImpl(sc)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
复制代码

进入scheduler.initialize(backend)方法,我们看到TaskSchedulerImpl中的backend属性被赋值为StandaloneSchedulerBackend

13-TaskScheduler.initialize()

从返回的tuple2可以知道,ts被赋值是TaskSchedulerImpl,sched被赋值StandaloneSchedulerBackend且内部持有TaskSchedulerImpl引用,我们用图将我们知道的属性汇总下

11-SparkContext-属性-01

4、SparkContext准备流程03

看完SparkContext.createTaskScheduler(),我们继续看下_taskScheduler.start(),启动TaskScheduler服务,这边我们忽略掉部分细节,只需知道start()方法中,主要调用了**backend.start()**方法,这里的backend就是我们在构造TaskScheduler.initialize()传入的StandaloneSchedulerBackend,既:TaskScheduler.start()实际是调用了StandaloneSchedulerBackend.start()方法

13-TaskScheduler.start()

进入StandaloneSchedulerBackend.start()方法,又调用了super.start()

从截图我们可以知道,StandaloneSchedulerBackend的父类就是CoarseGrainedSchedulerBackend,既:StandaloneSchedulerBackend.start()实际是调用了其父类CoarseGrainedSchedulerBackend的start()方法

14-StandaloneSchedulerBackend.start()

14-StandaloneSchedulerBackend.start()-02

下面我们再进入CoarseGrainedSchedulerBackend.start()方法,这边有个很熟悉的方法rpcEnv.setupEndpoint()方法,这里不做过多解释有兴趣的读者去看前面的文章Spark源码解析01-Master启动流程即可,我们需要关注的是DriverEndpoint注入到RpcEnv环境中,同时DriverEndpoint的onStart()会被后台线程执行

driverEndpoint = createDriverEndpointRef(properties)

rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
复制代码

看过资源层文章的读者肯定知道,所谓的Endpoint就是Rpc通信的一个端点,持有某个角色的端点就可以与该角色进行通信

15-CoarseGrainedSchedulerBackend.start()

下面我们把上面的SparkContext属性图补一补

11-SparkContext-属性-02

5、SparkContext准备流程04

我们继续回到StandaloneSchedulerBackend.start(),看到StandaloneSchedulerBackend类第114行代码,下面我们简化下代码逻辑

// 准备DriverEndpoint
super.start()

// 所需参数封装
......
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)

// 封装application信息
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
// client赋值
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
// 准备ClientEndpoint
client.start()
复制代码

14-StandaloneSchedulerBackend.start()-02

前面我们已经研究完,super.start()方法,现在我们来继续往下看,在构建StandaloneAppClient的时候,将command传入其中,这里我们记住org.apache.spark.executor.CoarseGrainedExecutorBackend被传入到StandaloneAppClient即可

下面继续,client.start()方法,StandaloneAppClient类第276行代码,ClientEndpoint注入到RpcEnv环境中,同时ClientEndpoint的onStart()会被后台线程执行

rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
复制代码

16-StandaloneAppClient.start()

下面我们把上面的SparkContext属性图补一补

11-SparkContext-属性-03

6、SparkContext准备流程05

继续我们的源码分析,从上面我们知道StandaloneAppClient中持有ClientEndpoint端点,且该端点的onStart()会被后台线程调起,下面我们来看下这个onStart()具体在做什么,StandaloneAppClient第86行代码,调用了registerWithMaster()方法

17-ClientEndpoint.onStart()

registerWithMaster()方法中,又调用了tryRegisterAllMasters()方法

17-ClientEndpoint.registerWithMaster()

进入tryRegisterAllMasters(),我们看到代码第106行跟107行,拿到master的引用,先master发送RegisterApplication消息,这不正符合我们提交流程里:Driver启动后向Master注册

// 拿到master端点引用
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// 向master发送RegisterApplication消息,注册application
masterRef.send(RegisterApplication(appDescription, self))
复制代码

17-ClientEndpoint.tryRegisterAllMasters()

至此,我们关于SparkContext的准备流程已经跟完了,SparkContext在准备过程中的准备了StandaloneSchedulerBackend就是为了构建与Master的通信且向Master发送注册消息(RegisterApplication),同时我们也知道SparkContext封装了很多属性,如:ShuffleManager、DAGScheduler等

11-SparkContext-属性-04

7、总结

Driver在运行我们的main方法是会涉及到SparkContext在准备,在此过程中SparkContext同时会准备好通信环境且向Master发送注册消息RegisterApplication,这也满足了提交流程里:Driver启动后向Master注册的步骤。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享