在新启动一个spark作业的同时,我们需要去创建一个上下文对象,不论是spark-sql,还是spark-core亦或是spark-streaming,每一个上下文都会依赖一个SparkConf对象,在spark作业调优的过程中,SparkConf这个对象起了至关重要的作用。

1. 核心参数在SparkConf对象中如何保存:

1
private val settings = new ConcurrentHashMap[String, String]()

所有配置项都保存在对应的ConcurrentHashMap当中,而且配置的key和value都是string类型的变量

2. 核心方法set

1
2
3
4
5
6
7
8
9
10
11
12
13
private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
if (!silent) {
logDeprecationWarning(key)
}
settings.put(key, value)
this
}

3. 常用的一些方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
*/
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}


/** Set a name for your application. Shown in the Spark web UI. */
def setAppName(name: String): SparkConf = {
set("spark.app.name", name)
}

/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
set(JARS, jars.filter(_ != null))
}

/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
def setJars(jars: Array[String]): SparkConf = {
setJars(jars.toSeq)
}

实际上调用的都是set()方法,但就是提供一个便捷的接口去设置一些常用参数

4. loadDefaults构造

1
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {}

默认构造SparkConf的过程中需要传入一个布尔型变量去标识是否要从系统参数中load对应spark的参数,但是默认就是需要去load的,具体逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

if (loadDefaults) {
loadFromSystemProperties(false)
}

private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}

通过Utils.getSystemProperties去获取当前环境中的变量类型,并通过守卫去过滤掉一些不必要的参数,实际上系统中以spark

开头的参数都会被导入进来;

Utils.getSystemProperties方法定义如下:

1
2
3
4
5
6
7
8
9
10

/**
* Returns the system properties map that is thread-safe to iterator over. It gets the
* properties which have been set explicitly, as well as those for which only a default value
* has been defined.
*/
def getSystemProperties: Map[String, String] = {
System.getProperties.stringPropertyNames().asScala
.map(key => (key, System.getProperty(key))).toMap
}

5. Cloneable特性

SparkConf对象继承了Cloneable对象并提供了一个clone方法,用以将对象的复制和传播

1
2
3
4
5
6
7
8
9
10
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
/** Copy this object */
override def clone: SparkConf = {
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}
}