All Products
Search
Document Center

Container Compute Service:Event-driven auto scaling based on RocketMQ metrics (KEDA)

Last Updated:Nov 20, 2025

When using RocketMQ, message accumulation can easily lead to high system load. To avoid service crashes and improve system reliability and stability, you can implement automatic and efficient horizontal pod autoscaling (HPA) based on RocketMQ message accumulation metrics using KEDA (Kubernetes Event-driven Autoscaling) as an elastic scaling solution for your applications.

Feature introduction

RocketMQ is a high-performance, highly reliable, and highly scalable distributed message middleware that has been widely used in enterprise applications. However, when using RocketMQ, message accumulation issues can easily occur, especially under high load conditions. Message accumulation can lead to high system load and even service crashes.

In this scenario, you can use the Kubernetes event-driven autoscaling tool KEDA to trigger horizontal pod autoscaling based on custom RocketMQ message accumulation metrics. This solution helps you implement automatic and efficient application scaling, thereby improving system reliability and stability. If you are using open-source RocketMQ, you can achieve similar capabilities by providing message accumulation data through JMX's Prometheus Exporter.

This topic uses Alibaba Cloud Prometheus as the data source and describes how to implement RocketMQ message accumulation scaling object configuration.

Prerequisites

Step 1: Deploy a workload

This example creates a Nginx sample application named sample-app.

  1. Log on to the ACS console. In the left-side navigation pane, click Clusters.

  2. On the Clusters page, find the cluster that you want to manage and click its ID. In the left-side navigation pane of the cluster details page, choose Workloads > Deployments.

  3. On the Deployments page, click Create From YAML, select Sample Templates as Custom as prompted, and use the following content to create a Nginx application named sample-app.

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sample-app
      namespace: default
      labels:
        app: sample-app
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: sample-app
      template:
        metadata:
          labels:
            app: sample-app
        spec:
          containers:
          - name: sample-app
            image: registry.cn-hangzhou.aliyuncs.com/acs-sample/nginx:latest # You can modify this to your actual RocketMQ consumer image as needed.
            resources:
              limits:
                cpu: "500m"

Step 2: Configure the ScaledObject scaling policy

You can configure the KEDA scaling policy by configuring ScaledObject, including the scaling object, maximum and minimum number of replicas, scaling thresholds (message accumulation threshold), and more. Before configuring ScaledObject, you need to obtain information such as the Prometheus address for RocketMQ instance metric data.

Obtain RocketMQ instance information

  1. Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar, select a region, such as China (Hangzhou). On the Instances page, click the name of the instance that you want to manage.

  3. On the Instance Details page, record the Instance ID in the Basic Information section. For example: rmq-cn-uax33****.

  4. In the left sidebar, click Access Control, and on the Intelligent Authentication tab, view and record the username and password of the current instance.

Obtain the Prometheus data source for the RocketMQ instance

  1. Log on to the ARMS console.

  2. In the left navigation pane, choose Managed Service for Prometheus > Instances.

  3. Click the target instance Cloud Service-{{RegionId}}, and in the navigation pane on the left, click Settings, and record the HTTP API URL (Grafana Read URL).

    image

