All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive scheduled and delayed messages

Last Updated:Mar 11, 2026

This topic provides sample code for sending and receiving scheduled and delayed messages by using the ApsaraMQ for RocketMQ HTTP client SDK for Go.

Scheduled and delayed messages

Scheduled and delayed messages control when a message is delivered to consumers:

  • Delayed message: Delivered after a specified delay from the time it is sent. For example, send a message now and have it delivered 30 seconds later.

  • Scheduled message: Delivered at a specific point in time. For example, send a message now and have it delivered at 14:00 tomorrow.

Over HTTP, both types use the same mechanism: set the StartDeliverTime field to a future Unix timestamp in milliseconds. The broker holds the message until that timestamp, then delivers it.

For more information, see Scheduled messages and delayed messages.

Prerequisites

Before you begin, make sure that you have:

How it works

Both scheduled and delayed messages use the StartDeliverTime field to control delivery timing. Set this field to a Unix timestamp in milliseconds representing when the broker should deliver the message.

  • Delayed delivery: Add the desired delay (in milliseconds) to the current time.

      // Deliver 10 seconds from now
      msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
  • Scheduled delivery: Convert the target delivery time to a Unix timestamp in milliseconds.

      // Deliver at a specific time
      msg.StartDeliverTime = targetTime.UTC().Unix() * 1000
If StartDeliverTime is set to a timestamp earlier than the current time, the message is delivered immediately.

Send scheduled and delayed messages

The following example sends four delayed messages, each scheduled for delivery 10 seconds after being sent.

Replace the following placeholders with your actual values:

PlaceholderDescriptionExample
${HTTP_ENDPOINT}HTTP endpoint from the HTTP Endpoint section of the Instance Details pagehttp://1234567890.mqrest.cn-hangzhou.aliyuncs.com
${TOPIC}Topic name created in the ApsaraMQ for RocketMQ consoledelayed-msg-topic
${INSTANCE_ID}Instance ID. If the instance has a namespace, specify the ID. If not, set to null or an empty stringMQ_INST_1234567890_ABCDEF
package main

import (
    "fmt"
    "time"
    "strconv"
    "os"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // HTTP endpoint from the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
    endpoint := "${HTTP_ENDPOINT}"
    // AccessKey pair from environment variables.
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // Topic created in the ApsaraMQ for RocketMQ console.
    topic := "${TOPIC}"
    // Instance ID. If the instance has a namespace, specify the ID.
    // If the instance does not have a namespace, set this to null or an empty string.
    // You can check the namespace on the Instance Details page in the ApsaraMQ for RocketMQ console.
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqProducer := client.GetProducer(instanceId, topic)
    // Cyclically send four messages.
    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest

            msg = mq_http_sdk.PublishMessageRequest{
            MessageBody: "hello mq!",         // The message content.
            MessageTag:  "",                  // The message tag.
            Properties:  map[string]string{}, // The message properties.
            }
        // The message key.
            msg.MessageKey = "MessageKey"
            // The custom properties of the message.
            msg.Properties["a"] = strconv.Itoa(i)
            // Deliver 10 seconds from now. Set to a Unix timestamp in milliseconds.
            // For scheduled delivery, set this to the time difference between the scheduled point and the current time.
            msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000

        ret, err := mqProducer.PublishMessage(msg)

        if err != nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}

Receive scheduled and delayed messages

The following example consumes scheduled and delayed messages using long polling. The consumer waits up to 3 seconds for new messages and processes up to 3 messages per batch.

Replace the following placeholders with your actual values:

PlaceholderDescriptionExample
${HTTP_ENDPOINT}HTTP endpoint from the HTTP Endpoint section of the Instance Details pagehttp://1234567890.mqrest.cn-hangzhou.aliyuncs.com
${TOPIC}Topic name created in the ApsaraMQ for RocketMQ consoledelayed-msg-topic
${INSTANCE_ID}Instance ID. If the instance has a namespace, specify the ID. If not, set to null or an empty stringMQ_INST_1234567890_ABCDEF
${GROUP_ID}Consumer group ID created in the ApsaraMQ for RocketMQ consoleGID_delayed_consumer
Each topic supports only one message type. A topic used for scheduled or delayed messages cannot send or receive other message types.
package main

import (
    "fmt"
    "github.com/gogap/errors"
    "strings"
    "time"
    "os"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // HTTP endpoint from the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
    endpoint := "${HTTP_ENDPOINT}"
    // AccessKey pair from environment variables.
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // Topic created in the ApsaraMQ for RocketMQ console.
    // Each topic can only send and receive messages of a specific type.
    topic := "${TOPIC}"
    // Instance ID. If the instance has a namespace, specify the ID.
    // If the instance does not have a namespace, set this to null or an empty string.
    // You can check the namespace on the Instance Details page in the ApsaraMQ for RocketMQ console.
    instanceId := "${INSTANCE_ID}"
    // Consumer group ID created in the ApsaraMQ for RocketMQ console.
    groupId := "${GROUP_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

    for {
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    // The message consumption logic.
                    var handles []string
                    fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
                    }

                    // If the broker does not receive an acknowledgment (ACK) before the NextConsumeTime
                    // elapses, the message is redelivered to the consumer.
                    // A unique timestamp is specified for the handle each time the message is consumed.
                    ackerr := mqConsumer.AckMessage(handles)
                    if ackerr != nil {
                        // If the handle times out, the broker fails to receive an ACK from the consumer.
                        fmt.Println(ackerr)
                        if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
                           for _, errAckItem := range errAckItems {
                             fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                               errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
                           }
                        } else {
                           fmt.Println("ack err =", ackerr)
                        }
                        time.Sleep(time.Duration(3) * time.Second)
                    } else {
                        fmt.Printf("Ack ---->\n\t%s\n", handles)
                    }

                    endChan <- 1
                }
            case err := <-errChan:
                {
                    // No message is available for consumption in the topic.
                    if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
                        fmt.Println("\nNo new message, continue!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout of consumer message ??")
                    endChan <- 1
                }
            }
        }()

        // Long polling: the default network timeout is 35 seconds.
        // If no message is available, the request is suspended on the broker for the specified period.
        // If a message becomes available during the wait, the broker responds immediately.
        mqConsumer.ConsumeMessage(respChan, errChan,
            3, // Maximum number of messages per batch (max: 16).
            3, // Long polling wait time in seconds (max: 30).
        )
        <-endChan
    }
}

What's next