All Products
Search
Document Center

Message Service:Manage topics and use the QueueEndpoint endpoint type

Last Updated:Jan 15, 2024

This topic describes how to use Message Service (MNS) SDK for Python to create a topic, create a queue, create a subscription, publish messages, receive and delete the messages, delete the topic, and delete the queue.

Step 1: Prepare the environment

  1. Download the latest version of MNS SDK for Python, decompress the package, and then go to the mns_python_sdk subdirectory. In this example, Python 2.x is used.

  2. Open the sample.cfg file, and specify an AccessKey ID, an AccessKey secret, an endpoint, and a Security Token Service (STS) token.

    • AccessKey ID and AccessKey secret

      • The AccessKey pair that is used to call API operations in Alibaba Cloud.

      • If you use an Alibaba Cloud account, go to the AccessKey Pair page of the Alibaba Cloud Management Console to create and view your AccessKey pair.

      • If you use MNS as a Resource Access Management (RAM) user, log on to the RAM console to view the AccessKey pair.

    • Endpoint

    • STS token

      • The temporary access credential that is provided by RAM. If you access Alibaba Cloud services as a RAM user or by using an Alibaba Cloud account, you do not need to specify an STS token. For more information, see What is STS?

  3. Go to the sample directory in which all the sample code resides.

Step 2: Create a topic

Run the createtopic.py file to create a topic. The default name of the new topic is MySampleTopic. You can also specify another topic name. For more information, see Topic.

  • Run the following command:

    python createtopic.py MyTopic1         

    The following result is returned:

    Create Topic Succeed! TopicName:MyTopic1 
  • Sample code:

    The endpoint, AccessKey ID, AccessKey secret, and STS token are retrieved from the sample.cfg file that is specified in Step 1.

    my_account = Account(endpoint, accid, acckey, token)
    topic_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleTopic"
    my_topic = my_account.get_topic(topic_name)
    
    topic_meta = TopicMeta()
    try:
        topic_url = my_topic.create(topic_meta)
        print "Create Topic Succeed! TopicName:%s\n" % topic_name
    except MNSExceptionBase, e:
        if e.type == "TopicAlreadyExist":
            print "Topic already exist, please delete it before creating or use it directly."
            sys.exit(0)
        print "Create Topic Fail! Exception:%s\n" % e         

Step 3: Create a queue

Run the createqueue.py file to create a queue. The default name of the new queue is MySampleQueue. You can also specify another queue name. For more information, see Queue.

  • Run the following command:

    python createqueue.py MyQueue1      

    The following result is returned:

    Create Queue Succeed! QueueName:MyQueue1 
  • Sample code:

    The endpoint, AccessKey ID, AccessKey secret, and STS token are retrieved from the sample.cfg file that is specified in Step 1.

    my_account = Account(endpoint, accid, acckey, token)
    queue_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleQueue"
    my_queue = my_account.get_queue(queue_name)
    
    queue_meta = QueueMeta()
    try:
        queue_url = my_queue.create(queue_meta)
        print "Create Queue Succeed! QueueName:%s\n" % queue_name
    except MNSExceptionBase, e:
        if e.type == "QueueAlreadyExist":
            print "Queue already exist, please delete it before creating or use it directly."
            sys.exit(0)
        print "Create Queue Fail! Exception:%s\n" % e     

Step 4: Create a subscription

Run the subscribe.py file to create a subscription. The following list describes the parameters in the file. For more information, see Subscription.

  • The first parameter specifies the region of the queue. The region of the queue must be the same as that of the topic. The China (Hangzhou) region is specified in this example.

  • The second parameter specifies the name of the queue that is created in Step 3.

  • The third parameter specifies the name of the topic that is created in Step 2.

  • The fourth parameter specifies the name of the subscription. The default name is MySampleTopic-Sub.

  • Run the following command:

    python subscribe_queueendpoint.py cn-hangzhou MyQueue1 MyTopic1 MyTopic1-Sub1

    The following result is returned:

    Create Subscription Succeed! TopicName:MyTopic1 SubName:MyTopic1-Sub1 Endpoint:acs:mns:cn-hangzhou:123456789098****:queues/MyQueue1
  • Sample code:

    The endpoint, AccessKey ID, AccessKey secret, and STS token are retrieved from the sample.cfg file that is specified in Step 1.

    region = sys.argv[1]
    queue_name = sys.argv[2]
    queue_endpoint = TopicHelper.generate_queue_endpoint(region, account_id, queue_name)
    
    my_account = Account(endpoint, accid, acckey, token)
    topic_name = sys.argv[3] if len(sys.argv) > 3 else "MySampleTopic"
    my_topic = my_account.get_topic(topic_name)
    sub_name = sys.argv[4] if len(sys.argv) > 4 else "MySampleTopic-Sub"
    my_sub = my_topic.get_subscription(sub_name)
    
    sub_meta = SubscriptionMeta(queue_endpoint, notify_content_format = SubscriptionNotifyContentFormat.SIMPLIFIED)
    try:
        topic_url = my_sub.subscribe(sub_meta)
        print "Create Subscription Succeed! TopicName:%s SubName:%s Endpoint:%s\n" % (topic_name, sub_name, queue_endpoint)
    except MNSExceptionBase, e:
        if e.type == "TopicNotExist":
            print "Topic not exist, please create topic."
            sys.exit(0)
        elif e.type == "SubscriptionAlreadyExist":
            print "Subscription already exist, please unsubscribe or use it directly."
            sys.exit(0)
        print "Create Subscription Fail! Exception:%s\n" % e

