在处理离线作业、流式数据等事件驱动型业务时,传统的基于CPU、内存利用率的HPA可能响应不够及时。ack-keda能够通过监控各类事件源(如消息队列、数据库等)的积压情况,在秒级内自动创建和Job或Deployment,并在任务完成后自动缩容至零,从而实现高效、实时的资源调度和成本优化。
工作原理
ack-keda是ACK集成的增强版KEDA(Kubernetes-based Event-driven Autoscaling),其核心原理是引入Scaler来充当事件源与应用之间的桥梁。
监控事件源:
Scaler负责连接到一个外部事件源(如MongoDB),并周期性地查询指定指标(如满足特定条件的文档数量)。驱动应用伸缩:
当
Scaler检测到事件积压(例如,查询到有待处理的数据),ack-keda会根据预设规则,快速伸缩ScaledJob或ScaledObject所关联的目标工作负载(如创建新Job或增加Deployment的Pod副本数)。当事件处理完毕,
Scaler检测到事件积压消失,ack-keda会自动将工作负载缩容,对于Job任务则自动清理已完成的资源,有效避免资源浪费和元数据堆积。
核心优势:
丰富的事件源支持:支持Kafka、MySQL、PostgreSQL、RabbitMQ、MongoDB等数据源。详见RabbitMQ Queue。
灵活的并发控制:通过
maxReplicaCount等参数可控制任务的最大并发数,防止下游系统被突发流量冲垮。元数据自动清理:
ScaledJob在任务完成后会自动清理已完成的Job及其Pod,避免元数据堆积对API Server造成压力。

