kube-prometheus与Prometheus Pushgateway集成:批处理任务监控方案

kube-prometheus与Prometheus Pushgateway集成:批处理任务监控方案

【免费下载链接】kube-prometheus prometheus-operator/kube-prometheus: kube-prometheus项目提供了在Kuber***es集群中部署Prometheus监控解决方案的一体化方法,包括Prometheus Server、Alertmanager、Grafana以及其他相关的监控组件,旨在简化在K8s环境下的监控配置与管理。 项目地址: https://gitcode.***/gh_mirrors/ku/kube-prometheus

引言:批处理任务监控的痛点与解决方案

在Kuber***es(K8s)环境中,传统的Prometheus(普罗米修斯)拉取模式(Pull Model)难以有效监控短暂运行的批处理任务(如定时数据处理、ETL作业、CI/CD流水线等)。这些任务通常具有以下特点:生命周期短(秒级/分钟级)、动态性强(按需调度)、无固定IP/端口,导致Prometheus无法持续抓取其 metrics(指标)。

Prometheus Pushgateway(推送网关) 作为中间件,解决了这一难题。它允许批处理任务主动推送metrics,实现非持久化作业的可靠监控。本文将详细介绍如何在kube-prometheus生态中集成Pushgateway,构建完整的批处理任务监控方案。

读完本文你将掌握:

  • Pushgateway的核心原理与适用场景
  • 在kube-prometheus中部署与配置Pushgateway的两种方案
  • 批处理任务推送metrics的代码实现(Python/Shell示例)
  • 关键监控指标设计与Grafana仪表盘配置
  • 高可用与数据清理最佳实践

一、Pushgateway核心原理与架构

1.1 工作流程

Pushgateway作为metrics中转站,其工作流程如下:

1.2 与拉取模式的对比

特性 拉取模式(Pull) 推送模式(Pushgateway)
适用场景 长期运行服务(如API) 短期批处理任务
服务发现 依赖K8s SD/文件发现 无需服务发现
健康检查 自动生成up指标 需手动实现健康状态监控
数据生命周期 随实例销毁自动清理 需手动/API清理过期数据
网络要求 Prometheus可访问目标 任务可访问Pushgateway

1.3 关键注意事项

根据Prometheus官方最佳实践,使用Pushgateway需注意:

  • 避免单点故障:Pushgateway本身需高可用部署
  • 数据清理:默认不会自动删除metrics,需通过API或任务结束时清理
  • 标签设计:服务级任务(如全量数据备份)应避免使用instance标签,防止数据残留

二、在kube-prometheus中部署Pushgateway

kube-prometheus默认未包含Pushgateway组件,需通过以下两种方案集成:

2.1 方案一:原生Manifest部署(推荐)

2.1.1 创建命名空间与RBAC配置
# pushgateway-namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: monitoring
  labels:
    name: monitoring
---
# pushgateway-rbac.yaml
apiVersion: v1
kind: ServiceA***ount
metadata:
  name: pushgateway
  namespace: monitoring
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: pushgateway
rules:
- apiGroups: [""]
  resources: ["services", "endpoints"]
  verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: pushgateway
subjects:
- kind: ServiceA***ount
  name: pushgateway
  namespace: monitoring
roleRef:
  kind: ClusterRole
  name: pushgateway
  apiGroup: rbac.authorization.k8s.io
2.1.2 部署Deployment与Service
# pushgateway-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: pushgateway
  namespace: monitoring
  labels:
    app: pushgateway
