All Products
Search
Document Center

ApsaraMQ for RocketMQ:Send normal messages in synchronous, asynchronous, or one-way mode

Last Updated:Mar 11, 2026

ApsaraMQ for RocketMQ supports three transmission modes for normal messages. Each mode offers a different trade-off between delivery reliability and throughput.

ModeBroker responseReliabilityThroughputBest for
SynchronousBlocks until acknowledgedNo message lossHighEmail notifications, registration confirmations, promotional messages
AsynchronousDelivered through callbackNo message lossHighResponse-time-sensitive workflows, video transcoding pipelines
One-wayNonePossible message lossHighestLog collection

All three modes share the same producer setup. The only difference is the send call:

  • Synchronous: producer.send(msg) -- blocks and returns SendResult

  • Asynchronous: producer.send(msg, callback) -- returns immediately, result delivered through SendCallback

  • One-way: producer.sendOneway(msg) -- fire-and-forget, no return value

Prerequisites

Common producer setup

The producer initialization below applies to all three modes. Replace the following placeholders with your values:

PlaceholderDescriptionExample
YOUR GROUP IDGroup ID created in the ApsaraMQ for RocketMQ consoleGID_example
YOUR ACCESS POINTEndpoint from the ApsaraMQ for RocketMQ consolehttp://MQ_INST_XXXX.aliyuncs.com:80
YOUR TOPICTopic created in the ApsaraMQ for RocketMQ consoletopic_example
YOUR MESSAGE TAGTag for message filteringtag_example
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

// Retrieve credentials from environment variables
private static RPCHook getAclRPCHook() {
    return new AclClientRPCHook(new SessionCredentials(
        System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
        System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}

// Create a producer with message trace enabled.
// To disable message trace, use: new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);

// Set the access channel to CLOUD for message trace on Alibaba Cloud.
// Skip this line if message trace is not needed.
producer.setAccessChannel(AccessChannel.CLOUD);

// Set the endpoint from the ApsaraMQ for RocketMQ console.
producer.setNamesrvAddr("YOUR ACCESS POINT");

producer.start();

Synchronous transmission

The producer sends a message and blocks until the broker returns a response. This guarantees delivery confirmation before the next message is sent.

Synchronous transmission

Trade-off: Reliable -- no messages are lost. However, throughput is lower than asynchronous mode because the producer waits for each response before sending the next message.

Use cases: Email notifications, registration confirmations, promotional messages -- scenarios where delivery confirmation is required before proceeding.

Sample code

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
    /**
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
    * and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
    */
    private static RPCHook getAclRPCHook() {
	  return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // Create a producer with message trace enabled.
        // To disable message trace: new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);

        // Set to CLOUD for message trace on Alibaba Cloud. Skip if not needed.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Endpoint from the ApsaraMQ for RocketMQ console (format: http://MQ_INST_XXXX.aliyuncs.com:80).
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // Resend or persist the message if delivery fails.
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Shut down the producer before exiting the application. (Optional)
        producer.shutdown();
    }
}

producer.send(msg) blocks and returns a SendResult containing the message ID and send status.

Asynchronous transmission

The producer sends a message and immediately moves on without waiting for a broker response. Implement a SendCallback to handle the broker response asynchronously.

Asynchronous transmission

Trade-off: Same reliability as synchronous mode (no message loss), but the producer does not block between sends. The added complexity is implementing SendCallback for success and failure handling.

Use cases: Response-time-sensitive workflows with long-running downstream processes. For example, after a video is uploaded, a callback triggers transcoding, and another callback pushes the transcoding result.

Sample code

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQAsyncProducer {
    /**
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
    * and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // Create a producer with message trace enabled.
        // To disable message trace: new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);

        // Set to CLOUD for message trace on Alibaba Cloud. Skip if not needed.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Endpoint from the ApsaraMQ for RocketMQ console (format: http://MQ_INST_XXXX.aliyuncs.com:80).
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override public void onSuccess(SendResult result) {
                        // Message delivered successfully.
                        System.out.println("send message success. msgId= " + result.getMsgId());
                    }

                    @Override public void onException(Throwable throwable) {
                        // Resend or persist the message if delivery fails.
                        System.out.println("send message failed.");
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                // Resend or persist the message if delivery fails.
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Shut down the producer before exiting the application. (Optional)
        producer.shutdown();
    }
}

producer.send(msg, new SendCallback() {...}) does not block. The result is delivered through onSuccess or onException.

One-way transmission

The producer sends a message and returns immediately. No broker response is returned and no callback is triggered. A message can be sent within microseconds.

单向发送

Trade-off: Highest throughput of all three modes, but messages may be lost because the producer cannot confirm delivery.

Use cases: High-volume, low-criticality data streams where occasional message loss is acceptable, such as log collection.

Sample code

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQOnewayProducer {
    /**
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
    * and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        // Create a producer with message trace enabled.
        // To disable message trace: new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);

        // Set to CLOUD for message trace on Alibaba Cloud. Skip if not needed.
        producer.setAccessChannel(AccessChannel.CLOUD);

        // Endpoint from the ApsaraMQ for RocketMQ console (format: http://MQ_INST_XXXX.aliyuncs.com:80).
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.sendOneway(msg);
            } catch (Exception e) {
                // Resend or persist the message if delivery fails.
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // Shut down the producer before exiting the application. (Optional)
        producer.shutdown();
    }
}

producer.sendOneway(msg) returns no SendResult and invokes no callback -- the message is sent on a best-effort basis.

Subscribe to normal messages

Regardless of the transmission mode used to send messages, consume them with DefaultMQPushConsumer. The broker pushes messages to the consumer, which processes them through a MessageListenerConcurrently listener.

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID
    * and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        // Create a consumer with message trace enabled.
        // To disable message trace:
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);

        // Endpoint of the ApsaraMQ for RocketMQ instance.
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");

        // Set to CLOUD for message trace on Alibaba Cloud. Skip if not needed.
    consumer.setAccessChannel(AccessChannel.CLOUD);

        // Topic created in the ApsaraMQ for RocketMQ console.
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}