This topic provides C# sample code for sending and receiving scheduled and delayed messages through the ApsaraMQ for RocketMQ HTTP client SDK.
Scheduled vs. delayed messages
Scheduled and delayed messages both defer delivery, but differ in how you specify the delivery time:
| Type | Delivery behavior | How to set StartDeliverTime |
|---|---|---|
| Delayed message | Delivered after a fixed delay from the time of sending | Current timestamp + delay duration (in milliseconds) |
| Scheduled message | Delivered at a specific point in time | Target delivery timestamp (in milliseconds) |
Over HTTP, both types use the same StartDeliverTime property -- an absolute timestamp in milliseconds. The broker holds the message until that timestamp, then delivers it to consumers.
For more information, see Scheduled messages and delayed messages.
Use cases
Order timeout: Send a delayed message that checks payment status after 30 minutes. Cancel the order if unpaid.
Retry with backoff: After a transient failure, schedule a retry with an increasing delay.
Timed notifications: Deliver a reminder at a specific future time.
Prerequisites
Before you begin, make sure that you have:
The C# HTTP client SDK installed. For setup instructions, see Prepare the environment
An ApsaraMQ for RocketMQ instance, topic, and consumer group created in the ApsaraMQ for RocketMQ console
An Alibaba Cloud AccessKey pair. For instructions, see Create an AccessKey pair
Send scheduled or delayed messages
The StartDeliverTime property controls when the broker delivers each message. Set it to a Unix timestamp in milliseconds.
For a delayed message, add the delay duration to the current timestamp:
AliyunSDKUtils.GetNowTimeStamp() + delayMs.For a scheduled message, set the parameter to the time difference between the scheduled point in time and the current point in time.
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-http-endpoint> | HTTP endpoint from the Instance Details page in the ApsaraMQ for RocketMQ console | http://xxxx.mqrest.cn-hangzhou.aliyuncs.com |
<your-topic> | Topic name | scheduled-topic |
<your-instance-id> | Instance ID (set to null or "" if the instance has no namespace) | MQ_INST_xxxx |
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;
namespace Aliyun.MQ.Sample
{
public class ProducerSample
{
// HTTP endpoint. Get this from the Instance Details page in the ApsaraMQ for RocketMQ console.
private const string _endpoint = "<your-http-endpoint>";
// Get AccessKey credentials from environment variables.
// Do not hardcode credentials in your source code.
private const string _accessKeyId = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_ID");
private const string _secretAccessKey = Environment.GetEnvironmentVariable("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Topic to publish to. Create this in the ApsaraMQ for RocketMQ console.
private const string _topicName = "<your-topic>";
// Instance ID. If the instance has no namespace, set this to null or "".
// Check the Instance Details page to determine whether your instance has a namespace.
private const string _instanceId = "<your-instance-id>";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQProducer producer = _client.GetProducer(_instanceId, _topicName);
static void Main(string[] args)
{
try
{
for (int i = 0; i < 4; i++)
{
TopicMessage sendMsg;
sendMsg = new TopicMessage("hello mq");
// Set a custom property on the message.
sendMsg.PutProperty("a", i.ToString());
// Key line: set the delivery time.
// Delayed message: add the delay (in ms) to the current timestamp.
// Scheduled message: set the parameter to the time difference between the scheduled point in time and the current point in time.
sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000; // 10-second delay
TopicMessage result = producer.PublishMessage(sendMsg);
Console.WriteLine("Message published: " + result);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
}
}
}Receive scheduled or delayed messages
Consume scheduled and delayed messages the same way you consume normal messages. The broker delivers them only after StartDeliverTime, so no special consumer configuration is required.
The following example uses long polling to consume messages and acknowledge each batch.
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<your-http-endpoint> | HTTP endpoint from the Instance Details page | http://xxxx.mqrest.cn-hangzhou.aliyuncs.com |
<your-topic> | Topic name | scheduled-topic |
<your-instance-id> | Instance ID (set to null or "" if the instance has no namespace) | MQ_INST_xxxx |
<your-group-id> | Consumer group ID | GID_scheduled_consumer |
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;
namespace Aliyun.MQ.Sample
{
public class ConsumerSample
{
// HTTP endpoint. Get this from the Instance Details page in the ApsaraMQ for RocketMQ console.
private const string _endpoint = "<your-http-endpoint>";
// The AccessKey ID that is used for authentication.
private const string _accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret that is used for authentication.
private const string _secretAccessKey = "${SECRET_KEY}";
// Topic to consume from. Create this in the ApsaraMQ for RocketMQ console.
private const string _topicName = "<your-topic>";
// Instance ID. If the instance has no namespace, set this to null or "".
private const string _instanceId = "<your-instance-id>";
// Consumer group ID. Create this in the ApsaraMQ for RocketMQ console.
private const string _groupId = "<your-group-id>";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// Consume messages in a loop. For production use, run multiple threads concurrently.
while (true)
{
try
{
// Long polling: if no messages are available, the request waits on the broker
// for up to the specified number of seconds before returning.
List<Message> messages = null;
try
{
messages = consumer.ConsumeMessage(
3, // Max messages per batch (up to 16)
3 // Long polling timeout in seconds (up to 30)
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
// Process each message.
foreach (Message message in messages)
{
Console.WriteLine(message);
Console.WriteLine("Property a is:" + message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// Acknowledge messages. If the broker does not receive an ACK
// before Message.nextConsumeTime, it redelivers the message.
// Each delivery assigns a new receipt handle.
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack message success:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
// An ACK can fail if the receipt handle has expired.
if (exp2 is AckMessageException)
{
AckMessageException ackExp = (AckMessageException)exp2;
Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
}See also
Scheduled messages and delayed messages -- Concepts and limitations
Create resources -- Set up instances, topics, and consumer groups
Prepare the environment -- Install and configure the C# HTTP client SDK