MQTT clients often publish high-volume telemetry data that downstream systems -- analytics pipelines, order processors, or monitoring services -- consume through RocketMQ. A message outflow task bridges these two systems: it continuously forwards messages from an ApsaraMQ for MQTT instance to an ApsaraMQ for RocketMQ instance through EventBridge, with optional filtering and transformation in transit.
How it works
A message outflow task routes data from your MQTT broker to a RocketMQ instance through EventBridge in four stages:
Publish -- MQTT clients publish messages to a topic on your ApsaraMQ for MQTT instance.
Filter -- EventBridge evaluates each message against the event pattern you define and drops non-matching messages.
Transform -- EventBridge reshapes matched messages -- splitting, mapping, enriching, or rerouting fields as needed.
Deliver -- EventBridge extracts the message body and properties with JSONPath expressions and forwards them to the target topic on your ApsaraMQ for RocketMQ instance.
Prerequisites
Before you begin, make sure that you have:
An ApsaraMQ for MQTT instance that is purchased and deployed, in the Running state. For more information, see Create resources
EventBridge activated with the required permissions granted to a Resource Access Management (RAM) user. For more information, see Activate EventBridge and grant permissions to a RAM user
Create a message outflow task
Log on to the ApsaraMQ for MQTT console. In the left-side navigation pane, choose Message Integration > Tasks.
In the top navigation bar, select a region, such as China (Hangzhou). On the Tasks page, click Create Task.
On the Create Task page, specify a Task Name and Description, then complete the following configuration steps.
Step 1: Configure the source
In the Source step, set Data Provider to Message Queue for MQTT and configure the following parameters. Click Next Step when finished.
Parameter Description Example Region Region of your ApsaraMQ for MQTT instance. Auto-filled based on your current region selection. China (Hangzhou) MQTT Instance The ApsaraMQ for MQTT instance that produces the messages to route. post-cn-jajh8i\*\*\*\* MQTT Topic The topic that contains the messages to route. test-topic Data Format Encoding applied to binary message data before delivery. JSON (default): UTF-8 encoded JSON in the payload. Text: UTF-8 encoded string in the payload. Binary: Base64 encoded string in the payload. JSON Batch Push Aggregates multiple events before delivery. A push triggers when either the Messages or Interval (Unit: Seconds) threshold is reached first. None Messages Maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10,000. 100 Interval (Unit: Seconds) The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Set to 0 to push immediately after aggregation. 3 For example, if you set Messages to 100 and Interval (Unit: Seconds) to 15, a push triggers as soon as 100 messages accumulate or 15 seconds elapse -- whichever comes first.
Step 2: Configure filtering
In the Filtering step, define an event pattern in the Pattern Content code editor to select which messages to forward. For the full pattern syntax, see Event patterns.
Step 3: Configure transformation
In the Transformation step, specify a data cleansing method to reshape messages in transit. Supported operations include splitting, mapping, enrichment, and dynamic routing. For details, see Data cleansing.
Step 4: Configure the sink
In the Sink step, set Service Type to Message Queue for Apache RocketMQ and configure the following parameters.
Parameter Description Example Version Version of the target ApsaraMQ for RocketMQ instance. RocketMQ 4.x Instance ID ID of the target ApsaraMQ for RocketMQ instance. test Topic Target topic on the RocketMQ instance that receives the forwarded messages. test Message Body JSONPath expression that extracts the message body from the event and routes the specified event content to the event target. $.data.bodyCustom Property JSONPath expression that extracts user-defined properties from the event and routes the specified event content to the event target. $.data.userPropertiesMessage Key JSONPath expression that extracts the message key from the event and routes the specified content to the event target. $.data.systemProperties.KEYSMessage Tag JSONPath expression that extracts the message tag from the event and routes the specified content to the event target. $.data.systemProperties.TAGSStep 5: Configure the task property
Under Task Property, configure the retry policy for failed deliveries and the fault handling method. For details, see Retry policies and dead-letter queues.
Go back to the Tasks page, find the task you created, and click Enable in the Actions column.
In the Note message, click OK.
The task takes 30 to 60 seconds to start. Track the progress in the Status column on the Tasks page.
Manage the task
On the Tasks page, find the task and use the Actions column:
| Action | Description |
|---|---|
| Details | View basic information, properties, and monitoring metrics on the Task Details page. |
| Edit | Modify the task configuration in the Edit Task panel. |
| Enable / Pause | Start or pause the task. Confirm in the Note message. |
| Delete | Remove the task. Confirm in the Note message. |