1 Overview
spark-on-k8s-operator,下文简称 Spark Operator, 背景知识就不介绍太多了,本文主要分享一下 Spark Operator 的指标系统是如何构建的,之后可以按照 Spark Operator 的方法,给自己创建的 Operator 配上指标系统
2 Metrics
2.1 Spark Metrics
Spark Operator 目前提供几种自定义的指标,这里自定义的意思是以 Spark Application 这个自定义资源对象为监控对象,围绕 Spark Application 创建的一些监控指标,来让 Spark Operator 的维护者,更好的监控 Operator 中 CRD 对象的情况。
下面是目前 Spark Operator 的指标。自定义指标基本都在 sparkapp_metrics.go
里定义。
type sparkAppMetrics struct {
...
sparkAppSubmitCount *prometheus.CounterVec
sparkAppSuccessCount *prometheus.CounterVec
sparkAppFailureCount *prometheus.CounterVec
sparkAppFailedSubmissionCount *prometheus.CounterVec
sparkAppRunningCount *util.PositiveGauge
sparkAppSuccessExecutionTime *prometheus.SummaryVec
sparkAppFailureExecutionTime *prometheus.SummaryVec
sparkAppExecutorRunningCount *util.PositiveGauge
sparkAppExecutorFailureCount *prometheus.CounterVec
sparkAppExecutorSuccessCount *prometheus.CounterVec
}
可以看到 Spark Operator 记录了,Spark App 的提交数、成功数、失败数、提交 失败数、当前运行数、运行成功的时间统计、运行失败的时间统计、运行的 Executor 数、失败的 Executor 数以及成功的 Executor 数。
newSparkAppMetrics
new 实际就是去注册的意思。按照指标的类型,CounterVec
或者 GaugeVec
等,配置好 TYPE
和 HELP
,或者 LABEL
等。
熟悉 Prometheus 的同学应该知道,Counter, Gauge, Summary, Histogram 几种类型,但是我们从上面的 sparkAppMetrics
上还看到了 PositiveGauge
。
有两个比较特殊的指标。一个是 sparkAppRunningCount
和 sparkAppExecutorRunningCount
,因为他们都是 Gauge
类型的,但是是不会降低到0以下的,所以这里注册的类型,是自定义的 PostiveGauge
。
sparkAppRunningCount := util.NewPositiveGauge(util.CreateValidMetricNameLabel(prefix, "spark_app_running_count"), "Spark App Running Count via the Operator", validLabels)
sparkAppExecutorRunningCount := util.NewPositiveGauge(util.CreateValidMetricNameLabel(prefix, "spark_app_executor_running_count"), "Spark App Running Executor Count via the Operator", validLabels)
这个是 Spark Operator 自实现的一个只用于正数的 Gauge,因为大家都知道 Gauge 其实是可以亦正亦负的。这个指标收集一些会增长会减少,但是不会跌破0的类型。
type PositiveGauge struct {
mux sync.RWMutex
name string
gaugeMetric *prometheus.GaugeVec
}
实现的原理很简单,就是一个读写锁 mux
和一个指标名 name
,以及一个正常的可以亦正亦负的 gaugeMetrics
。下面是其构造方法 NewPositiveGauge
。
func NewPositiveGauge(name string, description string, labels []string) *PositiveGauge {
validLabels := make([]string, len(labels))
for i, label := range labels {
validLabels[i] = CreateValidMetricNameLabel("", label)
}
gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: name,
Help: description,
},
validLabels,
)
return &PositiveGauge{
gaugeMetric: gauge,
name: name,
}
}
2.2 Workqueue Metrics
这里的 Workqueue Metrics 是指 client-go 库里的 workqueue 包里的 metrics.go
。因为 Spark Operator 实现的 Controller 里,用到了 rate limiting workqueue 这个工作队列。
https://github.com/kubernetes/client-go/blob/master/util/workqueue/metrics.go
Spark Operator 里自定义的 WorkQueueMetrics
主要是用于暴露这个工作队列的指标,其实就是给原来的 workqueque 的指标加上一个 prefix,这样后面收集指标和使用指标时候会更方便。
WorkQueueMetrics
的指标的结构体。
type WorkQueueMetrics struct {
prefix string
}
下面是 Workqueque Metrics 的几种类型。
// Depth Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewDepthMetric(name string) workqueue.GaugeMetric {
...
}
// Adds Count Metrics for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewAddsMetric(name string) workqueue.CounterMetric {
...
}
// Latency Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewLatencyMetric(name string) workqueue.SummaryMetric {
...
}
// WorkDuration Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewWorkDurationMetric(name string) workqueue.SummaryMetric {
...
}
// Retry Metric for the kubernetes workqueue.
func (p *WorkQueueMetrics) NewRetriesMetric(name string) workqueue.CounterMetric {
...
}
func (p *WorkQueueMetrics) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
...
}
func (p *WorkQueueMetrics) NewLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
...
}
2.3 指标初始化
Spark Metrics 和 Workqueue Metrics 两部分指标的初始化都在 InitializeMetrics
方法里。Spark Metrics 初始化,是普通的 Prometheus 指标收集初始化的方式,Workqueue Metrics 则是通过 Workqueue 的 Provider 来填充。
func InitializeMetrics(metricsConfig *MetricConfig) {
// Start the metrics endpoint for Prometheus to scrape
http.Handle(metricsConfig.MetricsEndpoint, promhttp.Handler())
go http.ListenAndServe(fmt.Sprintf(":%s", metricsConfig.MetricsPort), nil)
glog.Infof("Started Metrics server at localhost:%s%s", metricsConfig.MetricsPort, metricsConfig.MetricsEndpoint)
workQueueMetrics := WorkQueueMetrics{prefix: metricsConfig.MetricsPrefix}
workqueue.SetProvider(&workQueueMetrics)
}
2.4 其他
func CreateValidMetricNameLabel(prefix, name string) string {
// "-" is not a valid character for prometheus metric names or labels.
return strings.Replace(prefix+name, "-", "_", -1)
}
根据 Prometheus 的指引,Metric Name 除了可以用大小写字母和数字以为,还可以用下划线_,但是不能是中划线。CreateValidMetricNameLabel
方法是用来矫正指标名的,以防制造不符合规范的指标名,导致指标无法被 Prometheus 拉取。
The metric name specifies the general feature of a system that is measured (e.g. http_requests_total - the total number of HTTP requests received). It may contain ASCII letters and digits, as well as underscores and colons. It must match the regex [a-zA-Z_:][a-zA-Z0-9_:]*
func RegisterMetric(metric prometheus.Collector) {
if err := prometheus.Register(metric); err != nil {
// Ignore AlreadyRegisteredError.
if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
return
}
glog.Errorf("failed to register metric: %v", err)
}
}
构建指标体系,需要有一个注册 registry 的过程。传入的就是一个 metrics 类型,然后通过注册接口注册即可。
type MetricConfig struct {
MetricsEndpoint string
MetricsPort string
MetricsPrefix string
MetricsLabels []string
}
MeticsConfig
是 Spark Operator 的指标配置信息类型,包括 Operator 应用暴露的指标 Endpoint,指标端口 Port,指标的前缀(可以用于快速过滤)以及指标的 Labels(注意是一个数组,意思是指标会被打上这个数组的里的名字作为 Label)。
func fetchGaugeValue(m *prometheus.GaugeVec, labels map[string]string) float64 {
// Hack to get the current value of the metric to support PositiveGauge
pb := &prometheusmodel.Metric{}
m.With(labels).Write(pb)
return pb.GetGauge().GetValue()
}
2.5 工作时的指标
很重要的,就是根据 Spark App 的 CRD 对象的状态来输出指标了,这是指标体系重要的部分。这里看不懂,前面都白看。
func (sm *sparkAppMetrics) exportMetrics(oldApp, newApp *v1beta2.SparkApplication) {
oldState := oldApp.Status.AppState.State
newState := newApp.Status.AppState.State
if newState != oldState {
switch newState {
case v1beta2.SubmittedState:
// sparkAppSubmitCount + 1
case v1beta2.RunningState:
// sparkAppRunningCount + 1
case v1beta2.SucceedingState:
// sparkAppSuccessCount + 1
// sparkAppRunningCount - 1
case v1beta2.FailingState:
// sparkAppFailureCount + 1
// sparkAppRunningCount - 1
case v1beta2.FailedSubmissionState:
// sparkAppFailedSubmissionCount + 1
}
}
// Potential Executor status updates
// 不赘述了
for executor, newExecState := range newApp.Status.ExecutorState {
switch newExecState {
case v1beta2.ExecutorRunningState:
case v1beta2.ExecutorCompletedState:
case v1beta2.ExecutorFailedState:
}
}
}
指标构建和初始化完成了,也配置好什么时候该输出指标的方法了,然后看看 exportMetrics
这个方法是在什么时候被调用的。
// updateStatusAndExportMetrics updates the status of the SparkApplication and export the metrics.
func (c *Controller) updateStatusAndExportMetrics(oldApp, newApp *v1beta2.SparkApplication) error {
// Skip update if nothing changed.
if equality.Semantic.DeepEqual(oldApp, newApp) {
return nil
}
updatedApp, err := c.updateApplicationStatusWithRetries(oldApp, func(status *v1beta2.SparkApplicationStatus) {
*status = newApp.Status
}, c.k8sMinorVersion)
// Export metrics if the update was successful.
if err == nil && c.metrics != nil {
// 调用
c.metrics.exportMetrics(oldApp, updatedApp)
}
return err
}
当 Spark App CRD 对象在对 status
字段进行更新的时候,除了更新 status
以外,还会调用 exportMetics
方法来输出指标。
显然 updateStatusAndExportMetrics
这个方法是 Controller 在同步 CRD 对象的时候调用的。
if appToUpdate != nil {
glog.V(2).Infof("Trying to update SparkApplication %s/%s, from: [%v] to [%v]", app.Namespace, app.Name, app.Status, appToUpdate.Status)
// 当 appToUpdate 不为 nil,证明需要去更新 CRD 对象的 status 了
// 这个时候同时输出指标
err = c.updateStatusAndExportMetrics(app, appToUpdate)
if err != nil {
glog.Errorf("failed to update SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return err
}
}
3 Summary
给 Operator 加指标,总体来说不是非常难的事情,难的地方在于判断自己到底需要收集什么指标,做什么样的监控,如果需要自定义指标系统,可以参考 Spark Operator 的做法,自定义 CRD 层面的 Metrics,如果还需要监控工作队列,直接通过 client-go 的 workqueue 的接口去做即可。
下面是一份真实的 Spark Operator 输出的指标,供参考。
...
# HELP spark_app_executor_failure_count Spark App Failed Executor Count via the Operator
# TYPE spark_app_executor_failure_count counter
spark_app_executor_failure_count{project="Unknown"} 7
# HELP spark_app_executor_running_count Spark App Running Executor Count via the Operator
# TYPE spark_app_executor_running_count gauge
spark_app_executor_running_count{project="Unknown"} 7
spark_app_executor_running_count{project="demo"} 22
# HELP spark_app_executor_success_count Spark App Successful Executor Count via the Operator
# TYPE spark_app_executor_success_count counter
spark_app_executor_success_count{project="demo"} 65
# HELP spark_app_running_count Spark App Running Count via the Operator
# TYPE spark_app_running_count gauge
spark_app_running_count{project="demo"} 0
# HELP spark_app_submit_count Spark App Submits via the Operator
# TYPE spark_app_submit_count counter
spark_app_submit_count{project="demo"} 1
# HELP spark_app_success_count Spark App Success Count via the Operator
# TYPE spark_app_success_count counter
spark_app_success_count{project="demo"} 1
# HELP spark_app_success_execution_time_microseconds Spark App Successful Execution Runtime via the Operator
# TYPE spark_app_success_execution_time_microseconds summary
spark_app_success_execution_time_microseconds{project="demo",quantile="0.5"} NaN
spark_app_success_execution_time_microseconds{project="demo",quantile="0.9"} NaN
spark_app_success_execution_time_microseconds{project="demo",quantile="0.99"} NaN
spark_app_success_execution_time_microseconds_sum{project="demo"} 5.83e+08
spark_app_success_execution_time_microseconds_count{project="demo"} 1
# HELP spark_application_controller_adds Total number of adds handled by workqueue: spark-application-controller
# TYPE spark_application_controller_adds counter
spark_application_controller_adds 120
# HELP spark_application_controller_depth Current depth of workqueue: spark-application-controller
# TYPE spark_application_controller_depth gauge
spark_application_controller_depth 0
# HELP spark_application_controller_latency Latency for workqueue: spark-application-controller
# TYPE spark_application_controller_latency summary
spark_application_controller_latency{quantile="0.5"} NaN
spark_application_controller_latency{quantile="0.9"} NaN
spark_application_controller_latency{quantile="0.99"} NaN
spark_application_controller_latency_sum 6.150365e+06
spark_application_controller_latency_count 120
# HELP spark_application_controller_longest_running_processor_microseconds Longest running processor microseconds: spark-application-controller
# TYPE spark_application_controller_longest_running_processor_microseconds gauge
spark_application_controller_longest_running_processor_microseconds 0
# HELP spark_application_controller_retries Total number of retries handled by workqueue: spark-application-controller
# TYPE spark_application_controller_retries counter
spark_application_controller_retries 472
# HELP spark_application_controller_unfinished_work_seconds Unfinished work seconds: spark-application-controller
# TYPE spark_application_controller_unfinished_work_seconds gauge
spark_application_controller_unfinished_work_seconds 0
# HELP spark_application_controller_work_duration How long processing an item from workqueue spark-application-controller takes.
# TYPE spark_application_controller_work_duration summary
spark_application_controller_work_duration{quantile="0.5"} NaN
spark_application_controller_work_duration{quantile="0.9"} NaN
spark_application_controller_work_duration{quantile="0.99"} NaN
spark_application_controller_work_duration_sum 1.2046801e+07
spark_application_controller_work_duration_count 120