If an Alibaba Cloud account grants a RAM user permissions to operate topic resources on Message Queue for Apache RocketMQ instances, the RAM user cannot directly use the group IDs created by the Alibaba Cloud account. In this case, log on to the Message Queue for Apache RocketMQ console, view the instances and topics whose resource operation permissions have been granted to the RAM user, and create another group ID. Then, you can use the Message Queue for Apache RocketMQ SDK of the required language and version to send and subscribe to messages.

Prerequisites

  • You need the AccessKey pair that consists of an AccessKey ID and an AccessKey secret for authentication when using SDKs to send and subscribe to messages. Make sure that the Alibaba Cloud account has granted the programming and access permissions to your RAM user, so that the RAM user can use the AccessKey pair. For more information, see Create a RAM user.
  • The RAM user has been granted the operation permissions for the specified topic.

    If the RAM user is not granted the permissions, contact the Alibaba Cloud account to grant the permissions by referring to Grant permissions to RAM users.

Step 1: View the target instances and topics

  1. In your browser, open the RAM user logon portal, and log on with the RAM user account as instructed.
  2. In the product list, find Message Queue for Apache RocketMQ and then click Message Queue for Apache RocketMQ.
  3. In the left-side navigation pane in the Message Queue for Apache RocketMQ console, click Instances to view the basic information about the instances whose resource operation permissions have been granted to the RAM user.
  4. In the left-side navigation pane, click Topics to view the topics whose resource operation permissions have been granted to the RAM user.

Step 2: Create resources

Create a group ID

Next, you need to create a group ID for the message consumers or producers.
  • The group ID must be unique within the same instance.
  • Group IDs and topics implement N:N mapping. A consumer can subscribe to multiple topics and a topic can be subscribed by multiple consumers. A producer can send messages to multiple topics and a topic can receive messages from multiple producers.
  • A group ID is required for consumers but is optional for producers.
  1. In the left-side navigation pane, click Groups.
  2. On the Groups page, select an instance whose resource operation permissions have been granted to the RAM user, and then choose TCP > Create Group ID. This topic takes TCP as an example.
    Note The group ID of TCP-based instances cannot be used for HTTP-based instances and vice versa. You need to create the group IDs for TCP-based instances and for HTTP-based instances separately.
  3. In the Create Group ID dialog box, set Group ID and Description. Then, click OK.

Create an Alibaba Cloud AccessKey pair

The Alibaba Cloud AccessKey pair is used for identity authentication during message sending and subscription.

When using Message Queue for Apache RocketMQ SDKs to send and subscribe to messages, you not only need to specify the topic and group ID, but also need to enter the AccessKey pair you created in the Alibaba Cloud console for identity authentication.

For more information about how to create an AccessKey pair, see Create an AccessKey.

Step 3: Obtain an endpoint

After creating resources in the console, you need to obtain the endpoint of the instance or region in the console. To access services in an instance or a region when sending or subscribing to messages, you need to configure the endpoints for the producer and consumer.

  1. In the left-side navigation pane, click Instances.
  2. On the Instances page, select the instance you created.
  3. The Instance Information tab appears by default. In the Endpoint Information section, you can view the TCP endpoint and HTTP endpoint. The endpoints vary with the protocol. The following describes the endpoints when different protocols are used:
    • TCP: The endpoint displayed in the console is the endpoint of a specific instance in the region. Different instances in the same region have different endpoints.
    • HTTP: The endpoint displayed in the console is the endpoint of a region, instead of a specific instance. You need to configure an ID for the instance when sending and subscribing to messages.
  4. In the TCP Endpoint section, click Copy.

    As for TCP endpoints, you can click Sample Code to view how to set endpoints in different programming languages.

After completing the preceding preparations, you can run the sample code and use Message Queue for Apache RocketMQ to send and subscribe to messages.

Step 4: Send messages

