All Products
Search
Document Center

ApsaraMQ for RocketMQ:Synchronize messages between ApsaraMQ for RocketMQ instances

Last Updated:Mar 11, 2026

When you operate multiple ApsaraMQ for RocketMQ instances across regions or environments, you may need to replicate messages between them for disaster recovery, data aggregation, or environment migration. A message inflow task reads messages from a source instance, optionally filters and transforms them, and delivers them to a target instance -- without writing custom consumer-producer bridges.

How it works

A message inflow task runs as a four-stage pipeline:

Source (poll messages) --> Filter (match by event pattern) --> Transform (cleanse and reshape) --> Sink (deliver to target)

The system polls messages from the source instance in configurable batches (1--10,000 messages). Batches are forwarded to the sink at a configurable interval (0--15 seconds). Setting the interval to 0 delivers messages immediately after aggregation.

Prerequisites

Before you begin, make sure that you have:

  • Two ApsaraMQ for RocketMQ instances (source and target), purchased, deployed, and in the Running state. For more information, see Create resources

Create a message inflow task

  1. Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, choose Message Integration > Tasks.

  2. In the top navigation bar, select a region, such as China (Hangzhou). On the Tasks page, click Create Task.

  3. On the Create Task page, specify a Task Name and Description, then complete the following steps.

Step 1: Configure the source

In the Source step, set Data Provider to ApsaraMQ for RocketMQ and configure the following parameters. Then click Next Step.

ParameterDescriptionExample
RegionAuto-filled with the region you selected for the message outflow task.China (Hangzhou)
VersionInstance version. Select RocketMQ 4.x or RocketMQ 5.x.RocketMQ 5.x
InstanceThe source instance where messages are produced.rmq-cn-\*\*\*\*
TopicThe topic on the source instance to consume messages from.topic
TagA tag to filter messages on the source instance.test\_tag
Group IDThe consumer group used to track consumption progress. Options:Quickly Create
- Quickly Create (recommended): Auto-creates a group named GID_EVENTBRIDGE_xxx.
- Use Existing Group: Select an existing group that is not actively in use.
Consumer OffsetThe starting position for message consumption. Options:Latest Offset
- Latest Offset: Start from the most recent message.
- Earliest Offset: Start from the oldest available message.
- Timestamp: Start from a specific point in time. Requires the Consumed At parameter.
Consumed AtThe timestamp from which to start consuming. Required only when Consumer Offset is set to Timestamp.2024-06-18 15:28:29
Data Format (Body)Encoding applied to binary message data before delivery. Options:JSON
- JSON (default): Encodes binary data as JSON using UTF-8 and places it in the payload.
- Text: Encodes binary data as a UTF-8 string and places it in the payload.
- Binary: Encodes binary data as a Base64 string and places it in the payload.
MessagesMaximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches this value. Valid values: 1 to 10000.100
Interval (Unit: Seconds)Time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified interval. Valid values: 0 to 15. Set to 0 to deliver immediately after aggregation.3
Warning

Selecting Use Existing Group with a group that is actively in use affects the publishing and subscription of existing messages. Select a group that is not in use, or use Quickly Create instead.

Step 2: Configure filtering

In the Filtering step, define a data pattern to select which messages pass through the pipeline. Only messages that match the pattern are forwarded to the transformation stage.

For more information, see Event patterns.

Step 3: Configure transformation

In the Transformation step, specify a data cleansing method. Supported operations include splitting, mapping, enrichment, and dynamic routing.

For more information, see Use Function Compute to perform message cleansing.

Step 4: Configure the sink

In the Sink step, set Service Type to ApsaraMQ for RocketMQ and configure the following parameters.

ParameterDescriptionExample
VersionTarget instance version. Select RocketMQ 4.x or RocketMQ 5.x.RocketMQ 5.x
Instance IDThe target instance to deliver messages to.rmq-cn-\*\*\*\*
TopicThe topic on the target instance.topic
Message BodyDetermines the message body content delivered to the target. Options: Complete Data, Data Extraction, Fixed Value, Template.Data Extraction
Example extraction path: $.data.body
Custom PropertySets custom properties on the target message. Options: Not Specified, Data Extraction, Template.Template
Example template parameters:
{"userProperties":"$.data.userProperties", "msgId":"$.data.systemProperties.UNIQ_KEY"}
Example template:
{"EB_SYS_EMBED_OBJECT":"${userProperties}", "UNIQ_KEY":"${msgId}"}
Message KeySets the message key on the target. Options: Not Specified, Data Extraction, Fixed Value, Template.Data Extraction
Example extraction path: $.data.systemProperties.KEYS
Message TagSets the message tag on the target. Options: Not Specified, Data Extraction, Fixed Value, Template.Data Extraction
Example extraction path: $.data.systemProperties.TAGS

Step 5: Configure task properties

Configure the retry policy for failed event deliveries and the fault handling method.

For more information, see Retry policies and dead-letter queues.

Step 6: Save and verify

Click Save. On the Tasks page, find the task. When the Status column changes from Starting to Running, the task is active and synchronizing messages.

Manage tasks

On the Tasks page, find the task and use the Actions column to perform the following operations:

ActionDescription
DetailsView the Task Details page, which shows basic information, properties, and monitoring metrics for the task.
EditOpen the Edit Task panel to modify the task configuration, including source, sink, and task properties.
Enable / PauseStart or pause the task. In the Note message, click OK.
DeletePermanently delete the task. In the Note message, click OK.