全部产品
Search
文档中心

容器服务 Kubernetes 版 ACK:基于MongoDB事件源使用事件驱动伸缩

更新时间:Jan 19, 2026

在处理离线作业、流式数据等事件驱动型业务时,传统的基于CPU、内存利用率的HPA可能响应不够及时。ack-keda能够通过监控各类事件源(如消息队列、数据库等)的积压情况,在秒级内自动创建和Job或Deployment,并在任务完成后自动缩容至零,从而实现高效、实时的资源调度和成本优化。

工作原理

ack-keda是ACK集成的增强版KEDA(Kubernetes-based Event-driven Autoscaling),其核心原理是引入Scaler来充当事件源与应用之间的桥梁。

image
  1. 监控事件源:Scaler负责连接到一个外部事件源(如MongoDB),并周期性地查询指定指标(如满足特定条件的文档数量)。

  2. 驱动应用伸缩:

    • Scaler检测到事件积压(例如,查询到有待处理的数据),ack-keda会根据预设规则,快速伸缩ScaledJobScaledObject所关联的目标工作负载(如创建新Job或增加Deployment的Pod副本数)。

    • 当事件处理完毕,Scaler检测到事件积压消失,ack-keda会自动将工作负载缩容,对于Job任务则自动清理已完成的资源,有效避免资源浪费和元数据堆积。

核心优势:

  • 丰富的事件源支持:支持Kafka、MySQL、PostgreSQL、RabbitMQ、MongoDB等数据源。详见RabbitMQ Queue

  • 灵活的并发控制:通过maxReplicaCount等参数可控制任务的最大并发数,防止下游系统被突发流量冲垮。

  • 元数据自动清理:ScaledJob在任务完成后会自动清理已完成的Job及其Pod,避免元数据堆积对API Server造成压力。

原理

本文以一个模拟的视频转码场景为例。当有新的转码任务(一条"state":"waiting"的记录)插入MongoDB数据库时,ack-keda会自动创建一个Job Pod来执行转码任务,并在完成后修改数据状态为"state":"finished"。Job完成后,元数据将自动清理,降低对API Server的压力。

步骤一:部署ack-keda

  1. ACK集群列表页面,单击目标集群名称,在集群详情页左侧导航栏,选择应用 > Helm

  2. 单击创建,按照页面提示定位并选择ack-keda,选择最新Chart 版本,完成组件的安装。

步骤二:部署基于MongoDB事件源驱动弹性示例

1. 创建示例命名空间

本示例将使用mongodb命名空间部署数据库,使用mongodb-test命名空间部署弹性伸缩相关配置。

kubectl create ns mongodb
kubectl create ns mongodb-test

2. 部署MongoDB

如已有MongoDB服务,可跳过。

  1. 创建mongoDB.yaml。

    重要

    此MongoDB服务仅用于功能演示,不具备高可用性,请勿用于生产环境。

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: mongodb
      namespace: mongodb
    spec:
      replicas: 1
      selector:
        matchLabels:
          name: mongodb
      template:
        metadata:
          labels:
            name: mongodb
        spec:
          containers:
          - name: mongodb
            image: registry-cn-shanghai.ack.aliyuncs.com/acs/mongo:v5.0.0
            imagePullPolicy: IfNotPresent
            ports:
            - containerPort: 27017
              name: mongodb
              protocol: TCP
    ---
    kind: Service
    apiVersion: v1
    metadata:
      name: mongodb-svc
      namespace: mongodb
    spec:
      type: ClusterIP
      ports:
      - name: mongodb
        port: 27017
        targetPort: 27017
        protocol: TCP
      selector:
        name: mongodb
  2. 部署Mongo DB。

    kubectl apply -f mongoDB.yaml

3. 初始化MongoDB数据库

  1. 获取MongoDB Pod名称。

    MONGO_POD_NAME=$(kubectl get pods -n mongodb -l name=mongodb -o jsonpath='{.items[0].metadata.name}')
    echo "MongoDB Pod name is: $MONGO_POD_NAME"
  2. test数据库中创建用户test_user并创建test_collection

    # 创建用户
    kubectl exec -n mongodb ${MONGO_POD_NAME} -- mongo --eval 'db.createUser({ user:"test_user",pwd:"test_password",roles:[{ role:"readWrite", db: "test"}]})'
    
    # 验证用户
    kubectl exec -n mongodb ${MONGO_POD_NAME} -- mongo --eval 'db.auth("test_user","test_password")'
    
    # 创建Collection
    kubectl exec -n mongodb ${MONGO_POD_NAME} -- mongo test --eval 'db.createCollection("test_collection")'

4. 配置TriggerAuthentication和ScaledJob

