All Products
Search
Document Center

ApsaraMQ for RocketMQ:Step 3: Use SDKs to send and receive messages

Last Updated:Apr 22, 2024

ApsaraMQ for RocketMQ provides SDKs for multiple programming languages to send and receive messages of different types. This topic describes how to use the SDK for Java to connect to an ApsaraMQ for RocketMQ broker to send and receive normal messages.

Prerequisites

  • The required resources are created in ApsaraMQ for RocketMQ. For more information, see Step 2: Create resources.

  • IntelliJ IDEA is installed. For more information, see IntelliJ IDEA.

    You can use IntelliJ IDEA or Eclipse. In the examples of this topic, IntelliJ IDEA Ultimate is used.

  • JDK 1.8 or later is installed. For more information, see Java Downloads.

  • Maven 2.5 or later is installed. For more information, see Downloading Apache Maven.

Install the Java dependency library

  1. Create a Java project in IntelliJ IDEA.

  2. Add the following dependency to the pom.xml file to import the Java dependency library:

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client-java</artifactId>
        <version>5.0.4</version>
    </dependency>

Produce messages

In the created Java project, create a program that sends normal messages and run the program. Sample code:

package doc;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;


public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If you access an instance whose client is deployed on an Elastic Compute Service (ECS) instance over an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint. 
         * If you access an instance over the Internet or from a data center, you can specify the public endpoint. If you access an instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned. 
        String topic = "Your Topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access an instance by using a public endpoint, you must specify the username and password of the instance. You can obtain the instance username and password of an instance on the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If you access an instance whose client is deployed on an ECS instance over an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a Serverless Edition instance, you must specify the username and password of the instance, regardless of whether you access the instance over the Internet or in a VPC. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        ClientConfiguration configuration = builder.build();
        /**
         * When you initialize a producer, you can specify the topics that you want to use to check whether the topic settings are valid and prevent invalid topics from being started. 
         * You do not need to specify the topics for non-transactional messages. The broker dynamically checks whether the topics are valid. 
         * Note: To prevent the API operation that is called to query transactional messages from failing, you must specify topics for transactional messages in advance. 
         */
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        // Send a normal message. 
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // The message index key. You can use a keyword to accurately find the message. 
                .setKeys("messageKey")
                // The message tag. The consumer can use the tag to filter messages. 
                .setTag("messageTag")
                // The message body. 
                .setBody("messageBody".getBytes())
                .build();
        try {
            // Send the message. Take note of the result and capture exceptions such as failures. 
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

Consume messages

In the created Java project, create a program to subscribe to normal messages and run the program. ApsaraMQ for RocketMQ allows you to consume messages in simple mode and push mode. For more information, see SimpleConsumer and PushConsumer. You can select one of the modes to subscribe to messages. The following table describes the differences between simple consumers and push consumers.

Item

PushConsumer

SimpleConsumer

API operation call

The callback operation is called to return the consumption result by using a message listener. Consumers can process the consumption logic only within the scope of the message listener.

Business applications implement message processing and call the corresponding operation to return the consumption result.

Consumption concurrency management

ApsaraMQ forRocketMQ SDKs are used to manage the number of concurrent threads for message consumption.

The number of concurrent threads that are used for message consumption is based on the consumption logic of individual business applications.

API flexibility

The API operations are encapsulated and provide poor flexibility.

The atomic operations provide great flexibility.

Scenarios

This consumer type is suitable for development scenarios that do not require a custom process.

This consumer type is suitable for development scenarios that require custom processes.

PushConsumer

package doc;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
     /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If you access an instance whose client is deployed on an ECS instance over an internal network, we recommend that you specify the VPC endpoint. 
         * If you access an instance over the Internet or from a data center, you can specify the public endpoint. If you access an instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
    /**
         * If you access an instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the instance username and password of an instance on the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If you access an instance whose client is deployed on an ECS instance over an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a Serverless Edition instance, you must specify the username and password of the instance, regardless of whether you access the instance over the Internet or in a VPC. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a push consumer. When you initialize the push consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Specify the consumer group. 
                .setConsumerGroup(consumerGroup)
                // Specify the subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // Specify the message listener. 
                .setMessageListener(messageView -> {
                    // Consume the messages and return the consumption result. 
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // If you no longer require the push consumer, shut down the process. 
        //pushConsumer.close();
    }
}                                                 

SimpleConsumer

package doc;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
    /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If you access an instance whose client is deployed on an ECS instance over an internal network, we recommend that you specify the VPC endpoint. 
         * If you access an instance over the Internet or from a data center, you can specify the public endpoint. If you access an instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
    /**
         * If you access an instance by using a public endpoint, you must specify the username and password of the instance. You can obtain the instance username and password of an instance on the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If you access an instance whose client is deployed on an ECS instance over an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a Serverless Edition instance, you must specify the username and password of the instance, regardless of whether you access the instance over the Internet or in a VPC. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a simple consumer. When you initialize the simple consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Specify the consumer group. 
                .setConsumerGroup(consumerGroup)
                // Specify the timeout period for long polling requests. 
                .setAwaitDuration(awaitDuration)
                // Specify the subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        // Specify the maximum number of messages to be pulled. 
        int maxMessageNum = 16;
        // Specify the invisible time of the messages. 
        Duration invisibleDuration = Duration.ofSeconds(10);
        // If you use a simple consumer to consume messages, the client must obtain and consume messages in a loop. 
        // To consume messages in real time, we recommend that you use multiple threads to concurrently pull messages. 
        while (true) {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            messages.forEach(messageView -> {
                // LOGGER.info("Received message: {}", messageView);
                System.out.println("Received message: " + messageView);
            });
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    // After consumption is complete, the consumer must call the ACK method to commit the consumption result to the broker. 
                    consumer.ack(message);
                    System.out.println("Message is acknowledged successfully, messageId= " + messageId);
                    //LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    t.printStackTrace();
                    //LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        }
        // If you no longer require the simple consumer, shut down the process. 
        // consumer.close();
    }
}                                           

Verify messages

After you consume a message, you can check the consumption status of the message in the ApsaraMQ for RocketMQ console.

  1. Log on to the ApsaraMQ for RocketMQ console. On the Instances page, select the instance that you want to manage.

  2. In the left-side navigation pane of the page that appears, click Message Tracing.

SDK references

For information about how to use SDKs for other programming languages to send and receive messages of other types, see Overview.