This topic describes how to configure ApsaraMQ for Kafka as an event source for an event stream in the EventBridge console.
Prerequisites
An ApsaraMQ for Kafka instance is purchased and deployed, and the instance is in the In Service state. For more information, see Purchase and deploy an instance.
Procedure
Log on to the EventBridge console. In the left-side navigation pane, click Event Streams.
In the top navigation bar, select a region and click Create Event Stream.
In the Create Event Stream panel, set Task Name and Description, configure the following parameters, and then click Save.
Task creation
In the Source configuration wizard, set Data Provider to ApsaraMQ for Kafka, configure the following parameters, and then click Next.
In the Filtering, Transform, and Sink configuration wizards, set the event filtering rule, transformation rule, and event target. For more information about how to configure event transformation, see Use Function Compute for data cleansing.
Parameter
Description
Example
Region
Select the region where the source ApsaraMQ for Kafka instance resides.
China (Beijing)
Kafka Instance
Select the source instance that produces ApsaraMQ for Kafka messages.
MQ_INST_115964845466****_ByBeUp3p
Topic
Select the topic that produces ApsaraMQ for Kafka messages.
topic
Group ID
Select the name of the consumer group for the source instance. Use a dedicated consumer group to create the event source. Do not share the consumer group with existing services. This prevents interference with existing message sending and receiving.
GID_http_1
Consumer Offset
Select the offset from which to start message consumption.
Latest offset
Network Type
Select the network type for message routing.
Default Network
VPC
Select the VPC ID. This parameter is required if you set Network Type to Internet.
vpc-bp17fapfdj0dwzjkd****
VSwitch
Select the vSwitch ID. This parameter is required if you set Network Type to Internet.
vsw-bp1gbjhj53hdjdkg****
Security Group
Select the security group. This parameter is required if you set Network Type to Internet.
alikafka_pre-cn-7mz2****
Batch Push
Batch push aggregates multiple events. A batch push is triggered when the value of Messages or Interval (Unit: Seconds) is reached.
For example, you set the push count to 100 and the interval to 15s. If the number of messages reaches 100 in 10s, the push is triggered immediately without waiting for the 15s interval to elapse.
Enable
Messages
The maximum number of messages that can be sent in a single function call. A request is sent only when the number of accumulated messages reaches the specified value. Valid values: [1, 10000].
100
Interval (Unit: Seconds)
The interval at which the function is invoked. The system aggregates messages and sends them to Function Compute at the specified interval. Valid values: [0, 15]. Unit: seconds. A value of 0 indicates no wait time and messages are delivered immediately.
3
Task properties
Set the retry policy and dead-letter queue for the event stream. For more information, see Retries and dead-letter queues.
Return to the Event Streams page. Find the event stream that you created, and then click Enable in the Actions column.
After you enable the event stream, it may take 30 to 60 seconds to start. You can view the startup progress in the Status column on the Event Streams page.
Sample event
{
"specversion": "1.0",
"id": "8e215af8-ca18-4249-8645-f96c1026****",
"source": "acs:alikafka",
"type": "alikafka:Topic:Message",
"subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-23T02:49:51.589Z",
"aliyunaccountid": "182572506381****",
"data": {
"topic": "****",
"partition": 7,
"offset": 25,
"timestamp": 1655952591589,
"headers": {
"headers": [],
"isReadOnly": false
},
"key": "keytest",
"value": "hello kafka msg"
}
}For information about the parameters that are defined in the CloudEvents specification, see Event overview.
The following table describes the parameters in the data field.
Parameter | Type | Example | Description |
topic | String | TopicName | The name of the topic. |
partition | Int | 1 | The consumer partition of ApsaraMQ for Kafka. |
offset | Int | 0 | The message offset in ApsaraMQ for Kafka. |
timestamp | String | 1655952591589 | The UNIX timestamp when message consumption starts. |
headers.headers | List | [header1, header2] | The message header. |
headers.isReadOnly | Boolean | false | This field is reserved and has no practical significance. |
key | String | dataKey | The message key. |
value | String | dataValue | The message value. The content format depends on the data format that is configured for the task.
|