ack-keda通过TriggerAuthentication资源来安全地管理事件源的连接凭证。ScaledJob是ack-keda的核心资源,用于定义伸缩规则、轮询间隔以及待执行的Job任务模板。

  1. 创建auth.yaml,部署TriggerAuthentication。

    例如对于MongoDB事件源,TriggerAuthentication中的secretTargetRef字段会将指定Secret中的连接方式读取到ack-keda中,完成对MongoDB的登录认证。

    apiVersion: keda.sh/v1alpha1
    kind: TriggerAuthentication
    metadata:
      name: mongodb-trigger
      namespace: mongodb-test
    spec:
      secretTargetRef:
        - parameter: connectionString
          name: mongodb-secret
          key: connect
    ---
    apiVersion: v1
    kind: Secret
    metadata:
      name: mongodb-secret
      namespace: mongodb-test
    type: Opaque
    data:
      connect: bW9uZ29kYjovL3Rlc3RfdXNlcjp0ZXN0X3Bhc3N3b3JkQG1vbmdvZGItc3ZjLm1vbmdvZGIuc3ZjLmNsdXN0ZXIubG9jYWw6MjcwMTcvdGVzdA==
  2. 部署TriggerAuthentication。

    kubectl apply -f auth.yaml
  3. 部署ScaledJob。

    ScaledJob主要用于配置Job模板以及指定查询的数据库及查询表达式等。以下示例配置的是从test数据库中的test_collection中,查询满足{"type":"mp4","state":"waiting"}的待转码数据。

    1. 创建scaledJob.yaml。

      apiVersion: keda.sh/v1alpha1
      kind: ScaledJob
      metadata:
        name: mongodb-job
        namespace: mongodb-test
      spec:
        jobTargetRef:
          # Job模板配置
          template:
            spec:
              containers:
                - name: mongo-update
                  image: registry-cn-shanghai.ack.aliyuncs.com/acs/mongo-update:v6
                  args:
                    - --dataBase=test
                    - --collection=test_collection
                    - --operation=updateMany
                    - --update={"$set":{"state":"finished"}}
                  env:
                    - name: MONGODB_CONNECTION_STRING
                      value: mongodb://test_user:test_password@mongodb-svc.mongodb.svc.cluster.local:27017/test
                  imagePullPolicy: IfNotPresent
              restartPolicy: Never
          backoffLimit: 1
        pollingInterval: 15
        maxReplicaCount: 5
        successfulJobsHistoryLimit: 0
        failedJobsHistoryLimit: 10
        triggers:
          - type: mongodb
            metadata:
              dbName: test                               # 要查询的数据库
              collection: test_collection                # 要查询的collection
              query: '{"type":"mp4","state":"waiting"}'  # 会对查询转码类型为mp4且状态是waiting的数据拉起job进行处理
              queryValue: "1"
            authenticationRef:
              name: mongodb-trigger

      query:配置数据条目。当ack-keda查询到Mongo数据库中有满足该条件的数据条目时,将启动Job资源。

    2. 部署ScaledJob。

      kubectl apply -f scaledJob.yaml

步骤三:模拟事件并触发弹性伸缩

  1. mongodb命名空间中的MongoDB插入5条待处理的数据,模拟新任务的到来。

    MONGO_POD_NAME=$(kubectl get pods -n mongodb -l name=mongodb -o jsonpath='{.items[0].metadata.name}')
    
    # 插入5条待转码数据
    kubectl exec -n mongodb ${MONGO_POD_NAME} -- mongo test --eval 'db.test_collection.insert([
      {"type":"mp4","state":"waiting","createTimeStamp":"1610352740","fileName":"My Love"},
      {"type":"mp4","state":"waiting","createTimeStamp":"1610350740","fileName":"Harker"},
      {"type":"mp4","state":"waiting","createTimeStamp":"1610152940","fileName":"The World"},
      {"type":"mp4","state":"waiting","createTimeStamp":"1610390740","fileName":"Mother"},
      {"type":"mp4","state":"waiting","createTimeStamp":"1610344740","fileName":"Jagger"}
    ])'

  2. 验证弹性效果。

    持续观察mongodb-test命名空间下的Job资源。

    watch kubectl get job -n mongodb-test

    预期输出中,系统会创建5个Job,并在任务执行完毕后被自动清理。

    NAME                STATUS     COMPLETIONS   DURATION   AGE
    mongodb-job-4wxgx   Complete   1/1           3s         10s
    mongodb-job-9bs8r   Complete   1/1           3s         10s
    mongodb-job-p6pnb   Complete   1/1           3s         10s
    mongodb-job-pshkv   Complete   1/1           4s         10s
    mongodb-job-t6fs8   Complete   1/1           4s         10s
  3. 查询数据库中的数据状态。

    MONGO_POD_NAME=$(kubectl get pods -n mongodb -l name=mongodb -o jsonpath='{.items[0].metadata.name}')
    kubectl exec -n mongodb ${MONGO_POD_NAME} -- mongo test --eval 'db.test_collection.find({"type":"mp4"}).pretty()'

    预期输出中,所有记录的state字段都已从waiting更新为finished

相关文档