When MQTT devices produce messages that downstream enterprise systems need to consume through AMQP, you can create a message outflow task to bridge the two protocols. The task uses EventBridge to route messages from an ApsaraMQ for MQTT instance to an ApsaraMQ for RabbitMQ instance, with optional filtering and transformation in between.
How it works
A message outflow task connects your MQTT broker to a RabbitMQ instance through EventBridge:
MQTT clients publish messages to a topic on your ApsaraMQ for MQTT instance.
EventBridge receives the messages, applies optional filtering and transformation rules, and encodes the payload in the specified data format.
EventBridge delivers the processed messages to an exchange or queue on your ApsaraMQ for RabbitMQ instance.
You configure the entire pipeline -- source, filtering, transformation, and sink -- in a single task through the ApsaraMQ for MQTT console.
Prerequisites
Before you begin, make sure that you have:
An ApsaraMQ for MQTT instance in the Running state. For more information, see Create resources
An ApsaraMQ for RabbitMQ instance in the Running state, with at least one vhost and one exchange or queue created. For more information, see Step 2: Create resources
EventBridge activated, with the required permissions granted to a Resource Access Management (RAM) user. For more information, see Activate EventBridge and grant permissions to a RAM user
Create a message outflow task
Step 1: Start the task wizard
Log on to the ApsaraMQ for MQTT console.
In the left-side navigation pane, choose Message Integration > Tasks.
In the top navigation bar, select a region, such as China (Hangzhou).
On the Tasks page, click Create Task.
On the Create Task page, enter a Task Name and Description, then proceed to configure the task.
Step 2: Configure the source
In the Source step, set Data Provider to Message Queue for MQTT and configure the following parameters. Then click Next Step.
| Parameter | Description | Example |
|---|---|---|
| Region | The region of your ApsaraMQ for MQTT instance. Auto-filled based on the region you selected. | China (Hangzhou) |
| MQTT Instance | The ApsaraMQ for MQTT instance that produces the messages to route. | post-cn-jajh8i\*\*\*\* |
| MQTT Topic | The topic that produces the messages to route. | test-topic |
| Data Format | The encoding format applied to binary data before delivery. See Data format options. | JSON |
| Batch Push | Aggregates multiple events before pushing. A push is triggered when either the Messages threshold or the Interval (Unit: Seconds) threshold is reached, whichever comes first. | None |
| Messages | The maximum number of messages per push. Valid values: 1 to 10000. | 100 |
| Interval (Unit: Seconds) | The maximum wait time before a push. Valid values: 0 to 15. A value of 0 sends messages immediately after aggregation. | 3 |
Data format options
| Format | Encoding | When to use |
|---|---|---|
| JSON (default) | Binary data encoded as JSON using UTF-8, placed in the payload. | Works with most downstream consumers. |
| Text | Binary data encoded as a UTF-8 string, placed in the payload. | Consumers expect plain text. |
| Binary | Binary data encoded as a Base64 string, placed in the payload. | Preserving raw binary content. |
Step 3: Configure filtering (optional)
In the Filtering step, define a data pattern in the Pattern Content code editor to filter events before they reach the sink.
For the full pattern syntax and examples, see Event patterns.
Step 4: Configure transformation (optional)
In the Transformation step, define a data cleansing method to split, map, enrich, or dynamically route messages before delivery. For details, see Data cleansing.
Step 5: Configure the sink
In the Sink step, set Service Type to Message Queue for RabbitMQ and configure the following parameters.
| Parameter | Description | Example |
|---|---|---|
| Instance ID | The ApsaraMQ for RabbitMQ instance to receive messages. | amqp-cn-zvp2pny6\*\*\*\* |
| vhost | The virtual host (vhost) on the RabbitMQ instance. | test |
| Destination Type | Exchange: Routes messages to one or more queues through an exchange. Queue: Sends messages directly to a queue. | Queue |
| Exchange | The exchange to route messages to. Required only when Destination Type is set to Exchange. | exchange |
| Queue | The queue to send messages to. Required only when Destination Type is set to Queue. | queue |
| Message Routing Key | A JSONPath expression that extracts the routing key from the event. Required only when Destination Type is set to Exchange. | $.data.key |
| Message Body | A JSONPath expression that extracts the message body from the event. | $.data.body |
| MessageId | A JSONPath expression that extracts the message ID from the event. | $.data.props.messageId |
| Custom Property | A JSONPath expression that extracts custom properties from the event. | $.data.props |
All JSONPath fields use Data Extraction mode with the $.field.path syntax to extract specific values from the event payload.
Step 6: Configure the retry policy
Configure the retry policy for failed deliveries and the fault handling method. For details, see Retry policies and dead-letter queues.
Step 7: Enable the task
Go back to the Tasks page, find the task you created, and click Enable in the Actions column.
In the Note dialog, click OK.
The task takes 30 to 60 seconds to start. Track progress in the Status column on the Tasks page.
Verify the task
After the task is enabled:
Publish a test message to the MQTT topic you configured as the source.
Check the target exchange or queue on your ApsaraMQ for RabbitMQ instance to confirm the message arrived.
On the Tasks page, click Details in the Actions column to view the monitoring metrics for the task.
If messages do not appear in the RabbitMQ queue, check the following:
The MQTT topic name matches the source configuration.
The RabbitMQ instance, vhost, and exchange or queue names match the sink configuration.
The retry policy and dead-letter queue are configured to capture failed deliveries.
Manage existing tasks
On the Tasks page, find the task and use the Actions column to manage it:
| Action | Description |
|---|---|
| Details | View basic information, properties, and monitoring metrics. |
| Edit | Modify the task configuration, including source, sink, and retry settings. |
| Enable / Pause | Start or stop message routing. Click OK in the confirmation dialog. |
| Delete | Permanently remove the task. Click OK in the confirmation dialog. |
Related topics
Event patterns: Write filter patterns to selectively route messages.
Data cleansing: Transform, split, or enrich event data before delivery.
Retry policies and dead-letter queues: Configure retry behavior and dead-letter queues for failed deliveries.