When IoT devices or applications publish messages over MQTT, downstream services often need the same data in Apache Kafka for analytics, storage, or stream processing. A message outflow task connects an ApsaraMQ for MQTT topic to an ApsaraMQ for Kafka topic through EventBridge, so messages flow automatically without custom bridging code.
How it works
A message outflow task moves data through four stages:
Ingest -- MQTT clients publish messages to a topic on the ApsaraMQ for MQTT instance.
Filter -- EventBridge evaluates each message against an event pattern to filter messages.
Transform -- EventBridge reshapes the payload through splitting, mapping, enrichment, or dynamic routing.
Deliver -- EventBridge writes the processed message to the specified Kafka topic, using JSONPath to extract the message key and value.
Prerequisites
Before you begin, make sure you have:
An ApsaraMQ for MQTT instance in the Running state. See Create resources
An ApsaraMQ for Kafka instance in the Running state. See Get started with ApsaraMQ for Kafka
EventBridge activated and the required permissions granted to a Resource Access Management (RAM) user. See Activate EventBridge and grant permissions to a RAM user
Create a message outflow task
Log on to the ApsaraMQ for MQTT console. In the left-side navigation pane, choose Message Integration > Tasks.
In the top navigation bar, select a region. On the Tasks page, click Create Task.
On the Create Task page, enter a Task Name and Description, then configure the following steps. After you finish, click Save.
Step 1: Configure the source
Select the MQTT instance and topic that produce the messages you want to route.
In the Source step, set Data Provider to Message Queue for MQTT and configure the following parameters. Then click Next Step.
| Parameter | Description | Example |
|---|---|---|
| Region | Region of the ApsaraMQ for MQTT instance. Auto-filled based on the region you selected during instance creation. | China (Hangzhou) |
| MQTT Instance | The instance that produces the messages to route. | post-cn-jajh8i\*\*\*\* |
| MQTT Topic | The topic that produces the messages to route. | test-topic |
| Data Format | Encoding for the binary payload before delivery. JSON (default): encodes binary data as JSON with UTF-8. Text: encodes binary data as a UTF-8 string. Binary: encodes binary data as a Base64 string. | JSON |
| Batch Push | Aggregates multiple events per delivery. A push triggers when either the Messages threshold or the Interval threshold is reached, whichever comes first. | -- |
| Messages | Maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10,000. | 100 |
| Interval (Unit: Seconds) | Time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Set to 0 for immediate delivery after aggregation. | 3 |
Step 2: Filter messages
Define which messages pass through to the sink.
In the Filtering step, define an event pattern in the Pattern Content editor to select which messages proceed. For pattern syntax, see Event patterns.
Step 3: Transform messages
Reshape the payload before delivery.
In the Transformation step, configure a data cleansing rule to split, map, enrich, or dynamically route messages. For details, see Data cleansing.
Step 4: Configure the sink
Map the processed messages to a Kafka topic.
In the Sink step, set Service Type to Message Queue for Apache Kafka and configure the following parameters.
| Parameter | Description | Example |
|---|---|---|
| Instance ID | The ApsaraMQ for Kafka instance that receives the messages. | test |
| Topic | The Kafka topic to write messages to. | test |
| Acknowledgment Mode | How the Kafka broker acknowledges receipt of a message. | -- |
| Message Value | JSONPath expression that extracts the message body from the event payload. | $.data.value |
| Message Key | JSONPath expression that extracts the message key from the event payload. | $.data.key |
Step 5: Configure task properties
Define how the task handles delivery failures.
Configure the retry policy and fault-handling method for failed deliveries. For details, see Retry policies and dead-letter queues.
Enable the task
A newly created task is disabled by default. To activate it:
On the Tasks page, find the task and click Enable in the Actions column.
In the Note dialog box, click OK.
The task takes 30 to 60 seconds to start. Track progress in the Status column.
Manage the task
On the Tasks page, find the task and use the Actions column to manage it.
| Operation | Steps |
|---|---|
| View details | Click Details to view basic information, properties, and monitoring metrics. |
| Edit the task | Click Edit to modify task details and properties in the Edit Task panel. |
| Pause or resume | Click Pause to stop the task or Enable to restart it. Confirm by clicking OK. |
| Delete the task | Click Delete, then click OK to confirm. |
Related topics
Event patterns -- Write filter patterns for fine-grained message selection.
Data cleansing -- Explore transformation options such as payload splitting and enrichment.
Retry policies and dead-letter queues -- Configure error handling and dead-letter queues for failed deliveries.