EventBridge polls messages from an ApsaraMQ for RocketMQ topic, converts them into CloudEvents, and delivers them to a custom event bus for rule-based routing. Once configured, you can analyze events, view event traces, and forward data to downstream targets.
Limits
Each event source requires a dedicated consumer group. Do not reuse a consumer group that serves existing production workloads. Assign a unique group ID to every event source.
Both RocketMQ 4.x and RocketMQ 5.x instances are supported. Select the version that matches your instance.
Prerequisites
Before you begin, make sure that you have:
ApsaraMQ for RocketMQ
An activated ApsaraMQ for RocketMQ service with the required permissions granted to a Resource Access Management (RAM) user. For more information, see Activate and grant permissions on ApsaraMQ for RocketMQ
An ApsaraMQ for RocketMQ instance with at least one topic and one consumer group. For more information, see Create resources
EventBridge
An activated EventBridge service with the required permissions granted to a RAM user. For more information, see Activate EventBridge and grant permissions to a RAM user
A custom event bus. For more information, see Create a custom event bus
Add a RocketMQ event source
Log on to the EventBridge console.
In the left-side navigation pane, click Event Buses.
In the top navigation bar, select a region.
On the Event Buses page, click the name of the target event bus.
In the left-side navigation pane, click Event Sources.
Click Add Event Source.
In the Add Custom Event Source panel, configure the following parameters and click OK.
Parameter Required Description Name Yes Enter a name for the event source. Description Yes Enter a description. Event Provider Yes Select ApsaraMQ for RocketMQ. Region Yes Select the region where your ApsaraMQ for RocketMQ instance resides. Version Yes Select the instance version: RocketMQ 4.x or RocketMQ 5.x. Instance Yes Select the ApsaraMQ for RocketMQ instance. Topic Yes Select the topic on the instance. Tag No Enter a tag to filter messages. Group ID Yes Select the consumer group ID. A group corresponds to a type of business message. Use a dedicated group that is not shared with other applications or event sources. Consumer Offset Yes Select the offset from which to start consuming messages. Default: Latest Offset.
Verify the integration
After you create the event source, publish a test message to the RocketMQ topic and check the event trace in the EventBridge console. If the configuration is correct, the message appears in the event bus as a CloudEvents-formatted event.
Sample event
After the event source is created, messages published to the RocketMQ topic are delivered to the event bus as CloudEvents in the following format:
{
"id": "94ebc15f-f0db-4bbe-acce-56fb72fb****",
"source": "acs:mq",
"specversion": "1.0",
"type": "mq:Topic:SendMessage",
"datacontenttype": "application/json; charset=utf-8",
"subject": "acs:mq:cn-hangzhou:123456789098****:MQ_INST_123456789098****_BXhFHryi%TopicName",
"time": "2021-04-08T06:01:20.766Z",
"aliyunpublishtime": "2021-04-08T06:01:20.725Z",
"aliyuneventbusname": "BusName",
"data": {
"topic": "TopicName",
"systemProperties": {
"MIN_OFFSET": "0",
"TRACE_ON": "true",
"MAX_OFFSET": "8",
"MSG_REGION": "cn-hangzhou",
"KEYS": "systemProperties.KEYS",
"CONSUME_START_TIME": 1628577790396,
"UNIQ_KEY": "AC14C305069E1B28CDFA3181CDA2****",
"TAGS": "systemProperties.TAGS",
"INSTANCE_ID": "MQ_INST_123456789098****_BXhFHryi"
},
"userProperties": {},
"body": "TEST"
}
}For more information about CloudEvents envelope fields (id, source, specversion, type, datacontenttype, subject, time), see Event overview.
Data field reference
The data object contains the following fields.
| Field | Type | Example | Description |
|---|---|---|---|
| topic | String | TopicName | The RocketMQ topic name. |
| systemProperties | Map | -- | System-level message properties. See the fields below. |
| systemProperties.MIN_OFFSET | Int | 0 | The earliest offset in the queue. |
| systemProperties.TRACE_ON | Boolean | true | Whether a message trace exists. Valid values: true, false. |
| systemProperties.MAX_OFFSET | Int | 8 | The latest offset in the queue. |
| systemProperties.MSG_REGION | String | cn-hangzhou | The region from which the message was sent. |
| systemProperties.KEYS | String | systemProperties.KEYS | Keys used to filter the message. |
| systemProperties.CONSUME_START_TIME | Long | 1628577790396 | The start time of message consumption, in milliseconds. |
| systemProperties.UNIQ_KEY | String | AC14C305069E1B28CDFA3181CDA2**** | The unique message identifier. |
| systemProperties.TAGS | String | systemProperties.TAGS | Tags used to filter the message. |
| systemProperties.INSTANCE_ID | String | MQ_INST_123456789098****_BXhFHryi | The ApsaraMQ for RocketMQ instance ID. |
| userProperties | Map | None | Custom user-defined properties attached to the message. |
| body | String | TEST | The message body. |