All Products
Search
Document Center

EventBridge:Create an event stream from ApsaraMQ for MQTT

Last Updated:Mar 11, 2026

When IoT devices or messaging clients publish to MQTT topics, you often need to route those messages to downstream services for processing, storage, or analysis. You can create an event stream in EventBridge to consume messages from an ApsaraMQ for MQTT source and deliver them to a target.

An event stream from ApsaraMQ for MQTT follows this pipeline:

Source (ApsaraMQ for MQTT) --> Filter (optional) --> Transform (optional) --> Target (sink)

Each stage works as follows:

StagePurpose
SourceConsumes messages from an MQTT topic on your ApsaraMQ for MQTT instance
FilterEvaluates each event against a pattern and discards non-matching events
TransformReshapes event data before delivery -- for example, extracting fields or reformatting payloads
TargetDelivers the processed event to a downstream service

Prerequisites

Before you begin, make sure that you have:

Create the event stream

  1. Log on to the EventBridge console. In the left-side navigation pane, click Event Streams.

  2. In the top navigation bar, select a region and click Create Event Stream.

  3. On the Create Event Stream page, enter a Task Name and Description, then complete the following sections.

Configure the MQTT source

  1. In the Source step, set Data Provider to Message Queue for MQTT.

  2. Configure the source parameters, then click Next Step.

    When Batch Push is enabled and you set Messages to 100 and Batch Push Interval (Unit: Seconds) to 15, messages are sent when the number of messages reaches 100 even if only 10 seconds have elapsed.
    ParameterDescriptionExample
    MQTT InstanceThe ApsaraMQ for MQTT instance on which messages are produced.test-instance
    MQTT TopicThe topic in which messages are produced on the ApsaraMQ for MQTT instance.test-topic
    Batch PushAggregates multiple events at a time. This feature is triggered if the condition that is specified by the Messages parameter or the Batch Push Interval (Unit: Seconds) parameter is met.Enable
    MessagesThe maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.100
    Batch Push Interval (Unit: Seconds)The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. 0 means messages are sent immediately after aggregation.3

Configure filtering, transformation, and target

  1. In the Filtering, Transformation, and Sink steps, set the event filtering method, transformation rule, and event target. For details on using Function Compute for message transformation, see Use Function Compute to perform message cleansing.

Configure retry and dead-letter queue

  1. In the Task Property step, set the retry policy and dead-letter queue. When delivery to the target fails, EventBridge retries based on the configured policy. Events that remain undeliverable after all retries are routed to the dead-letter queue for inspection and reprocessing. For the full list of retry options, see Retry policies and dead-letter queues.

Save and enable the event stream

  1. Click Save.

  2. Return to the Event Streams page, find the event stream, and click Enable in the Actions column. Enabling takes 30 to 60 seconds. Track the progress in the Status column.

Sample event

When a message is published to the MQTT topic, EventBridge wraps it in a CloudEvents-compliant envelope and delivers it to the target:

{
    "specversion": "1.0",
    "id": "AC1EC0C950650816F27D46F7D7CA****",
    "source": "acs:mqtt",
    "type": "mqtt:Topic:SendMessage",
    "subject": "acs:mq:cn-hangzhou:143998900779****:topic/mqtt-cn-2r42qam****/housekee****",
    "datacontenttype": "application/json; charset=utf-8",
    "time": "2022-06-22T03:53:47.959Z",
    "aliyunaccountid": "143998900779****",
    "data": {
        "props": {
            "firstTopic": "housekee****",
            "secondTopic": "/testMq4****",
            "clientId": "GID_****"
        },
        "body": "TEST"
    }
}

CloudEvents envelope fields

For information about the parameters defined in the CloudEvents specification, see Event overview.

MQTT message payload (data)

The data object contains the MQTT message payload:

ParameterTypeExampleDescription
propsMap--Message attributes
props.firstTopicStringhousekee****The parent topic used to send and receive messages
props.secondTopicString/testMq4****The subtopic
props.clientIdStringGID_****The client ID
bodyStringTESTThe message body