本文介紹如何在事件匯流排EventBridge控制台添加雲訊息佇列 Kafka 版作為事件流中的事件提供方。
前提條件
您已購買並部署雲訊息佇列 Kafka 版執行個體,且執行個體處於服務中狀態。具體步驟,請參見購買和部署執行個體。
操作步驟
- 登入事件匯流排EventBridge控制台,在左側導覽列,單擊事件流。
- 在頂部功能表列,選擇地區,然後單擊建立事件流。
在建立事件流面板,設定任務名稱和描述,配置以下參數,然後單擊儲存。
任務建立
在Source(源)設定精靈,選擇資料提供方為訊息佇列 Kafka 版,設定以下參數,然後單擊下一步。
在Filtering(過濾)、Transform(轉換)及Sink(目標)設定精靈,設定事件過濾、轉換規則及事件目標。事件轉換的配置說明,請參見使用Function Compute實現訊息資料清洗。
參數
說明
樣本
地區
選擇雲訊息佇列 Kafka 版源執行個體所在的地區。
華北2(北京)
kafka 執行個體
選擇生產雲訊息佇列 Kafka 版訊息的源執行個體。
MQ_INST_115964845466****_ByBeUp3p
Topic
選擇生產雲訊息佇列 Kafka 版訊息的Topic。
topic
Group ID
選擇源執行個體的消費組名稱。請使用獨立的消費組來建立事件來源,不要和已有的業務混用消費組,以免影響已有的訊息收發。
GID_http_1
消費位點
選擇開始消費訊息的位點。
最新位點
網路設定
選擇路由訊息的網路類型。
預設網路
Virtual Private Cloud
選擇VPC ID。當網路設定設定為自建公網時需要設定此參數。
vpc-bp17fapfdj0dwzjkd****
交換器
選擇vSwitch ID。當網路設定設定為自建公網時需要設定此參數。
vsw-bp1gbjhj53hdjdkg****
安全性群組
選擇安全性群組。當網路設定設定為自建公網時需要設定此參數。
alikafka_pre-cn-7mz2****
批量推送
批量推送可幫您批量彙總多個事件,當批量推送条数和批量推送间隔(单位:秒)兩者條件達到其一時即會觸發批量推送。
例如:您設定的推送條數為100 條,間隔時間為15 s,在10 s內訊息條數已達到100條,那麼該次推送則不會等15 s後再推送。
開啟
批量推送条数
調用函數發送的最大批量訊息條數,當積壓的訊息數量到達設定值時才會發送請求,取值範圍為 [1,10000]。
100
批量推送间隔(单位:秒)
調用函數的間隔時間,系統每到間隔時間點會將訊息彙總後發給Function Compute,取值範圍為[0,15],單位為秒。0秒錶示無等待時間,直接投遞。
3
任務屬性
設定事件流的重試策略及無效信件佇列。更多資訊,請參見重試和死信。
返回事件流頁面,找到建立好的事件流,在其右側操作欄,單擊啟用。
啟用事件流後,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。
事件樣本
{
"specversion": "1.0",
"id": "8e215af8-ca18-4249-8645-f96c1026****",
"source": "acs:alikafka",
"type": "alikafka:Topic:Message",
"subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-23T02:49:51.589Z",
"aliyunaccountid": "182572506381****",
"data": {
"topic": "****",
"partition": 7,
"offset": 25,
"timestamp": 1655952591589,
"headers": {
"headers": [],
"isReadOnly": false
},
"key": "keytest",
"value": "hello kafka msg"
}
}CloudEvents規範中定義的參數解釋,請參見事件概述。
data欄位包含的參數解釋如下表所示。
參數 | 類型 | 樣本值 | 描述 |
topic | String | TopicName | Topic的名稱。 |
partition | Int | 1 | 雲訊息佇列 Kafka 版的消費分區資訊。 |
offset | Int | 0 | 雲訊息佇列 Kafka 版的訊息位點。 |
timestamp | String | 1655952591589 | 開始消費時間戳記。 |
headers.headers | List | [header1, header2] | 訊息 header |
headers.isReadOnly | Boolean | false | 此欄位僅保留,無實際意義 |
key | String | dataKey | 訊息 key |
value | String | dataValue | 訊息 value。具體內容格式和任務配置的資料格式相關。
|