本文介紹如何在事件匯流排EventBridge控制台添加Data Transmission Service作為事件流中的事件提供方。
前提條件
在Data Transmission Service控制台建立資料訂閱任務且任務狀態為正常。詳細操作,請參見資料訂閱操作指導。
在建立的資料訂閱任務中新增消費組。
支援地區
支援將事件流中的事件提供方設定為Data Transmission Service的地區有華東1(杭州)、華東2(上海)、華北1(青島)、華北2(北京)、華南1(深圳)、華南3(廣州)、西南1(成都)、中國香港。
操作步驟
事件匯流排EventBridge的事件流僅中轉操作類型為INSERT、DELETE、UPDATE和DDL的DTS資料。
- 登入事件匯流排EventBridge控制台,在左側導覽列,單擊事件流。
- 在頂部功能表列,選擇地區,然後單擊建立事件流。
在建立事件流面板,設定任務名稱和描述,然後配置以下參數,最後單擊儲存。
任務建立:
在Source(源)設定精靈,選擇資料提供方為資料庫傳輸服務 DTS(資料庫),設定以下參數,然後單擊下一步。
參數
說明
樣本
資料訂閱任務
選擇您在Data Transmission Service控制台上建立的資料訂閱任務名稱。
dts8jqe****
接入方式
預設為建立的資料訂閱任務的接入方式且不可更改。
RDS
執行個體ID
預設為建立資料訂閱任務時訂閱的執行個體且不可更改。
rm-bp18mj3q2dzyb****
消費組
在前提條件中建立的用於消費訂閱任務資料的消費組名稱。
說明請確保該消費組沒有在其他用戶端的執行個體上運行,否則可能導致傳入的消費位點失效。
test
帳號
建立消費組時設定的帳號。
test
密碼
建立消費組時設定的密碼。
******
消費位點
期望消費第一條資料的時間戳記。消費位點必須在訂閱執行個體的資料範圍之內。
說明消費位點僅在新消費組第一次運行時生效,若後續任務重啟,則會基於上次消費位點繼續消費。
2022-06-21 00:00:00
批量推送条数
調用函數發送的最大批量訊息條數,當積壓的訊息數量到達設定值時才會發送請求,取值範圍為 [1,10000]。
100
批量推送间隔(单位:秒)
調用函數的間隔時間,系統每到間隔時間點會將訊息彙總後發給Function Compute,取值範圍為[0,15],單位為秒。0秒錶示無等待時間,直接投遞。
3
在Filtering(過濾)、Transform(轉換)及Sink(目標)設定精靈,設定事件過濾、轉換規則及事件目標。事件轉換的配置說明,請參見使用Function Compute實現訊息資料清洗。
任務屬性
設定事件流的重試策略及無效信件佇列。更多資訊,請參見重試和死信。
返回事件流頁面,找到建立好的事件流,在其右側操作欄,單擊啟用。
啟用事件流後,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。
事件樣本
在DTS資料庫建立資料訂閱任務時,當訂閱的執行個體類型為MySQL執行個體時,事件樣本如下:
{
"data": {
"id": 321****,
"topicPartition": {
"hash": 0,
"partition": 0,
"topic": "cn_hangzhou_rm_1234****_test_version2"
},
"offset": 3218099,
"sourceTimestamp": 1654847757,
"operationType": "UPDATE",
"schema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou--test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"beforeImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [
104,
101,
108,
108,
111
],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 9,
"capacity": 9,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 45
},
"afterImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [
98,
121,
101
],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 11,
"capacity": 11,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 47
}
},
"id": "12f701a43741d404fa9a7be89d9acae0-321****",
"source": "DTSstreamDemo",
"specversion": "1.0",
"type": "dts:ConsumeMessage",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-10T07:55:57Z",
"subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}CloudEvents規範中定義的參數解釋,請參見事件概述。
data欄位包含的參數解釋如下表所示。
參數 | 類型 | 說明 |
id | String | DTS資料ID。 |
topicPartition | Array | Topic的分區資訊。 |
hash | String | DTS底層儲存參數。 |
partition | String | Topic的分區。 |
topic | String | Topic的名稱。 |
offset | Int | DTS資料對應的訊息儲存位點。 |
sourceTimestamp | Int | DTS資料產生時間戳記。 |
operationType | String | DTS資料的操作類型。 |
schema | Array | 資料庫表結構資訊。 |
recordFields | Array | 欄位詳情記錄。 |
fieldName | String | 欄位名稱。 |
rawDataTypeNum | Int | 欄位類型映射值。 該值對應從資料訂閱通道中擷取的增量資料還原序列化後的dataTypeNumber欄位值,詳情請參見使用Kafka用戶端消費訂閱資料。 |
isPrimaryKey | Boolean | 欄位是否是主鍵。 |
isUniqueKey | Boolean | 欄位是否是唯一值。 |
fieldPosition | String | 欄位位置。 |
nameIndex | Array | 命名索引。 |
schemaId | String | 資料庫表結構資訊的ID。 |
databaseName | String | 資料庫名稱。 |
tableName | String | 資料表名稱。 |
primaryIndexInfo | String | 主鍵索引。 |
indexType | String | 主鍵索引類型。 |
indexFields | Array | 主鍵索引欄位內容。 |
cardinality | String | 主鍵基數。 |
nullable | Boolean | 主鍵是否可為null。 |
isFirstUniqueIndex | Boolean | 是否是第一個唯一索引。 |
uniqueIndexInfo | String | 唯一索引。 |
foreignIndexInfo | String | 外鍵索引。 |
normalIndexInfo | String | 普通索引。 |
databaseInfo | Array | 資料庫資訊。 |
databaseType | String | 資料庫類型。 |
version | String | 資料庫版本。 |
totalRows | Int | 資料表的總行數。 |
beforeImage | String | 操作前記錄欄位內容鏡像。 |
values | String | 記錄欄位的值。 |
size | Int | 記錄欄位大小。 |
afterImage | String | 操作後記錄欄位內容鏡像。 |