kube-prometheus与Prometheus Pushgateway集成:批处理任务监控方案
【免费下载链接】kube-prometheus prometheus-operator/kube-prometheus: kube-prometheus项目提供了在Kuber***es集群中部署Prometheus监控解决方案的一体化方法,包括Prometheus Server、Alertmanager、Grafana以及其他相关的监控组件,旨在简化在K8s环境下的监控配置与管理。 项目地址: https://gitcode.***/gh_mirrors/ku/kube-prometheus
引言:批处理任务监控的痛点与解决方案
在Kuber***es(K8s)环境中,传统的Prometheus(普罗米修斯)拉取模式(Pull Model)难以有效监控短暂运行的批处理任务(如定时数据处理、ETL作业、CI/CD流水线等)。这些任务通常具有以下特点:生命周期短(秒级/分钟级)、动态性强(按需调度)、无固定IP/端口,导致Prometheus无法持续抓取其 metrics(指标)。
Prometheus Pushgateway(推送网关) 作为中间件,解决了这一难题。它允许批处理任务主动推送metrics,实现非持久化作业的可靠监控。本文将详细介绍如何在kube-prometheus生态中集成Pushgateway,构建完整的批处理任务监控方案。
读完本文你将掌握:
- Pushgateway的核心原理与适用场景
- 在kube-prometheus中部署与配置Pushgateway的两种方案
- 批处理任务推送metrics的代码实现(Python/Shell示例)
- 关键监控指标设计与Grafana仪表盘配置
- 高可用与数据清理最佳实践
一、Pushgateway核心原理与架构
1.1 工作流程
Pushgateway作为metrics中转站,其工作流程如下:
1.2 与拉取模式的对比
| 特性 | 拉取模式(Pull) | 推送模式(Pushgateway) |
|---|---|---|
| 适用场景 | 长期运行服务(如API) | 短期批处理任务 |
| 服务发现 | 依赖K8s SD/文件发现 | 无需服务发现 |
| 健康检查 | 自动生成up指标 |
需手动实现健康状态监控 |
| 数据生命周期 | 随实例销毁自动清理 | 需手动/API清理过期数据 |
| 网络要求 | Prometheus可访问目标 | 任务可访问Pushgateway |
1.3 关键注意事项
根据Prometheus官方最佳实践,使用Pushgateway需注意:
- 避免单点故障:Pushgateway本身需高可用部署
- 数据清理:默认不会自动删除metrics,需通过API或任务结束时清理
-
标签设计:服务级任务(如全量数据备份)应避免使用
instance标签,防止数据残留
二、在kube-prometheus中部署Pushgateway
kube-prometheus默认未包含Pushgateway组件,需通过以下两种方案集成:
2.1 方案一:原生Manifest部署(推荐)
2.1.1 创建命名空间与RBAC配置
# pushgateway-namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: monitoring
labels:
name: monitoring
---
# pushgateway-rbac.yaml
apiVersion: v1
kind: ServiceA***ount
metadata:
name: pushgateway
namespace: monitoring
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: pushgateway
rules:
- apiGroups: [""]
resources: ["services", "endpoints"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: pushgateway
subjects:
- kind: ServiceA***ount
name: pushgateway
namespace: monitoring
roleRef:
kind: ClusterRole
name: pushgateway
apiGroup: rbac.authorization.k8s.io
2.1.2 部署Deployment与Service
# pushgateway-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pushgateway
namespace: monitoring
labels:
app: pushgateway
spec:
replicas: 1 # 单实例(如需HA可部署多实例+负载均衡)
selector:
matchLabels:
app: pushgateway
template:
metadata:
labels:
app: pushgateway
annotations:
prometheus.io/scrape: "true"
prometheus.io/path: "/metrics"
prometheus.io/port: "9091"
spec:
serviceA***ountName: pushgateway
containers:
- name: pushgateway
image: prom/pushgateway:v1.6.2 # 使用最新稳定版
ports:
- containerPort: 9091
args:
- --web.enable-lifecycle # 启用API清理功能
- --persistence.file=/data/metrics # 持久化metrics到文件
- --persistence.interval=5m # 持久化间隔
volumeMounts:
- name: data
mountPath: /data
resources:
requests:
cpu: 50m
memory: 50Mi
limits:
cpu: 200m
memory: 200Mi
volumes:
- name: data
persistentVolumeClaim:
claimName: pushgateway-pvc # 需提前创建PVC
---
# pushgateway-service.yaml
apiVersion: v1
kind: Service
metadata:
name: pushgateway
namespace: monitoring
labels:
app: pushgateway
spec:
selector:
app: pushgateway
ports:
- port: 80
targetPort: 9091
type: ClusterIP # 集群内访问(如需外部访问可配置NodePort/Ingress)
2.1.3 创建ServiceMonitor
让Prometheus自动发现Pushgateway:
# pushgateway-servicemonitor.yaml
apiVersion: monitoring.coreos.***/v1
kind: ServiceMonitor
metadata:
name: pushgateway
namespace: monitoring
labels:
release: kube-prometheus
spec:
selector:
matchLabels:
app: pushgateway
namespaceSelector:
matchNames:
- monitoring
endpoints:
- port: http
interval: 15s # 拉取间隔(与Prometheus配置一致)
path: /metrics
2.2 方案二:Json***自定义集成(高级)
对于使用Json***管理kube-prometheus的场景,可通过以下步骤扩展组件:
- 创建Pushgateway组件定义:
// json***/kube-prometheus/***ponents/pushgateway.libson***
local k = import 'kson***/kson***.beta.4/k.libson***';
local containers = import 'json***/kube-prometheus/***ponents/containers.libson***';
{
pushgateway(
name='pushgateway',
namespace='monitoring',
image='prom/pushgateway:v1.6.2',
replicas=1,
resources={
requests: { cpu: '50m', memory: '50Mi' },
limits: { cpu: '200m', memory: '200Mi' },
},
):: {
deployment: k.apps.v1.deployment.new(
name,
replicas,
k.core.v1.podTemplateSpec.new(
k.core.v1.container.new(name, image) +
k.core.v1.container.withPorts([
k.core.v1.containerPort.new('http', 9091),
]) +
k.core.v1.container.withArgs([
'--web.enable-lifecycle',
'--persistence.file=/data/metrics',
]) +
k.core.v1.container.withResources(resources) +
k.core.v1.container.withVolumeMounts([
k.core.v1.volumeMount.new('data', '/data'),
]),
metadata={ labels: { app: name } },
) +
k.core.v1.podTemplateSpec.withVolumes([
k.core.v1.volume.new('data', k.core.v1.persistentVolumeClaimVolumeSource.new('pushgateway-pvc')),
]),
) +
k.apps.v1.deployment.withSelector({ matchLabels: { app: name } }),
service: k.core.v1.service.new(
name,
[k.core.v1.servicePort.new('http', 80, 9091)],
{ app: name },
) +
k.core.v1.service.withNamespace(namespace),
serviceMonitor: k.monitoring.coreos.***.v1.serviceMonitor.new(
name,
{ matchLabels: { app: name } },
[k.monitoring.coreos.***.v1.serviceMonitorEndpoint.new('http', 15)],
) +
k.monitoring.coreos.***.v1.serviceMonitor.withNamespace(namespace) +
k.monitoring.coreos.***.v1.serviceMonitor.withNamespaceSelector({ matchNames: [namespace] }),
},
}
- 在主配置中引用组件:
// json***/kube-prometheus/main.libson***
local kp = import 'json***/kube-prometheus/main.libson***';
local pushgateway = import 'json***/kube-prometheus/***ponents/pushgateway.libson***';
kp + {
***ponents+: {
pushgateway: pushgateway.pushgateway(),
},
}
- 生成Manifest并部署:
json*** -J vendor -o manifests/pushgateway.json*** main.libson***
kubectl apply -f manifests/
三、批处理任务推送Metrics实现
3.1 核心API说明
Pushgateway提供REST API接收metrics,格式如下:
POST /metrics/job/{job_name}[/{instance}]
Content-Type: text/plain
# HELP job_duration_seconds Duration of the batch job
# TYPE job_duration_seconds gauge
job_duration_seconds 42.5
# HELP job_su***ess 1 if job su***eeded, 0 otherwise
# TYPE job_su***ess gauge
job_su***ess 1
-
job_name:任务名称(必填,如daily-backup) -
instance:实例标识(可选,如worker-01,服务级任务建议省略)
3.2 代码示例
3.2.1 Python实现(使用prometheus-client)
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
import time
import uuid
def main():
# 1. 创建指标注册表
registry = CollectorRegistry()
# 2. 定义指标
job_su***ess = Gauge(
'batch_job_su***ess',
'1 if job su***eeded, 0 otherwise',
registry=registry
)
job_duration = Gauge(
'batch_job_duration_seconds',
'Duration of the batch job in seconds',
registry=registry
)
records_processed = Gauge(
'batch_job_records_processed',
'Number of records processed',
registry=registry
)
start_time = time.time()
job_id = str(uuid.uuid4())[:8] # 生成唯一任务ID
try:
# 3. 执行批处理逻辑(示例:处理1000条记录)
records = 1000
time.sleep(5) # 模拟处理耗时
records_processed.set(records)
# 4. 标记任务成功
job_su***ess.set(1)
print(f"Job {job_id} su***eeded. Processed {records} records.")
except Exception as e:
# 5. 标记任务失败
job_su***ess.set(0)
print(f"Job {job_id} failed: {str(e)}")
finally:
# 6. 记录任务 duration
job_duration.set(time.time() - start_time)
# 7. 推送 metrics 到 Pushgateway
push_to_gateway(
'pushgateway.monitoring.svc:80', # Pushgateway服务地址
job='data-processing', # 任务名称
registry=registry,
grouping_key={'job_id': job_id} # 可选:分组键(用于区分同一任务的不同实例)
)
if __name__ == "__main__":
main()
3.2.2 Shell实现(使用curl)
适用于简单脚本任务:
#!/bin/bash
set -euo pipefail
# 任务元数据
JOB_NAME="log-cleanup"
JOB_ID=$(uuidgen | cut -c1-8)
PUSHGATEWAY_URL="http://pushgateway.monitoring.svc:80/metrics/job/${JOB_NAME}/instance/${HOSTNAME}"
# 定义指标
start_time=$(date +%s)
su***ess=0
files_deleted=0
# 执行任务逻辑
trap 'echo "Job interrupted"; exit 1' SIGINT SIGTERM
echo "Starting log cleanup job ${JOB_ID}..."
files_deleted=$(find /var/log -name "*.log" -mtime +7 -delete -print | wc -l)
su***ess=1 # 若前面命令执行失败,set -e 会直接退出,不会执行此行
# 计算耗时
duration=$(( $(date +%s) - start_time ))
# 推送指标到Pushgateway
cat <<EOF | curl --data-binary @- ${PUSHGATEWAY_URL}
# HELP batch_job_su***ess 1 if job su***eeded, 0 otherwise
# TYPE batch_job_su***ess gauge
batch_job_su***ess ${su***ess}
# HELP batch_job_duration_seconds Duration of the job in seconds
# TYPE batch_job_duration_seconds gauge
batch_job_duration_seconds ${duration}
# HELP batch_job_files_deleted Number of files deleted
# TYPE batch_job_files_deleted gauge
batch_job_files_deleted ${files_deleted}
EOF
echo "Job ${JOB_ID} ***pleted. Deleted ${files_deleted} files in ${duration}s."
3.3 多标签与分组推送
对于复杂场景(如多租户任务),可通过URL路径添加标签:
# 推送时附加环境标签(env)和租户标签(tenant)
curl -X POST http://pushgateway:9091/metrics/job/backup/env/prod/tenant/acme \
--data-binary @- <<EOF
backup_size_bytes 1024000
backup_duration_seconds 30
EOF
在Prometheus中可通过以下查询筛选:
backup_size_bytes{job="backup", env="prod", tenant="acme"}
四、监控指标设计与告警规则
4.1 核心指标体系
批处理任务应至少监控以下指标:
| 指标名称 | 类型 | 说明 | 示例值 |
|---|---|---|---|
batch_job_su***ess |
Gauge | 任务成功状态(1=成功,0=失败) | 1 |
batch_job_duration_seconds |
Gauge | 任务持续时间(秒) | 42.5 |
batch_job_retries_total |
Counter | 任务重试次数 | 2 |
batch_job_records_processed |
Gauge | 处理记录数(适用于数据处理任务) | 1000 |
batch_job_errors_total |
Counter | 任务执行过程中的错误数 | 5 |
4.2 Prometheus告警规则
在prometheus-rules.yaml中添加以下规则:
apiVersion: monitoring.coreos.***/v1
kind: PrometheusRule
metadata:
name: batch-job-alerts
namespace: monitoring
labels:
release: kube-prometheus
spec:
groups:
- name: batch_jobs
rules:
- alert: BatchJobFailure
expr: batch_job_su***ess{job=~"batch.*"} == 0
for: 5m
labels:
severity: critical
annotations:
summary: "批处理任务失败"
description: "任务 {{ $labels.job }} (ID: {{ $labels.job_id }}) 执行失败. 持续时间: {{ $value }}s"
- alert: BatchJobDurationHigh
expr: batch_job_duration_seconds{job=~"batch.*"} > 300 # 超过5分钟
for: 1m
labels:
severity: warning
annotations:
summary: "批处理任务耗时过长"
description: "任务 {{ $labels.job }} 耗时 {{ $value | humanizeDuration }},超过阈值5分钟"
- alert: BatchJobNotRunning
expr: absent(batch_job_su***ess{job="daily-backup"}[24h])
labels:
severity: critical
annotations:
summary: "关键批处理任务未执行"
description: "任务 daily-backup 过去24小时无运行记录,请检查调度"
五、Grafana仪表盘配置
5.1 导入仪表盘
- 创建
batch-jobs-dashboard.json:
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "datasource",
"uid": "grafana"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 100,
"iteration": 1,
"links": [],
"panels": [
{
"collapsed": false,
"datasource": null,
"gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 },
"id": 20,
"panels": [],
"title": "批处理任务概览",
"type": "row"
},
{
"aliasColors": {},
"bars": true,
"dashLength": 10,
"dashes": false,
"datasource": { "type": "prometheus", "uid": "prometheus" },
"fieldConfig": { "defaults": {}, "overrides": [] },
"fill": 1,
"fillGradient": 0,
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 1 },
"hiddenSeries": false,
"id": 2,
"legend": { "avg": false, "current": false, "max": false, "min": false, "show": true, "total": false, "values": false },
"lines": false,
"linewidth": 1,
"nullPointMode": "null",
"options": { "alertThreshold": true },
"percentage": false,
"pluginVersion": "10.2.2",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(batch_job_su***ess{job=~\"$job\"}) by (job, instance)",
"interval": "",
"legendFormat": "{{job}}-{{instance}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "任务成功率",
"tooltip": { "shared": true, "sort": 0, "value_type": "individual" },
"type": "graph",
"xaxis": { "buckets": null, "mode": "time", "name": null, "show": true, "values": [] },
"yaxes": [
{ "format": "short", "label": null, "logBase": 1, "max": "1", "min": "0", "show": true },
{ "format": "short", "label": null, "logBase": 1, "max": null, "min": null, "show": true }
],
"yaxis": { "align": false, "alignLevel": null }
}
// 更多面板配置...
],
"refresh": "10s",
"schemaVersion": 38,
"style": "dark",
"tags": ["batch", "jobs"],
"templating": {
"list": [
{
"allValue": null,
"current": { "selected": false, "text": "All", "value": "$__all" },
"datasource": { "type": "prometheus", "uid": "prometheus" },
"definition": "label_values(batch_job_su***ess, job)",
"description": null,
"error": null,
"hide": 0,
"includeAll": true,
"label": "Job",
"multi": false,
"name": "job",
"options": [],
"query": { "query": "label_values(batch_job_su***ess, job)", "refId": "StandardVariableQuery" },
"refresh": 1,
"regex": "",
"skipUrlSync": false,
"sort": 1,
"tagValuesQuery": "",
"tags": [],
"tagsQuery": "",
"type": "query",
"useTags": false
}
]
},
"time": { "from": "now-6h", "to": "now" },
"timepicker": { "refresh_intervals": ["5s", "10s", "30s", "1m", "5m", "15m", "30m", "1h", "2h", "1d"] },
"timezone": "",
"title": "批处理任务监控",
"uid": "batch-jobs",
"version": 1
}
- 通过Grafana UI导入该JSON,或使用kube-prometheus的ConfigMap挂载:
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-dashboards-batch-jobs
namespace: monitoring
labels:
grafana_dashboard: "true"
data:
batch-jobs.json: |-
# 上述JSON内容
5.2 关键面板展示
最终仪表盘将包含以下核心视图:
- 任务成功率趋势图(按作业/实例分组)
- 任务耗时分布热力图
- 失败任务详情表
- 关键指标卡片(如总任务数、平均耗时、错误率)
六、高可用与最佳实践
6.1 Pushgateway高可用部署
为避免单点故障,可部署多实例Pushgateway + 负载均衡:
实现要点:
- 使用共享存储(如NFS)持久化metrics
- 配置Service为
ClusterIP类型,自动负载均衡 - 多实例间通过文件锁同步数据(需Prometheus 2.30+)
6.2 数据清理策略
由于Pushgateway默认不会自动删除metrics,需通过以下方式清理:
6.2.1 API删除(推荐)
任务结束后主动调用API删除:
# 删除整个任务的metrics
curl -X DELETE http://pushgateway:9091/metrics/job/{job_name}
# 删除特定实例的metrics
curl -X DELETE http://pushgateway:9091/metrics/job/{job_name}/instance/{instance_id}
Python示例:
import requests
def delete_metrics(job_name, job_id):
url = f"http://pushgateway.monitoring.svc:80/metrics/job/{job_name}/job_id/{job_id}"
response = requests.delete(url)
if response.status_code == 202:
print(f"Metrics for job {job_name} (ID: {job_id}) deleted su***essfully")
else:
print(f"Failed to delete metrics: {response.text}")
6.2.2 定时清理脚本
使用CronJob定期清理过期数据(如7天未更新的metrics):
apiVersion: batch/v1
kind: CronJob
metadata:
name: pushgateway-cleanup
namespace: monitoring
spec:
schedule: "0 0 * * *" # 每天凌晨执行
jobTemplate:
spec:
template:
spec:
containers:
- name: cleanup
image: curlimages/curl:latest
***mand: ["/bin/sh", "-c"]
args:
- |
# 获取所有job
jobs=$(curl -s http://pushgateway:9091/api/v1/status | jq -r '.data.jobs[].name')
for job in $jobs; do
# 检查最后更新时间(假设>7天则删除)
last_updated=$(curl -s http://pushgateway:9091/api/v1/status | jq -r ".data.jobs[] | select(.name == \"$job\").lastUpdated")
if [ $(date -d "$last_updated" +%s) -lt $(date -d "7 days ago" +%s) ]; then
echo "Deleting stale job: $job"
curl -X DELETE http://pushgateway:9091/metrics/job/$job
fi
done
restartPolicy: OnFailure
6.3 安全最佳实践
-
网络隔离:
- 使用***workPolicy限制仅批处理任务可访问Pushgateway
- 配置TLS加密(通过Ingress或直接挂载证书)
-
认证授权:
- 启用Basic Auth(通过
--web.config.file配置) - 集成OIDC(适用于企业环境)
- 启用Basic Auth(通过
-
数据保护:
- 定期备份持久化数据目录
- 启用审计日志记录所有推送/删除操作
七、常见问题与故障排查
7.1 Metrics未显示在Prometheus中
排查步骤:
- 检查Pushgateway是否收到数据:
kubectl exec -it -n monitoring <pushgateway-pod> -- curl localhost:9091/metrics - 验证ServiceMonitor配置:
kubectl get servicemonitor pushgateway -n monitoring -o yaml - 查看Prometheus靶标状态:
- 访问Prometheus UI → Status → Targets → 搜索
pushgateway
- 访问Prometheus UI → Status → Targets → 搜索
7.2 数据重复或残留
解决方案:
- 避免在服务级任务中使用
instance标签 - 确保任务结束时调用DELETE API
- 启用持久化并定期清理过期数据
7.3 Pushgateway重启后数据丢失
确保:
- 已正确挂载持久卷(PVC)
- 检查日志确认数据持久化成功:
level=info ts=2023-10-01T08:00:00Z caller=persistence.go:100 msg="persisted metrics to file" file=/data/metrics
八、总结与展望
通过集成Pushgateway,kube-prometheus实现了对批处理任务的完整监控闭环。本文介绍的方案涵盖部署配置、代码实现、指标设计、可视化与高可用实践,可直接应用于生产环境。
未来趋势:
- Prometheus Agent模式与Pushgateway的结合
- 基于Service Mesh(如Istio)的透明推送代理
- 云原生事件驱动监控(如Knative + Pushgateway)
批处理任务监控是DevOps与数据平台的关键环节,合理运用Pushgateway将显著提升系统可观测性与故障排查效率。
如果你觉得本文有帮助,请点赞、收藏并关注后续更新!
下期预告:《基于Prometheus Agent的轻量级批处理监控方案》
【免费下载链接】kube-prometheus prometheus-operator/kube-prometheus: kube-prometheus项目提供了在Kuber***es集群中部署Prometheus监控解决方案的一体化方法,包括Prometheus Server、Alertmanager、Grafana以及其他相关的监控组件,旨在简化在K8s环境下的监控配置与管理。 项目地址: https://gitcode.***/gh_mirrors/ku/kube-prometheus