全部產品
Search
文件中心

EventBridge:Data Transmission Service

更新時間:Dec 27, 2024

本文介紹如何在事件匯流排EventBridge控制台添加Data Transmission Service作為事件流中的事件提供方。

前提條件

支援地區

支援將事件流中的事件提供方設定為Data Transmission Service的地區有華東1(杭州)、華東2(上海)、華北1(青島)、華北2(北京)、華南1(深圳)、華南3(廣州)、西南1(成都)、中國香港。

操作步驟

重要

事件匯流排EventBridge的事件流僅中轉操作類型為INSERT、DELETE、UPDATE和DDL的DTS資料。

  1. 登入事件匯流排EventBridge控制台,在左側導覽列,單擊事件流
  2. 在頂部功能表列,選擇地區,然後單擊建立事件流
  3. 建立事件流面板,設定任務名稱描述,然後配置以下參數,最後單擊儲存

    • 任務建立:

      1. Source(源)設定精靈,選擇資料提供方資料庫傳輸服務 DTS(資料庫),設定以下參數,然後單擊下一步

        參數

        說明

        樣本

        資料訂閱任務

        選擇您在Data Transmission Service控制台上建立的資料訂閱任務名稱。

        dts8jqe****

        接入方式

        預設為建立的資料訂閱任務的接入方式且不可更改。

        RDS

        執行個體ID

        預設為建立資料訂閱任務時訂閱的執行個體且不可更改。

        rm-bp18mj3q2dzyb****

        消費組

        在前提條件中建立的用於消費訂閱任務資料的消費組名稱。

        說明

        請確保該消費組沒有在其他用戶端的執行個體上運行,否則可能導致傳入的消費位點失效。

        test

        帳號

        建立消費組時設定的帳號。

        test

        密碼

        建立消費組時設定的密碼。

        ******

        消費位點

        期望消費第一條資料的時間戳記。消費位點必須在訂閱執行個體的資料範圍之內。

        說明

        消費位點僅在新消費組第一次運行時生效,若後續任務重啟,則會基於上次消費位點繼續消費。

        2022-06-21 00:00:00

        批量推送条数

        調用函數發送的最大批量訊息條數,當積壓的訊息數量到達設定值時才會發送請求,取值範圍為 [1,10000]。

        100

        批量推送间隔(单位:秒)

        調用函數的間隔時間,系統每到間隔時間點會將訊息彙總後發給Function Compute,取值範圍為[0,15],單位為秒。0秒錶示無等待時間,直接投遞。

        3

      2. Filtering(過濾)Transform(轉換)Sink(目標)設定精靈,設定事件過濾、轉換規則及事件目標。事件轉換的配置說明,請參見使用Function Compute實現訊息資料清洗

    • 任務屬性

      設定事件流的重試策略及無效信件佇列。更多資訊,請參見重試和死信

  4. 返回事件流頁面,找到建立好的事件流,在其右側操作欄,單擊啟用

    啟用事件流後,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。

事件樣本

在DTS資料庫建立資料訂閱任務時,當訂閱的執行個體類型為MySQL執行個體時,事件樣本如下:

