すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:RabbitMQ 向け ApsaraMQ のメトリックに基づく水平ポッド自動スケーリング

最終更新日:May 31, 2025

ack-keda コンポーネントは、イベント駆動型スケーリングを提供し、イベントソースから定期的にデータを取得します。 保留中のメッセージが増加すると、ACK KEDA がトリガーされ、数秒以内にバッチジョブをスケーリングします。 RabbitMQ 向け ApsaraMQ と ack-keda を使用して、キューの長さとメッセージングレートのメトリックを監視できます。 このトピックでは、RabbitMQ 向け ApsaraMQ のメッセージキューの長さメトリックとイベント駆動型オートスケーラー KEDA に基づいて水平ポッド自動スケーリングを設定する方法について説明します。

前提条件

ステップ 1:デプロイメントの作成とデプロイ

  1. ACK コンソール にログインします。 左側のナビゲーションウィンドウで、[クラスター] をクリックします。

  2. [クラスター] ページで、管理するクラスターを見つけ、その名前をクリックします。 左側のウィンドウで、[ワークロード] > [デプロイメント] を選択します。

  3. [デプロイメント] ページの左上隅にある [YAML から作成] をクリックします。

  4. [作成] ページで、[サンプルテンプレート][カスタム] に設定します。 次の内容に基づいて sample-app という名前のデプロイメントを作成し、[作成] をクリックします。

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sample-app
      namespace: default
      labels:
        app: sample-app
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: sample-app
      template:
        metadata:
          labels:
            app: sample-app
        spec:
          containers:
          - name: consumer
            image: consumer  # RabbitMQ 向け ApsaraMQ のコンシューマー アプリケーションで使用される実際のイメージに置き換えます。
            resources:
              limits:
                cpu: "500m"

ステップ 2:RabbitMQ 向け ApsaraMQ のメトリックに基づく水平ポッド自動スケーリングの設定ApsaraMQ for RabbitMQ のメトリックに基づいて水平ポッド自動スケーリングを構成する

