Send and receive messages with the ApsaraMQ for RocketMQ 5.x SDK for Python. Each example covers a specific message type and is ready to run after you replace the placeholder values with your instance details.
Prerequisites
Before you begin, make sure that you have:
The RocketMQ Python SDK installed. For version details, see Version guide
An ApsaraMQ for RocketMQ 5.x instance with an endpoint. For setup instructions, see Environment preparation
A topic and a consumer group created on the instance
Parameters
Replace the following placeholders in each code example with your actual values.
| Parameter | Example value | Description |
|---|---|---|
<your-endpoint> | rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080 | The endpoint of your ApsaraMQ for RocketMQ instance. To get the endpoint, see Get the instance endpoint. Use the public endpoint for internet access or the VPC endpoint for VPC access. |
<your-instance-id> | rmq-cn-xxx | The ID of your ApsaraMQ for RocketMQ instance. Required only for public network access on serverless instances. |
<your-topic> | normal_test | The topic to which messages are sent or from which messages are consumed. Create the topic in advance. |
<your-consumer-group> | GID_test | The consumer group used to consume messages. Create the consumer group in advance. |
<your-ak> | 1XVg0hzgKm****** | The username of your instance. Required for internet access. For VPC access, required only if the instance is serverless and authentication-free in VPCs is disabled. See Get the instance username and password. |
<your-sk> | ijSt8rEc45****** | The password of your instance. Same access conditions as the username. |
Send normal messages
Normal messages have no special delivery semantics and are the most common message type.
Send synchronously
The send() method blocks until the broker acknowledges the message.
from rocketmq import ClientConfiguration, Credentials, Message, Producer
if __name__ == '__main__':
# Instance endpoint
endpoints = "<your-endpoint>"
# Authenticate with your instance username and password
credentials = Credentials("<your-ak>", "<your-sk>")
config = ClientConfiguration(endpoints, credentials)
topic = "<your-topic>"
producer = Producer(config, (topic,))
try:
producer.startup()
try:
msg = Message()
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
# Tag: secondary classifier of messages besides topic
msg.tag = "tag"
# Keys: alternative way to identify messages besides message ID
msg.keys = "keys"
# Custom user property
msg.add_property("key", "value")
for i in range(10):
res = producer.send(msg)
print(f"Send message success. {res}")
producer.shutdown()
except Exception as e:
print(f"Send message failed: {e}")
producer.shutdown()
except Exception as e:
print(f"Producer startup failed: {e}")
producer.shutdown()Send asynchronously
The send_async() method returns immediately and invokes a callback when the broker responds. Use asynchronous sending for higher throughput when you do not need to wait for each acknowledgment.
from rocketmq import ClientConfiguration, Credentials, Message, Producer
def handle_send_result(result_future):
try:
# Avoid time-consuming logic in the callback; offload to another thread if needed
res = result_future.result()
print(f"Send message success. {res}")
except Exception as exception:
print(f"Send message failed: {exception}")
if __name__ == '__main__':
endpoints = "<your-endpoint>"
credentials = Credentials("<your-ak>", "<your-sk>")
config = ClientConfiguration(endpoints, credentials)
topic = "<your-topic>"
producer = Producer(config, (topic,))
try:
producer.startup()
try:
for i in range(10):
msg = Message()
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
msg.tag = "tag"
msg.keys = "keys"
msg.add_property("send", "async")
send_result_future = producer.send_async(msg)
send_result_future.add_done_callback(handle_send_result)
except Exception as e:
print(f"Send message failed: {e}")
except Exception as e:
print(f"Producer startup failed: {e}")
input("Press Enter to stop the application.")
producer.shutdown()Send ordered messages
Ordered messages are delivered in the order they are sent within the same message group. Set the message_group property to group related messages.
from rocketmq import ClientConfiguration, Credentials, Message, Producer
if __name__ == '__main__':
endpoints = "<your-endpoint>"
credentials = Credentials("<your-ak>", "<your-sk>")
config = ClientConfiguration(endpoints, credentials)
topic = "<your-topic>"
producer = Producer(config, (topic,))
try:
producer.startup()
try:
msg = Message()
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
msg.tag = "rocketmq-send-fifo-message"
msg.keys = "keys"
# Messages with the same message_group are delivered in order
msg.message_group = "fifo-group"
for i in range(10):
res = producer.send(msg)
print(f"Send message success. {res}")
producer.shutdown()
except Exception as e:
print(f"Send message failed: {e}")
producer.shutdown()
except Exception as e:
print(f"Producer startup failed: {e}")
producer.shutdown()The source code for ordered messages is available at fifo_producer_example.py.
Send scheduled and delayed messages
Scheduled and delayed messages are delivered after a specified timestamp. Set delivery_timestamp to a Unix epoch time in seconds.
import time
from rocketmq import ClientConfiguration, Credentials, Message, Producer
if __name__ == '__main__':
endpoints = "<your-endpoint>"
credentials = Credentials("<your-ak>", "<your-sk>")
config = ClientConfiguration(endpoints, credentials)
topic = "<your-topic>"
producer = Producer(config, (topic,))
try:
producer.startup()
try:
msg = Message()
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
msg.tag = "rocketmq-send-delay-message"
# Deliver the message 10 seconds from now
msg.delivery_timestamp = int(time.time()) + 10
res = producer.send(msg)
print(f"Send message success. {res}")
producer.shutdown()
except Exception as e:
print(f"Send message failed: {e}")
producer.shutdown()
except Exception as e:
print(f"Producer startup failed: {e}")
producer.shutdown()Send transactional messages
Transactional messages support two-phase commit. The broker checks back with the producer to confirm or roll back uncommitted messages through a TransactionChecker.
from rocketmq import (ClientConfiguration, Credentials, Message, Producer,
TransactionChecker, TransactionResolution)
class OrderTransactionChecker(TransactionChecker):
"""Check whether the local transaction succeeded and return COMMIT or ROLLBACK."""
def check(self, message: Message) -> TransactionResolution:
print(f"Transaction check for message: {message}. Committing.")
return TransactionResolution.COMMIT
if __name__ == '__main__':
endpoints = "<your-endpoint>"
credentials = Credentials("<your-ak>", "<your-sk>")
config = ClientConfiguration(endpoints, credentials)
topic = "<your-topic>"
# Pass the transaction checker to the producer
producer = Producer(config, (topic,), checker=OrderTransactionChecker())
try:
producer.startup()
except Exception as e:
print(f"Producer startup failed: {e}")
try:
# Begin a half-message transaction
transaction = producer.begin_transaction()
msg = Message()
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
msg.tag = "rocketmq-send-transaction-message"
res = producer.send(msg, transaction)
print(f"Send half message success. {res}")
# Option 1: Commit from client side
transaction.commit()
print(f"Committed message: {transaction.message_id}")
# Option 2: Roll back instead
# transaction.rollback()
# print(f"Rolled back message: {transaction.message_id}")
producer.shutdown()
except Exception as e:
print(f"Transaction send failed: {e}")
producer.shutdown()Send lite messages
Lite messages use a parent topic with sub-topics (lite topics) to categorize messages at a finer granularity. Set lite_topic on the message to specify the sub-topic.
from rocketmq import ClientConfiguration, Credentials, Message, Producer
if __name__ == '__main__':
endpoints = "<your-endpoint>"
credentials = Credentials("<your-ak>", "<your-sk>")
config = ClientConfiguration(endpoints, credentials)
# Use the parent topic
topic = "<your-topic>"
producer = Producer(config, (topic,))
try:
producer.startup()
try:
msg = Message()
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
for i in range(10):
# Route each message to a lite topic under the parent topic
msg.lite_topic = f"lite-test-{i}"
res = producer.send(msg)
print(f"Send message success. {res}")
producer.shutdown()
except Exception as e:
print(f"Send message failed: {e}")
producer.shutdown()
except Exception as e:
print(f"Producer startup failed: {e}")
producer.shutdown()Consume messages with a simple consumer
A simple consumer pulls messages from the broker with long polling. After processing each message, call ack() to acknowledge it.
from rocketmq import (ClientConfiguration, Credentials, FilterExpression,
SimpleConsumer)
if __name__ == '__main__':
endpoints = "<your-endpoint>"
credentials = Credentials("<your-ak>", "<your-sk>")
config = ClientConfiguration(endpoints, credentials)
topic = "<your-topic>"
consumer_group = "<your-consumer-group>"
# Use a singleton consumer instance in production; avoid creating multiple consumers
simple_consumer = SimpleConsumer(config, consumer_group, {topic: FilterExpression()})
try:
simple_consumer.startup()
try:
# Optional: filter by tag
# simple_consumer.subscribe(topic, FilterExpression("your-tag"))
while True:
try:
# Long poll for up to 32 messages; each message is invisible
# for 15 seconds after receipt
messages = simple_consumer.receive(32, 15)
if messages is not None:
for msg in messages:
simple_consumer.ack(msg)
print(f"Acknowledged message: [{msg.message_id}]")
except Exception as e:
print(f"Receive or ack failed: {e}")
except Exception as e:
print(f"Consumer error: {e}")
simple_consumer.shutdown()
except Exception as e:
print(f"Consumer startup failed: {e}")
simple_consumer.shutdown()Consume lite messages with a push consumer
A push consumer receives messages automatically through a registered listener. For the lite message push consumer example, see lite_push_consumer_example.py.
Public network access for serverless instances
To access a serverless ApsaraMQ for RocketMQ instance over the public network, pass the instance ID as the third argument to ClientConfiguration:
config = ClientConfiguration(endpoints, credentials, "<your-instance-id>")Replace <your-instance-id> with the actual ID of your instance, for example rmq-cn-xxx.
GitHub source files
The following table maps each message type to its GitHub source file.