{
  "data": {
    "id": 321****,
    "topicPartition": {
      "hash": 0,
      "partition": 0,
      "topic": "cn_hangzhou_rm_1234****_test_version2"
    },
    "offset": 3218099,
    "sourceTimestamp": 1654847757,
    "operationType": "UPDATE",
    "schema": {
      "recordFields": [
        {
          "fieldName": "id",
          "rawDataTypeNum": 8,
          "isPrimaryKey": true,
          "isUniqueKey": false,
          "fieldPosition": 0
        },
        {
          "fieldName": "topic",
          "rawDataTypeNum": 253,
          "isPrimaryKey": false,
          "isUniqueKey": false,
          "fieldPosition": 1
        }
      ],
      "nameIndex": {
        "id": {
          "fieldName": "id",
          "rawDataTypeNum": 8,
          "isPrimaryKey": true,
          "isUniqueKey": false,
          "fieldPosition": 0
        },
        "topic": {
          "fieldName": "topic",
          "rawDataTypeNum": 253,
          "isPrimaryKey": false,
          "isUniqueKey": false,
          "fieldPosition": 1
        }
      },
      "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
      "databaseName": "hangzhou--test-db",
      "tableName": "message_info",
      "primaryIndexInfo": {
        "indexType": "PrimaryKey",
        "indexFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          }
        ],
        "cardinality": 0,
        "nullable": true,
        "isFirstUniqueIndex": false
      },
      "uniqueIndexInfo": [],
      "foreignIndexInfo": [],
      "normalIndexInfo": [],
      "databaseInfo": {
        "databaseType": "MySQL",
        "version": "5.7.35-log"
      },
      "totalRows": 0
    },
    "beforeImage": {
      "recordSchema": {
        "recordFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        ],
        "nameIndex": {
          "id": {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          "topic": {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        },
        "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "primaryIndexInfo": {
          "indexType": "PrimaryKey",
          "indexFields": [
            {
              "fieldName": "id",
              "rawDataTypeNum": 8,
              "isPrimaryKey": true,
              "isUniqueKey": false,
              "fieldPosition": 0
            }
          ],
          "cardinality": 0,
          "nullable": true,
          "isFirstUniqueIndex": false
        },
        "uniqueIndexInfo": [],
        "foreignIndexInfo": [],
        "normalIndexInfo": [],
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        },
        "totalRows": 0
      },
      "values": [
        {
          "data": 115
        },
        {
          "data": {
            "hb": [
              104,
              101,
              108,
              108,
              111
            ],
            "offset": 0,
            "isReadOnly": false,
            "bigEndian": true,
            "nativeByteOrder": false,
            "mark": -1,
            "position": 0,
            "limit": 9,
            "capacity": 9,
            "address": 0
          },
          "charset": "utf8mb4"
        }
      ],
      "size": 45
    },
    "afterImage": {
      "recordSchema": {
        "recordFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        ],
        "nameIndex": {
          "id": {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          "topic": {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        },
        "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "primaryIndexInfo": {
          "indexType": "PrimaryKey",
          "indexFields": [
            {
              "fieldName": "id",
              "rawDataTypeNum": 8,
              "isPrimaryKey": true,
              "isUniqueKey": false,
              "fieldPosition": 0
            }
          ],
          "cardinality": 0,
          "nullable": true,
          "isFirstUniqueIndex": false
        },
        "uniqueIndexInfo": [],
        "foreignIndexInfo": [],
        "normalIndexInfo": [],
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        },
        "totalRows": 0
      },
      "values": [
        {
          "data": 115
        },
        {
          "data": {
            "hb": [
              98,
              121,
              101
            ],
            "offset": 0,
            "isReadOnly": false,
            "bigEndian": true,
            "nativeByteOrder": false,
            "mark": -1,
            "position": 0,
            "limit": 11,
            "capacity": 11,
            "address": 0
          },
          "charset": "utf8mb4"
        }
      ],
      "size": 47
    }
  },
  "id": "12f701a43741d404fa9a7be89d9acae0-321****",
  "source": "DTSstreamDemo",
  "specversion": "1.0",
  "type": "dts:ConsumeMessage",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2022-06-10T07:55:57Z",
  "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}

CloudEvents規範中定義的參數解釋,請參見事件概述

data欄位包含的參數解釋如下表所示。

參數

類型

說明

id

String

DTS資料ID。

topicPartition

Array

Topic的分區資訊。

hash

String

DTS底層儲存參數。

partition

String

Topic的分區。

topic

String

Topic的名稱。

offset

Int

DTS資料對應的訊息儲存位點。

sourceTimestamp

Int

DTS資料產生時間戳記。

operationType

String

DTS資料的操作類型。

schema

Array

資料庫表結構資訊。

recordFields

Array

欄位詳情記錄。

fieldName

String

欄位名稱。

rawDataTypeNum

Int

欄位類型映射值。

該值對應從資料訂閱通道中擷取的增量資料還原序列化後的dataTypeNumber欄位值,詳情請參見使用Kafka用戶端消費訂閱資料

isPrimaryKey

Boolean

欄位是否是主鍵。

isUniqueKey

Boolean

欄位是否是唯一值。

fieldPosition

String

欄位位置。

nameIndex

Array

命名索引。

schemaId

String

資料庫表結構資訊的ID。

databaseName

String

資料庫名稱。

tableName

String

資料表名稱。

primaryIndexInfo

String

主鍵索引。

indexType

String

主鍵索引類型。

indexFields

Array

主鍵索引欄位內容。

cardinality

String

主鍵基數。

nullable

Boolean

主鍵是否可為null。

isFirstUniqueIndex

Boolean

是否是第一個唯一索引。

uniqueIndexInfo

String

唯一索引。

foreignIndexInfo

String

外鍵索引。

normalIndexInfo

String

普通索引。

databaseInfo

Array

資料庫資訊。

databaseType

String

資料庫類型。

version

String

資料庫版本。

totalRows

Int

資料表的總行數。

beforeImage

String

操作前記錄欄位內容鏡像。

values

String

記錄欄位的值。

size

Int

記錄欄位大小。

afterImage

String

操作後記錄欄位內容鏡像。