All Products
Search
Document Center

ApsaraMQ for RocketMQ:Sample code

Last Updated:Mar 11, 2026

Send and receive messages with the ApsaraMQ for RocketMQ 5.x SDK for Python. Each example covers a specific message type and is ready to run after you replace the placeholder values with your instance details.

Prerequisites

Before you begin, make sure that you have:

Parameters

Replace the following placeholders in each code example with your actual values.

ParameterExample valueDescription
<your-endpoint>rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080The endpoint of your ApsaraMQ for RocketMQ instance. To get the endpoint, see Get the instance endpoint. Use the public endpoint for internet access or the VPC endpoint for VPC access.
<your-instance-id>rmq-cn-xxxThe ID of your ApsaraMQ for RocketMQ instance. Required only for public network access on serverless instances.
<your-topic>normal_testThe topic to which messages are sent or from which messages are consumed. Create the topic in advance.
<your-consumer-group>GID_testThe consumer group used to consume messages. Create the consumer group in advance.
<your-ak>1XVg0hzgKm******The username of your instance. Required for internet access. For VPC access, required only if the instance is serverless and authentication-free in VPCs is disabled. See Get the instance username and password.
<your-sk>ijSt8rEc45******The password of your instance. Same access conditions as the username.

Send normal messages

Normal messages have no special delivery semantics and are the most common message type.

Send synchronously

The send() method blocks until the broker acknowledges the message.

from rocketmq import ClientConfiguration, Credentials, Message, Producer

if __name__ == '__main__':
    # Instance endpoint
    endpoints = "<your-endpoint>"
    # Authenticate with your instance username and password
    credentials = Credentials("<your-ak>", "<your-sk>")
    config = ClientConfiguration(endpoints, credentials)
    topic = "<your-topic>"
    producer = Producer(config, (topic,))

    try:
        producer.startup()
        try:
            msg = Message()
            msg.topic = topic
            msg.body = "hello, rocketmq.".encode('utf-8')
            # Tag: secondary classifier of messages besides topic
            msg.tag = "tag"
            # Keys: alternative way to identify messages besides message ID
            msg.keys = "keys"
            # Custom user property
            msg.add_property("key", "value")
            for i in range(10):
                res = producer.send(msg)
                print(f"Send message success. {res}")
            producer.shutdown()
        except Exception as e:
            print(f"Send message failed: {e}")
            producer.shutdown()
    except Exception as e:
        print(f"Producer startup failed: {e}")
        producer.shutdown()

Send asynchronously

The send_async() method returns immediately and invokes a callback when the broker responds. Use asynchronous sending for higher throughput when you do not need to wait for each acknowledgment.

from rocketmq import ClientConfiguration, Credentials, Message, Producer


def handle_send_result(result_future):
    try:
        # Avoid time-consuming logic in the callback; offload to another thread if needed
        res = result_future.result()
        print(f"Send message success. {res}")
    except Exception as exception:
        print(f"Send message failed: {exception}")


if __name__ == '__main__':
    endpoints = "<your-endpoint>"
    credentials = Credentials("<your-ak>", "<your-sk>")
    config = ClientConfiguration(endpoints, credentials)
    topic = "<your-topic>"
    producer = Producer(config, (topic,))

    try:
        producer.startup()
        try:
            for i in range(10):
                msg = Message()
                msg.topic = topic
                msg.body = "hello, rocketmq.".encode('utf-8')
                msg.tag = "tag"
                msg.keys = "keys"
                msg.add_property("send", "async")
                send_result_future = producer.send_async(msg)
                send_result_future.add_done_callback(handle_send_result)
        except Exception as e:
            print(f"Send message failed: {e}")
    except Exception as e:
        print(f"Producer startup failed: {e}")

    input("Press Enter to stop the application.")
    producer.shutdown()

Send ordered messages

Ordered messages are delivered in the order they are sent within the same message group. Set the message_group property to group related messages.

from rocketmq import ClientConfiguration, Credentials, Message, Producer

if __name__ == '__main__':
    endpoints = "<your-endpoint>"
    credentials = Credentials("<your-ak>", "<your-sk>")
    config = ClientConfiguration(endpoints, credentials)
    topic = "<your-topic>"
    producer = Producer(config, (topic,))

    try:
        producer.startup()
        try:
            msg = Message()
            msg.topic = topic
            msg.body = "hello, rocketmq.".encode('utf-8')
            msg.tag = "rocketmq-send-fifo-message"
            msg.keys = "keys"
            # Messages with the same message_group are delivered in order
            msg.message_group = "fifo-group"
            for i in range(10):
                res = producer.send(msg)
                print(f"Send message success. {res}")
            producer.shutdown()
        except Exception as e:
            print(f"Send message failed: {e}")
            producer.shutdown()
    except Exception as e:
        print(f"Producer startup failed: {e}")
        producer.shutdown()
Note

The source code for ordered messages is available at fifo_producer_example.py.

Send scheduled and delayed messages

Scheduled and delayed messages are delivered after a specified timestamp. Set delivery_timestamp to a Unix epoch time in seconds.

import time

from rocketmq import ClientConfiguration, Credentials, Message, Producer

