绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Spark Kubernetes 的源码分析系列 - submit
2023-07-04 18:00:00

1 Overview

Kubernetes 是作为新的 resouceManager 集成到 Spark 中的,集成的思路跟将 YARN 集成是类似的,Spark 本身提供 Standalone 这种资源管理的模式,当然是不够的。

而集成 Kubernetes 的方式,其实是很好理解的,也就是在 Spark 中起一个 Http 的客户端从而和 Kubernetes 的 ApiSever 进行通信,从而把与 Appication 相关的一些配置,例如如何创建 Driver 和 Executor 的 Pod,当然也包括对 Pod 的 Watch 相关。

2 源码分析

Spark Kubernetes 的模块的代码其实并不多,建议大家到以下目录下利用 tree 简单看一下。

# 路径
path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark
➜  spark git:(master) ✗ tree -d -L 3
.
├── deploy
│   └── k8s
│       ├── features // 包括 Driver/Executor, configMap, secret 等配置的步骤
│       └── submit // 跟 submit 有关
└── scheduler
    └── cluster
        └── k8s // 跟 executor pod 的调度,状态等有关

代码结构还是很清晰的,一部分是与 deploy 有关,一部分是跟 scheduler 有关。

本文重点解析以下 submit 相关的代码。

/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit
├── K8sSubmitOps.scala // spark submit 相关
├── KubernetesClientApplication.scala // spark submit 的封装
├── KubernetesDriverBuilder.scala // Driver builder
├── LoggingPodStatusWatcher.scala // Spark Pod 的状态 Watcher
└── MainAppResource.scala // 包含 Java/Python/R 的一些资源定义

然后看一下 Spark K8S 模式的入口类。

private[spark] class KubernetesClientApplication extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    val parsedArguments = ClientArguments.fromCommandLineArgs(args)
    run(parsedArguments, conf)
  }

  private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
    val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
    val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
    val kubernetesConf = KubernetesConf.createDriverConf(
      sparkConf,
      kubernetesAppId,
      clientArguments.mainAppResource,
      clientArguments.mainClass,
      clientArguments.driverArgs)
    val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
    val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None

    val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)

    Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
      master,
      Some(kubernetesConf.namespace),
      KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
      SparkKubernetesClientFactory.ClientType.Submission,
      sparkConf, None, None)) { kubernetesClient =>
        val client = new Client(
          kubernetesConf,
          new KubernetesDriverBuilder(),
          kubernetesClient,
          waitForAppCompletion,
          watcher)
        client.run()
    }
  }
}

这一段是 Spark K8S 的入口 Main Class,重点关注 run() 方法。首先生成一个 kubernetesAppId,为什么不是 spark app name,原因是这个关于 App 的标识,会以 Label 的方式,标注在关于这个 App 的所有资源上,包括 Driver/Executor Pod,ConfigMap,Secret 等。

可以留意一下 WAIT_FOR_APP_COMPLETION 这个配置,默认值为 true。表示当选择 cluster mode 的时候,laucher 进程是否会等待 App 结束后才会退出,如果改为 false,则 laucher 进程会马上结束。