Create ScaledObject configuration

  1. Create ScaledObject.yaml.

    apiVersion: keda.sh/v1alpha1
    kind: ScaledObject
    metadata:
      name: prometheus-scaledobject
      namespace: default
    spec:
      scaleTargetRef:
        name: sample-app
      maxReplicaCount: 10
      minReplicaCount: 2
      triggers:
      - type: prometheus
        metadata:
          serverAddress: {{RocketMQ instance's Prometheus data source HTTP API URL}}
          metricName: rocketmq_consumer_inflight_messages
          query: sum({__name__=~"rocketmq_consumer_ready_messages|rocketmq_consumer_inflight_messages",instance_id="{{RocketMQ instance ID}}",topic=~"keda"}) by (consumer_group)
          threshold: '30'

    Parameter descriptions:

    Parameter

    Description

    scaleTargetRef.name

    Configures the scaling object. Here, configure the sample-app application created in Step 1: Deploy a workload.

    maxReplicaCount

    The maximum number of replicas for scaling out.

    minReplicaCount

    The minimum number of replicas for scaling in.

    serverAddress

    Configures the Prometheus address that stores RocketMQ metric data, which is the HTTP API URL (Grafana Read URL) recorded earlier.

    metricName

    PromQL request data.

    query

    Performs an aggregate operation on the data requested in metricName. Here, the aggregation method is the PromQL for message accumulation. Replace instance_id with the RocketMQ instance ID.

    threshold

    The threshold for scaling. In this example, a message accumulation of 30 is set as the threshold. When it exceeds 30, scaling out is triggered.

  2. Deploy and view the ScaledObject resource.

    1. Deploy the ScaledObject resource.

      kubectl apply -f ScaledObject.yaml
    2. Get the scaling configuration status.

      kubectl get ScaledObject

      Expected results:

      NAME                      SCALETARGETKIND      SCALETARGETNAME   MIN   MAX   TRIGGERS     AUTHENTICATION   READY   ACTIVE   FALLBACK   AGE
      prometheus-scaledobject   apps/v1.Deployment   sample-app        2     10    prometheus                    True    False    False      105s
    3. Check the HPA generation status.

      kubectl get hpa

      Expected results:

      NAME                               REFERENCE               TARGETS      MINPODS   MAXPODS   REPLICAS   AGE
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   0/30 (avg)   2         10        2          28m
  3. (Optional) Use Prometheus Token to enhance data reading security and configure Prometheus Token verification.

    Expand to view detailed steps

    1. Generate a Prometheus Token as prompted on the page.

      image

    2. Create a Secret, where the Value values for customAuthHeader: "Authorization" and customAuthValue need to be Base64 encoded.

      apiVersion: v1
      kind: Secret
      metadata:
        name: keda-prom-secret
        namespace: default
      data:
        customAuthHeader: "QXV0Xxxxxxxlvbg=="
        customAuthValue: "kR2tpT2lJeFpXSmxaVFV6WlMTxxxxxxxxRMVFE0TUdRdE9USXpaQzFqWkRZd09EZ3dOVFV5WWpZaWZRLjlDaFBYU0Q2dEhWc1dQaFlyMGh3ZU5FQjZQZWVETXFjTlYydVNqOU82TTQ="
    3. Create KEDA request data credentials and deploy them to the cluster as shown in the example code below.

      apiVersion: keda.sh/v1alpha1
      kind: TriggerAuthentication
      metadata:
        name: keda-prom-creds
        namespace: default
      spec:
        secretTargetRef:
          - parameter: customAuthHeader
            name: keda-prom-secret
            key: customAuthHeader
          - parameter: customAuthValue
            name: keda-prom-secret
            key: customAuthValue
    4. In the ScaledObject YAML file, configure the authenticationRef field and fill in the created credential name.

      apiVersion: keda.sh/v1alpha1
      kind: ScaledObject
      metadata:
        name: prometheus-scaledobject
        namespace: default
      spec:
        scaleTargetRef:
          name: sample-app
        maxReplicaCount: 10
        minReplicaCount: 2
        triggers:
        - type: prometheus
          metadata:
            serverAddress: http://cn-beijing.arms.aliyuncs.com:9090/api/v1/prometheus/8cba801fff65546a3012e9a684****/****538168824185/cloud-product-rocketmq/cn-beijing
            metricName: rocketmq_consumer_inflight_messages
            query: sum({__name__=~"rocketmq_consumer_ready_messages|rocketmq_consumer_inflight_messages",instance_id="rmq-cn-uax3xxxxxx",topic=~"keda"}) by (consumer_group)
            threshold: '30'
            authModes: "custom"
          authenticationRef: # Configuration field.
            name: keda-prom-creds # Fill in the credential name.
      Note

      This example uses the Custom authentication type. You can refer to the KEDA community documentation to choose other authentication methods.

Step 3: Configure producers and consumers

RocketMQ 5.x instance

This example produces and consumes data based on the rocketmq-keda-sample project. Update the RocketMQ instance address, username, and password according to the comments.

Producer

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
	"os"
	"time"
)

func main() {
	p, err := rocketmq.NewProducer(
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rmq-cn-uaxxxxxxy02.cn-beijing.rmq.aliyuncs.com:8080"})), // RocketMQ instance address
		producer.WithCredentials(primitive.Credentials{
			AccessKey: "xxxxxxxxxxx",  // RocketMQ instance username
			SecretKey: "xxxxxxxxxxx",  // RocketMQ instance password
		}),
		producer.WithRetry(2),
	)

	if err != nil {
		fmt.Printf("failed to start producer: %s", err.Error())
		os.Exit(1)
	}

	err = p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	}
	for true {
		res, err := p.SendSync(context.Background(), primitive.NewMessage("keda",
			[]byte("Hello RocketMQ Go Client!")))

		if err != nil {
			fmt.Printf("send message error: %s\n", err)
		} else {
			fmt.Printf("send message success: result=%s\n", res.String())
		}
		time.Sleep(100 * time.Millisecond)
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Printf("shutdown producer error: %s", err.Error())
	}
}

