All Products
Search
Document Center

EventBridge:ApsaraMQ for RocketMQ

Last Updated:Nov 30, 2023

This topic describes how to create an event stream whose event provider is ApsaraMQ for RocketMQ in the EventBridge console.

Prerequisites

Procedure

  1. Log on to the EventBridge console. In the left-side navigation pane, click Event Streams.

  2. In the top navigation bar, select a region and click Create Event Stream.

  3. On the Create Event Stream page, configure the Task Name and Description parameters and follow the on-screen instructions to configure other parameters. Then, click Save. The following section describes the parameters:

    • Task Creation

      1. In the Source step, set the Data Provider parameter to Message Queue for Apache RocketMQ and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.

        Parameter

        Description

        Example

        Region

        The region where the source ApsaraMQ for RocketMQ instance resides.

        China (Hangzhou)

        Version

        Instance

        The ApsaraMQ for RocketMQ on which messages are produced.

        MQ_INST_115964845466****_ByBeUp3p

        Topic

        The topic in which messages are produced on the ApsaraMQ for RocketMQ instance.

        topic

        Tag

        The tag that is used to filter messages on the ApsaraMQ for RocketMQ instance.

        test

        Group ID

        The name of the consumer group on the ApsaraMQ for RocketMQ instance. You must use a separate consumer group to create the event source. Do not use the same consumer group for ApsaraMQ for RocketMQ and another existing messaging service. Otherwise, you may fail to send and receive messages by using the existing messaging service.

        GID_http_1

        Consumer Offset

        The offset from which messages are consumed.

        Latest Offset

        Batch Push

        The batch push feature helps you aggregate multiple events at a time. This feature is triggered if the condition that is specified by the Messages parameter or the Batch Push Interval (Unit: Seconds) parameter is met.

        For example, if you set the Messages parameter to 100 and the Interval (Unit: Seconds) parameter to 15, the push is executed when the number of messages reaches 100 even if only 10 seconds are elapsed.

        Enable

        Messages

        The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.

        100

        Batch Push Interval (Unit: Seconds)

        The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 indicates that messages are immediately sent after aggregation.

        3

      2. In the Filtering, Transformation, and Sink steps, configure the event filtering method, event transformation rule, and event target. For information about event transformation configurations, see Use Function Compute to perform message cleansing.

    • Task Property

      Configure the retry policy and dead-letter queue for the event stream. For more information, see Retry policies and dead-letter queues.

  4. Go back to the Event Streams page and find the event stream that you created. Then, click Enable in the Actions column.

    Enabling an event stream requires 30 to 60 seconds to complete. You can view the progress in the Status column of the event stream on the Event Streams page.

Sample event

{
    "specversion":"1.0",
    "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
    "source":"acs:mq",
    "type":"mq:Topic:SendMessage",
    "subject":"acs:mq:cn-hangzhou:123456789098****:MQ_INST_123456789098****_BXhFHryi%TopicName",
    "datacontenttype":"application/json; charset=utf-8",
    "time":"2021-04-08T06:01:20.766Z",
    "aliyunpublishtime":"2021-04-08T06:01:20.725Z",
    "aliyuneventbusname":"BusName",
    "data":{
        "topic":"TopicName",
        "systemProperties":{
            "MIN_OFFSET":"0",
            "TRACE_ON":"true",
            "MAX_OFFSET":"8",
            "MSG_REGION":"cn-hangzhou",
            "KEYS":"systemProperties.KEYS",
            "CONSUME_START_TIME":1628577790396,
            "UNIQ_KEY":"AC14C305069E1B28CDFA3181CDA2****",
            "TAGS":"systemProperties.TAGS",
            "INSTANCE_ID":"MQ_INST_123456789098****_BXhFHryi"
        },
        "userProperties":{
        },
        "body":"TEST"
    }
}

For information about the parameters defined in the CloudEvents specification, see Overview.

The following table describes the parameters contained in data.

Parameter

Type

Example

Description

topic

String

TopicName

The topic name.

systemProperties

Map

The system properties.

MIN_OFFSET

Int

0

The minimum offset.

TRACE_ON

Boolean

true

Indicates whether a message trace exists. Valid values:

  • true

  • false

MAX_OFFSET

Int

8

The maximum offset.

MSG_REGION

String

cn-hangzhou

The region from which the message was sent.

KEYS

String

systemProperties.KEYS

The filtering keys.

CONSUME_START_TIME

Long

1628577790396

The start time of message consumption. Unit: milliseconds.

UNIQ_KEY

String

AC14C305069E1B28CDFA3181CDA2****

The unique key of the message.

TAGS

String

systemProperties.TAGS

The filtering tags.

INSTANCE_ID

String

MQ_INST_123456789098****_BXhFHryi

The instance ID.

userProperties

Map

The user properties.

body

String

TEST

The message body.