Use the Python HTTP client SDK to send and receive scheduled and delayed messages in ApsaraMQ for RocketMQ. This topic provides complete producer and consumer code examples.
How scheduled and delayed messages work
Scheduled and delayed messages defer delivery to consumers until a specified time:
Delayed message: Delivered after a fixed delay. For example, a message with a 10-second delay becomes available to consumers 10 seconds after it is sent.
Scheduled message: Delivered at a specific point in time. For example, a message scheduled for 2024-01-15 15:00:00 is delivered at that time.
Over HTTP, both types use the same API. Set start_deliver_time to a future Unix timestamp in milliseconds:
For a delayed message, calculate the timestamp as
current_time + delay_duration.For a scheduled message, set the timestamp directly to the target delivery time.
For more information, see Scheduled messages and delayed messages.
Prerequisites
Before you begin, make sure that you have:
The Python HTTP client SDK installed. For details, see Prepare the environment
An ApsaraMQ for RocketMQ instance, topic, and consumer group created in the ApsaraMQ for RocketMQ console. For details, see Create resources
An AccessKey pair for your Alibaba Cloud account. For details, see Create an AccessKey pair
Send scheduled or delayed messages
The following example sends four delayed messages, each delivered 10 seconds after being sent. To send a scheduled message instead, set start_deliver_time to the target delivery time as a Unix timestamp in milliseconds.
Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<http-endpoint> | HTTP endpoint of your instance. Find it on the Instance Details page in the ApsaraMQ for RocketMQ console | http://xxxxx.mqrest.cn-hangzhou.aliyuncs.com |
<topic> | Name of the topic to send messages to | scheduled-msg-topic |
<instance-id> | ID of the instance that owns the topic. Required if the instance has a namespace. If the instance does not have a namespace, pass an empty string. Check the Instance Details page for the namespace status | MQ_INST_xxxxx |
import sys
import os
import time
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
# Initialize the producer client
mq_client = MQClient(
# HTTP endpoint. Find it in the HTTP Endpoint section
# of the Instance Details page in the ApsaraMQ for RocketMQ console.
"<http-endpoint>",
# AccessKey ID and AccessKey secret for authentication.
# Store credentials in environment variables instead of hardcoding them.
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
topic_name = "<topic>"
instance_id = "<instance-id>"
producer = mq_client.get_producer(instance_id, topic_name)
# Send four messages
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
msg = TopicMessage(
"I am test message %s.hello" % i, # Message content
"tag1" # Message tag
)
# Set a custom message attribute
msg.put_property("a", "i")
# Set the message key for tracing
msg.set_message_key("MessageKey")
# Deliver this message after a 10-second delay.
# The value is a Unix timestamp in milliseconds.
# For a scheduled message, replace the calculation below
# with the target delivery time in milliseconds.
msg.set_start_deliver_time(int(round(time.time() * 1000)) + 10 * 1000)
re_msg = producer.publish_message(msg)
print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topic not exist, please create it.")
sys.exit(1)
print("Publish Message Fail. Exception:%s" % e)Key parameter: set_start_deliver_time
Controls when the broker delivers the message. Pass a Unix timestamp in milliseconds:
| Delivery type | How to set the timestamp | Example |
|---|---|---|
| Delayed delivery | Current time plus the delay duration | int(round(time.time() * 1000)) + 10 * 1000 |
| Scheduled delivery | Absolute timestamp of the target delivery time | 1700000000000 |
Receive scheduled and delayed messages
The broker holds scheduled and delayed messages until the delivery time arrives, then makes them available for consumption. Consume them the same way as normal messages.
The following example uses long polling to receive and acknowledge messages. Replace the following placeholders with your actual values:
| Placeholder | Description | Example |
|---|---|---|
<http-endpoint> | HTTP endpoint of your instance | http://xxxxx.mqrest.cn-hangzhou.aliyuncs.com |
<topic> | Name of the topic to consume from | scheduled-msg-topic |
<group-id> | ID of the consumer group | GID_scheduled_msg |
<instance-id> | ID of the instance. Pass an empty string if the instance has no namespace | MQ_INST_xxxxx |
import os
import time
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# Initialize the consumer client
mq_client = MQClient(
"<http-endpoint>",
os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
topic_name = "<topic>"
group_id = "<group-id>"
instance_id = "<instance-id>"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# Long polling timeout in seconds (max: 30)
wait_seconds = 3
# Maximum messages per request (max: 16)
batch = 3
print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
% (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)))
while True:
try:
# Long polling: the request blocks on the broker until a message
# arrives or wait_seconds elapses, whichever comes first.
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nConsumedTimes: %s \
\nPublishTime: %s\nBody: %s \
\nNextConsumeTime: %s \
\nReceiptHandle: %s \
\nProperties: %s\n" % \
(msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle, msg.properties)))
except MQExceptionBase as e:
if e.type == "MessageNotExist":
print(("No new message! RequestId: %s" % e.req_id))
continue
print(("Consume Message Fail! Exception:%s\n" % e))
time.sleep(2)
continue
# Acknowledge consumed messages.
# If the broker does not receive an ACK before msg.next_consume_time,
# it redelivers the message. Each delivery generates a new receipt handle.
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
except MQExceptionBase as e:
print(("\nAk Message Fail! Exception:%s" % e))
if e.sub_errors:
for sub_error in e.sub_errors:
print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
(sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))How long polling works
consume_message blocks on the broker for up to wait_seconds. If a message becomes available during that period, the broker responds immediately. Otherwise, the request returns after the timeout.
| Parameter | Description | Range |
|---|---|---|
wait_seconds | Long polling timeout in seconds | Maximum: 30 |
batch | Maximum messages returned per request | Maximum: 16 |
How message acknowledgment works
After processing a message, acknowledge it by sending the receipt handle back to the broker. If no ACK arrives before next_consume_time, the broker redelivers the message with a new receipt handle. Always use the handle from the most recent delivery.
What's next
Scheduled messages and delayed messages -- concepts and feature details
Prepare the environment -- SDK setup for other languages