前言

在Spark框架中,应用程序的提交离不开Spark Driver,而Spark Driver的初始化始终围绕SparkContext的初始化,可以说SparkContext是Spark程序的发动机引擎,有了它程序才能跑起来,在spark-core中,SparkContext重中之重,它提供了很多能力,比如生成RDD,比如生成广播变量等,所以学习SparkContext的组件和启动流程有助于剖析整个Spark内核的架构。

SparkContext组件概览

在SparkContext中包含了整个框架中很重要的几部分:

  • SparkEnv:Spark的运行环境,Executor会依赖它去执行分配的task,不光Executor中有,同时为了保证本地模式任务也能跑起来,Driver中也有
  • SparkUI:Spark作业的监控页面,底层并没有采用前端技术,纯后端实现,用以对当前SparkJob的监控和调优,可以从页面观察到目前的Executor的jvm信息,每个job的stage划分和task划分,同时还可以观察到每个task处理的数据,用以发现数据是否倾斜
  • DAGScheduler:DAG调度器,是SparkJob调度系统的重要组件之一,负责创建job,根据RDD依赖情况划分stage,提交stage,将作业划分成一个有向无环图
  • TaskScheduler:任务调度器,是SparkJob调度系统的重要组件之一,负责按照调度算法将DAGScheduler创建的task分发至Executor,DAGScheduler是它的前置调度
  • SparkStatusTracker:提供对作业、Stage的监控
  • ConsoleProcessBar:利用SparkStatusTracker提供监控信息,将任务进度以日志的形式打印到终端中
  • HearbeatReceiver:心跳接收器,所有Executor都会定期向它发送心跳信息,用以统计存活的Executor,此信息会一直同步给TaskScheduler,用以保证TaskScheduler去分发task的时候会挑选合适的Executor
  • ContextCleaner:上下文清理器,用异步的方式去清理那些超出应用作用域范围的RDD、ShuffleDependency和Broadcast
  • LiveListenerBus:SparkContext中的事件总线,可以接收各个组件的事件,并且通过异步的方式对事件进行匹配并调用不同的回调方法
  • ShutdownHookManager:关闭时的钩子管理器,用以做一些清理工作,比如资源释放等
  • AppStatusStore:存储Application状态数据,在2.3.0之后的版本引入
  • EventLoggingListener(可选):将事件持久化到存储的监听器,通过spark.eventLog.enabled 进行控制
  • ExecutorAllocationManager(可选):Executor动态分配管理器,根据工作负载状态动态调整Executor的数量,通过属性spark.dynamicAllocation.enabledspark.dynamicAllocation.testing 进行控制

SparkContext初始化流程

在探究SparkContext初始化流程之前,先看一下这个类里有哪些属性,有助于我们去理解它在初始化的过程中做了哪些工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/*spark conf对象*/
private var _conf: SparkConf = _
/*保存event log日志的目录*/
private var _eventLogDir: Option[URI] = None
/*event log日志的压缩格式*/
private var _eventLogCodec: Option[String] = None
/*spark context 事件总线*/
private var _listenerBus: LiveListenerBus = _
/*spark env 运行环境*/
private var _env: SparkEnv = _
/*spark status tracker 作业状态监控器*/
private var _statusTracker: SparkStatusTracker = _
/*console progress bar 终端输出作业状态进度器*/
private var _progressBar: Option[ConsoleProgressBar] = None
/*spark ui*/
private var _ui: Option[SparkUI] = None
/*hadoop 配置文件*/
private var _hadoopConfiguration: Configuration = _
/*executor 内存大小*/
private var _executorMemory: Int = _
/*向executor提交task的管理控制器*/
private var _schedulerBackend: SchedulerBackend = _
/*task scheduler*/
private var _taskScheduler: TaskScheduler = _
/*heartbeat receiver*/
private var _heartbeatReceiver: RpcEndpointRef = _
/*dag scheduler*/
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
/*event logging listener*/
private var _eventLogger: Option[EventLoggingListener] = None
/*executor allocation manager*/
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
/*context cleaner*/
private var _cleaner: Option[ContextCleaner] = None
/*事件总线启动标识*/
private var _listenerBusStarted: Boolean = false
/*作业提交额外的jars*/
private var _jars: Seq[String] = _
/*作业提交额外的files*/
private var _files: Seq[String] = _
/*shutdown hook manager*/
private var _shutdownHookRef: AnyRef = _
/*app status store*/
private var _statusStore: AppStatusStore = _

