When you operate multiple ApsaraMQ for RocketMQ instances across regions or environments, you may need to replicate messages between them for disaster recovery, data aggregation, or environment migration. A message inflow task reads messages from a source instance, optionally filters and transforms them, and delivers them to a target instance -- without writing custom consumer-producer bridges.
How it works
A message inflow task runs as a four-stage pipeline:
Source (poll messages) --> Filter (match by event pattern) --> Transform (cleanse and reshape) --> Sink (deliver to target)
The system polls messages from the source instance in configurable batches (1--10,000 messages). Batches are forwarded to the sink at a configurable interval (0--15 seconds). Setting the interval to 0 delivers messages immediately after aggregation.
Prerequisites
Before you begin, make sure that you have:
Two ApsaraMQ for RocketMQ instances (source and target), purchased, deployed, and in the Running state. For more information, see Create resources
Create a message inflow task
Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, choose Message Integration > Tasks.
In the top navigation bar, select a region, such as China (Hangzhou). On the Tasks page, click Create Task.
On the Create Task page, specify a Task Name and Description, then complete the following steps.
Step 1: Configure the source
In the Source step, set Data Provider to ApsaraMQ for RocketMQ and configure the following parameters. Then click Next Step.
| Parameter | Description | Example |
|---|---|---|
| Region | Auto-filled with the region you selected for the message outflow task. | China (Hangzhou) |
| Version | Instance version. Select RocketMQ 4.x or RocketMQ 5.x. | RocketMQ 5.x |
| Instance | The source instance where messages are produced. | rmq-cn-\*\*\*\* |
| Topic | The topic on the source instance to consume messages from. | topic |
| Tag | A tag to filter messages on the source instance. | test\_tag |
| Group ID | The consumer group used to track consumption progress. Options: | Quickly Create |
- Quickly Create (recommended): Auto-creates a group named GID_EVENTBRIDGE_xxx. | ||
| - Use Existing Group: Select an existing group that is not actively in use. | ||
| Consumer Offset | The starting position for message consumption. Options: | Latest Offset |
| - Latest Offset: Start from the most recent message. | ||
| - Earliest Offset: Start from the oldest available message. | ||
| - Timestamp: Start from a specific point in time. Requires the Consumed At parameter. | ||
| Consumed At | The timestamp from which to start consuming. Required only when Consumer Offset is set to Timestamp. | 2024-06-18 15:28:29 |
| Data Format (Body) | Encoding applied to binary message data before delivery. Options: | JSON |
| - JSON (default): Encodes binary data as JSON using UTF-8 and places it in the payload. | ||
| - Text: Encodes binary data as a UTF-8 string and places it in the payload. | ||
| - Binary: Encodes binary data as a Base64 string and places it in the payload. | ||
| Messages | Maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches this value. Valid values: 1 to 10000. | 100 |
| Interval (Unit: Seconds) | Time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified interval. Valid values: 0 to 15. Set to 0 to deliver immediately after aggregation. | 3 |
Selecting Use Existing Group with a group that is actively in use affects the publishing and subscription of existing messages. Select a group that is not in use, or use Quickly Create instead.
Step 2: Configure filtering
In the Filtering step, define a data pattern to select which messages pass through the pipeline. Only messages that match the pattern are forwarded to the transformation stage.
For more information, see Event patterns.
Step 3: Configure transformation
In the Transformation step, specify a data cleansing method. Supported operations include splitting, mapping, enrichment, and dynamic routing.
For more information, see Use Function Compute to perform message cleansing.
Step 4: Configure the sink
In the Sink step, set Service Type to ApsaraMQ for RocketMQ and configure the following parameters.
| Parameter | Description | Example |
|---|---|---|
| Version | Target instance version. Select RocketMQ 4.x or RocketMQ 5.x. | RocketMQ 5.x |
| Instance ID | The target instance to deliver messages to. | rmq-cn-\*\*\*\* |
| Topic | The topic on the target instance. | topic |
| Message Body | Determines the message body content delivered to the target. Options: Complete Data, Data Extraction, Fixed Value, Template. | Data Extraction |
Example extraction path: $.data.body | ||
| Custom Property | Sets custom properties on the target message. Options: Not Specified, Data Extraction, Template. | Template |
| Example template parameters: | ||
{"userProperties":"$.data.userProperties", "msgId":"$.data.systemProperties.UNIQ_KEY"} | ||
| Example template: | ||
{"EB_SYS_EMBED_OBJECT":"${userProperties}", "UNIQ_KEY":"${msgId}"} | ||
| Message Key | Sets the message key on the target. Options: Not Specified, Data Extraction, Fixed Value, Template. | Data Extraction |
Example extraction path: $.data.systemProperties.KEYS | ||
| Message Tag | Sets the message tag on the target. Options: Not Specified, Data Extraction, Fixed Value, Template. | Data Extraction |
Example extraction path: $.data.systemProperties.TAGS |
Step 5: Configure task properties
Configure the retry policy for failed event deliveries and the fault handling method.
For more information, see Retry policies and dead-letter queues.
Step 6: Save and verify
Click Save. On the Tasks page, find the task. When the Status column changes from Starting to Running, the task is active and synchronizing messages.
Manage tasks
On the Tasks page, find the task and use the Actions column to perform the following operations:
| Action | Description |
|---|---|
| Details | View the Task Details page, which shows basic information, properties, and monitoring metrics for the task. |
| Edit | Open the Edit Task panel to modify the task configuration, including source, sink, and task properties. |
| Enable / Pause | Start or pause the task. In the Note message, click OK. |
| Delete | Permanently delete the task. In the Note message, click OK. |