All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive delayed messages

Last Updated:Jan 08, 2024

This topic provides sample code on how to send and receive delayed messages by using the TCP client SDK for Java.

Prerequisites

Before you start, make sure that the following operations are performed:

  • The SDK for Java is installed. For more information, see Prepare the environment.

  • The resources that you want to specify in the code are created in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.

  • The AccessKey pair of your Alibaba Cloud account is obtained. For more information, see Create an AccessKey pair.

  • Optional. Logging settings are configured. for more information, see Logging settings.

Background information

Delayed messages are delivered from an ApsaraMQ for RocketMQ broker to a client for consumption after a period of time, such as 3 seconds. Delayed messages are similar to delayed queues and can be used in scenarios in which a time window between message production and message consumption is required, or scenarios in which delayed tasks are triggered by messages.

For information about the terms that are used for delayed messages and the precautions that you must take when you use delayed messages, see Scheduled messages and delayed messages.

Note

If you are a new user of ApsaraMQ for RocketMQ, we recommend that you refer to the Demo project to create a project before you use ApsaraMQ for RocketMQ to send and receive messages.

Send delayed messages

For information about the sample code, see ApsaraMQ for RocketMQ code library.

The following sample code provides an example on how to send delayed messages by using the TCP client SDK for Java:

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // The AccessKey secret that is used for authentication. 
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The TCP endpoint. You can obtain the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
          "XXX");

        Producer producer = ONSFactory.createProducer(properties);
        // Before you send the message, call the start() method only once to start the producer. 
        producer.start();
        Message msg = new Message( 
                // The topic that you created in the ApsaraMQ for RocketMQ console. 
                "Topic",
                // The message tag. A message tag is similar to a Gmail tag and can be used by consumers to filter messages on the ApsaraMQ for RocketMQ broker. 
                "tag",
                // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods to serialize and deserialize a message body. 
                "Hello MQ".getBytes());
        // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. 
        // If you cannot receive a message as expected, you can use the key to query and resend the message in the ApsaraMQ for RocketMQ console. 
        // Note: You can send and receive a message even if you do not specify the key. 
        msg.setKey("ORDERID_100");
        try {
            // The delay time before the message is sent. The value must be later than the current time. Unit: milliseconds. The maximum value that you can specify is equal to 40 days. 
            // In the following example, the message is delivered after a delay of 3 seconds. 
            long delayTime = System.currentTimeMillis() + 3000;

            // The point in time when the broker starts to deliver the message. 
            msg.setStartDeliverTime(delayTime);

            SendResult sendResult = producer.send(msg);
            // Send the message in synchronous transmission mode. If no exception is thrown, the message is sent. 
            if (sendResult != null) {
            System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
            }
            } catch (Exception e) {
            // The logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            e.printStackTrace();
        }
        // Before you exit the application, destroy the producer. 
        // Note: If you destroy a producer, memory can be saved. If you need to frequently send messages, do not destroy the producer. 
        producer.shutdown();
    }
}           

Subscribe to delayed messages

The sample code for subscribing to delayed messages is the same as that for subscribing to normal messages. For more information, see Subscribe to messages.