全部產品
Search
文件中心

Container Service for Kubernetes:基於RabbitMQ指標的容器水平伸縮

更新時間:Nov 01, 2025

ack-keda提供事件驅動彈效能力,會從事件來源中進行資料的周期性消費,當訊息出現堆積,即可秒級觸發一個批次的離線任務伸縮。RabbitMQ和Keda搭配,可以監控隊列長度和訊息速率指標。本文介紹如何利用RabbitMQ指標訊息佇列長度和事件驅動自動調整工具Keda實現應用的Auto Scaling。

前提條件

步驟一:部署工作負載建立應用

  1. 登入Container Service管理主控台,在左側導覽列選擇叢集列表

  2. 叢集列表頁面,單擊目的地組群名稱,然後在左側導覽列,選擇工作負載 > 無狀態

  3. 無狀態頁面,單擊使用YAML建立資源,選擇樣本模板自訂,使用如下內容建立名為sample-app的應用,然後單擊建立

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sample-app
      namespace: default
      labels:
        app: sample-deploy
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: sample-app
      template:
        metadata:
          labels:
            app: sample-app
        spec:
          containers:
          - name: consumer
            image: consumer  # 修改為業務真實的RabbitMQ的消費者鏡像。
            resources:
              limits:
                cpu: "500m"

步驟二:部署基於RabbitMQ指標的彈性樣本

以下步驟通過擷取已建立RabbitMQ訊息佇列的執行個體資訊,對接RabbitMQ與HPA伸縮指標,實現將RabbitMQ指標轉換為HPA可用指標,並實現容器自動調整。

  1. 查看訊息佇列RabbitMQ執行個體資訊。

    1. 登入雲訊息佇列 RabbitMQ 版控制台,然後在左側導覽列選擇实例列表

    2. 單擊目標執行個體名稱,進入執行個體詳情頁面。在存取點資訊頁簽,查看並記錄公網存取點Endpoint

      使用公網存取點會增加安全風險,可能會受到外部攻擊或未授權訪問。本執行個體使用公網存取點僅作為示範。如果應用主要在阿里雲VPC內運行,無需外部存取,建議不開啟公網訪問。

    3. 在左側導覽列,單擊使用者和許可權管理,查看並記錄使用者名稱密碼

      如未建立,可參見使用者和許可權管理建立。
    4. 在左側導覽列中單擊Vhost 列表,查看並記錄Vhost的值,例如amq-test

  2. 執行如下指令,建立並記錄串連認證字串。

    echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64

    其中rabbitmq-usernamerabbitmq-password分別為此前擷取的使用者名稱密碼localhost為記錄的Endpoint,vhost為上一步記錄的名稱。

  3. 使用如下YAML建立Secret。

    apiVersion: v1
    kind: Secret
    metadata:
      name: keda-rabbitmq-secret
    data:
      host: YW1x****** # 已建立的串連認證字串。
  4. 使用如下YAML建立TriggerAuthentication對象,並將該YAML檔案部署到叢集中。

    apiVersion: keda.sh/v1alpha1
    kind: TriggerAuthentication
    metadata:
      name: keda-trigger-auth-rabbitmq-conn
      namespace: default
    spec:
      secretTargetRef:
        - parameter: host
          name: keda-rabbitmq-secret # keda-rabbitmq-secret為上一步建立的Secret。
          key: host

    執行如下命令,將YAML檔案部署到叢集中。

    kubectl apply -f secret.yaml
    kubectl apply -f rabbitmq-trigger-auth.yaml

    部署完成後,可在Keda中使用RabbitMQ觸發器,並且可以通過引用TriggerAuthentication對象來串連RabbitMQ擷取指標資料。

  5. 使用如下內容,建立YAML檔案ScaledObject.yaml。

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: rabbitmq-scaledobject
      namespace: default
    spec:
      scaleTargetRef:
        name: sample-app
      maxReplicaCount: 10
      minReplicaCount: 1
      triggers:
      - type: rabbitmq
        metadata:
          protocol: amqp
          queueName: queue-test
          mode: QueueLength
          value: "20"
          metricName: queue-test 
        authenticationRef:
          name: keda-trigger-auth-rabbitmq-conn

    參數

    說明

    scaleTargetRef

    配置擴縮容的對象。本樣本配置為步驟一:部署工作負載建立應用已建立的應用sample-app。

    maxReplicaCount

    最大副本數。

    minReplicaCount

    最小副本數。

    protocol

    keda組件與RabbitMQ之間的通訊協定。取值範圍:auto、http、amqp。

    queueName

    待讀取資訊的隊列名稱。

    value

    觸發擴容的閾值。

    metricName

    自訂指標的名稱,用於告訴HPA從哪個指標擷取資料來決策擴縮容。在這裡是RabbitMQ訊息佇列名稱queue-test,用於擷取隊列長度。

  6. 應用配置,查看ScaledObject、HPA資源狀態。

    執行如下命令,建立資源。

    kubectl apply -f ScaledObject.yaml  

    執行如下命令,擷取伸縮配置狀態。

    kubectl get ScaledObject

    預期輸出:

    NAME                    SCALETARGETKIND      SCALETARGETNAME   MIN   MAX   TRIGGERS   AUTHENTICATION                    READY   ACTIVE   FALLBACK   AGE
    rabbitmq-scaledobject   apps/v1.Deployment   sample-app        1     10   rabbitmq   keda-trigger-auth-rabbitmq-conn    True    False    False      17s

    執行如下命令,查看HPA的產生情況。

    kubectl get hpa

    預期輸出:

    NAME                             REFERENCE               TARGETS      MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)   1         10        2          2m35s