private[spark] class Client(
    conf: KubernetesDriverConf, builder: KubernetesDriverBuilder,
    kubernetesClient: KubernetesClient, waitForAppCompletion: Boolean,
    watcher: LoggingPodStatusWatcher) extends Logging {
  def run(): Unit = {
    // driver Pod 的配置是这里来的
    val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)

    // configmap 从这里来的
    val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
    val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)

    // Driver 容器的配置
    val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container).addNewEnv()
        .withName(ENV_SPARK_CONF_DIR)
        .withValue(SPARK_CONF_DIR_INTERNAL)
        .endEnv()
      .addNewVolumeMount()
        .withName(SPARK_CONF_VOLUME)
        .withMountPath(SPARK_CONF_DIR_INTERNAL)
        .endVolumeMount()
      .build()

    // Driver Pod 的配置
    val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
      .editSpec()
        .addToContainers(resolvedDriverContainer)
        .addNewVolume()
          .withName(SPARK_CONF_VOLUME)
          .withNewConfigMap()
            .withName(configMapName)
            .endConfigMap()
          .endVolume()
        .endSpec()
      .build()

    Utils.tryWithResource(
      kubernetesClient
        .pods()
        .withName(resolvedDriverPod.getMetadata.getName)
        .watch(watcher)) { _ =>
      val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)

      try {
        // including configMap
        val otherKubernetesResources =
          resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
        addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
        kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
      } catch {
        case NonFatal(e) =>
          kubernetesClient.pods().delete(createdDriverPod)
          throw e
      }

      val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" +
        s"${resolvedDriverPod.getMetadata.getName}"
      if (waitForAppCompletion) {
        logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...")
        // watcher
        watcher.awaitCompletion()
        logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.")
      } else {
        logInfo(s"Deployed Spark application ${conf.appName} with " +
          s"submission ID ${sId} into Kubernetes.")
      }
    }
  }

  // K8S 的特性,其他的包括 configmap secret executor-pod 这些,如果 driver 挂了,其他都要删除干净
  private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
    val driverPodOwnerReference = new OwnerReferenceBuilder()
      .withName(driverPod.getMetadata.getName)
      .withApiVersion(driverPod.getApiVersion)
      .withUid(driverPod.getMetadata.getUid)
      .withKind(driverPod.getKind)
      .withController(true)
      .build()
    // 给每个 resource 都给个名字 
    resources.foreach { resource =>
      val originalMetadata = resource.getMetadata
      originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
    }
  }

  // 创建 ConfigMap,主要是得到像 Hadoop/Spark Conf 之类的配置信息
  private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
    // Java 的 Prop?
    val properties = new Properties()

    conf.foreach { case (k, v) =>
      properties.setProperty(k, v)
    }
    val propertiesWriter = new StringWriter()
    properties.store(propertiesWriter,
      s"Java properties built from Kubernetes config map with name: $configMapName")
    new ConfigMapBuilder()
      .withNewMetadata()
        .withName(configMapName)
        .endMetadata()
      .addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
      .build()
  }
}

然后看看 Driver Pod 都需要配置些什么东西。

