ack-keda提供事件驅動彈效能力,會從事件來源中進行資料的周期性消費,當訊息出現堆積,即可秒級觸發一個批次的離線任務伸縮。RabbitMQ和Keda搭配,可以監控隊列長度和訊息速率指標。本文介紹如何利用RabbitMQ指標訊息佇列長度和事件驅動自動調整工具Keda實現應用的Auto Scaling。
前提條件
已部署ack-keda組件,請參見事件驅動彈性。
已建立雲訊息佇列 RabbitMQ 版執行個體及相關資源,請參見建立資源。
通過kubectl已串連Kubernetes叢集,請參見通過kubectl串連叢集。
已安裝Go語言運行環境。
步驟一:部署工作負載建立應用
登入Container Service管理主控台,在左側導覽列選擇叢集列表。
在叢集列表頁面,單擊目的地組群名稱,然後在左側導覽列,選擇。
在無狀態頁面,單擊使用YAML建立資源,選擇樣本模板為自訂,使用如下內容建立名為sample-app的應用,然後單擊建立。
apiVersion: apps/v1 kind: Deployment metadata: name: sample-app namespace: default labels: app: sample-deploy spec: replicas: 2 selector: matchLabels: app: sample-app template: metadata: labels: app: sample-app spec: containers: - name: consumer image: consumer # 修改為業務真實的RabbitMQ的消費者鏡像。 resources: limits: cpu: "500m"
步驟二:部署基於RabbitMQ指標的彈性樣本
以下步驟通過擷取已建立RabbitMQ訊息佇列的執行個體資訊,對接RabbitMQ與HPA伸縮指標,實現將RabbitMQ指標轉換為HPA可用指標,並實現容器自動調整。
查看訊息佇列RabbitMQ執行個體資訊。
登入雲訊息佇列 RabbitMQ 版控制台,然後在左側導覽列選擇实例列表。
單擊目標執行個體名稱,進入執行個體詳情頁面。在存取點資訊頁簽,查看並記錄公網存取點的Endpoint。
使用公網存取點會增加安全風險,可能會受到外部攻擊或未授權訪問。本執行個體使用公網存取點僅作為示範。如果應用主要在阿里雲VPC內運行,無需外部存取,建議不開啟公網訪問。
在左側導覽列,單擊使用者和許可權管理,查看並記錄使用者名稱和密碼。
如未建立,可參見使用者和許可權管理建立。
在左側導覽列中單擊Vhost 列表,查看並記錄Vhost的值,例如amq-test。
執行如下指令,建立並記錄串連認證字串。
echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64其中
rabbitmq-username和rabbitmq-password分別為此前擷取的使用者名稱和密碼,localhost為記錄的Endpoint,vhost為上一步記錄的名稱。使用如下YAML建立Secret。
apiVersion: v1 kind: Secret metadata: name: keda-rabbitmq-secret data: host: YW1x****** # 已建立的串連認證字串。使用如下YAML建立TriggerAuthentication對象,並將該YAML檔案部署到叢集中。
apiVersion: keda.sh/v1alpha1 kind: TriggerAuthentication metadata: name: keda-trigger-auth-rabbitmq-conn namespace: default spec: secretTargetRef: - parameter: host name: keda-rabbitmq-secret # keda-rabbitmq-secret為上一步建立的Secret。 key: host執行如下命令,將YAML檔案部署到叢集中。
kubectl apply -f secret.yaml kubectl apply -f rabbitmq-trigger-auth.yaml部署完成後,可在Keda中使用RabbitMQ觸發器,並且可以通過引用TriggerAuthentication對象來串連RabbitMQ擷取指標資料。
使用如下內容,建立YAML檔案ScaledObject.yaml。
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: rabbitmq-scaledobject namespace: default spec: scaleTargetRef: name: sample-app maxReplicaCount: 10 minReplicaCount: 1 triggers: - type: rabbitmq metadata: protocol: amqp queueName: queue-test mode: QueueLength value: "20" metricName: queue-test authenticationRef: name: keda-trigger-auth-rabbitmq-conn參數
說明
scaleTargetRef配置擴縮容的對象。本樣本配置為步驟一:部署工作負載建立應用已建立的應用sample-app。
maxReplicaCount最大副本數。
minReplicaCount最小副本數。
protocolkeda組件與RabbitMQ之間的通訊協定。取值範圍:auto、http、amqp。
queueName待讀取資訊的隊列名稱。
value觸發擴容的閾值。
metricName自訂指標的名稱,用於告訴HPA從哪個指標擷取資料來決策擴縮容。在這裡是RabbitMQ訊息佇列名稱queue-test,用於擷取隊列長度。
應用配置,查看ScaledObject、HPA資源狀態。
執行如下命令,建立資源。
kubectl apply -f ScaledObject.yaml執行如下命令,擷取伸縮配置狀態。
kubectl get ScaledObject預期輸出:
NAME SCALETARGETKIND SCALETARGETNAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE FALLBACK AGE rabbitmq-scaledobject apps/v1.Deployment sample-app 1 10 rabbitmq keda-trigger-auth-rabbitmq-conn True False False 17s執行如下命令,查看HPA的產生情況。
kubectl get hpa預期輸出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 2 2m35s
步驟三:生產及消費資料實現擴縮容
下面通過RabbitMQ訊息佇列的生產者與消費者代碼,展示利用RabbitMQ訊息佇列長度實現容器的伸縮效果。
基於以下producer代碼生產RabbitMQ訊息。
package main import ( "fmt" "log" "time" "github.com/streadway/amqp" ) const ( queueName = "queue-test" // 替換為待讀取資訊的隊列名稱 numMsgs = 10000 pauseTime = 10 * time.Millisecond url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // 訪問的RabbitMQ的url,拼接方式amqp://username:password@localhost:5672/vhost ) func main() { conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") go produce(ch, q) select {} } // 生產訊息佇列 func produce(ch *amqp.Channel, q amqp.Queue) { for i := 0; i < numMsgs; i++ { msg := fmt.Sprintf("Message %d", i) err := ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) failOnError(err, "Failed to publish a message") log.Printf("Successed to publish a message: %s", msg) time.Sleep(pauseTime) } } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }初始化Go環境並下載依賴。
go mod init producer go mod tidy查看HPA詳情。
運行producer代碼生產訊息佇列。
go run producer.go執行如下命令,查看HPA詳情
kubectl get hpa預期輸出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s預期輸出表明,sample-app已經擴容到keda組件設定的最大值。
關閉producer程式,運行下面consumer程式消費隊列。
package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( queueName = "queue-test" url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // 訪問的RabbitMQ的url,拼接方式amqp://username:password@localhost:5672/vhost ) func main() { // 串連到 RabbitMQ conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 建立一個通道 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 聲明隊列(確保隊列存在) _, err = ch.QueueDeclare( queueName, // 隊列名稱 true, // 是否持久化 false, // 是否排外 false, // 是否在隊列中等待 false, // 是否自動刪除 nil, // 額外屬性 ) failOnError(err, "Failed to declare a queue") // 擷取訊息通道 msgs, err := ch.Consume( queueName, // 隊列名稱 "", // 消費者的名字,如果不需要可以留空 true, // 自動確認 false, // 是否排外 false, // 是否在隊列中等待 false, // 是否必須優先 nil, // 額外的參數 ) failOnError(err, "Failed to register a consumer") // 建立一個 goroutine 處理接收到的訊息 go func() { for msg := range msgs { log.Printf("Received a message: %s", msg.Body) // 在這裡可以處理接收到的訊息,例如進行商務邏輯處理 fmt.Printf("Processed message: %s\n", msg.Body) } }() // 阻塞主程式,保持運行 select {} } // 簡單的錯誤處理函數 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }運行consumer代碼消費訊息。
go run consumer.go執行如下命令,監控HPA縮容,
kubectl get hpa -w預期輸出:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 235000m/20 (avg) 1 10 10 9m51s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 10 10m keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 1 15m預期輸出表明,在資料消費結束一段時間後,sample-app縮容至Keda組件設定的最小值。
相關文檔
您還可以根據自訂的 RocketMQ 訊息堆積指標配置 HPA,實現更靈活的訊息驅動擴縮容,請參見基於RocketMQ指標的KEDA。