Step 5: Publish messages

Run the publishmessage.py file to publish multiple messages to the topic. You must specify the name of the topic that is created in Step 2 for the first parameter. For more information, see TopicMessage.

  • Run the following command:

    python publishmessage.py MyTopic1        

    The following result is returned:

    ==========Publish Message To Topic==========
    TopicName:MyTopic1
    MessageCount:3
    
    Publish Message Succeed. MessageBody:I am test message 0. MessageID:F6EA56633844DBFC-1-154BDFB****-200000004
    Publish Message Succeed. MessageBody:I am test message 1. MessageID:F6EA56633844DBFC-1-154BDFB****-200000005
    Publish Message Succeed. MessageBody:I am test message 2. MessageID:F6EA56633844DBFC-1-154BDFB****-200000006 
  • Sample code:

    The endpoint, AccessKey ID, AccessKey secret, and STS token are retrieved from the sample.cfg file that is specified in Step 1.

    my_account = Account(endpoint, accid, acckey, token)
    topic_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleTopic"
    my_topic = my_account.get_topic(topic_name)
    
    msg_count = 3
    print "%sPublish Message To Topic%s\nTopicName:%s\nMessageCount:%s\n" % (10*"=", 10*"=", topic_name, msg_count)
    
    for i in range(msg_count):
        try:
            msg_body = "I am test message %s." % i
            msg = TopicMessage(msg_body)
            re_msg = my_topic.publish_message(msg)
            print "Publish Message Succeed. MessageBody:%s MessageID:%s" % (msg_body, re_msg.message_id)
        except MNSExceptionBase,e:
            if e.type == "TopicNotExist":
                print "Topic not exist, please create it."
                sys.exit(1)
            print "Publish Message Fail. Exception:%s" % e          

Step 6: Receive and delete the messages from the queue

After you perform Step 5, MNS pushes the messages to the queue that is specified in Step 4.

In this step, run the recvdelmessage.py file to receive and delete all the messages from the queue.

The first parameter specifies the name of the queue that is specified in Step 4. The second parameter specifies whether to perform Base64 decoding on the messages. In this example, the second parameter is set to FALSE because Base64 encoding was not performed when the messages were published. The requests to receive messages from the queue are long polling requests. If you set the WaitSeconds parameter to 3, the long-polling request times out after 3 seconds. For more information, see QueueMessage.

python recvdelmessage.py MyQueue1 false

The following result is returned:

==========Receive And Delete Message From Queue==========
QueueName:MyQueue1
WaitSeconds:3

Receive Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTEtOA== MessageBody:I am test message 0. MessageID:E56AE055BAA638AC-1-1570CE4****-200000001
Delete Message Succeed!  ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTEtOA==
Receive Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5NC0xNDczMzkwMjkyLTEtOA== MessageBody:I am test message 1. MessageID:E56AE055BAA638AC-1-1570CE4****-200000002
Delete Message Succeed!  ReceiptHandle:1-ODU4OTkzNDU5NC0xNDczMzkwMjkyLTEtOA==
Receive Message Succeed! ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTItOA== MessageBody:I am test message 2. MessageID:CDAC88D223C0F9E3-2-1570CE4****-200000001
Delete Message Succeed!  ReceiptHandle:1-ODU4OTkzNDU5My0xNDczMzkwMjkyLTItOA==
Queue is empty!

Step 7: Delete the topic

Run the deletetopic.py file to delete the topic. You must specify the name of the topic that is created in Step 2 for the first parameter.

  • Run the following command:

    python deletetopic.py MyTopic1

    The following result is returned:

    Delete Topic Succeed! TopicName:MyTopic1
  • Sample code:

    The endpoint, AccessKey ID, AccessKey secret, and STS token are retrieved from the sample.cfg file that is specified in Step 1.

    my_account = Account(endpoint, accid, acckey, token)
    topic_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleTopic"
    my_topic = my_account.get_topic(topic_name)
    
    try:
        my_topic.delete()
        print "Delete Topic Succeed! TopicName:%s\n" % topic_name
    except MNSExceptionBase, e:
        print "Delete Topic Fail! Exception:%s\n" % e         

Step 8: Delete the queue

Run the deletequeue.py file to delete the queue.

  • Run the following command:

    python deletequeue.py MyQueue1

    The following result is returned:

    Delete Queue Succeed! QueueName:MyQueue1
  • Sample code:

    The endpoint, AccessKey ID, AccessKey secret, and STS token are retrieved from the sample.cfg file that is specified in Step 1.

    my_account = Account(endpoint, accid, acckey, token)
    queue_name = sys.argv[1] if len(sys.argv) > 1 else "MySampleQueue"
    my_queue = my_account.get_queue(queue_name)
    
    #delete queue
    try:
        my_queue.delete()
        print "Delete Queue Succeed! QueueName:%s\n" % queue_name
    except MNSExceptionBase, e:
        print "Delete Queue Fail! Exception:%s\n" % e