本文以一个模拟的视频转码场景为例。当有新的转码任务(一条"state":"waiting"的记录)插入MongoDB数据库时,ack-keda会自动创建一个Job Pod来执行转码任务,并在完成后修改数据状态为"state":"finished"。Job完成后,元数据将自动清理,降低对API Server的压力。
步骤一:部署ack-keda
在ACK集群列表页面,单击目标集群名称,在集群详情页左侧导航栏,选择。
单击创建,按照页面提示定位并选择ack-keda,选择最新Chart 版本,完成组件的安装。
步骤二:部署基于MongoDB事件源驱动弹性示例
1. 创建示例命名空间
本示例将使用mongodb命名空间部署数据库,使用mongodb-test命名空间部署弹性伸缩相关配置。
kubectl create ns mongodb
kubectl create ns mongodb-test2. 部署MongoDB
如已有MongoDB服务,可跳过。
创建mongoDB.yaml。
重要此MongoDB服务仅用于功能演示,不具备高可用性,请勿用于生产环境。
apiVersion: apps/v1 kind: Deployment metadata: name: mongodb namespace: mongodb spec: replicas: 1 selector: matchLabels: name: mongodb template: metadata: labels: name: mongodb spec: containers: - name: mongodb image: registry-cn-shanghai.ack.aliyuncs.com/acs/mongo:v5.0.0 imagePullPolicy: IfNotPresent ports: - containerPort: 27017 name: mongodb protocol: TCP --- kind: Service apiVersion: v1 metadata: name: mongodb-svc namespace: mongodb spec: type: ClusterIP ports: - name: mongodb port: 27017 targetPort: 27017 protocol: TCP selector: name: mongodb部署Mongo DB。
kubectl apply -f mongoDB.yaml
3. 初始化MongoDB数据库
获取MongoDB Pod名称。
MONGO_POD_NAME=$(kubectl get pods -n mongodb -l name=mongodb -o jsonpath='{.items[0].metadata.name}') echo "MongoDB Pod name is: $MONGO_POD_NAME"在
test数据库中创建用户test_user并创建test_collection。# 创建用户 kubectl exec -n mongodb ${MONGO_POD_NAME} -- mongo --eval 'db.createUser({ user:"test_user",pwd:"test_password",roles:[{ role:"readWrite", db: "test"}]})' # 验证用户 kubectl exec -n mongodb ${MONGO_POD_NAME} -- mongo --eval 'db.auth("test_user","test_password")' # 创建Collection kubectl exec -n mongodb ${MONGO_POD_NAME} -- mongo test --eval 'db.createCollection("test_collection")'
4. 配置TriggerAuthentication和ScaledJob
ack-keda通过TriggerAuthentication资源来安全地管理事件源的连接凭证。ScaledJob是ack-keda的核心资源,用于定义伸缩规则、轮询间隔以及待执行的Job任务模板。
创建auth.yaml,部署TriggerAuthentication。
例如对于MongoDB事件源,TriggerAuthentication中的
secretTargetRef字段会将指定Secret中的连接方式读取到ack-keda中,完成对MongoDB的登录认证。apiVersion: keda.sh/v1alpha1 kind: TriggerAuthentication metadata: name: mongodb-trigger namespace: mongodb-test spec: secretTargetRef: - parameter: connectionString name: mongodb-secret key: connect --- apiVersion: v1 kind: Secret metadata: name: mongodb-secret namespace: mongodb-test type: Opaque data: connect: bW9uZ29kYjovL3Rlc3RfdXNlcjp0ZXN0X3Bhc3N3b3JkQG1vbmdvZGItc3ZjLm1vbmdvZGIuc3ZjLmNsdXN0ZXIubG9jYWw6MjcwMTcvdGVzdA==部署TriggerAuthentication。
kubectl apply -f auth.yaml部署ScaledJob。
ScaledJob主要用于配置Job模板以及指定查询的数据库及查询表达式等。以下示例配置的是从test数据库中的test_collection中,查询满足
{"type":"mp4","state":"waiting"}的待转码数据。创建scaledJob.yaml。
apiVersion: keda.sh/v1alpha1 kind: ScaledJob metadata: name: mongodb-job namespace: mongodb-test spec: jobTargetRef: # Job模板配置 template: spec: containers: - name: mongo-update image: registry-cn-shanghai.ack.aliyuncs.com/acs/mongo-update:v6 args: - --dataBase=test - --collection=test_collection - --operation=updateMany - --update={"$set":{"state":"finished"}} env: - name: MONGODB_CONNECTION_STRING value: mongodb://test_user:test_password@mongodb-svc.mongodb.svc.cluster.local:27017/test imagePullPolicy: IfNotPresent restartPolicy: Never backoffLimit: 1 pollingInterval: 15 maxReplicaCount: 5 successfulJobsHistoryLimit: 0 failedJobsHistoryLimit: 10 triggers: - type: mongodb metadata: dbName: test # 要查询的数据库 collection: test_collection # 要查询的collection query: '{"type":"mp4","state":"waiting"}' # 会对查询转码类型为mp4且状态是waiting的数据拉起job进行处理 queryValue: "1" authenticationRef: name: mongodb-triggerquery:配置数据条目。当ack-keda查询到Mongo数据库中有满足该条件的数据条目时,将启动Job资源。部署ScaledJob。
kubectl apply -f scaledJob.yaml
步骤三:模拟事件并触发弹性伸缩
向
mongodb命名空间中的MongoDB插入5条待处理的数据,模拟新任务的到来。MONGO_POD_NAME=$(kubectl get pods -n mongodb -l name=mongodb -o jsonpath='{.items[0].metadata.name}') # 插入5条待转码数据 kubectl exec -n mongodb ${MONGO_POD_NAME} -- mongo test --eval 'db.test_collection.insert([ {"type":"mp4","state":"waiting","createTimeStamp":"1610352740","fileName":"My Love"}, {"type":"mp4","state":"waiting","createTimeStamp":"1610350740","fileName":"Harker"}, {"type":"mp4","state":"waiting","createTimeStamp":"1610152940","fileName":"The World"}, {"type":"mp4","state":"waiting","createTimeStamp":"1610390740","fileName":"Mother"}, {"type":"mp4","state":"waiting","createTimeStamp":"1610344740","fileName":"Jagger"} ])'验证弹性效果。
持续观察mongodb-test命名空间下的Job资源。
watch kubectl get job -n mongodb-test预期输出中,系统会创建5个Job,并在任务执行完毕后被自动清理。
NAME STATUS COMPLETIONS DURATION AGE mongodb-job-4wxgx Complete 1/1 3s 10s mongodb-job-9bs8r Complete 1/1 3s 10s mongodb-job-p6pnb Complete 1/1 3s 10s mongodb-job-pshkv Complete 1/1 4s 10s mongodb-job-t6fs8 Complete 1/1 4s 10s查询数据库中的数据状态。
MONGO_POD_NAME=$(kubectl get pods -n mongodb -l name=mongodb -o jsonpath='{.items[0].metadata.name}') kubectl exec -n mongodb ${MONGO_POD_NAME} -- mongo test --eval 'db.test_collection.find({"type":"mp4"}).pretty()'预期输出中,所有记录的
state字段都已从waiting更新为finished。