When using RocketMQ, message accumulation can easily lead to high system load. To avoid service crashes and improve system reliability and stability, you can implement automatic and efficient horizontal pod autoscaling (HPA) based on RocketMQ message accumulation metrics using KEDA (Kubernetes Event-driven Autoscaling) as an elastic scaling solution for your applications.
Feature introduction
RocketMQ is a high-performance, highly reliable, and highly scalable distributed message middleware that has been widely used in enterprise applications. However, when using RocketMQ, message accumulation issues can easily occur, especially under high load conditions. Message accumulation can lead to high system load and even service crashes.
In this scenario, you can use the Kubernetes event-driven autoscaling tool KEDA to trigger horizontal pod autoscaling based on custom RocketMQ message accumulation metrics. This solution helps you implement automatic and efficient application scaling, thereby improving system reliability and stability. If you are using open-source RocketMQ, you can achieve similar capabilities by providing message accumulation data through JMX's Prometheus Exporter.
This topic uses Alibaba Cloud Prometheus as the data source and describes how to implement RocketMQ message accumulation scaling object configuration.
Prerequisites
The ack-keda component is installed.
An ApsaraMQ for RocketMQ 5.x series instance is created, along with a topic and a group named
keda.RocketMQ 5.x Serverless instances can quickly scale resources based on business load, allocate resources and calculate fees based on actual usage, which can effectively reduce costs.
The Alibaba Cloud RocketMQ (5.0) service is integrated in ARMS.
Step 1: Deploy a workload
This example creates a Nginx sample application named sample-app.
Log on to the ACS console. In the left-side navigation pane, click Clusters.
On the Clusters page, find the cluster that you want to manage and click its ID. In the left-side navigation pane of the cluster details page, choose .
On the Deployments page, click Create From YAML, select Sample Templates as Custom as prompted, and use the following content to create a Nginx application named sample-app.
apiVersion: apps/v1 kind: Deployment metadata: name: sample-app namespace: default labels: app: sample-app spec: replicas: 1 selector: matchLabels: app: sample-app template: metadata: labels: app: sample-app spec: containers: - name: sample-app image: registry.cn-hangzhou.aliyuncs.com/acs-sample/nginx:latest # You can modify this to your actual RocketMQ consumer image as needed. resources: limits: cpu: "500m"
Step 2: Configure the ScaledObject scaling policy
You can configure the KEDA scaling policy by configuring ScaledObject, including the scaling object, maximum and minimum number of replicas, scaling thresholds (message accumulation threshold), and more. Before configuring ScaledObject, you need to obtain information such as the Prometheus address for RocketMQ instance metric data.
Obtain RocketMQ instance information
Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances.
In the top navigation bar, select a region, such as China (Hangzhou). On the Instances page, click the name of the instance that you want to manage.
On the Instance Details page, record the Instance ID in the Basic Information section. For example: rmq-cn-uax33****.
In the left sidebar, click Access Control, and on the Intelligent Authentication tab, view and record the username and password of the current instance.
Obtain the Prometheus data source for the RocketMQ instance
Log on to the ARMS console.
In the left navigation pane, choose .
Click the target instance Cloud Service-{{RegionId}}, and in the navigation pane on the left, click Settings, and record the HTTP API URL (Grafana Read URL).