You can send messages in the following ways:

  • To quickly verify the availability of the topics, you can send messages in the console.
    1. In the left-side navigation pane, click Topics.
    2. On the Topics page, find the topic that you created and click Send in the Actions column.
    3. In the Send Message dialog box, enter the message content in the Message Body field and click OK.

    The console returns a success prompt message and the corresponding message ID once the message is sent.

  • In the production environment, you can call SDKs to send messages in Message Queue for Apache RocketMQ.

    This topic describes how to call a TCP Java SDK to send messages.

    Send messages by using the TCP Java SDK

    1. Use either of the following methods to add the dependency of the Java SDK:
      • Use Maven:
        <dependency>
         <groupId>com.aliyun.openservices</groupId>
         <artifactId>ons-client</artifactId>
         <version>"XXX"</version>
            // Enter the latest Java SDK version.
        </dependency>                           

        For more information about the latest Java SDK version, see Release notes.

      • Download a dependency JAR package:

        For more information about the download URL of the latest Java SDK, see Release notes.

    2. Set related parameters and run the sample code according to the following instructions:
          import com.aliyun.openservices.ons.api.Message;
          import com.aliyun.openservices.ons.api.Producer;
          import com.aliyun.openservices.ons.api.SendResult;
          import com.aliyun.openservices.ons.api.ONSFactory;
          import com.aliyun.openservices.ons.api.PropertyKeyConst;
      
          import java.util.Properties;
      
          public class ProducerTest {
              public static void main(String[] args) {
                  Properties properties = new Properties();
                  // The group ID you created in the console.
                  properties.put(PropertyKeyConst.GROUP_ID, "XXX");
                  // The AccessKey ID used for RAM user authentication, which is created by the Alibaba Cloud account.
                  properties.put(PropertyKeyConst.AccessKey,"XXX");
                  // The AccessKey secret used for RAM user authentication, which is created by the Alibaba Cloud account.
                  properties.put(PropertyKeyConst.SecretKey, "XXX");
                  // Set the TCP endpoint: Go to the Instances page in the Message Queue for Apache RocketMQ console, select the target instance, and view the endpoint in the Endpoint Information section.
                  properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
      
                  Producer producer = ONSFactory.createProducer(properties);
                  // Before sending a message, call the start method once to start the producer.
                  producer.start();
      
                  // Send messages cyclically.
                  while(true){
                      Message msg = new Message( //
                          // The topic you created in the console. This is the name of the topic to which the message belongs.
                          "TopicTestMQ",
                          // The message tag.
                          // The message tag is similar to a Gmail tag. It is used to sort messages, enabling the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on the specified criteria.
                          "TagA",
                          // The message body.
                          // It may be any data in binary form, and Message Queue for Apache RocketMQ does not process the message body.
                          // The producer and consumer must negotiate the consistent serialization and deserialization methods.
                          "Hello MQ".getBytes());
                      // The message key, which must be globally unique, so that you can query a message and resend it in the console if you fail to receive the message.
                      // Note: Messages can still be sent and received even if this attribute is not set.
                      msg.setKey("ORDERID_100");
                      // The message sending result, which is successful if no exception occurs.
                      // Print the message ID to facilitate querying the message sending status.
                      SendResult sendResult = producer.send(msg);
                      System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
                  }
      
                  // You can destroy the producer before exiting the application.
                  // Note: You can choose not to destroy the producer object.
                  producer.shutdown();
              }
          }                    
    3. Check whether messages are sent.

      Once a message is sent, you can check its sending status in the console by performing the following operations:

      1. In the left-side navigation pane, choose Message Query > By Message ID.
      2. In the search box, enter the message ID returned after the message is sent, and click Search to query the sending status of the message.

        Storage Time indicates the time when the Message Queue for Apache RocketMQ broker stores the message. If the message can be queried out, the message has been sent to the Message Queue for Apache RocketMQ broker.

      Notice This step demonstrates the situation where Message Queue for Apache RocketMQ is used for the first time but the consumer has not been started yet. Therefore, no consumption data is displayed in the message status information. To start the consumer and subscribe to messages, see Step 5. For more information about the message status, see Query messages and Query a message trace.

Step 5: Subscribe to messages by using an SDK

Once a message is sent, you need to start the consumer to subscribe to messages. The following uses the TCP Java SDK as an example.

  1. To subscribe to messages by using the TCP Java SDK, you can run the following sample code to start the consumer and test the message subscription function. You must set related parameters correctly according to the instructions.
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
    import java.util.Properties;
    
    public class ConsumerTest {
        public static void main(String[] args) {
            Properties properties = new Properties();
            // The group ID you created in the console.
            properties.put(PropertyKeyConst.GROUP_ID, "XXX");
            // The AccessKey ID used for RAM user authentication, which is created by the Alibaba Cloud account.
            properties.put(PropertyKeyConst.AccessKey, "XXX");
            // The AccessKey secret used for RAM user authentication, which is created by the Alibaba Cloud account.
            properties.put(PropertyKeyConst.SecretKey, "XXX");
            // Set the TCP endpoint: Go to the Instances page in the Message Queue for Apache RocketMQ console, select the target instance, and view the endpoint in the Endpoint Information section.
            properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
    
            Consumer consumer = ONSFactory.createConsumer(properties);
            consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
                public Action consume(Message message, ConsumeContext context) {
                    System.out.println("Receive: " + message);
                    return Action.CommitMessage;
                }
            });
            consumer.start();
            System.out.println("Consumer Started");
        }
    }            
  2. Check whether message subscription is successful.

    Once the preceding steps have been completed, you can check whether the consumer has been started in the console, that is, whether the message subscription is successful.

    1. In the left-side navigation pane, click Groups.
    2. Find the group ID you want to view, and click Subscription in the Actions column.

      If Online is displayed and Subscription Consistency is Yes, the subscription is successful. Otherwise, the subscription fails.

References