实际上SparkContext初始化的过程大抵上就以上各种组件的初始化过程,接下来看详细启动流程:

  1. 使用构造方法中config的clone方法给自己的私有_conf进行赋值,同时校验SparkConf中的必要参数是否正确

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    class SparkContext(config: SparkConf) extends Logging {}

    _conf = config.clone()
    _conf.validateSettings()

    if (!_conf.contains("spark.master")) {
    throw new SparkException("A master URL must be set in your configuration")
    }
    if (!_conf.contains("spark.app.name")) {
    throw new SparkException("An application name must be set in your configuration")
    }

    // log out spark.app.name in the Spark driver logs
    logInfo(s"Submitted application: $appName")

    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
    throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
    "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    }

    if (_conf.getBoolean("spark.logConf", false)) {
    logInfo("Spark configuration:\n" + _conf.toDebugString)
    }

    // Set Spark driver host and port system properties. This explicitly sets the configuration
    // instead of relying on the default value of the config constant.
    _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
    _conf.setIfMissing("spark.driver.port", "0")

    _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

    _jars = Utils.getUserJars(_conf)
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
    .toSeq.flatten
  2. 根据配置项初始化EventLog的保存路径和压缩格式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    _eventLogDir =
    if (isEventLogEnabled) {
    val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
    .stripSuffix("/")
    Some(Utils.resolveURI(unresolvedDir))
    } else {
    None
    }

    _eventLogCodec = {
    val compress = _conf.getBoolean("spark.eventLog.compress", false)
    if (compress && isEventLogEnabled) {
    Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
    } else {
    None
    }
    }
  3. 初始化事件总线

    1
    _listenerBus = new LiveListenerBus(_conf)
  4. 初始化AppStatusStore

    1
    2
    3
    4
    // Initialize the app status store and listener before SparkEnv is created so that it gets
    // all events.
    _statusStore = AppStatusStore.createLiveStore(conf)
    listenerBus.addToStatusQueue(_statusStore.listener.get)
  5. 初始化SparkEnv

    1
    2
    3
    // Create the Spark execution environment (cache, map output tracker, etc)
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)
  6. 初始化SparkStatusTracker

    1
    _statusTracker = new SparkStatusTracker(this, _statusStore)
  7. 初始化ConsoleProgressBar

    1
    2
    3
    4
    5
    6
    _progressBar =
    if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
    Some(new ConsoleProgressBar(this))
    } else {
    None
    }
  8. 初始化SparkUI

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    _ui =
    if (conf.getBoolean("spark.ui.enabled", true)) {
    Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
    startTime))
    } else {
    // For tests, do not enable the UI
    None
    }
    // Bind the UI before starting the task scheduler to communicate
    // the bound port to the cluster manager properly
    _ui.foreach(_.bind())
  9. 初始化hadoopConfiguration

    1
    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
  10. 初始化executorMemory

    1
    2
    3
    4
    5
    6
    _executorMemory = _conf.getOption("spark.executor.memory")
    .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
    .orElse(Option(System.getenv("SPARK_MEM"))
    .map(warnSparkMem))
    .map(Utils.memoryStringToMb)
    .getOrElse(1024)
  11. 初始化heatbeatReveiver

    1
    2
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
    HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
  12. 初始化任务调度器并启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()
  13. 初始化applicationId

    1
    2
    3
    4
    5
    6
    7
    _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
    System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
    }
    _ui.foreach(_.setAppId(_applicationId))
  14. 初始化SparkEnv中的BlockManager

    1
    _env.blockManager.initialize(_applicationId)
  15. 初始化SparkEnv中的metricsSystem

    1
    2
    3
    4
    5
    // The metrics system for Driver need to be set spark.app.id to app ID.
    // So it should start after we get app ID from the task scheduler and set spark.app.id.
    _env.metricsSystem.start()
    // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
    _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
  16. 初始化EventLoggingListener

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    _eventLogger =
    if (isEventLogEnabled) {
    val logger =
    new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
    _conf, _hadoopConfiguration)
    logger.start()
    listenerBus.addToEventLogQueue(logger)
    Some(logger)
    } else {
    None
    }
  17. 初始化ExecutorAllocationManager

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // Optionally scale number of executors dynamically based on workload. Exposed for testing.
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    _executorAllocationManager =
    if (dynamicAllocationEnabled) {
    schedulerBackend match {
    case b: ExecutorAllocationClient =>
    Some(new ExecutorAllocationManager(
    schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
    _env.blockManager.master))
    case _ =>
    None
    }
    } else {
    None
    }
    _executorAllocationManager.foreach(_.start())
  18. 初始化ContextCleaner

    1
    2
    3
    4
    5
    6
    7
    _cleaner =
    if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
    Some(new ContextCleaner(this))
    } else {
    None
    }
    _cleaner.foreach(_.start())
  19. 初始化ShutdownHookManager

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    logDebug("Adding shutdown hook") // force eager creation of logger
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
    ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
    logInfo("Invoking stop() from shutdown hook")
    try {
    stop()
    } catch {
    case e: Throwable =>
    logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
    }
    }