Consumer

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"os"
	"time"
)

func main() {
	sig := make(chan os.Signal)
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName("keda"),
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: "xxxxxxxxxxx", // RocketMQ instance username
			SecretKey: "xxxxxxxxxxx", // RocketMQ instance password
		}),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rmq-cn-uaxxxxxxy02.cn-beijing.rmq.aliyuncs.com:8080"})), // RocketMQ instance address
	)
	err := c.Subscribe("keda", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for i := range msgs {
			fmt.Printf("subscribe callback: %v \n", msgs[i])
			time.Sleep(1 * time.Second)
		}

		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	<-sig
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
	}
}

RocketMQ 5.x Serverless instance

This example produces and consumes data based on the sample code provided in RocketMQ-Client. Update the RocketMQ instance address, username, and password according to the comments, and make modifications based on the information provided in Serverless instance Internet access instructions.

Producer

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	rmq_client "github.com/apache/rocketmq-clients/golang/v5"
	"github.com/apache/rocketmq-clients/golang/v5/credentials"
)

const (
	Topic     = "xxxxxx" // Topic Name
	Endpoint  = "xxxxxx" // RocketMQ instance address
	AccessKey = "xxxxxx" // RocketMQ instance username
	SecretKey = "xxxxxx" // RocketMQ instance password
)

func main() {
	os.Setenv("mq.consoleAppender.enabled", "true")
	rmq_client.ResetLogger()
	// In most case, you don't need to create many producers, singleton pattern is more recommended.
	producer, err := rmq_client.NewProducer(&rmq_client.Config{
		Endpoint: Endpoint,
		NameSpace: "rqm-xx-xxxxxx", // RocketMQ instance address
		Credentials: &credentials.SessionCredentials{
			AccessKey:    AccessKey,
			AccessSecret: SecretKey,
		},
	},
		rmq_client.WithTopics(Topic),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start producer
	err = producer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// graceful stop producer
	defer producer.GracefulStop()

	for i := 0; i < 10; i++ {
		// new a message
		msg := &rmq_client.Message{
			Topic: Topic,
			Body:  []byte("this is a message : " + strconv.Itoa(i)),
		}
		// set keys and tag
		msg.SetKeys("a", "b")
		msg.SetTag("ab")
		// send message in sync
		resp, err := producer.Send(context.TODO(), msg)
		if err != nil {
			log.Fatal(err)
		}
		for i := 0; i < len(resp); i++ {
			fmt.Printf("%#v\n", resp[i])
		}
		// wait a moment
		time.Sleep(time.Second * 1)
	}
}

Consumer

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	rmq_client "github.com/apache/rocketmq-clients/golang/v5"
	"github.com/apache/rocketmq-clients/golang/v5/credentials"
)

