This topic describes how to create a message outflow task in the ApsaraMQ for RocketMQ console to synchronize data from ApsaraMQ for RocketMQ to Simple Message Queue (SMQ, formerly MNS). A message outflow task reads messages from a RocketMQ topic, optionally filters and transforms them, and delivers the results to an SMQ queue.
How it works
A message outflow task moves messages through four stages:
Source (RocketMQ) --> Filtering (optional) --> Transformation (optional) --> Sink (SMQ)| Stage | Purpose |
|---|---|
| Source | Reads messages from an ApsaraMQ for RocketMQ topic. |
| Filtering | Applies an event pattern to forward only matching messages. |
| Transformation | Runs a Function Compute function for data cleansing -- splitting, mapping, enrichment, or dynamic routing. |
| Sink | Writes processed messages to the target SMQ queue. |
Supported regions
SMQ is available as a sink in the following regions and areas:
China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Zhangjiakou), China (Hohhot), China (Shenzhen), China (Chengdu), China (Hong Kong), US (Silicon Valley), US (Virginia), Germany (Frankfurt), Singapore, Japan (Tokyo), Malaysia (Kuala Lumpur), and Indonesia (Jakarta).
Create a message outflow task
Prerequisites
Before you begin, make sure that you have:
An ApsaraMQ for RocketMQ instance in the Running state. For more information, see Create resources
An SMQ queue. For more information, see Activate SMQ and authorize RAM users to access SMQ
Procedure
Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, choose Message Integration > Tasks.
In the top navigation bar, select a region. On the Tasks page, click Create Task.
On the Create Task page, specify a Task Name and Description, then complete each stage described below.
Configure the source
In the Source step, set Data Provider to ApsaraMQ for RocketMQ and configure the following parameters. Click Next Step when finished.
| Parameter | Description | Example |
|---|---|---|
| Region | Auto-filled with the region you selected in step 2. | China (Hangzhou) |
| Version | The RocketMQ instance version. Valid values: RocketMQ 4.x, RocketMQ 5.x. | RocketMQ 5.x |
| Instance | The instance that produces the messages. | rmq-cn-\*\*\*\* |
| Topic | The topic that produces the messages. | topic |
| Tag | A tag to filter messages at the source. | test\_tag |
| Group ID | The consumer group for the outflow task. Quickly Create (recommended): auto-generates a group named GID_EVENTBRIDGE_xxx. Use Existing Group: select an idle group. Using an active group affects the publishing and subscription of existing messages. | Quickly Create |
| Consumer Offset | Where to start consuming. Latest Offset: consume from the newest messages. Earliest Offset: consume from the oldest messages. Timestamp: consume from a specific point in time. | Latest Offset |
| Consumed At | Required only when Consumer Offset is set to Timestamp. Specifies the time from which to start consuming. | 2024-06-18 15:28:29 |
| Data Format (Body) | Encoding for binary message data. JSON (default): UTF-8 encoded, JSON-formatted in the payload. Text: UTF-8 encoded strings. Binary: Base64 encoded strings. | Json |
| Messages | Maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches this number. Valid values: 1 to 10000. | 100 |
| Interval (Unit: Seconds) | The time interval at which you want to invoke the function. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. A value of 0 sends messages immediately after aggregation. | 3 |
Configure filtering
In the Filtering step, define an event pattern to filter data. For more information, see Event patterns.
Configure transformation
In the Transformation step, specify a data cleansing method to implement data processing capabilities such as splitting, mapping, enrichment, and dynamic routing. For more information, see Use Function Compute to perform message cleansing.
Configure the sink
In the Sink step, set Service Type to SMQ and configure the following parameters.
| Parameter | Description | Example |
|---|---|---|
| Queue Name | The SMQ queue to receive messages. | test |
| Enable Base64 Encoding | Whether to Base64-encode message bodies. Keep this enabled -- disabling it causes garbled characters in received messages. | Yes |
| Message Body | The content to deliver. Valid values: Complete Data, Data Extraction, Fixed Value, Template. | Complete Data |
Configure task properties
Configure the retry policy for failed deliveries and the fault handling method. For more information, see Retry policies and dead-letter queues.
Click Save. On the Tasks page, the task status changes from Starting to Running when the task is ready.
Manage a task
On the Tasks page, find your task and use the Actions column to perform the following operations.
| Operation | Steps |
|---|---|
| View details | Click Details to open the Task Details page. View basic information, properties, and monitoring metrics. |
| Edit | Click Edit to open the Edit Task panel. Modify the task configuration and properties. |
| Enable or pause | Click Enable or Pause, then click OK in the confirmation dialog. |
| Delete | Click Delete, then click OK in the confirmation dialog. |
Related topics
Event patterns: Filter messages with event pattern matching.
Retry policies and dead-letter queues: Configure retry behavior and dead-letter handling.