if __name__ == '__main__':
    endpoints = "<your-endpoint>"
    credentials = Credentials("<your-ak>", "<your-sk>")
    config = ClientConfiguration(endpoints, credentials)
    topic = "<your-topic>"
    producer = Producer(config, (topic,))

    try:
        producer.startup()
        try:
            msg = Message()
            msg.topic = topic
            msg.body = "hello, rocketmq.".encode('utf-8')
            msg.tag = "rocketmq-send-delay-message"
            # Deliver the message 10 seconds from now
            msg.delivery_timestamp = int(time.time()) + 10
            res = producer.send(msg)
            print(f"Send message success. {res}")
            producer.shutdown()
        except Exception as e:
            print(f"Send message failed: {e}")
            producer.shutdown()
    except Exception as e:
        print(f"Producer startup failed: {e}")
        producer.shutdown()

Send transactional messages

Transactional messages support two-phase commit. The broker checks back with the producer to confirm or roll back uncommitted messages through a TransactionChecker.

from rocketmq import (ClientConfiguration, Credentials, Message, Producer,
                      TransactionChecker, TransactionResolution)


class OrderTransactionChecker(TransactionChecker):
    """Check whether the local transaction succeeded and return COMMIT or ROLLBACK."""

    def check(self, message: Message) -> TransactionResolution:
        print(f"Transaction check for message: {message}. Committing.")
        return TransactionResolution.COMMIT


if __name__ == '__main__':
    endpoints = "<your-endpoint>"
    credentials = Credentials("<your-ak>", "<your-sk>")
    config = ClientConfiguration(endpoints, credentials)
    topic = "<your-topic>"
    # Pass the transaction checker to the producer
    producer = Producer(config, (topic,), checker=OrderTransactionChecker())

    try:
        producer.startup()
    except Exception as e:
        print(f"Producer startup failed: {e}")

    try:
        # Begin a half-message transaction
        transaction = producer.begin_transaction()
        msg = Message()
        msg.topic = topic
        msg.body = "hello, rocketmq.".encode('utf-8')
        msg.tag = "rocketmq-send-transaction-message"
        res = producer.send(msg, transaction)
        print(f"Send half message success. {res}")

        # Option 1: Commit from client side
        transaction.commit()
        print(f"Committed message: {transaction.message_id}")

        # Option 2: Roll back instead
        # transaction.rollback()
        # print(f"Rolled back message: {transaction.message_id}")

        producer.shutdown()
    except Exception as e:
        print(f"Transaction send failed: {e}")
        producer.shutdown()

Send lite messages

Lite messages use a parent topic with sub-topics (lite topics) to categorize messages at a finer granularity. Set lite_topic on the message to specify the sub-topic.

from rocketmq import ClientConfiguration, Credentials, Message, Producer

if __name__ == '__main__':
    endpoints = "<your-endpoint>"
    credentials = Credentials("<your-ak>", "<your-sk>")
    config = ClientConfiguration(endpoints, credentials)
    # Use the parent topic
    topic = "<your-topic>"
    producer = Producer(config, (topic,))

    try:
        producer.startup()
        try:
            msg = Message()
            msg.topic = topic
            msg.body = "hello, rocketmq.".encode('utf-8')
            for i in range(10):
                # Route each message to a lite topic under the parent topic
                msg.lite_topic = f"lite-test-{i}"
                res = producer.send(msg)
                print(f"Send message success. {res}")
            producer.shutdown()
        except Exception as e:
            print(f"Send message failed: {e}")
            producer.shutdown()
    except Exception as e:
        print(f"Producer startup failed: {e}")
        producer.shutdown()

Consume messages with a simple consumer

A simple consumer pulls messages from the broker with long polling. After processing each message, call ack() to acknowledge it.

from rocketmq import (ClientConfiguration, Credentials, FilterExpression,
                      SimpleConsumer)

if __name__ == '__main__':
    endpoints = "<your-endpoint>"
    credentials = Credentials("<your-ak>", "<your-sk>")
    config = ClientConfiguration(endpoints, credentials)
    topic = "<your-topic>"
    consumer_group = "<your-consumer-group>"
    # Use a singleton consumer instance in production; avoid creating multiple consumers
    simple_consumer = SimpleConsumer(config, consumer_group, {topic: FilterExpression()})

    try:
        simple_consumer.startup()
        try:
            # Optional: filter by tag
            # simple_consumer.subscribe(topic, FilterExpression("your-tag"))
            while True:
                try:
                    # Long poll for up to 32 messages; each message is invisible
                    # for 15 seconds after receipt
                    messages = simple_consumer.receive(32, 15)
                    if messages is not None:
                        for msg in messages:
                            simple_consumer.ack(msg)
                            print(f"Acknowledged message: [{msg.message_id}]")
                except Exception as e:
                    print(f"Receive or ack failed: {e}")
        except Exception as e:
            print(f"Consumer error: {e}")
            simple_consumer.shutdown()
    except Exception as e:
        print(f"Consumer startup failed: {e}")
        simple_consumer.shutdown()

Consume lite messages with a push consumer

A push consumer receives messages automatically through a registered listener. For the lite message push consumer example, see lite_push_consumer_example.py.

Public network access for serverless instances

To access a serverless ApsaraMQ for RocketMQ instance over the public network, pass the instance ID as the third argument to ClientConfiguration:

config = ClientConfiguration(endpoints, credentials, "<your-instance-id>")

Replace <your-instance-id> with the actual ID of your instance, for example rmq-cn-xxx.

GitHub source files

The following table maps each message type to its GitHub source file.

Message typeSendConsume
Normal messagesSync: normal_producer_example.py, Async: async_producer_example.pysimple_consumer_example.py
Ordered messagesfifo_producer_example.py-
Scheduled and delayed messagesdelay_producer_example.py-
Transactional messagestransaction_producer_example.py-
Lite messageslite_producer_example.pylite_push_consumer_example.py

See also