const (
	Topic         = "xxxxxx" // Topic Name
	ConsumerGroup = "xxxxxx" // Consumer Group Name
	Endpoint  = "xxxxxx" // RocketMQ instance address
	AccessKey = "xxxxxx" // RocketMQ instance username
	SecretKey = "xxxxxx" // RocketMQ instance password
)

var (
	// maximum waiting time for receive func
	awaitDuration = time.Second * 5
	// maximum number of messages received at one time
	maxMessageNum int32 = 16
	// invisibleDuration should > 20s
	invisibleDuration = time.Second * 20
	// receive messages in a loop
)

func main() {
	// log to console
	os.Setenv("mq.consoleAppender.enabled", "true")
	rmq_client.ResetLogger()
	// In most case, you don't need to create many consumers, singleton pattern is more recommended.
	simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
		Endpoint:      Endpoint,
		NameSpace: "rqm-xx-xxxxxx", // RocketMQ instance address
		ConsumerGroup: ConsumerGroup,
		Credentials: &credentials.SessionCredentials{
			AccessKey:    AccessKey,
			AccessSecret: SecretKey,
		},
	},
		rmq_client.WithSimpleAwaitDuration(awaitDuration),
		rmq_client.WithSimpleSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
			Topic: rmq_client.SUB_ALL,
		}),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start simpleConsumer
	err = simpleConsumer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// graceful stop simpleConsumer
	defer simpleConsumer.GracefulStop()

	go func() {
		for {
			fmt.Println("start receive message")
			mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
			if err != nil {
				fmt.Println(err)
			}
			// ack message
			for _, mv := range mvs {
				simpleConsumer.Ack(context.TODO(), mv)
				fmt.Println(mv)
			}
			fmt.Println("wait a moment")
			fmt.Println()
			time.Sleep(time.Second * 3)
		}
	}()
	// run for a while
	time.Sleep(time.Minute)
}

Step 4: Implement scaling using production and consumption data

The following example demonstrates using a RocketMQ 5.x instance.

  1. Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar, select a region, such as China (Hangzhou), and then click the name of the target instance in the instance list to view and record the Endpoint and Network Information.

  3. In the navigation pane on the left, click Access Control, and then click the Intelligent Authentication tab to view and record the instance username and password.

  4. Run the Producer program to produce data and check the HPA scaling status.

    1. Produce data.

      go run producer.go
    2. Check the HPA scaling status.

      kubectl get hpa

      Expected results:

      NAME                               REFERENCE               TARGETS           MINPODS   MAXPODS   REPLICAS   AGE
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   32700m/30 (avg)   2         10        10         47m

      As you can see, the sample-app application has scaled out to the maximum number of replicas set by the KEDA component.

  5. Close the Producer program, run the Consumer program, and check the HPA scaling status.

    1. Consume data.

      go run consumer.go
    2. Check the HPA scaling status

      kubectl get hpa -w

      Expected results:

      NAME                               REFERENCE               TARGETS            MINPODS   MAXPODS   REPLICAS   AGE
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   222500m/30 (avg)   2         10        10         50m
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   232400m/30 (avg)   2         10        10         51m
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   0/30 (avg)         2         10        10         52m
      keda-hpa-prometheus-scaledobject   Deployment/sample-app   0/30 (avg)         2         10        2          57m

    As you can see, after the data consumption is completed (about 5 minutes), the sample-app application scales in to the minimum number of replicas set by the KEDA component.

References

You can also implement KEDA based on RabbitMQ metrics to monitor queue length and message rate metrics. For more information, see Horizontal pod autoscaling based on RabbitMQ metrics.