Create ScaledObject configuration
Create ScaledObject.yaml.
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: prometheus-scaledobject namespace: default spec: scaleTargetRef: name: sample-app maxReplicaCount: 10 minReplicaCount: 2 triggers: - type: prometheus metadata: serverAddress: {{RocketMQ instance's Prometheus data source HTTP API URL}} metricName: rocketmq_consumer_inflight_messages query: sum({__name__=~"rocketmq_consumer_ready_messages|rocketmq_consumer_inflight_messages",instance_id="{{RocketMQ instance ID}}",topic=~"keda"}) by (consumer_group) threshold: '30'Parameter descriptions:
Parameter
Description
scaleTargetRef.nameConfigures the scaling object. Here, configure the sample-app application created in Step 1: Deploy a workload.
maxReplicaCountThe maximum number of replicas for scaling out.
minReplicaCountThe minimum number of replicas for scaling in.
serverAddressConfigures the Prometheus address that stores RocketMQ metric data, which is the HTTP API URL (Grafana Read URL) recorded earlier.
metricNamePromQL request data.
queryPerforms an aggregate operation on the data requested in metricName. Here, the aggregation method is the PromQL for message accumulation. Replace
instance_idwith the RocketMQ instance ID.thresholdThe threshold for scaling. In this example, a message accumulation of 30 is set as the threshold. When it exceeds 30, scaling out is triggered.
Deploy and view the ScaledObject resource.
Deploy the ScaledObject resource.
kubectl apply -f ScaledObject.yamlGet the scaling configuration status.
kubectl get ScaledObjectExpected results:
NAME SCALETARGETKIND SCALETARGETNAME MIN MAX TRIGGERS AUTHENTICATION READY ACTIVE FALLBACK AGE prometheus-scaledobject apps/v1.Deployment sample-app 2 10 prometheus True False False 105sCheck the HPA generation status.
kubectl get hpaExpected results:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-prometheus-scaledobject Deployment/sample-app 0/30 (avg) 2 10 2 28m
(Optional) Use Prometheus Token to enhance data reading security and configure Prometheus Token verification.
Step 3: Configure producers and consumers
RocketMQ 5.x instance
This example produces and consumes data based on the rocketmq-keda-sample project. Update the RocketMQ instance address, username, and password according to the comments.
Producer
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"time"
)
func main() {
p, err := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rmq-cn-uaxxxxxxy02.cn-beijing.rmq.aliyuncs.com:8080"})), // RocketMQ instance address
producer.WithCredentials(primitive.Credentials{
AccessKey: "xxxxxxxxxxx", // RocketMQ instance username
SecretKey: "xxxxxxxxxxx", // RocketMQ instance password
}),
producer.WithRetry(2),
)
if err != nil {
fmt.Printf("failed to start producer: %s", err.Error())
os.Exit(1)
}
err = p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for true {
res, err := p.SendSync(context.Background(), primitive.NewMessage("keda",
[]byte("Hello RocketMQ Go Client!")))
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
time.Sleep(100 * time.Millisecond)
}
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
Consumer
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
"time"
)
func main() {
sig := make(chan os.Signal)
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("keda"),
consumer.WithCredentials(primitive.Credentials{
AccessKey: "xxxxxxxxxxx", // RocketMQ instance username
SecretKey: "xxxxxxxxxxx", // RocketMQ instance password
}),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rmq-cn-uaxxxxxxy02.cn-beijing.rmq.aliyuncs.com:8080"})), // RocketMQ instance address
)
err := c.Subscribe("keda", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
time.Sleep(1 * time.Second)
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
<-sig
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
}
}
RocketMQ 5.x Serverless instance
This example produces and consumes data based on the sample code provided in RocketMQ-Client. Update the RocketMQ instance address, username, and password according to the comments, and make modifications based on the information provided in Serverless instance Internet access instructions.
Producer
package main
import (
"context"
"fmt"
"log"
"os"
"strconv"
"time"
rmq_client "github.com/apache/rocketmq-clients/golang/v5"
"github.com/apache/rocketmq-clients/golang/v5/credentials"
)
const (
Topic = "xxxxxx" // Topic Name
Endpoint = "xxxxxx" // RocketMQ instance address
AccessKey = "xxxxxx" // RocketMQ instance username
SecretKey = "xxxxxx" // RocketMQ instance password
)
func main() {
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// In most case, you don't need to create many producers, singleton pattern is more recommended.
producer, err := rmq_client.NewProducer(&rmq_client.Config{
Endpoint: Endpoint,
NameSpace: "rqm-xx-xxxxxx", // RocketMQ instance address
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithTopics(Topic),
)
if err != nil {
log.Fatal(err)
}
// start producer
err = producer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop producer
defer producer.GracefulStop()
for i := 0; i < 10; i++ {
// new a message
msg := &rmq_client.Message{
Topic: Topic,
Body: []byte("this is a message : " + strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
// send message in sync
resp, err := producer.Send(context.TODO(), msg)
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(resp); i++ {
fmt.Printf("%#v\n", resp[i])
}
// wait a moment
time.Sleep(time.Second * 1)
}
}
Consumer
package main
import (
"context"
"fmt"
"log"
"os"
"time"
rmq_client "github.com/apache/rocketmq-clients/golang/v5"
"github.com/apache/rocketmq-clients/golang/v5/credentials"
)
const (
Topic = "xxxxxx" // Topic Name
ConsumerGroup = "xxxxxx" // Consumer Group Name
Endpoint = "xxxxxx" // RocketMQ instance address
AccessKey = "xxxxxx" // RocketMQ instance username
SecretKey = "xxxxxx" // RocketMQ instance password
)
var (
// maximum waiting time for receive func
awaitDuration = time.Second * 5
// maximum number of messages received at one time
maxMessageNum int32 = 16
// invisibleDuration should > 20s
invisibleDuration = time.Second * 20
// receive messages in a loop
)
func main() {
// log to console
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// In most case, you don't need to create many consumers, singleton pattern is more recommended.
simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
Endpoint: Endpoint,
NameSpace: "rqm-xx-xxxxxx", // RocketMQ instance address
ConsumerGroup: ConsumerGroup,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithSimpleAwaitDuration(awaitDuration),
rmq_client.WithSimpleSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
Topic: rmq_client.SUB_ALL,
}),
)
if err != nil {
log.Fatal(err)
}
// start simpleConsumer
err = simpleConsumer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop simpleConsumer
defer simpleConsumer.GracefulStop()
go func() {
for {
fmt.Println("start receive message")
mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
if err != nil {
fmt.Println(err)
}
// ack message
for _, mv := range mvs {
simpleConsumer.Ack(context.TODO(), mv)
fmt.Println(mv)
}
fmt.Println("wait a moment")
fmt.Println()
time.Sleep(time.Second * 3)
}
}()
// run for a while
time.Sleep(time.Minute)
}
Step 4: Implement scaling using production and consumption data
The following example demonstrates using a RocketMQ 5.x instance.
Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances.
In the top navigation bar, select a region, such as China (Hangzhou), and then click the name of the target instance in the instance list to view and record the Endpoint and Network Information.
In the navigation pane on the left, click Access Control, and then click the Intelligent Authentication tab to view and record the instance username and password.
Run the Producer program to produce data and check the HPA scaling status.
Produce data.
go run producer.goCheck the HPA scaling status.
kubectl get hpaExpected results:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-prometheus-scaledobject Deployment/sample-app 32700m/30 (avg) 2 10 10 47mAs you can see, the sample-app application has scaled out to the maximum number of replicas set by the KEDA component.
Close the Producer program, run the Consumer program, and check the HPA scaling status.
Consume data.
go run consumer.goCheck the HPA scaling status
kubectl get hpa -wExpected results:
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-prometheus-scaledobject Deployment/sample-app 222500m/30 (avg) 2 10 10 50m keda-hpa-prometheus-scaledobject Deployment/sample-app 232400m/30 (avg) 2 10 10 51m keda-hpa-prometheus-scaledobject Deployment/sample-app 0/30 (avg) 2 10 10 52m keda-hpa-prometheus-scaledobject Deployment/sample-app 0/30 (avg) 2 10 2 57m
As you can see, after the data consumption is completed (about 5 minutes), the sample-app application scales in to the minimum number of replicas set by the KEDA component.
References
You can also implement KEDA based on RabbitMQ metrics to monitor queue length and message rate metrics. For more information, see Horizontal pod autoscaling based on RabbitMQ metrics.