spec:
  replicas: 1  # 单实例(如需HA可部署多实例+负载均衡)
  selector:
    matchLabels:
      app: pushgateway
  template:
    metadata:
      labels:
        app: pushgateway
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/path: "/metrics"
        prometheus.io/port: "9091"
    spec:
      serviceA***ountName: pushgateway
      containers:
      - name: pushgateway
        image: prom/pushgateway:v1.6.2  # 使用最新稳定版
        ports:
        - containerPort: 9091
        args:
        - --web.enable-lifecycle  # 启用API清理功能
        - --persistence.file=/data/metrics  # 持久化metrics到文件
        - --persistence.interval=5m  # 持久化间隔
        volumeMounts:
        - name: data
          mountPath: /data
        resources:
          requests:
            cpu: 50m
            memory: 50Mi
          limits:
            cpu: 200m
            memory: 200Mi
      volumes:
      - name: data
        persistentVolumeClaim:
          claimName: pushgateway-pvc  # 需提前创建PVC
---
# pushgateway-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: pushgateway
  namespace: monitoring
  labels:
    app: pushgateway
spec:
  selector:
    app: pushgateway
  ports:
  - port: 80
    targetPort: 9091
  type: ClusterIP  # 集群内访问(如需外部访问可配置NodePort/Ingress)
2.1.3 创建ServiceMonitor

让Prometheus自动发现Pushgateway:

# pushgateway-servicemonitor.yaml
apiVersion: monitoring.coreos.***/v1
kind: ServiceMonitor
metadata:
  name: pushgateway
  namespace: monitoring
  labels:
    release: kube-prometheus
spec:
  selector:
    matchLabels:
      app: pushgateway
  namespaceSelector:
    matchNames:
    - monitoring
  endpoints:
  - port: http
    interval: 15s  # 拉取间隔(与Prometheus配置一致)
    path: /metrics

2.2 方案二:Json***自定义集成(高级)

对于使用Json***管理kube-prometheus的场景,可通过以下步骤扩展组件:

  1. 创建Pushgateway组件定义
// json***/kube-prometheus/***ponents/pushgateway.libson***
local k = import 'kson***/kson***.beta.4/k.libson***';
local containers = import 'json***/kube-prometheus/***ponents/containers.libson***';

{
  pushgateway(
    name='pushgateway',
    namespace='monitoring',
    image='prom/pushgateway:v1.6.2',
    replicas=1,
    resources={
      requests: { cpu: '50m', memory: '50Mi' },
      limits: { cpu: '200m', memory: '200Mi' },
    },
  ):: {
    deployment: k.apps.v1.deployment.new(
      name,
      replicas,
      k.core.v1.podTemplateSpec.new(
        k.core.v1.container.new(name, image) +
        k.core.v1.container.withPorts([
          k.core.v1.containerPort.new('http', 9091),
        ]) +
        k.core.v1.container.withArgs([
          '--web.enable-lifecycle',
          '--persistence.file=/data/metrics',
        ]) +
        k.core.v1.container.withResources(resources) +
        k.core.v1.container.withVolumeMounts([
          k.core.v1.volumeMount.new('data', '/data'),
        ]),
        metadata={ labels: { app: name } },
      ) +
      k.core.v1.podTemplateSpec.withVolumes([
        k.core.v1.volume.new('data', k.core.v1.persistentVolumeClaimVolumeSource.new('pushgateway-pvc')),
      ]),
    ) +
    k.apps.v1.deployment.withSelector({ matchLabels: { app: name } }),

    service: k.core.v1.service.new(
      name,
      [k.core.v1.servicePort.new('http', 80, 9091)],
      { app: name },
    ) +
    k.core.v1.service.withNamespace(namespace),

    serviceMonitor: k.monitoring.coreos.***.v1.serviceMonitor.new(
      name,
      { matchLabels: { app: name } },
      [k.monitoring.coreos.***.v1.serviceMonitorEndpoint.new('http', 15)],
    ) +
    k.monitoring.coreos.***.v1.serviceMonitor.withNamespace(namespace) +
    k.monitoring.coreos.***.v1.serviceMonitor.withNamespaceSelector({ matchNames: [namespace] }),
  },
}
  1. 在主配置中引用组件
// json***/kube-prometheus/main.libson***
local kp = import 'json***/kube-prometheus/main.libson***';
local pushgateway = import 'json***/kube-prometheus/***ponents/pushgateway.libson***';

