All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send and receive scheduled messages

Last Updated:Jan 08, 2024

This topic provides sample code on how to send and receive scheduled messages by using the TCP client SDK for Java.

Background information

Scheduled messages are consumed after a predefined timestamp. Scheduled messages can be used in scenarios in which a time window between message production and consumption is required, or in which scheduled tasks are triggered by using messages.

For information about the terms that are used for scheduled messages and the precautions that you must take when you use scheduled messages, see Scheduled messages and delayed messages.

Note

If you are a new user of ApsaraMQ for RocketMQ, we recommend that you refer to the Demo project to build an ApsaraMQ for RocketMQ project to send and receive messages.

Prerequisites

Before you start, make sure that the following operations are performed:

  • The SDK for Java is installed. For more information, see Prepare the environment.

  • The resources that you want to specify in the code are created in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.

  • The AccessKey pair of your Alibaba Cloud account is obtained. For more information, see Create an AccessKey pair.

  • Optional. Logging settings are configured. for more information, see Logging settings.

Send scheduled messages

For information about the sample code, see ApsaraMQ for RocketMQ code library.

The following sample code provides an example on how to send scheduled messages by using the TCP client SDK for Java:

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // The AccessKey secret that is used for authentication. 
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The TCP endpoint. You can obtain the endpoint in the TCP endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
          "XXX");
        Producer producer = ONSFactory.createProducer(properties);
        // Before you send the message, call the start() method only once to start the producer. 
        producer.start();
        Message msg = new Message(
                // The topic in which the message is produced. 
                "Topic",
                // The message tag. A message tag is similar to a Gmail tag and is used by consumers to filter messages on the ApsaraMQ for RocketMQ broker. 
                "tag",
                // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods to serialize and deserialize a message body. 
                "Hello MQ".getBytes());
        // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. 
        // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. 
        // Note: You can send and receive a message even if you do not specify the key. 
        msg.setKey("ORDERID_100");

        try {
            // The timestamp that indicates the time when the broker delivers the message to the consumer. Unit: milliseconds. For example, if you set this parameter to 2016-03-07 16:21:00, the broker delivers the message at 16:21:00 on March 7, 2016. The value must be later than the current time. If you set this parameter to a time that is earlier than the current time, the message is immediately delivered to the consumer. 
            long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();

            msg.setStartDeliverTime(timeStamp);
            // Send the message. If no exception is thrown, the message is sent. 
            SendResult sendResult = producer.send(msg);
            System.out.println("Message Id:" + sendResult.getMessageId());
        }
        catch (Exception e) {
            // Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            e.printStackTrace();
        }

        // Before you exit the application, destroy the producer. 
        // Note: Memory can be saved if you destroy a producer. If you need to frequently send messages, do not destroy a producer. 
        producer.shutdown();
    }
}       

Subscribe to scheduled messages

The sample code for subscribing scheduled messages is the same as that for subscribing to normal messages. For more information, see Subscribe to messages.