All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive normal messages

Last Updated:Mar 11, 2026

Normal messages are the default message type in ApsaraMQ for RocketMQ with no special delivery semantics. Unlike scheduled, delayed, ordered, and transactional messages, normal messages do not carry additional delivery behavior.

This topic provides sample code for sending and receiving normal messages by using the HTTP client SDK for Go.

Prerequisites

Before you begin, make sure that you have:

Configuration parameters

Both examples require the following parameters. Replace the placeholders with your actual values before running the code.

PlaceholderDescriptionWhere to find it
<your-http-endpoint>The HTTP endpoint of your instanceInstance Details page > HTTP Endpoint section in the ApsaraMQ for RocketMQ console
<your-topic>The topic to send messages toMust be created in the console. Each topic supports only one message type
<your-instance-id>The ID of your instanceInstance Details page. If the instance has no namespace, set this to null or an empty string
<your-group-id>The ID of your consumer group (consumer only)Must be created in the console

AccessKey credentials are read from environment variables:

  • ALIBABA_CLOUD_ACCESS_KEY_ID

  • ALIBABA_CLOUD_ACCESS_KEY_SECRET

Send normal messages

The following example initializes a producer and sends four messages to a topic. Each message includes a body, an optional tag, a message key, and custom properties.

package main

import (
    "fmt"
    "os"
    "strconv"
    "time"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    endpoint := "<your-http-endpoint>"
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    topic := "<your-topic>"
    instanceId := "<your-instance-id>"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    producer := client.GetProducer(instanceId, topic)

    for i := 0; i < 4; i++ {
        msg := mq_http_sdk.PublishMessageRequest{
            MessageBody: "hello mq!",
            MessageTag:  "",                  // Optional: filter tag
            Properties:  map[string]string{}, // Optional: custom properties
        }
        msg.MessageKey = "MessageKey"
        msg.Properties["a"] = strconv.Itoa(i)

        ret, err := producer.PublishMessage(msg)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n",
            ret.MessageId, ret.MessageBodyMD5)

        time.Sleep(100 * time.Millisecond)
    }
}

Key points:

  • PublishMessage is a synchronous call that returns MessageId and MessageBodyMD5 on success.

  • Set MessageTag to route messages to specific consumers that subscribe with a matching tag filter.

  • Use MessageKey to set a custom identifier for the message.

Consume normal messages

The following example initializes a consumer and continuously polls for messages using long polling. After processing each batch, it acknowledges the messages to prevent redelivery.

package main

import (
    "fmt"
    "os"
    "strings"
    "time"

    "github.com/aliyunmq/mq-http-go-sdk"
    "github.com/gogap/errors"
)

func main() {
    endpoint := "<your-http-endpoint>"
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    topic := "<your-topic>"
    instanceId := "<your-instance-id>"
    groupId := "<your-group-id>"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    consumer := client.GetConsumer(instanceId, topic, groupId, "")

    for {
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)

        go func() {
            select {
            case resp := <-respChan:
                // Process the batch
                var handles []string
                fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
                for _, v := range resp.Messages {
                    handles = append(handles, v.ReceiptHandle)
                    fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                        "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                        "\tBody: %s\n"+
                        "\tProps: %s\n",
                        v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                        v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
                }

                // Acknowledge processed messages
                ackerr := consumer.AckMessage(handles)
                if ackerr != nil {
                    fmt.Println(ackerr)
                    if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
                        for _, errAckItem := range errAckItems {
                            fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                                errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
                        }
                    } else {
                        fmt.Println("ack err =", ackerr)
                    }
                    time.Sleep(3 * time.Second)
                } else {
                    fmt.Printf("Ack ---->\n\t%s\n", handles)
                }
                endChan <- 1

            case err := <-errChan:
                if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
                    fmt.Println("\nNo new message, continue!")
                } else {
                    fmt.Println(err)
                    time.Sleep(3 * time.Second)
                }
                endChan <- 1

            case <-time.After(35 * time.Second):
                fmt.Println("Timeout of consumer message ??")
                endChan <- 1
            }
        }()

        // Long polling: wait up to 3 seconds for new messages, fetch up to 3 per batch
        consumer.ConsumeMessage(respChan, errChan,
            3, // Max messages per batch (up to 16)
            3, // Long polling wait time in seconds (up to 30)
        )
        <-endChan
    }
}

How long polling works

In long polling mode, if no message is available, the request is held on the broker for the duration you specify (up to 30 seconds). As soon as a message arrives, the broker responds immediately. This reduces empty responses compared to short polling. The default network timeout is 35 seconds.

Acknowledgment and redelivery

After you process a message, call AckMessage with the receipt handle to confirm consumption. If the broker does not receive an acknowledgment before the NextConsumeTime deadline, it redelivers the message. Each redelivery assigns a new receipt handle.

Note: If the receipt handle expires before you acknowledge the message, the acknowledgment fails. The error response includes ErrorHandle, ErrorCode, and ErrorMsg for each failed handle.

Related topics