1 Overview
Kubernetes 是作为新的 resouceManager 集成到 Spark 中的,集成的思路跟将 YARN 集成是类似的,Spark 本身提供 Standalone 这种资源管理的模式,当然是不够的。
而集成 Kubernetes 的方式,其实是很好理解的,也就是在 Spark 中起一个 Http 的客户端从而和 Kubernetes 的 ApiSever 进行通信,从而把与 Appication 相关的一些配置,例如如何创建 Driver 和 Executor 的 Pod,当然也包括对 Pod 的 Watch 相关。
2 源码分析
Spark Kubernetes 的模块的代码其实并不多,建议大家到以下目录下利用 tree
简单看一下。
# 路径
path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark
➜ spark git:(master) ✗ tree -d -L 3
.
├── deploy
│ └── k8s
│ ├── features // 包括 Driver/Executor, configMap, secret 等配置的步骤
│ └── submit // 跟 submit 有关
└── scheduler
└── cluster
└── k8s // 跟 executor pod 的调度,状态等有关
代码结构还是很清晰的,一部分是与 deploy 有关,一部分是跟 scheduler 有关。
本文重点解析以下 submit 相关的代码。
/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit
├── K8sSubmitOps.scala // spark submit 相关
├── KubernetesClientApplication.scala // spark submit 的封装
├── KubernetesDriverBuilder.scala // Driver builder
├── LoggingPodStatusWatcher.scala // Spark Pod 的状态 Watcher
└── MainAppResource.scala // 包含 Java/Python/R 的一些资源定义
然后看一下 Spark K8S 模式的入口类。
private[spark] class KubernetesClientApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
run(parsedArguments, conf)
}
private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
kubernetesAppId,
clientArguments.mainAppResource,
clientArguments.mainClass,
clientArguments.driverArgs)
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Some(kubernetesConf.namespace),
KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
SparkKubernetesClientFactory.ClientType.Submission,
sparkConf, None, None)) { kubernetesClient =>
val client = new Client(
kubernetesConf,
new KubernetesDriverBuilder(),
kubernetesClient,
waitForAppCompletion,
watcher)
client.run()
}
}
}
这一段是 Spark K8S 的入口 Main Class,重点关注 run()
方法。首先生成一个 kubernetesAppId
,为什么不是 spark app name,原因是这个关于 App 的标识,会以 Label 的方式,标注在关于这个 App 的所有资源上,包括 Driver/Executor Pod,ConfigMap,Secret 等。
可以留意一下 WAIT_FOR_APP_COMPLETION
这个配置,默认值为 true
。表示当选择 cluster mode 的时候,laucher 进程是否会等待 App 结束后才会退出,如果改为 false
,则 laucher 进程会马上结束。
private[spark] class Client(
conf: KubernetesDriverConf, builder: KubernetesDriverBuilder,
kubernetesClient: KubernetesClient, waitForAppCompletion: Boolean,
watcher: LoggingPodStatusWatcher) extends Logging {
def run(): Unit = {
// driver Pod 的配置是这里来的
val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
// configmap 从这里来的
val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)
// Driver 容器的配置
val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container).addNewEnv()
.withName(ENV_SPARK_CONF_DIR)
.withValue(SPARK_CONF_DIR_INTERNAL)
.endEnv()
.addNewVolumeMount()
.withName(SPARK_CONF_VOLUME)
.withMountPath(SPARK_CONF_DIR_INTERNAL)
.endVolumeMount()
.build()
// Driver Pod 的配置
val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
.editSpec()
.addToContainers(resolvedDriverContainer)
.addNewVolume()
.withName(SPARK_CONF_VOLUME)
.withNewConfigMap()
.withName(configMapName)
.endConfigMap()
.endVolume()
.endSpec()
.build()
Utils.tryWithResource(
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.watch(watcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
// including configMap
val otherKubernetesResources =
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}
val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" +
s"${resolvedDriverPod.getMetadata.getName}"
if (waitForAppCompletion) {
logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...")
// watcher
watcher.awaitCompletion()
logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.")
} else {
logInfo(s"Deployed Spark application ${conf.appName} with " +
s"submission ID ${sId} into Kubernetes.")
}
}
}
// K8S 的特性,其他的包括 configmap secret executor-pod 这些,如果 driver 挂了,其他都要删除干净
private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(driverPod.getMetadata.getName)
.withApiVersion(driverPod.getApiVersion)
.withUid(driverPod.getMetadata.getUid)
.withKind(driverPod.getKind)
.withController(true)
.build()
// 给每个 resource 都给个名字
resources.foreach { resource =>
val originalMetadata = resource.getMetadata
originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
}
}
// 创建 ConfigMap,主要是得到像 Hadoop/Spark Conf 之类的配置信息
private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
// Java 的 Prop?
val properties = new Properties()
conf.foreach { case (k, v) =>
properties.setProperty(k, v)
}
val propertiesWriter = new StringWriter()
properties.store(propertiesWriter,
s"Java properties built from Kubernetes config map with name: $configMapName")
new ConfigMapBuilder()
.withNewMetadata()
.withName(configMapName)
.endMetadata()
.addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
.build()
}
}
然后看看 Driver Pod 都需要配置些什么东西。
private[spark] class KubernetesDriverBuilder {
def buildFromFeatures(
conf: KubernetesDriverConf,
client: KubernetesClient): KubernetesDriverSpec = {
val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
.map { file =>
// Spark 3.0 开始可以支持传 Pod 的 Template 文件,而且 Template 后会覆盖之前的配置,Priority 高
KubernetesUtils.loadPodFromTemplate(
client,
new File(file),
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
}
.getOrElse(SparkPod.initialPod())
// 重点关注,这里是配置 Pod 的步骤清单
val features: Seq[KubernetesFeatureConfigStep] = Seq(
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))
val spec = KubernetesDriverSpec(
initialPod,
driverKubernetesResources = Seq.empty,
conf.sparkConf.getAll.toMap)
features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
val addedResources = feature.getAdditionalKubernetesResources()
KubernetesDriverSpec(
configuredPod,
spec.driverKubernetesResources ++ addedResources,
spec.systemProperties ++ addedSystemProperties)
}
}
}
然后看一下 Pod 状态的监听器。原理是创建一个 scheduler
后台线程池,按照配置的时间间隔,去监听 Pod 的状态。
private[k8s] class LoggingPodStatusWatcherImpl(
appId: String,
maybeLoggingInterval: Option[Long]) extends LoggingPodStatusWatcher with Logging {
private val podCompletedFuture = new CountDownLatch(1)
// start timer for periodic logging
private val scheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
private val logRunnable: Runnable = () => logShortStatus()
private var pod = Option.empty[Pod]
private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
def start(): Unit = {
maybeLoggingInterval.foreach { interval =>
scheduler.scheduleAtFixedRate(logRunnable, , interval, TimeUnit.MILLISECONDS)
}
}
// 当异步接受到事件的时候,判断 Pod 是已经 Succeeded 还是 Failed
override def eventReceived(action: Action, pod: Pod): Unit = {
this.pod = Option(pod)
action match {
case Action.DELETED | Action.ERROR =>
closeWatch()
case _ =>
logLongStatus()
if (hasCompleted()) {
closeWatch()
}
}
}
后,看看一些新的特性,比如说可以用 spark submit 来 kill 掉整个 APP。
spark-submit --kill dbyin:spark-hdfs-* --master k8s://https://kubernetes.default.svc --conf spark.kubernetes.namespace=dbyin
private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
with CommandLineLoggingUtils {
private def isGlob(name: String): Boolean = {
name.last == '*'
}
def execute(submissionId: String, sparkConf: SparkConf, op: K8sSubmitOp): Unit = {
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
submissionId.split(":", 2) match {
case Array(part1, part2@_*) =>
val namespace = if (part2.isEmpty) None else Some(part1)
val pName = if (part2.isEmpty) part1 else part2.headOption.get
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
namespace,
KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
SparkKubernetesClientFactory.ClientType.Submission,
sparkConf,
None,
None)
) { kubernetesClient =>
implicit val client: KubernetesClient = kubernetesClient
if (isGlob(pName)) {
val ops = namespace match {
case Some(ns) =>
kubernetesClient
.pods
.inNamespace(ns)
case None =>
kubernetesClient
.pods
}
val pods = ops
.list()
.getItems
.asScala
.filter { pod =>
val meta = pod.getMetadata
meta.getName.startsWith(pName.stripSuffix("*")) &&
meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
}.toList
op.executeOnGlob(pods, namespace, sparkConf)
} else {
op.executeOnPod(pName, namespace, sparkConf)
}
}
case _ =>
printErrorAndExit(s"Submission ID: {$submissionId} is invalid.")
}
}
// 这是可以 kill 掉 Spark App 的方法
override def kill(submissionId: String, conf: SparkConf): Unit = {
printMessage(s"Submitting a request to kill submission " +
s"${submissionId} in ${conf.get("spark.master")}. " +
s"Grace period in secs: ${getGracePeriod(conf).getOrElse("not set.")}")
execute(submissionId, conf, new KillApplication)
}
// 这是可以看 App 状态的
override def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit = {
printMessage(s"Submitting a request for the status of submission" +
s" ${submissionId} in ${conf.get("spark.master")}.")
execute(submissionId, conf, new ListStatus)
}
override def supports(master: String): Boolean = {
master.startsWith("k8s://")
}
}
3 Summary
至此,关于 Spark 以 K8S 作为 ResourceManager 的时候,Spark submit 的流程的过程中,是如何创建 Driver 和 Executor,还有各种 Secret 和 ConfigMap 这些资源的过程就分析到这里。