浅析SparkContext中的组件与创建流程
前言
在Spark框架中,应用程序的提交离不开Spark Driver,而Spark Driver的初始化始终围绕SparkContext的初始化,可以说SparkContext是Spark程序的发动机引擎,有了它程序才能跑起来,在spark-core中,SparkContext重中之重,它提供了很多能力,比如生成RDD,比如生成广播变量等,所以学习SparkContext的组件和启动流程有助于剖析整个Spark内核的架构。
SparkContext组件概览
在SparkContext中包含了整个框架中很重要的几部分:
- SparkEnv:Spark的运行环境,Executor会依赖它去执行分配的task,不光Executor中有,同时为了保证本地模式任务也能跑起来,Driver中也有
- SparkUI:Spark作业的监控页面,底层并没有采用前端技术,纯后端实现,用以对当前SparkJob的监控和调优,可以从页面观察到目前的Executor的jvm信息,每个job的stage划分和task划分,同时还可以观察到每个task处理的数据,用以发现数据是否倾斜
- DAGScheduler:DAG调度器,是SparkJob调度系统的重要组件之一,负责创建job,根据RDD依赖情况划分stage,提交stage,将作业划分成一个有向无环图
- TaskScheduler:任务调度器,是SparkJob调度系统的重要组件之一,负责按照调度算法将DAGScheduler创建的task分发至Executor,DAGScheduler是它的前置调度
- SparkStatusTracker:提供对作业、Stage的监控
- ConsoleProcessBar:利用SparkStatusTracker提供监控信息,将任务进度以日志的形式打印到终端中
- HearbeatReceiver:心跳接收器,所有Executor都会定期向它发送心跳信息,用以统计存活的Executor,此信息会一直同步给TaskScheduler,用以保证TaskScheduler去分发task的时候会挑选合适的Executor
- ContextCleaner:上下文清理器,用异步的方式去清理那些超出应用作用域范围的RDD、ShuffleDependency和Broadcast
- LiveListenerBus:SparkContext中的事件总线,可以接收各个组件的事件,并且通过异步的方式对事件进行匹配并调用不同的回调方法
- ShutdownHookManager:关闭时的钩子管理器,用以做一些清理工作,比如资源释放等
- AppStatusStore:存储Application状态数据,在2.3.0之后的版本引入
- EventLoggingListener(可选):将事件持久化到存储的监听器,通过
spark.eventLog.enabled
进行控制 - ExecutorAllocationManager(可选):Executor动态分配管理器,根据工作负载状态动态调整Executor的数量,通过属性
spark.dynamicAllocation.enabled
和spark.dynamicAllocation.testing
进行控制
SparkContext初始化流程
在探究SparkContext初始化流程之前,先看一下这个类里有哪些属性,有助于我们去理解它在初始化的过程中做了哪些工作:
1 | /*spark conf对象*/ |
实际上SparkContext初始化的过程大抵上就以上各种组件的初始化过程,接下来看详细启动流程:
使用构造方法中config的clone方法给自己的私有_conf进行赋值,同时校验SparkConf中的必要参数是否正确
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35class SparkContext(config: SparkConf) extends Logging {}
_conf = config.clone()
_conf.validateSettings()
if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
}
if (!_conf.contains("spark.app.name")) {
throw new SparkException("An application name must be set in your configuration")
}
// log out spark.app.name in the Spark driver logs
logInfo(s"Submitted application: $appName")
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}
if (_conf.getBoolean("spark.logConf", false)) {
logInfo("Spark configuration:\n" + _conf.toDebugString)
}
// Set Spark driver host and port system properties. This explicitly sets the configuration
// instead of relying on the default value of the config constant.
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten根据配置项初始化EventLog的保存路径和压缩格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17_eventLogDir =
if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
_eventLogCodec = {
val compress = _conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
} else {
None
}
}初始化事件总线
1
_listenerBus = new LiveListenerBus(_conf)
初始化AppStatusStore
1
2
3
4// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
_statusStore = AppStatusStore.createLiveStore(conf)
listenerBus.addToStatusQueue(_statusStore.listener.get)初始化SparkEnv
1
2
3// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)初始化SparkStatusTracker
1
_statusTracker = new SparkStatusTracker(this, _statusStore)
初始化ConsoleProgressBar
1
2
3
4
5
6_progressBar =
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}初始化SparkUI
1
2
3
4
5
6
7
8
9
10
11_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())初始化hadoopConfiguration
1
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
初始化executorMemory
1
2
3
4
5
6_executorMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)初始化heatbeatReveiver
1
2_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))初始化任务调度器并启动
1
2
3
4
5
6
7
8
9
10// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()初始化applicationId
1
2
3
4
5
6
7_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))初始化SparkEnv中的BlockManager
1
_env.blockManager.initialize(_applicationId)
初始化SparkEnv中的metricsSystem
1
2
3
4
5// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))初始化EventLoggingListener
1
2
3
4
5
6
7
8
9
10
11_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}初始化ExecutorAllocationManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())初始化ContextCleaner
1
2
3
4
5
6
7_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())初始化ShutdownHookManager
1
2
3
4
5
6
7
8
9
10
11logDebug("Adding shutdown hook") // force eager creation of logger
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
try {
stop()
} catch {
case e: Throwable =>
logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
}
}