This topic provides sample code for sending and receiving scheduled and delayed messages by using the ApsaraMQ for RocketMQ HTTP client SDK for Go.
Scheduled and delayed messages
Scheduled and delayed messages control when a message is delivered to consumers:
Delayed message: Delivered after a specified delay from the time it is sent. For example, send a message now and have it delivered 30 seconds later.
Scheduled message: Delivered at a specific point in time. For example, send a message now and have it delivered at 14:00 tomorrow.
Over HTTP, both types use the same mechanism: set the StartDeliverTime field to a future Unix timestamp in milliseconds. The broker holds the message until that timestamp, then delivers it.
For more information, see Scheduled messages and delayed messages.
Prerequisites
Before you begin, make sure that you have:
The Go HTTP SDK installed. For details, see Prepare the environment
An ApsaraMQ for RocketMQ instance, a topic, and a consumer group created in the ApsaraMQ for RocketMQ console
An AccessKey pair for your Alibaba Cloud account. For details, see Create an AccessKey pair
How it works
Both scheduled and delayed messages use the StartDeliverTime field to control delivery timing. Set this field to a Unix timestamp in milliseconds representing when the broker should deliver the message.
Delayed delivery: Add the desired delay (in milliseconds) to the current time.
// Deliver 10 seconds from now msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000Scheduled delivery: Convert the target delivery time to a Unix timestamp in milliseconds.
// Deliver at a specific time msg.StartDeliverTime = targetTime.UTC().Unix() * 1000
If StartDeliverTime is set to a timestamp earlier than the current time, the message is delivered immediately.Send scheduled and delayed messages
The following example sends four delayed messages, each scheduled for delivery 10 seconds after being sent.
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
${HTTP_ENDPOINT} | HTTP endpoint from the HTTP Endpoint section of the Instance Details page | http://1234567890.mqrest.cn-hangzhou.aliyuncs.com |
${TOPIC} | Topic name created in the ApsaraMQ for RocketMQ console | delayed-msg-topic |
${INSTANCE_ID} | Instance ID. If the instance has a namespace, specify the ID. If not, set to null or an empty string | MQ_INST_1234567890_ABCDEF |
package main
import (
"fmt"
"time"
"strconv"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// HTTP endpoint from the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
endpoint := "${HTTP_ENDPOINT}"
// AccessKey pair from environment variables.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Topic created in the ApsaraMQ for RocketMQ console.
topic := "${TOPIC}"
// Instance ID. If the instance has a namespace, specify the ID.
// If the instance does not have a namespace, set this to null or an empty string.
// You can check the namespace on the Instance Details page in the ApsaraMQ for RocketMQ console.
instanceId := "${INSTANCE_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
// Cyclically send four messages.
for i := 0; i < 4; i++ {
var msg mq_http_sdk.PublishMessageRequest
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!", // The message content.
MessageTag: "", // The message tag.
Properties: map[string]string{}, // The message properties.
}
// The message key.
msg.MessageKey = "MessageKey"
// The custom properties of the message.
msg.Properties["a"] = strconv.Itoa(i)
// Deliver 10 seconds from now. Set to a Unix timestamp in milliseconds.
// For scheduled delivery, set this to the time difference between the scheduled point and the current time.
msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
}Receive scheduled and delayed messages
The following example consumes scheduled and delayed messages using long polling. The consumer waits up to 3 seconds for new messages and processes up to 3 messages per batch.
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
${HTTP_ENDPOINT} | HTTP endpoint from the HTTP Endpoint section of the Instance Details page | http://1234567890.mqrest.cn-hangzhou.aliyuncs.com |
${TOPIC} | Topic name created in the ApsaraMQ for RocketMQ console | delayed-msg-topic |
${INSTANCE_ID} | Instance ID. If the instance has a namespace, specify the ID. If not, set to null or an empty string | MQ_INST_1234567890_ABCDEF |
${GROUP_ID} | Consumer group ID created in the ApsaraMQ for RocketMQ console | GID_delayed_consumer |
Each topic supports only one message type. A topic used for scheduled or delayed messages cannot send or receive other message types.
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// HTTP endpoint from the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
endpoint := "${HTTP_ENDPOINT}"
// AccessKey pair from environment variables.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// Topic created in the ApsaraMQ for RocketMQ console.
// Each topic can only send and receive messages of a specific type.
topic := "${TOPIC}"
// Instance ID. If the instance has a namespace, specify the ID.
// If the instance does not have a namespace, set this to null or an empty string.
// You can check the namespace on the Instance Details page in the ApsaraMQ for RocketMQ console.
instanceId := "${INSTANCE_ID}"
// Consumer group ID created in the ApsaraMQ for RocketMQ console.
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// The message consumption logic.
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
}
// If the broker does not receive an acknowledgment (ACK) before the NextConsumeTime
// elapses, the message is redelivered to the consumer.
// A unique timestamp is specified for the handle each time the message is consumed.
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// If the handle times out, the broker fails to receive an ACK from the consumer.
fmt.Println(ackerr)
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
} else {
fmt.Println("ack err =", ackerr)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
// No message is available for consumption in the topic.
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// Long polling: the default network timeout is 35 seconds.
// If no message is available, the request is suspended on the broker for the specified period.
// If a message becomes available during the wait, the broker responds immediately.
mqConsumer.ConsumeMessage(respChan, errChan,
3, // Maximum number of messages per batch (max: 16).
3, // Long polling wait time in seconds (max: 30).
)
<-endChan
}
}What's next
Learn about scheduled messages and delayed messages concepts and constraints
Create resources such as instances, topics, and consumer groups