このステップでは、作成した RabbitMQ 向け ApsaraMQ インスタンスの情報を取得し、RabbitMQ 向け ApsaraMQ のメトリックに基づいてアプリケーションを自動的にスケーリングするように Horizontal Pod Autoscaler(HPA)を設定する方法を示します。

  1. RabbitMQ 向け ApsaraMQ インスタンスの情報を取得します。

    1. RabbitMQ 向け ApsaraMQ コンソール にログインします。 左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。

    2. [インスタンス] ページで、作成したインスタンスの名前をクリックして、インスタンス詳細ページに移動します。 [エンドポイント情報] タブの [エンドポイント] 列にある [パブリックエンドポイント] のエンドポイントを記録します。

      説明

      RabbitMQ 向け ApsaraMQ インスタンスの作成時にパブリックエンドポイントを使用すると、インスタンスが悪意のある攻撃や不正アクセスに対して脆弱になる可能性があります。 この例では、デモ用にのみパブリックエンドポイントを使用しています。 アプリケーションが VPC(Virtual Private Cloud)にデプロイされていて、インターネットにアクセスする必要がない場合は、セキュリティを強化するためにインターネットアクセスを無効にすることをお勧めします。

    3. 左側のナビゲーションウィンドウで、[静的アカウント] をクリックし、インスタンスへのログインに使用する [ユーザー名][パスワード] を記録します。

      作成されていない場合は、静的ユーザー名とパスワードの管理 を参照して作成してください。
    4. 左側のナビゲーションウィンドウで、[Vhost] をクリックし、仮想ホスト (vhost) の名前を記録します。 例:amq-test。

  2. 次のコマンドを実行して、接続認証トークンを作成します。

    echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64

    rabbitmq-usernamerabbitmq-password を記録したユーザー名とパスワードに置き換え、localhost を記録したパブリックエンドポイントに置き換え、vhost を vhost の名前に置き換えます。

  3. 次の YAML コンテンツに基づいてシークレットを作成します。

    apiVersion: v1
    kind: Secret
    metadata:
      name: keda-rabbitmq-secret
    data:
      host: YW1x****** # 作成した接続認証トークン。
  4. 次の YAML コンテンツを使用して、TriggerAuthentication オブジェクトを作成します。

    apiVersion: keda.sh/v1alpha1
    kind: TriggerAuthentication
    metadata:
      name: keda-trigger-auth-rabbitmq-conn
      namespace: default
    spec:
      secretTargetRef:
        - parameter: host
          name: keda-rabbitmq-secret # keda-rabbitmq-secret は、前のステップで作成したシークレットです。
          key: host

    次のコマンドを実行して、クラスターに TriggerAuthentication オブジェクトをデプロイします。

    kubectl apply -f secret.yaml
    kubectl apply -f rabbitmq-trigger-auth.yaml

    TriggerAuthentication オブジェクトがデプロイされると、KEDA で RabbitMQ トリガーを使用できます。 また、TriggerAuthentication オブジェクトを参照して RabbitMQ 向け ApsaraMQ インスタンスに接続し、メトリックをクエリすることもできます。

  5. ScaledObject.yaml という名前のファイルを作成し、次のコンテンツをファイルにコピーします。

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: rabbitmq-scaledobject
      namespace: default
    spec:
      scaleTargetRef:
        name: rabbitmq-deployment
      maxReplicaCount: 10
      minReplicaCount: 1
      triggers:
      - type: rabbitmq
        metadata:
          protocol: amqp
          queueName: queue-test
          mode: QueueLength
          value: "20"
          metricName: queue-test
        authenticationRef:
          name: keda-trigger-auth-rabbitmq-conn

    パラメーター

    説明

    scaleTargetRef

    スケーリングするオブジェクト。 この例では、値は ステップ 1:デプロイメントの作成とデプロイ で作成したデプロイメントの名前である gsample-appg に設定されています。

    maxReplicaCount

    複製ポッドの最大数。

    minReplicaCount

    複製ポッドの最小数。

    protocol

    ack-keda と RabbitMQ 向け ApsaraMQ インスタンスが通信に使用するプロトコル。 有効な値:auto、http、amqp。

    queueName

    データを読み取るキューの名前。

    value

    スケールアウトのしきい値。

    metricName

    HPA がアプリケーションをスケーリングする基準となるカスタムメトリックの名前。 この例では、キューの長さを示す queue-test メトリックが使用されます。

  6. アプリケーションを設定し、ScaledObject と HPA のステータスを確認します。

    次のコマンドを実行して、ScaledObject を作成します。

    kubectl apply -f ScaledObject.yaml

    次のコマンドを実行して、ScaledObject のステータスを確認します。

    kubectl get ScaledObject

    予期される出力:

    NAME                    SCALETARGETKIND      SCALETARGETNAME   MIN   MAX   TRIGGERS   AUTHENTICATION                    READY   ACTIVE   FALLBACK   AGE
    rabbitmq-scaledobject   apps/v1.Deployment   sample-app        1     10   rabbitmq   keda-trigger-auth-rabbitmq-conn    True    False    False      17s

    次のコマンドを実行して、HPA がデプロイされているかどうかを確認します。

    kubectl get hpa

    予期される出力:

    NAME                             REFERENCE               TARGETS      MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)   1         10        2          2m35s

ステップ 3:データの作成と消費によるアプリケーションスケーリングのトリガー

