ack-keda コンポーネントは、イベント駆動型スケーリングを提供し、イベントソースから定期的にデータを取得します。 保留中のメッセージが増加すると、ACK KEDA がトリガーされ、数秒以内にバッチジョブをスケーリングします。 RabbitMQ 向け ApsaraMQ と ack-keda を使用して、キューの長さとメッセージングレートのメトリックを監視できます。 このトピックでは、RabbitMQ 向け ApsaraMQ のメッセージキューの長さメトリックとイベント駆動型オートスケーラー KEDA に基づいて水平ポッド自動スケーリングを設定する方法について説明します。
前提条件
ack-keda がデプロイされていること。 詳細については、「ACK KEDA」をご参照ください。
RabbitMQ 向け ApsaraMQ インスタンスと関連リソースが作成されていること。
Go がインストールされていること。
ステップ 1:デプロイメントの作成とデプロイ
ACK コンソール にログインします。 左側のナビゲーションウィンドウで、[クラスター] をクリックします。
[クラスター] ページで、管理するクラスターを見つけ、その名前をクリックします。 左側のウィンドウで、 を選択します。
[デプロイメント] ページの左上隅にある [YAML から作成] をクリックします。
[作成] ページで、[サンプルテンプレート] を [カスタム] に設定します。 次の内容に基づいて sample-app という名前のデプロイメントを作成し、[作成] をクリックします。
apiVersion: apps/v1 kind: Deployment metadata: name: sample-app namespace: default labels: app: sample-app spec: replicas: 2 selector: matchLabels: app: sample-app template: metadata: labels: app: sample-app spec: containers: - name: consumer image: consumer # RabbitMQ 向け ApsaraMQ のコンシューマー アプリケーションで使用される実際のイメージに置き換えます。 resources: limits: cpu: "500m"
ステップ 2:RabbitMQ 向け ApsaraMQ のメトリックに基づく水平ポッド自動スケーリングの設定ApsaraMQ for RabbitMQ のメトリックに基づいて水平ポッド自動スケーリングを構成する
このステップでは、作成した RabbitMQ 向け ApsaraMQ インスタンスの情報を取得し、RabbitMQ 向け ApsaraMQ のメトリックに基づいてアプリケーションを自動的にスケーリングするように Horizontal Pod Autoscaler(HPA)を設定する方法を示します。
RabbitMQ 向け ApsaraMQ インスタンスの情報を取得します。
RabbitMQ 向け ApsaraMQ コンソール にログインします。 左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。
[インスタンス] ページで、作成したインスタンスの名前をクリックして、インスタンス詳細ページに移動します。 [エンドポイント情報] タブの [エンドポイント] 列にある [パブリックエンドポイント] のエンドポイントを記録します。
説明RabbitMQ 向け ApsaraMQ インスタンスの作成時にパブリックエンドポイントを使用すると、インスタンスが悪意のある攻撃や不正アクセスに対して脆弱になる可能性があります。 この例では、デモ用にのみパブリックエンドポイントを使用しています。 アプリケーションが VPC(Virtual Private Cloud)にデプロイされていて、インターネットにアクセスする必要がない場合は、セキュリティを強化するためにインターネットアクセスを無効にすることをお勧めします。
左側のナビゲーションウィンドウで、[静的アカウント] をクリックし、インスタンスへのログインに使用する [ユーザー名] と [パスワード] を記録します。
作成されていない場合は、静的ユーザー名とパスワードの管理 を参照して作成してください。
左側のナビゲーションウィンドウで、[Vhost] をクリックし、仮想ホスト (vhost) の名前を記録します。 例:amq-test。
次のコマンドを実行して、接続認証トークンを作成します。
echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64
rabbitmq-username
とrabbitmq-password
を記録したユーザー名とパスワードに置き換え、localhost
を記録したパブリックエンドポイントに置き換え、vhost
を vhost の名前に置き換えます。次の YAML コンテンツに基づいてシークレットを作成します。
apiVersion: v1 kind: Secret metadata: name: keda-rabbitmq-secret data: host: YW1x****** # 作成した接続認証トークン。
次の YAML コンテンツを使用して、TriggerAuthentication オブジェクトを作成します。
apiVersion: keda.sh/v1alpha1 kind: TriggerAuthentication metadata: name: keda-trigger-auth-rabbitmq-conn namespace: default spec: secretTargetRef: - parameter: host name: keda-rabbitmq-secret # keda-rabbitmq-secret は、前のステップで作成したシークレットです。 key: host
次のコマンドを実行して、クラスターに TriggerAuthentication オブジェクトをデプロイします。
kubectl apply -f secret.yaml kubectl apply -f rabbitmq-trigger-auth.yaml
TriggerAuthentication オブジェクトがデプロイされると、KEDA で RabbitMQ トリガーを使用できます。 また、TriggerAuthentication オブジェクトを参照して RabbitMQ 向け ApsaraMQ インスタンスに接続し、メトリックをクエリすることもできます。
ScaledObject.yaml という名前のファイルを作成し、次のコンテンツをファイルにコピーします。
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: rabbitmq-scaledobject namespace: default spec: scaleTargetRef: name: rabbitmq-deployment maxReplicaCount: 10 minReplicaCount: 1 triggers: - type: rabbitmq metadata: protocol: amqp queueName: queue-test mode: QueueLength value: "20" metricName: queue-test authenticationRef: name: keda-trigger-auth-rabbitmq-conn
パラメーター
説明
scaleTargetRef
スケーリングするオブジェクト。 この例では、値は ステップ 1:デプロイメントの作成とデプロイ で作成したデプロイメントの名前である gsample-appg に設定されています。
maxReplicaCount
複製ポッドの最大数。
minReplicaCount
複製ポッドの最小数。
protocol
ack-keda と RabbitMQ 向け ApsaraMQ インスタンスが通信に使用するプロトコル。 有効な値:auto、http、amqp。
queueName
データを読み取るキューの名前。
value
スケールアウトのしきい値。
metricName
HPA がアプリケーションをスケーリングする基準となるカスタムメトリックの名前。 この例では、キューの長さを示す queue-test メトリックが使用されます。
アプリケーションを設定し、ScaledObject と HPA のステータスを確認します。
次のコマンドを実行して、ScaledObject を作成します。
kubectl apply -f ScaledObject.yaml
次のコマンドを実行して、ScaledObject のステータスを確認します。
kubectl get ScaledObject
予期される出力:
NAME SCALETARGETKIND SCALETARGETNAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE FALLBACK AGE rabbitmq-scaledobject apps/v1.Deployment sample-app 1 10 rabbitmq keda-trigger-auth-rabbitmq-conn True False False 17s
次のコマンドを実行して、HPA がデプロイされているかどうかを確認します。
kubectl get hpa
予期される出力:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 2 2m35s
ステップ 3:データの作成と消費によるアプリケーションスケーリングのトリガー
このステップでは、プロデューサーコードとコンシューマーコードを使用して、RabbitMQ 向け ApsaraMQ のメッセージキューの長さに基づいて HPA がアプリケーションをスケーリングできるようにする方法を示します。
次のプロデューサーコードを使用してメッセージを生成します。
package main import ( "fmt" "log" "time" "github.com/streadway/amqp" ) const ( queueName = "queue-test" // データを読み取るキューの名前に置き換えます。 numMsgs = 10000 pauseTime = 10 * time.Millisecond url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // RabbitMQ 向け ApsaraMQ インスタンスの URL。 URL は amqp://guest:password@localhost:5672/vhost 形式である必要があります。 ) func main() { conn, err := amqp.Dial(url) failOnError(err, "RabbitMQ への接続に失敗しました") //日本語コメント defer conn.Close() ch, err := conn.Channel() failOnError(err, "チャネルのオープンに失敗しました") //日本語コメント defer ch.Close() q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) failOnError(err, "キューの宣言に失敗しました") //日本語コメント go produce(ch, q) select {} } // メッセージを作成します。 //日本語コメント func produce(ch *amqp.Channel, q amqp.Queue) { for i := 0; i < numMsgs; i++ { msg := fmt.Sprintf("Message %d", i) err := ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) failOnError(err, "メッセージの公開に失敗しました") //日本語コメント log.Printf("メッセージの公開に成功しました: %s", msg) //日本語コメント time.Sleep(pauseTime) } } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
次のコマンドを実行して、HPA に関する情報をクエリします。
プロデューサーコードを実行してメッセージを送信します。
go run producer.go
次のコマンドを実行して、HPA に関する情報をクエリします。
kubectl get hpa
予期される出力:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s
出力は、sample-app アプリケーションの複製ポッドの数が KEDA で指定された最大値にスケーリングされたことを示しています。
プロデューサープログラムを終了し、次のコンシューマープログラムを実行してメッセージを受信します。
package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( queueName = "queue-test" url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // RabbitMQ 向け ApsaraMQ インスタンスの URL。 URL は amqp://guest:password@localhost:5672/vhost 形式である必要があります。 ) func main() { // RabbitMQ 向け ApsaraMQ に接続します。 //日本語コメント conn, err := amqp.Dial(url) failOnError(err, "RabbitMQ への接続に失敗しました") //日本語コメント defer conn.Close() // チャネルを作成します。 //日本語コメント ch, err := conn.Channel() failOnError(err, "チャネルのオープンに失敗しました") //日本語コメント defer ch.Close() // キューが存在することを確認するためにキューを要求します。 //日本語コメント _, err = ch.QueueDeclare( queueName, // キュー名。 //日本語コメント true, // 永続ストレージを有効にするかどうかを指定します。 //日本語コメント false, // 排他的キューを有効にするかどうかを指定します。 //日本語コメント false, // メッセージをキューに保存するかどうかを指定します。 //日本語コメント false, // 自動削除を有効にするかどうかを指定します。 //日本語コメント nil, // 追加の属性。 //日本語コメント ) failOnError(err, "キューの宣言に失敗しました") //日本語コメント // メッセージチャネルを取得します。 //日本語コメント msgs, err := ch.Consume( queueName, // キュー名。 //日本語コメント "", // コンシューマー名。 このパラメーターは空のままにすることができます。 //日本語コメント true, // 自動確認を有効にします。 //日本語コメント false, // 排他的キューを有効にするかどうかを指定します。 //日本語コメント false, // メッセージをキューに保存するかどうかを指定します。 //日本語コメント false, // 優先キューを有効にするかどうかを指定します。 //日本語コメント nil, // 追加のパラメーター。 //日本語コメント ) failOnError(err, "コンシューマーの登録に失敗しました") //日本語コメント // 受信したメッセージを処理するゴルーチンを作成します。 //日本語コメント go func() { for msg := range msgs { log.Printf("メッセージを受信しました: %s", msg.Body) //日本語コメント // メッセージ消費ロジックに基づいて受信したメッセージを処理します。 //日本語コメント fmt.Printf("Processed message: %s\n", msg.Body) } }() // メインプログラムをブロックし、キューを実行し続けます。 //日本語コメント select {} } // 簡単なエラー処理関数。 //日本語コメント func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
コンシューマーコードを実行してメッセージを受信します。
go run consumer.go
次のコマンドを実行して、HPA がアプリケーションをスケールインするかどうかを確認します。
kubectl get hpa -w
予期される出力:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 235000m/20 (avg) 1 10 10 9m51s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 10 10m keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 1 15m
出力は、データ消費が終了した後の時点である時点で、sample-app アプリケーションの複製ポッドの数が KEDA で指定された最小値にスケーリングされたことを示しています。
参考資料
RocketMQ 向け ApsaraMQ のメッセージ蓄積メトリックに基づいてアプリケーションをスケーリングするように HPA を設定する方法の詳細については、「RocketMQ メトリックに基づく Keda」をご参照ください。