private[spark] class KubernetesDriverBuilder {

  def buildFromFeatures(
      conf: KubernetesDriverConf,
      client: KubernetesClient): KubernetesDriverSpec = {

    val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
      .map { file =>
        // Spark 3.0 开始可以支持传 Pod 的 Template 文件,而且 Template 后会覆盖之前的配置,Priority 高
        KubernetesUtils.loadPodFromTemplate(
          client,
          new File(file),
          conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
      }
      .getOrElse(SparkPod.initialPod())

    // 重点关注,这里是配置 Pod 的步骤清单
    val features: Seq[KubernetesFeatureConfigStep] = Seq(
      new BasicDriverFeatureStep(conf),
      new DriverKubernetesCredentialsFeatureStep(conf),
      new DriverServiceFeatureStep(conf),
      new MountSecretsFeatureStep(conf),
      new EnvSecretsFeatureStep(conf),
      new LocalDirsFeatureStep(conf),
      new MountVolumesFeatureStep(conf),
      new DriverCommandFeatureStep(conf),
      new HadoopConfDriverFeatureStep(conf),
      new KerberosConfDriverFeatureStep(conf),
      new PodTemplateConfigMapStep(conf))

    val spec = KubernetesDriverSpec(
      initialPod,
      driverKubernetesResources = Seq.empty,
      conf.sparkConf.getAll.toMap)

    features.foldLeft(spec) { case (spec, feature) =>
      val configuredPod = feature.configurePod(spec.pod)
      val addedSystemProperties = feature.getAdditionalPodSystemProperties()
      val addedResources = feature.getAdditionalKubernetesResources()

      KubernetesDriverSpec(
        configuredPod,
        spec.driverKubernetesResources ++ addedResources,
        spec.systemProperties ++ addedSystemProperties)
    }
  }
}

然后看一下 Pod 状态的监听器。原理是创建一个 scheduler 后台线程池,按照配置的时间间隔,去监听 Pod 的状态。

private[k8s] class LoggingPodStatusWatcherImpl(
  appId: String,
  maybeLoggingInterval: Option[Long]) extends LoggingPodStatusWatcher with Logging {

  private val podCompletedFuture = new CountDownLatch(1)
  // start timer for periodic logging
  private val scheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
  private val logRunnable: Runnable = () => logShortStatus()

  private var pod = Option.empty[Pod]

  private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")

  def start(): Unit = {
    maybeLoggingInterval.foreach { interval =>
      scheduler.scheduleAtFixedRate(logRunnable, , interval, TimeUnit.MILLISECONDS)
    }
  }

  // 当异步接受到事件的时候,判断 Pod 是已经 Succeeded 还是 Failed
  override def eventReceived(action: Action, pod: Pod): Unit = {
    this.pod = Option(pod)
    action match {
      case Action.DELETED | Action.ERROR =>
        closeWatch()

      case _ =>
        logLongStatus()
        if (hasCompleted()) {
          closeWatch()
        }
    }
  }

后,看看一些新的特性,比如说可以用 spark submit 来 kill 掉整个 APP。

spark-submit --kill dbyin:spark-hdfs-* --master k8s://https://kubernetes.default.svc --conf spark.kubernetes.namespace=dbyin
private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
  with CommandLineLoggingUtils {

  private def isGlob(name: String): Boolean = {
    name.last == '*'
  }

  def execute(submissionId: String, sparkConf: SparkConf, op: K8sSubmitOp): Unit = {
    val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
    submissionId.split(":", 2) match {
      case Array(part1, part2@_*) =>
        val namespace = if (part2.isEmpty) None else Some(part1)
        val pName = if (part2.isEmpty) part1 else part2.headOption.get
        Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
          master,
          namespace,
          KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
          SparkKubernetesClientFactory.ClientType.Submission,
          sparkConf,
          None,
          None)
        ) { kubernetesClient =>
          implicit val client: KubernetesClient = kubernetesClient
          if (isGlob(pName)) {
            val ops = namespace match {
              case Some(ns) =>
                kubernetesClient
                  .pods
                  .inNamespace(ns)
              case None =>
                kubernetesClient
                  .pods
            }
            val pods = ops
              .list()
              .getItems
              .asScala
              .filter { pod =>
                val meta = pod.getMetadata
                meta.getName.startsWith(pName.stripSuffix("*")) &&
                  meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
              }.toList
            op.executeOnGlob(pods, namespace, sparkConf)
          } else {
            op.executeOnPod(pName, namespace, sparkConf)
          }
        }
      case _ =>
        printErrorAndExit(s"Submission ID: {$submissionId} is invalid.")
    }
  }

  // 这是可以 kill 掉 Spark App 的方法
  override def kill(submissionId: String, conf: SparkConf): Unit = {
    printMessage(s"Submitting a request to kill submission " +
      s"${submissionId} in ${conf.get("spark.master")}. " +
      s"Grace period in secs: ${getGracePeriod(conf).getOrElse("not set.")}")
    execute(submissionId, conf, new KillApplication)
  }

  // 这是可以看 App 状态的
  override def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit = {
    printMessage(s"Submitting a request for the status of submission" +
      s" ${submissionId} in ${conf.get("spark.master")}.")
    execute(submissionId, conf, new ListStatus)
  }

  override def supports(master: String): Boolean = {
    master.startsWith("k8s://")
  }
}

3 Summary

至此,关于 Spark 以 K8S 作为 ResourceManager 的时候,Spark submit 的流程的过程中,是如何创建 Driver 和 Executor,还有各种 Secret 和 ConfigMap 这些资源的过程就分析到这里。

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Spark SQL
创建时间:2022-04-11 10:32:39
Spark SQL
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 飘絮絮絮丶
    专家
戳我,来吐槽~