kp + {
  ***ponents+: {
    pushgateway: pushgateway.pushgateway(),
  },
}
  1. 生成Manifest并部署
json*** -J vendor -o manifests/pushgateway.json*** main.libson***
kubectl apply -f manifests/

三、批处理任务推送Metrics实现

3.1 核心API说明

Pushgateway提供REST API接收metrics,格式如下:

POST /metrics/job/{job_name}[/{instance}]
Content-Type: text/plain

# HELP job_duration_seconds Duration of the batch job
# TYPE job_duration_seconds gauge
job_duration_seconds 42.5
# HELP job_su***ess 1 if job su***eeded, 0 otherwise
# TYPE job_su***ess gauge
job_su***ess 1
  • job_name:任务名称(必填,如daily-backup
  • instance:实例标识(可选,如worker-01,服务级任务建议省略)

3.2 代码示例

3.2.1 Python实现(使用prometheus-client
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
import time
import uuid

def main():
    # 1. 创建指标注册表
    registry = CollectorRegistry()
    
    # 2. 定义指标
    job_su***ess = Gauge(
        'batch_job_su***ess', 
        '1 if job su***eeded, 0 otherwise',
        registry=registry
    )
    job_duration = Gauge(
        'batch_job_duration_seconds', 
        'Duration of the batch job in seconds',
        registry=registry
    )
    records_processed = Gauge(
        'batch_job_records_processed', 
        'Number of records processed',
        registry=registry
    )

    start_time = time.time()
    job_id = str(uuid.uuid4())[:8]  # 生成唯一任务ID
    
    try:
        # 3. 执行批处理逻辑(示例:处理1000条记录)
        records = 1000
        time.sleep(5)  # 模拟处理耗时
        records_processed.set(records)
        
        # 4. 标记任务成功
        job_su***ess.set(1)
        print(f"Job {job_id} su***eeded. Processed {records} records.")
        
    except Exception as e:
        # 5. 标记任务失败
        job_su***ess.set(0)
        print(f"Job {job_id} failed: {str(e)}")
    finally:
        # 6. 记录任务 duration
        job_duration.set(time.time() - start_time)
        
        # 7. 推送 metrics 到 Pushgateway
        push_to_gateway(
            'pushgateway.monitoring.svc:80',  # Pushgateway服务地址
            job='data-processing',  # 任务名称
            registry=registry,
            grouping_key={'job_id': job_id}  # 可选:分组键(用于区分同一任务的不同实例)
        )

if __name__ == "__main__":
    main()
3.2.2 Shell实现(使用curl

适用于简单脚本任务:

#!/bin/bash
set -euo pipefail

# 任务元数据
JOB_NAME="log-cleanup"
JOB_ID=$(uuidgen | cut -c1-8)
PUSHGATEWAY_URL="http://pushgateway.monitoring.svc:80/metrics/job/${JOB_NAME}/instance/${HOSTNAME}"

# 定义指标
start_time=$(date +%s)
su***ess=0
files_deleted=0

# 执行任务逻辑
trap 'echo "Job interrupted"; exit 1' SIGINT SIGTERM
echo "Starting log cleanup job ${JOB_ID}..."

files_deleted=$(find /var/log -name "*.log" -mtime +7 -delete -print | wc -l)
su***ess=1  # 若前面命令执行失败,set -e 会直接退出,不会执行此行

# 计算耗时
duration=$(( $(date +%s) - start_time ))

# 推送指标到Pushgateway
cat <<EOF | curl --data-binary @- ${PUSHGATEWAY_URL}
# HELP batch_job_su***ess 1 if job su***eeded, 0 otherwise
# TYPE batch_job_su***ess gauge
batch_job_su***ess ${su***ess}
# HELP batch_job_duration_seconds Duration of the job in seconds
# TYPE batch_job_duration_seconds gauge
batch_job_duration_seconds ${duration}
# HELP batch_job_files_deleted Number of files deleted
# TYPE batch_job_files_deleted gauge
batch_job_files_deleted ${files_deleted}
EOF

echo "Job ${JOB_ID} ***pleted. Deleted ${files_deleted} files in ${duration}s."

3.3 多标签与分组推送

对于复杂场景(如多租户任务),可通过URL路径添加标签:

# 推送时附加环境标签(env)和租户标签(tenant)
curl -X POST http://pushgateway:9091/metrics/job/backup/env/prod/tenant/acme \
  --data-binary @- <<EOF
backup_size_bytes 1024000
backup_duration_seconds 30
EOF

在Prometheus中可通过以下查询筛选:

backup_size_bytes{job="backup", env="prod", tenant="acme"}

四、监控指标设计与告警规则

4.1 核心指标体系

批处理任务应至少监控以下指标:

指标名称 类型 说明 示例值
batch_job_su***ess Gauge 任务成功状态(1=成功,0=失败) 1
batch_job_duration_seconds Gauge 任务持续时间(秒) 42.5
batch_job_retries_total Counter 任务重试次数 2
batch_job_records_processed Gauge 处理记录数(适用于数据处理任务) 1000
batch_job_errors_total Counter 任务执行过程中的错误数 5

4.2 Prometheus告警规则

prometheus-rules.yaml中添加以下规则:

apiVersion: monitoring.coreos.***/v1
kind: PrometheusRule
metadata:
  name: batch-job-alerts
  namespace: monitoring
  labels:
    release: kube-prometheus
spec:
  groups:
  - name: batch_jobs
    rules:
    - alert: BatchJobFailure
      expr: batch_job_su***ess{job=~"batch.*"} == 0
      for: 5m
      labels:
        severity: critical
      annotations:
        summary: "批处理任务失败"
        description: "任务 {{ $labels.job }} (ID: {{ $labels.job_id }}) 执行失败. 持续时间: {{ $value }}s"
        
    - alert: BatchJobDurationHigh
      expr: batch_job_duration_seconds{job=~"batch.*"} > 300  # 超过5分钟
      for: 1m
      labels:
        severity: warning
      annotations:
        summary: "批处理任务耗时过长"
        description: "任务 {{ $labels.job }} 耗时 {{ $value | humanizeDuration }},超过阈值5分钟"
        
    - alert: BatchJobNotRunning
      expr: absent(batch_job_su***ess{job="daily-backup"}[24h])
      labels:
        severity: critical
      annotations:
        summary: "关键批处理任务未执行"
        description: "任务 daily-backup 过去24小时无运行记录,请检查调度"

五、Grafana仪表盘配置

5.1 导入仪表盘

  1. 创建batch-jobs-dashboard.json
{
  "annotations": {
    "list": [
      {
        "builtIn": 1,
        "datasource": {
          "type": "datasource",
          "uid": "grafana"
        },
        "enable": true,
        "hide": true,
        "iconColor": "rgba(0, 211, 255, 1)",
        "name": "Annotations & Alerts",
        "type": "dashboard"
      }
    ]
  },
  "editable": true,
  "fiscalYearStartMonth": 0,
  "graphTooltip": 0,
  "id": 100,
  "iteration": 1,
  "links": [],
  "panels": [
    {
      "collapsed": false,
      "datasource": null,
      "gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 },
      "id": 20,
      "panels": [],
      "title": "批处理任务概览",
      "type": "row"
    },
    {
      "aliasColors": {},
      "bars": true,
      "dashLength": 10,
      "dashes": false,
      "datasource": { "type": "prometheus", "uid": "prometheus" },
      "fieldConfig": { "defaults": {}, "overrides": [] },
      "fill": 1,
      "fillGradient": 0,
      "gridPos": { "h": 8, "w": 12, "x": 0, "y": 1 },
      "hiddenSeries": false,
      "id": 2,
      "legend": { "avg": false, "current": false, "max": false, "min": false, "show": true, "total": false, "values": false },
      "lines": false,
      "linewidth": 1,
      "nullPointMode": "null",
      "options": { "alertThreshold": true },
      "percentage": false,
      "pluginVersion": "10.2.2",
      "pointradius": 2,
      "points": false,
      "renderer": "flot",
      "seriesOverrides": [],
      "spaceLength": 10,
      "stack": false,
      "steppedLine": false,
      "targets": [
        {
          "expr": "sum(batch_job_su***ess{job=~\"$job\"}) by (job, instance)",
          "interval": "",
          "legendFormat": "{{job}}-{{instance}}",
          "refId": "A"
        }
      ],
      "thresholds": [],
      "timeFrom": null,
      "timeRegions": [],
      "timeShift": null,
      "title": "任务成功率",
      "tooltip": { "shared": true, "sort": 0, "value_type": "individual" },
      "type": "graph",
      "xaxis": { "buckets": null, "mode": "time", "name": null, "show": true, "values": [] },
      "yaxes": [
        { "format": "short", "label": null, "logBase": 1, "max": "1", "min": "0", "show": true },
        { "format": "short", "label": null, "logBase": 1, "max": null, "min": null, "show": true }
      ],
      "yaxis": { "align": false, "alignLevel": null }
    }
    // 更多面板配置...
  ],
  "refresh": "10s",
  "schemaVersion": 38,
  "style": "dark",
  "tags": ["batch", "jobs"],
  "templating": {
    "list": [
      {
        "allValue": null,
        "current": { "selected": false, "text": "All", "value": "$__all" },
        "datasource": { "type": "prometheus", "uid": "prometheus" },
        "definition": "label_values(batch_job_su***ess, job)",
        "description": null,
        "error": null,
        "hide": 0,
        "includeAll": true,
        "label": "Job",
        "multi": false,
        "name": "job",
        "options": [],
        "query": { "query": "label_values(batch_job_su***ess, job)", "refId": "StandardVariableQuery" },
        "refresh": 1,
        "regex": "",
        "skipUrlSync": false,
        "sort": 1,
        "tagValuesQuery": "",
        "tags": [],
        "tagsQuery": "",
        "type": "query",
        "useTags": false
      }
    ]
  },
  "time": { "from": "now-6h", "to": "now" },
  "timepicker": { "refresh_intervals": ["5s", "10s", "30s", "1m", "5m", "15m", "30m", "1h", "2h", "1d"] },
  "timezone": "",
  "title": "批处理任务监控",
  "uid": "batch-jobs",
  "version": 1
}
  1. 通过Grafana UI导入该JSON,或使用kube-prometheus的ConfigMap挂载:
apiVersion: v1
kind: ConfigMap
metadata:
  name: grafana-dashboards-batch-jobs
  namespace: monitoring
  labels:
    grafana_dashboard: "true"
data:
  batch-jobs.json: |-
    # 上述JSON内容

5.2 关键面板展示

最终仪表盘将包含以下核心视图:

  • 任务成功率趋势图(按作业/实例分组)
  • 任务耗时分布热力图
  • 失败任务详情表
  • 关键指标卡片(如总任务数、平均耗时、错误率)

六、高可用与最佳实践

6.1 Pushgateway高可用部署

为避免单点故障,可部署多实例Pushgateway + 负载均衡:

实现要点:
  • 使用共享存储(如NFS)持久化metrics
  • 配置Service为ClusterIP类型,自动负载均衡
  • 多实例间通过文件锁同步数据(需Prometheus 2.30+)

6.2 数据清理策略

由于Pushgateway默认不会自动删除metrics,需通过以下方式清理:

6.2.1 API删除(推荐)

任务结束后主动调用API删除:

# 删除整个任务的metrics
curl -X DELETE http://pushgateway:9091/metrics/job/{job_name}

# 删除特定实例的metrics
curl -X DELETE http://pushgateway:9091/metrics/job/{job_name}/instance/{instance_id}

Python示例:

import requests

def delete_metrics(job_name, job_id):
    url = f"http://pushgateway.monitoring.svc:80/metrics/job/{job_name}/job_id/{job_id}"
    response = requests.delete(url)
    if response.status_code == 202:
        print(f"Metrics for job {job_name} (ID: {job_id}) deleted su***essfully")
    else:
        print(f"Failed to delete metrics: {response.text}")
6.2.2 定时清理脚本

使用CronJob定期清理过期数据(如7天未更新的metrics):

apiVersion: batch/v1
kind: CronJob
metadata:
  name: pushgateway-cleanup
  namespace: monitoring
spec:
  schedule: "0 0 * * *"  # 每天凌晨执行
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: cleanup
            image: curlimages/curl:latest
            ***mand: ["/bin/sh", "-c"]
            args:
            - |
              # 获取所有job
              jobs=$(curl -s http://pushgateway:9091/api/v1/status | jq -r '.data.jobs[].name')
              for job in $jobs; do
                # 检查最后更新时间(假设>7天则删除)
                last_updated=$(curl -s http://pushgateway:9091/api/v1/status | jq -r ".data.jobs[] | select(.name == \"$job\").lastUpdated")
                if [ $(date -d "$last_updated" +%s) -lt $(date -d "7 days ago" +%s) ]; then
                  echo "Deleting stale job: $job"
                  curl -X DELETE http://pushgateway:9091/metrics/job/$job
                fi
              done
          restartPolicy: OnFailure

6.3 安全最佳实践

  1. 网络隔离

    • 使用***workPolicy限制仅批处理任务可访问Pushgateway
    • 配置TLS加密(通过Ingress或直接挂载证书)
  2. 认证授权

    • 启用Basic Auth(通过--web.config.file配置)
    • 集成OIDC(适用于企业环境)
  3. 数据保护

    • 定期备份持久化数据目录
    • 启用审计日志记录所有推送/删除操作

七、常见问题与故障排查

7.1 Metrics未显示在Prometheus中

排查步骤:

  1. 检查Pushgateway是否收到数据:
    kubectl exec -it -n monitoring <pushgateway-pod> -- curl localhost:9091/metrics
    
  2. 验证ServiceMonitor配置:
    kubectl get servicemonitor pushgateway -n monitoring -o yaml
    
  3. 查看Prometheus靶标状态:
    • 访问Prometheus UI → Status → Targets → 搜索pushgateway

7.2 数据重复或残留

解决方案:

  • 避免在服务级任务中使用instance标签
  • 确保任务结束时调用DELETE API
  • 启用持久化并定期清理过期数据

7.3 Pushgateway重启后数据丢失

确保:

  • 已正确挂载持久卷(PVC)
  • 检查日志确认数据持久化成功:
    level=info ts=2023-10-01T08:00:00Z caller=persistence.go:100 msg="persisted metrics to file" file=/data/metrics
    

八、总结与展望

通过集成Pushgateway,kube-prometheus实现了对批处理任务的完整监控闭环。本文介绍的方案涵盖部署配置、代码实现、指标设计、可视化与高可用实践,可直接应用于生产环境。

未来趋势

  • Prometheus Agent模式与Pushgateway的结合
  • 基于Service Mesh(如Istio)的透明推送代理
  • 云原生事件驱动监控(如Knative + Pushgateway)

批处理任务监控是DevOps与数据平台的关键环节,合理运用Pushgateway将显著提升系统可观测性与故障排查效率。


如果你觉得本文有帮助,请点赞、收藏并关注后续更新!
下期预告:《基于Prometheus Agent的轻量级批处理监控方案》

【免费下载链接】kube-prometheus prometheus-operator/kube-prometheus: kube-prometheus项目提供了在Kuber***es集群中部署Prometheus监控解决方案的一体化方法,包括Prometheus Server、Alertmanager、Grafana以及其他相关的监控组件,旨在简化在K8s环境下的监控配置与管理。 项目地址: https://gitcode.***/gh_mirrors/ku/kube-prometheus

转载请说明出处内容投诉
CSS教程网 » kube-prometheus与Prometheus Pushgateway集成:批处理任务监控方案

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买