This topic describes how to create an event stream whose event provider is ApsaraMQ for Kafka in the EventBridge console.
Prerequisites
EventBridge is activated and the required permissions are granted to a Resource Access Management (RAM) user. For more information, see Activate EventBridge and grant permissions to a RAM user.
An ApsaraMQ for Kafka instance is purchased and deployed. Make sure that the instance is in the Running state. For more information, see Step 2: Purchase and deploy an instance in the "Procedure" section of the Overview topic.
Procedure
Log on to the EventBridge console. In the left-side navigation pane, click Event Streams.
In the top navigation bar, select a region and click Create Event Stream.
On the Create Event Stream page, configure the Task Name and Description parameters and follow the on-screen instructions to configure other parameters. Then, click Save. The following section describes the parameters:
Task Creation
In the Source step, set the Data Provider parameter to Message Queue for Apache Kafka and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.
In the Filtering, Transformation, and Sink steps, configure the event filtering method, event transformation rule, and event target. For information about event transformation configurations, see Use Function Compute to perform message cleansing.
Parameter
Description
Example
Region
The region where the ApsaraMQ for Kafka instance resides.
China (Beijing)
Message Queue for Apache Kafka Instance
The ApsaraMQ for Kafka instance on which messages are produced.
MQ_INST_115964845466****_ByBeUp3p
Topic
The topic in which messages are produced on the ApsaraMQ for Kafka instance.
topic
Group ID
The name of the consumer group on the ApsaraMQ for Kafka instance. You must use a separate consumer group to create the message routing source. Do not use the same consumer group for ApsaraMQ for Kafka and another existing messaging service. Otherwise, you may fail to send or receive messages by using the existing messaging service.
GID_http_1
Concurrency Quota (Consumers)
The number of consumers on the ApsaraMQ for Kafka instance.
1
Consumer Offset
The offset from which messages are consumed.
Latest Offset
Network Configuration
The type of the network over which you want to route messages.
Basic Network
VPC
The ID of the virtual private cloud (VPC) in which the ApsaraMQ for Kafka instance is deployed. This parameter is required only if you set the Network Configuration parameter to Internet.
vpc-bp17fapfdj0dwzjkd****
vSwitch
The ID of the vSwitch to which the ApsaraMQ for Kafka instance belongs. This parameter is required only if you set the Network Configuration parameter to Internet.
vsw-bp1gbjhj53hdjdkg****
Security Group
The security group to which the ApsaraMQ for Kafka instance belongs. This parameter is required only if you set the Network Configuration parameter to Internet.
alikafka_pre-cn-7mz2****
Batch Push
The batch push feature helps you aggregate multiple events at a time. This feature is triggered if the condition that is specified by the Messages parameter or the Batch Push Interval (Unit: Seconds) parameter is met.
For example, if you set the Messages parameter to 100 and the Interval (Unit: Seconds) parameter to 15, the push is executed when the number of messages reaches 100 even if only 10 seconds are elapsed.
Enable
Messages
The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.
100
Batch Push Interval (Unit: Seconds)
The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 indicates that messages are sent immediately after aggregation.
3
Task Property
Configure the retry policy and dead-letter queue for the event stream. For more information, see Retry policies and dead-letter queues.
Go back to the Event Streams page and find the event stream that you created. Then, click Enable in the Actions column.
Enabling an event stream requires 30 to 60 seconds to complete. You can view the progress in the Status column of the event stream on the Event Streams page.
Sample event
{
"specversion": "1.0",
"id": "8e215af8-ca18-4249-8645-f96c1026****",
"source": "acs:alikafka",
"type": "alikafka:Topic:Message",
"subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-23T02:49:51.589Z",
"aliyunaccountid": "182572506381****",
"data": {
"topic": "****",
"partition": 7,
"offset": 25,
"timestamp": 1655952591589,
"headers": {
"headers": [],
"isReadOnly": false
},
"key": "keytest",
"value": "hello kafka msg"
}
}
For information about the parameters defined in the CloudEvents specification, see Overview.
The following table describes the parameters contained in data.
Parameter | Type | Example | Details |
topic | String | TopicName | The topic name. |
partition | Int | 1 | The information about partitions on the ApsaraMQ for Kafka instance. |
offset | Int | 0 | The message offset of the ApsaraMQ for Kafka instance. |
timestamp | String | 1655952591589 | The timestamp that indicates when message consumption started. |