Normal messages are the default message type in ApsaraMQ for RocketMQ with no special delivery semantics. Unlike scheduled, delayed, ordered, and transactional messages, normal messages do not carry additional delivery behavior.
This topic provides sample code for sending and receiving normal messages by using the HTTP client SDK for Go.
Prerequisites
Before you begin, make sure that you have:
The Go HTTP client SDK installed. For more information, see Prepare the environment
An ApsaraMQ for RocketMQ instance, topic, and consumer group created in the console. For more information, see Create resources
An AccessKey pair for your Alibaba Cloud account. For more information, see Create an AccessKey pair
Configuration parameters
Both examples require the following parameters. Replace the placeholders with your actual values before running the code.
| Placeholder | Description | Where to find it |
|---|---|---|
<your-http-endpoint> | The HTTP endpoint of your instance | Instance Details page > HTTP Endpoint section in the ApsaraMQ for RocketMQ console |
<your-topic> | The topic to send messages to | Must be created in the console. Each topic supports only one message type |
<your-instance-id> | The ID of your instance | Instance Details page. If the instance has no namespace, set this to null or an empty string |
<your-group-id> | The ID of your consumer group (consumer only) | Must be created in the console |
AccessKey credentials are read from environment variables:
ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET
Send normal messages
The following example initializes a producer and sends four messages to a topic. Each message includes a body, an optional tag, a message key, and custom properties.
package main
import (
"fmt"
"os"
"strconv"
"time"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
endpoint := "<your-http-endpoint>"
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
topic := "<your-topic>"
instanceId := "<your-instance-id>"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
producer := client.GetProducer(instanceId, topic)
for i := 0; i < 4; i++ {
msg := mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!",
MessageTag: "", // Optional: filter tag
Properties: map[string]string{}, // Optional: custom properties
}
msg.MessageKey = "MessageKey"
msg.Properties["a"] = strconv.Itoa(i)
ret, err := producer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n",
ret.MessageId, ret.MessageBodyMD5)
time.Sleep(100 * time.Millisecond)
}
}Key points:
PublishMessageis a synchronous call that returnsMessageIdandMessageBodyMD5on success.Set
MessageTagto route messages to specific consumers that subscribe with a matching tag filter.Use
MessageKeyto set a custom identifier for the message.
Consume normal messages
The following example initializes a consumer and continuously polls for messages using long polling. After processing each batch, it acknowledges the messages to prevent redelivery.
package main
import (
"fmt"
"os"
"strings"
"time"
"github.com/aliyunmq/mq-http-go-sdk"
"github.com/gogap/errors"
)
func main() {
endpoint := "<your-http-endpoint>"
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
topic := "<your-topic>"
instanceId := "<your-instance-id>"
groupId := "<your-group-id>"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
consumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
// Process the batch
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
}
// Acknowledge processed messages
ackerr := consumer.AckMessage(handles)
if ackerr != nil {
fmt.Println(ackerr)
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
} else {
fmt.Println("ack err =", ackerr)
}
time.Sleep(3 * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
case err := <-errChan:
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(3 * time.Second)
}
endChan <- 1
case <-time.After(35 * time.Second):
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}()
// Long polling: wait up to 3 seconds for new messages, fetch up to 3 per batch
consumer.ConsumeMessage(respChan, errChan,
3, // Max messages per batch (up to 16)
3, // Long polling wait time in seconds (up to 30)
)
<-endChan
}
}How long polling works
In long polling mode, if no message is available, the request is held on the broker for the duration you specify (up to 30 seconds). As soon as a message arrives, the broker responds immediately. This reduces empty responses compared to short polling. The default network timeout is 35 seconds.
Acknowledgment and redelivery
After you process a message, call AckMessage with the receipt handle to confirm consumption. If the broker does not receive an acknowledgment before the NextConsumeTime deadline, it redelivers the message. Each redelivery assigns a new receipt handle.
Note: If the receipt handle expires before you acknowledge the message, the acknowledgment fails. The error response includes ErrorHandle, ErrorCode, and ErrorMsg for each failed handle.