All Products
Search
Document Center

ApsaraMQ for RocketMQ:Normal messages

Last Updated:Mar 11, 2026

Normal messages are the default message type in ApsaraMQ for RocketMQ. Unlike ordered, scheduled, delayed, and transactional messages, normal messages have no special features. Normal messages support asynchronous decoupled communications between producers and consumers. Use normal messages when your application requires reliable delivery but does not need strict sequencing or timed delivery.

When to use normal messages

Normal messages suit any scenario that prioritizes transmission reliability over processing order. Common examples include microservice decoupling, data integration, and event-driven architectures:

  • Microservice decoupling: An upstream service encapsulates order placement and payment as an independent normal message and sends it to the ApsaraMQ for RocketMQ broker. Downstream services subscribe independently and process the message according to their own logic. Each message is self-contained -- no cross-message dependencies.

    Asynchronous decoupling between an order system and downstream services

  • Data integration: An instrumentation component collects operation logs from frontend applications and forwards them to ApsaraMQ for RocketMQ. Each message is a discrete log data record. The broker stores and delivers records to downstream storage systems without transforming them. Subsequent tasks are processed by backend applications.

    Log data pipeline from frontend applications to downstream storage

Message lifecycle

A normal message passes through five stages between creation and deletion:

Lifecycle of a normal message
StageDescription
InitializedThe producer builds the message and prepares it for sending.
ReadyThe broker accepts the message. It is now visible and available to consumers.
InflightA consumer retrieves the message and begins processing it. If the broker receives no acknowledgment within a specific period of time, it retries delivery.
AckedThe consumer commits a consumption result. The broker marks whether the current message is consumed but does not delete it immediately.
DeletedThe message is removed when the retention period expires or storage space runs low. Deletion follows a rolling strategy that removes the oldest messages first from the physical file. For details, see Message storage and cleanup.

By default, ApsaraMQ for RocketMQ retains all messages even after they are acked. Consumers can re-consume any message that has not yet been deleted.

Code examples (Java)

The following examples use the Apache RocketMQ 5.x SDK to send and consume normal messages. Each example configures a client connection and performs a single operation. Replace placeholders with your actual values before running the code.

For the complete SDK reference and examples in other languages, see Apache RocketMQ 5.x SDKs.

Before you begin

Prepare the following resources in the ApsaraMQ for RocketMQ console:

ResourceWhere to find it
EndpointInstance Details page > Endpoints tab. Use the VPC endpoint for access from Elastic Compute Service (ECS) instances in the same virtual private cloud (VPC). Use the public endpoint for internet access (requires the internet access feature to be enabled).
TopicCreate a topic with MessageType set to Normal. Normal messages can only be sent to topics of this type.
Consumer groupRequired for consumer examples. Create one in the console.
Credentials (public endpoint only)Username and password from the Access Control page > Intelligent Authentication tab. Not required for VPC access, where the broker authenticates automatically. For serverless instances accessed over a VPC with the authentication-free in VPCs feature enabled, credentials are also not required.

Placeholder reference

PlaceholderDescriptionExample
<your-endpoint>Instance endpointrmq-cn-xxx.cn-hangzhou.rmq.aliyuncs.com:8080
<your-topic>Topic namenormal-topic-01
<your-consumer-group>Consumer group nameGID_normal_consumer
<your-username>Instance username (public endpoint only)--
<your-password>Instance password (public endpoint only)--

Sample code

Send a normal message

package doc;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        // Specify the instance endpoint.
        String endpoints = "<your-endpoint>";
        String topic = "<your-topic>";

        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        // Uncomment the following line for public endpoint access:
        // builder.setCredentialProvider(new StaticSessionCredentialsProvider("<your-username>", "<your-password>"));
        ClientConfiguration configuration = builder.build();

        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();

        // Build a normal message with a key and tag for filtering and tracing.
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setKeys("messageKey")
                .setTag("messageTag")
                .setBody("messageBody".getBytes())
                .build();

        try {
            SendReceipt sendReceipt = producer.send(message);
            System.out.println("Send success. Message ID: " + sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

Expected output:

Send success. Message ID: 01BE0A3600F5762D0455025C2D1A0000

Consume with a push consumer

A push consumer receives messages through a MessageListener callback. The broker pushes messages to the consumer automatically.

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        String endpoints = "<your-endpoint>";
        String topic = "<your-topic>";
        String consumerGroup = "<your-consumer-group>";

        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        // Uncomment the following line for public endpoint access:
        // builder.setCredentialProvider(new StaticSessionCredentialsProvider("<your-username>", "<your-password>"));
        ClientConfiguration clientConfiguration = builder.build();

        // Subscribe to all messages in the topic (tag = "*").
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);

        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .setMessageListener(messageView -> {
                    System.out.println("Received message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();

        // Keep the process running to continue receiving messages.
        Thread.sleep(Long.MAX_VALUE);

        // Close the consumer when it is no longer needed.
        // pushConsumer.close();
    }
}

Expected output:

Received message: MessageView{messageId=01BE0A3600F5762D0455025C2D1A0000, topic=<your-topic>, ...}

Consume with a simple consumer

A simple consumer pulls messages explicitly through receive() and acknowledges each message with ack(). This mode gives you full control over the consumption pace.

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    public static void main(String[] args) throws ClientException, IOException {
        String endpoints = "<your-endpoint>";
        String topic = "<your-topic>";
        String consumerGroup = "<your-consumer-group>";

        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        // Uncomment the following line for public endpoint access:
        // builder.setCredentialProvider(new StaticSessionCredentialsProvider("<your-username>", "<your-password>"));
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);

        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setAwaitDuration(awaitDuration)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();

        int maxMessageNum = 16;
        Duration invisibleDuration = Duration.ofSeconds(10);

        // Pull and acknowledge messages in a loop.
        // For real-time consumption, use multiple threads to pull concurrently.
        while (true) {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            messages.forEach(messageView -> {
                System.out.println("Received message: " + messageView);
            });
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    consumer.ack(message);
                    System.out.println("Acked message: " + messageId);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        }

        // Close the consumer when it is no longer needed.
        // consumer.close();
    }
}

Expected output:

Received message: MessageView{messageId=01BE0A3600F5762D0455025C2D1A0000, topic=<your-topic>, ...}
Acked message: 01BE0A3600F5762D0455025C2D1A0000

Best practices

Set a globally unique message key for every message

Assign a business-specific identifier -- such as an order ID or user ID -- as the message key. This lets you query individual messages and their traces in the ApsaraMQ for RocketMQ console.

Message message = provider.newMessageBuilder()
        .setTopic(topic)
        .setKeys("order-12345")   // Use a unique business identifier.
        .setTag("payment")
        .setBody(payload)
        .build();

Choose the right consumer mode

ModeHow it worksBest for
Push consumerThe broker pushes messages to a MessageListener callback.Low-latency, event-driven processing where the consumer handles each message as it arrives.
Simple consumerThe application calls receive() to pull a batch, then ack() each message after processing.Workloads that require manual flow control or batching.