このステップでは、プロデューサーコードとコンシューマーコードを使用して、RabbitMQ 向け ApsaraMQ のメッセージキューの長さに基づいて HPA がアプリケーションをスケーリングできるようにする方法を示します。

  1. 次のプロデューサーコードを使用してメッセージを生成します。

    package main
    
    import (
    	"fmt"
    	"log"
    	"time"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test" // データを読み取るキューの名前に置き換えます。
    	numMsgs   = 10000
    	pauseTime = 10 * time.Millisecond
            url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // RabbitMQ 向け ApsaraMQ インスタンスの URL。 URL は amqp://guest:password@localhost:5672/vhost 形式である必要があります。
    )
    
    func main() {
    	conn, err := amqp.Dial(url)
    	failOnError(err, "RabbitMQ への接続に失敗しました") //日本語コメント
    	defer conn.Close()
    
    	ch, err := conn.Channel()
    	failOnError(err, "チャネルのオープンに失敗しました") //日本語コメント
    	defer ch.Close()
    
    	q, err := ch.QueueDeclare(
    		queueName,
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	failOnError(err, "キューの宣言に失敗しました") //日本語コメント
    	go produce(ch, q)
    	select {}
    }
    // メッセージを作成します。 //日本語コメント
    func produce(ch *amqp.Channel, q amqp.Queue) {
    	for i := 0; i < numMsgs; i++ {
    		msg := fmt.Sprintf("Message %d", i)
    		err := ch.Publish(
    			"",
    			q.Name,
    			false,
    			false,
    			amqp.Publishing{
    				ContentType: "text/plain",
    				Body:        []byte(msg),
    			},
    		)
    		failOnError(err, "メッセージの公開に失敗しました") //日本語コメント
    		log.Printf("メッセージの公開に成功しました: %s", msg) //日本語コメント
    		time.Sleep(pauseTime)
    	}
    }
    
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }
  2. 次のコマンドを実行して、HPA に関する情報をクエリします。

    プロデューサーコードを実行してメッセージを送信します。

    go run producer.go

    次のコマンドを実行して、HPA に関する情報をクエリします。

    kubectl get hpa

    予期される出力:

    NAME                               REFERENCE               TARGETS           MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app    443000m/20 (avg)   1         10        10         9m15s

    出力は、sample-app アプリケーションの複製ポッドの数が KEDA で指定された最大値にスケーリングされたことを示しています。

  3. プロデューサープログラムを終了し、次のコンシューマープログラムを実行してメッセージを受信します。

    package main
    
    import (
    	"fmt"
    	"log"
    	"github.com/streadway/amqp"
    )
    
    const (
    	queueName = "queue-test"
    	url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // RabbitMQ 向け ApsaraMQ インスタンスの URL。 URL は amqp://guest:password@localhost:5672/vhost 形式である必要があります。
    )
    
    func main() {
    	// RabbitMQ 向け ApsaraMQ に接続します。 //日本語コメント
    	conn, err := amqp.Dial(url)
    	failOnError(err, "RabbitMQ への接続に失敗しました") //日本語コメント
    	defer conn.Close()
    
    	// チャネルを作成します。 //日本語コメント
    	ch, err := conn.Channel()
    	failOnError(err, "チャネルのオープンに失敗しました") //日本語コメント
    	defer ch.Close()
    
    	// キューが存在することを確認するためにキューを要求します。 //日本語コメント
    	_, err = ch.QueueDeclare(
    		queueName, // キュー名。 //日本語コメント
    		true,      // 永続ストレージを有効にするかどうかを指定します。 //日本語コメント
    		false,     // 排他的キューを有効にするかどうかを指定します。 //日本語コメント
    		false,     // メッセージをキューに保存するかどうかを指定します。 //日本語コメント
    		false,     // 自動削除を有効にするかどうかを指定します。 //日本語コメント
    		nil,       // 追加の属性。 //日本語コメント
    	)
    	failOnError(err, "キューの宣言に失敗しました") //日本語コメント
    
    	// メッセージチャネルを取得します。 //日本語コメント
    	msgs, err := ch.Consume(
    		queueName, // キュー名。 //日本語コメント
    		"",        // コンシューマー名。 このパラメーターは空のままにすることができます。 //日本語コメント
    		true,      // 自動確認を有効にします。 //日本語コメント
    		false,     // 排他的キューを有効にするかどうかを指定します。 //日本語コメント
    		false,     // メッセージをキューに保存するかどうかを指定します。 //日本語コメント
    		false,     // 優先キューを有効にするかどうかを指定します。 //日本語コメント
    		nil,       // 追加のパラメーター。 //日本語コメント
    	)
    	failOnError(err, "コンシューマーの登録に失敗しました") //日本語コメント
    
    	// 受信したメッセージを処理するゴルーチンを作成します。 //日本語コメント
    	go func() {
    		for msg := range msgs {
    			log.Printf("メッセージを受信しました: %s", msg.Body) //日本語コメント
    			// メッセージ消費ロジックに基づいて受信したメッセージを処理します。 //日本語コメント
    			fmt.Printf("Processed message: %s\n", msg.Body)
    		}
    	}()
    
    	// メインプログラムをブロックし、キューを実行し続けます。 //日本語コメント
    	select {}
    }
    
    // 簡単なエラー処理関数。 //日本語コメント
    func failOnError(err error, msg string) {
    	if err != nil {
    		log.Fatalf("%s: %s", msg, err)
    	}
    }

    コンシューマーコードを実行してメッセージを受信します。

    go run consumer.go

    次のコマンドを実行して、HPA がアプリケーションをスケールインするかどうかを確認します。

    kubectl get hpa -w

    予期される出力:

    NAME                               REFERENCE               TARGETS            MINPODS   MAXPODS   REPLICAS   AGE
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   443000m/20 (avg)   1         10        10         9m15s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   235000m/20 (avg)   1         10        10         9m51s
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        10         10m
    keda-hpa-rabbitmq-scaledobject   Deployment/sample-app   0/20 (avg)         1         10        1          15m

    出力は、データ消費が終了した後の時点である時点で、sample-app アプリケーションの複製ポッドの数が KEDA で指定された最小値にスケーリングされたことを示しています。

参考資料

RocketMQ 向け ApsaraMQ のメッセージ蓄積メトリックに基づいてアプリケーションをスケーリングするように HPA を設定する方法の詳細については、「RocketMQ メトリックに基づく Keda」をご参照ください。