The ack-keda component provides event-driven scaling by periodically consuming data from event sources. When messages accumulate, ack-keda triggers scaling for a batch of offline tasks within seconds. You can use ApsaraMQ for RabbitMQ with Keda to monitor metrics such as queue length and message rate. This topic describes how to use the ApsaraMQ for RabbitMQ queue length metric and the Keda event-driven autoscaler to automatically scale an application.
Prerequisites
The ack-keda component is deployed. For more information, see Event-driven scaling.
An ApsaraMQ for RabbitMQ instance and related resources are created. For more information, see Create resources.
You are connected to a Kubernetes cluster using kubectl. For more information, see Connect to a cluster using kubectl.
A Go runtime environment is installed.
Step 1: Deploy a workload to create an application
Log on to the ACK console. In the left navigation pane, click Clusters.
On the Clusters page, find the cluster you want and click its name. In the left-side pane, choose .
On the Deployments page, click Create from YAML. Set Sample Template to Custom. Use the following YAML to create an application named sample-app, and then click Create.
apiVersion: apps/v1 kind: Deployment metadata: name: sample-app namespace: default labels: app: sample-app spec: replicas: 2 selector: matchLabels: app: sample-app template: metadata: labels: app: sample-app spec: containers: - name: consumer image: consumer # Replace this with the actual image of your RabbitMQ consumer. resources: limits: cpu: "500m"
Step 2: Deploy a scaling example based on RabbitMQ metrics
These steps show how to obtain information about your ApsaraMQ for RabbitMQ instance and configure the Horizontal Pod Autoscaler (HPA) to automatically scale your application based on RabbitMQ metrics.
Obtain the ApsaraMQ for RabbitMQ instance information.
Log on to the ApsaraMQ for RabbitMQ console. In the navigation pane on the left, choose Instances.
Click the name of the target instance to go to the Instance Details page. On the Endpoint Information tab, view and record the Endpoint for the Public Endpoint.
Using a public endpoint creates security risks, such as external attacks or unauthorized access. This example uses a public endpoint for demonstration only. If your application runs inside an Alibaba Cloud VPC and does not need external access, do not enable public access.
In the navigation pane on the left, click Users and Permissions. View and record the Username and Password.
If you have not created a user, see Users and Permissions to create one.
In the navigation pane on the left, click Vhosts. View and record the Vhost value, for example, amq-test.
Run the following command to create and record the connection authentication string.
echo -n "amqp://rabbitmq-username:rabbitmq-password@localhost:5672/vhost" | base64Replace
rabbitmq-usernameandrabbitmq-passwordwith the Username and Password that you obtained. Replacelocalhostwith the recorded endpoint. Replacevhostwith the name recorded in the previous step.Use the following YAML to create a Secret.
apiVersion: v1 kind: Secret metadata: name: keda-rabbitmq-secret data: host: YW1x****** # The connection authentication string that you created.Use the following YAML to create a TriggerAuthentication object and deploy the YAML file to the cluster.
apiVersion: keda.sh/v1alpha1 kind: TriggerAuthentication metadata: name: keda-trigger-auth-rabbitmq-conn namespace: default spec: secretTargetRef: - parameter: host name: keda-rabbitmq-secret # keda-rabbitmq-secret is the Secret created in the previous step. key: hostRun the following commands to deploy the YAML files to the cluster.
kubectl apply -f secret.yaml kubectl apply -f rabbitmq-trigger-auth.yamlAfter the deployment is complete, you can use RabbitMQ triggers in Keda. You can also reference the TriggerAuthentication object to connect to RabbitMQ and retrieve metrics data.
Create a YAML file named ScaledObject.yaml with the following content.
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: rabbitmq-scaledobject namespace: default spec: scaleTargetRef: name: sample-app maxReplicaCount: 10 minReplicaCount: 1 triggers: - type: rabbitmq metadata: protocol: amqp queueName: queue-test mode: QueueLength value: "20" metricName: queue-test authenticationRef: name: keda-trigger-auth-rabbitmq-connParameter
Description
scaleTargetRefThe object to scale. In this example, it is set to sample-app, the application created in Step 1: Deploy a workload to create an application.
maxReplicaCountThe maximum number of replicas.
minReplicaCountThe minimum number of replicas.
protocolThe communication protocol between the Keda component and RabbitMQ. Valid values: auto, http, and amqp.
queueNameThe name of the queue from which to read messages.
valueThe threshold that triggers a scale-out.
metricNameThe name of the custom metric. This tells the HPA which metric to use for scaling decisions. Here, it is the name of the RabbitMQ queue, queue-test, used to get the queue length.
Apply the configuration and check the status of the ScaledObject and HPA resources.
Run the following command to create the resource.
kubectl apply -f ScaledObject.yamlRun the following command to check the status of the scaling configuration.
kubectl get ScaledObjectExpected output:
NAME SCALETARGETKIND SCALETARGETNAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE FALLBACK AGE rabbitmq-scaledobject apps/v1.Deployment sample-app 1 10 rabbitmq keda-trigger-auth-rabbitmq-conn True False False 17sRun the following command to check the HPA status.
kubectl get hpaExpected output:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 2 2m35s
Step 3: Produce and consume data to trigger scaling
The following producer and consumer code for an ApsaraMQ for RabbitMQ queue demonstrates how to use the queue length to scale containers.
Use the following producer code to produce RabbitMQ messages.
package main import ( "fmt" "log" "time" "github.com/streadway/amqp" ) const ( queueName = "queue-test" // Replace this with the name of the queue from which to read messages. numMsgs = 10000 pauseTime = 10 * time.Millisecond url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // The URL to access RabbitMQ. Format: amqp://username:password@localhost:5672/vhost ) func main() { conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( queueName, true, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") go produce(ch, q) select {} } // Produce messages. func produce(ch *amqp.Channel, q amqp.Queue) { for i := 0; i < numMsgs; i++ { msg := fmt.Sprintf("Message %d", i) err := ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) failOnError(err, "Failed to publish a message") log.Printf("Successed to publish a message: %s", msg) time.Sleep(pauseTime) } } func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }Initialize the Go environment and download dependencies.
go mod init producer go mod tidyCheck the HPA details.
Run the producer code to produce messages for the queue.
go run producer.goRun the following command to check the HPA details.
kubectl get hpaExpected output:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15sThe expected output shows that sample-app has scaled out to the maximum number of replicas set in the Keda component.
Stop the producer program and run the following consumer program to consume messages from the queue.
package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( queueName = "queue-test" url = "amqp://Mjpt****:QT****@amqp-cn-zxux009.cn-beijing.amqp-0.net.mq.amqp.aliyuncs.com:5672/amq-test" // The URL to access RabbitMQ. Format: amqp://username:password@localhost:5672/vhost ) func main() { // Connect to RabbitMQ. conn, err := amqp.Dial(url) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // Create a channel. ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // Declare the queue to make sure it exists. _, err = ch.QueueDeclare( queueName, // Queue name true, // durable false, // exclusive false, // no-wait false, // auto-delete nil, // arguments ) failOnError(err, "Failed to declare a queue") // Get the message channel. msgs, err := ch.Consume( queueName, // Queue name "", // Consumer name. Leave it empty if not needed. true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // arguments ) failOnError(err, "Failed to register a consumer") // Create a goroutine to process received messages. go func() { for msg := range msgs { log.Printf("Received a message: %s", msg.Body) // You can process the received messages here, for example, by implementing business logic. fmt.Printf("Processed message: %s\n", msg.Body) } }() // Block the main program to keep it running. select {} } // Simple error handling function. func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }Run the consumer code to consume messages.
go run consumer.goTo monitor the HPA scale-in, run the following command.
kubectl get hpa -wExpected output:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-rabbitmq-scaledobject Deployment/sample-app 443000m/20 (avg) 1 10 10 9m15s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 235000m/20 (avg) 1 10 10 9m51s keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 10 10m keda-hpa-rabbitmq-scaledobject Deployment/sample-app 0/20 (avg) 1 10 1 15mThe expected output shows that after a period of no data consumption, sample-app scales in to the minimum number of replicas set in the Keda component.
References
For information about how to configure HPA based on custom RocketMQ message accumulation metrics for more flexible scaling, see KEDA based on RocketMQ metrics.