步驟三:生產及消費資料實現擴縮容

下面通過RabbitMQ訊息佇列的生產者與消費者代碼,展示利用RabbitMQ訊息佇列長度實現容器的伸縮效果。

  1. 基於以下producer代碼生產RabbitMQ訊息。

    package main
    
    import (
    	"fmt"
    	"log"
    	"time"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"    // 替換為待讀取資訊的隊列名稱
    	numMsgs   = 10000
    	pauseTime = 10 * time.Millisecond
            url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test"   // 訪問的RabbitMQ的url,拼接方式amqp://username:password@localhost:5672/vhost
    )
    
    func main() {
    	conn, err := amqp.Dial(url)
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		queueName,
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	failOnError(err, "Failed to declare a queue")
    	go produce(ch, q)
    	select {}
    }
    // 生產訊息佇列
    func produce(ch *amqp.Channel, q amqp.Queue) {
    	for i := 0; i < numMsgs; i++ {
    		msg := fmt.Sprintf("Message %d", i)
    		err := ch.Publish(
    			"",
    			q.Name,
    			false,
    			false,
    			amqp.Publishing{
    				ContentType: "text/plain",
    				Body:        []byte(msg),
    			},
    		)
    		failOnError(err, "Failed to publish a message")
    		log.Printf("Successed to publish a message: %s", msg)
    		time.Sleep(pauseTime)
    	}
    }
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
  2. 初始化Go環境並下載依賴。

    go mod init producer
    go mod tidy
  3. 查看HPA詳情。

    運行producer代碼生產訊息佇列。

    go run producer.go

    執行如下命令,查看HPA詳情

    kubectl get hpa

    預期輸出:

    NAME                               REFERENCE               TARGETS           MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app    443000m/20 (avg)   1         10        10         9m15s

    預期輸出表明,sample-app已經擴容到keda組件設定的最大值。

  4. 關閉producer程式,運行下面consumer程式消費隊列。

    package main
    
    import (
    	"fmt"
    	"log"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"
    	url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test"   // 訪問的RabbitMQ的url,拼接方式amqp://username:password@localhost:5672/vhost
    )
    
    func main() {
    	// 串連到 RabbitMQ
    	conn, err := amqp.Dial(url)
    	failOnError(err, "Failed to connect to RabbitMQ")
    	defer conn.Close()
    
    	// 建立一個通道
    	ch, err := conn.Channel()
    	failOnError(err, "Failed to open a channel")
    	defer ch.Close()
    
    	// 聲明隊列(確保隊列存在)
    	_, err = ch.QueueDeclare(
    		queueName, // 隊列名稱
    		true,      // 是否持久化
    		false,     // 是否排外
    		false,     // 是否在隊列中等待
    		false,     // 是否自動刪除
    		nil,       // 額外屬性
    	)
    	failOnError(err, "Failed to declare a queue")
    
    	// 擷取訊息通道
    	msgs, err := ch.Consume(
    		queueName, // 隊列名稱
    		"",        // 消費者的名字,如果不需要可以留空
    		true,      // 自動確認
    		false,     // 是否排外
    		false,     // 是否在隊列中等待
    		false,     // 是否必須優先
    		nil,       // 額外的參數
    	)
    	failOnError(err, "Failed to register a consumer")
    
    	// 建立一個 goroutine 處理接收到的訊息
    	go func() {
    		for msg := range msgs {
    			log.Printf("Received a message: %s", msg.Body)
    			// 在這裡可以處理接收到的訊息,例如進行商務邏輯處理
    			fmt.Printf("Processed message: %s\n", msg.Body)
    		}
    	}()
    
    	// 阻塞主程式,保持運行
    	select {}
    }
    
    // 簡單的錯誤處理函數
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }

    運行consumer代碼消費訊息。

    go run consumer.go

    執行如下命令,監控HPA縮容,

    kubectl get hpa -w

    預期輸出:

    NAME                               REFERENCE               TARGETS            MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   443000m/20 (avg)   1         10        10         9m15s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   235000m/20 (avg)   1         10        10         9m51s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        10         10m
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        1          15m

    預期輸出表明,在資料消費結束一段時間後,sample-app縮容至Keda組件設定的最小值。

相關文檔

您還可以根據自訂的 RocketMQ 訊息堆積指標配置 HPA,實現更靈活的訊息驅動擴縮容,請參見基於RocketMQ指標的KEDA