すべてのプロダクト
Search
ドキュメントセンター

EventBridge:DTS

最終更新日:Jan 11, 2025

このトピックでは、EventBridge コンソールで、イベントプロバイダーがデータ伝送サービス (DTS) であるイベントストリームを作成する方法について説明します。

前提条件

サポートされているリージョン

DTS は、中国 (杭州)、中国 (上海)、中国 (青島)、中国 (北京)、中国 (深圳)、中国 (広州)、中国 (成都)、中国 (香港) の各リージョンでイベントストリームのイベントプロバイダーとして使用できます。

手順

重要

EventBridge のイベントストリームは、DTS で INSERT、DELETE、UPDATE、および DDL ステートメントを実行することで管理されるデータのみを転送できます。

  1. EventBridge コンソール にログインします。左側のナビゲーションペインで、[イベントストリーム] をクリックします。

  2. 上部のナビゲーションバーで、リージョンを選択し、[イベントストリームの作成] をクリックします。

  3. [イベントストリームの作成] ページで、タスク名 パラメーターと 説明 パラメーターを設定し、画面の指示に従って他のパラメーターを設定します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。

    • タスクの作成

      1. [ソース] ステップで、データプロバイダー パラメーターを データ伝送サービス (DTS) に設定し、他のパラメーターを設定します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。

        パラメーター

        説明

        データサブスクリプションタスク

        DTS コンソール で作成した変更トラッキングタスクの ID。

        dts8jqe****

        アクセス方法

        変更トラッキングタスクのソースとなるデータベースインスタンスのアクセス方法。このパラメーターの値は変更できません。

        RDS

        インスタンス ID

        変更トラッキングタスクのソースとなるデータベースインスタンスの ID。このパラメーターの値は変更できません。

        rm-bp18mj3q2dzyb****

        コンシューマーグループ

        変更トラッキングタスクのデータを使用するために作成したコンシューマーグループの名前。

        説明

        コンシューマーグループが 1 つのクライアントでのみ実行されていることを確認してください。そうでない場合、指定された消費チェックポイントは無効になる可能性があります。

        test

        アカウント

        コンシューマーグループを作成するときに指定したアカウント名。

        test

        パスワード

        コンシューマーグループを作成するときに指定したアカウントパスワード。

        ******

        コンシューマーオフセット

        最初のデータエントリが使用される時刻。コンシューマーオフセットで指定されたデータエントリは、変更トラッキングタスクのデータ範囲内にある必要があります。

        説明

        指定したコンシューマーオフセットは、コンシューマーグループが初めてデータを使用するときにのみ有効になります。変更トラッキングタスクが再起動された場合、コンシューマーグループは最後に記録されたコンシューマーオフセットからデータを使用します。

        2022-06-21 00:00:00

        バッチプッシュ

        バッチプッシュ機能を使用すると、一度に複数のイベントを集約できます。この機能は、一括プッシュの件数 パラメーターまたは バッチプッシュ間隔 (単位:秒) パラメーターで指定された条件が満たされた場合にトリガーされます。

        たとえば、メッセージパラメーターを 100 に設定し、間隔 (単位: 秒) パラメーターを 15 に設定した場合、経過時間が 10 秒であっても、メッセージ数が 100 に達するとプッシュが実行されます。

        有効

        一括プッシュの件数

        各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。

        100

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを示します。

        3

      2. [フィルタリング][変換][シンク] の各ステップで、イベントフィルタリング方法、イベント変換ルール、イベントターゲットを設定します。イベント変換設定の詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。

    • タスクのプロパティ

      イベントストリームの再試行ポリシーとデッドレターキューを設定します。詳細については、「再試行ポリシーとデッドレターキュー」をご参照ください。

  4. [イベントストリーム] ページに戻り、作成したイベントストリームを見つけます。次に、[アクション] 列の [有効化] をクリックします。

    イベントストリームの有効化には、30 ~ 60 秒かかります。[イベントストリーム] ページのイベントストリームの [ステータス] 列で進行状況を確認できます。

イベントの例

次のコードは、DTS で MySQL データベースの変更トラッキングタスクが作成された後に生成されるイベントの例を示しています。

{
  "data": {
    "id": 321****,
    "topicPartition": {
      "hash": 0,
      "partition": 0,
      "topic": "cn_hangzhou_rm_1234****_test_version2"
    },
    "offset": 3218099,
    "sourceTimestamp": 1654847757,
    "operationType": "UPDATE",
    "schema": {
      "recordFields": [
        {
          "fieldName": "id",
          "rawDataTypeNum": 8,
          "isPrimaryKey": true,
          "isUniqueKey": false,
          "fieldPosition": 0
        },
        {
          "fieldName": "topic",
          "rawDataTypeNum": 253,
          "isPrimaryKey": false,
          "isUniqueKey": false,
          "fieldPosition": 1
        }
      ],
      "nameIndex": {
        "id": {
          "fieldName": "id",
          "rawDataTypeNum": 8,
          "isPrimaryKey": true,
          "isUniqueKey": false,
          "fieldPosition": 0
        },
        "topic": {
          "fieldName": "topic",
          "rawDataTypeNum": 253,
          "isPrimaryKey": false,
          "isUniqueKey": false,
          "fieldPosition": 1
        }
      },
      "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
      "databaseName": "hangzhou--test-db",
      "tableName": "message_info",
      "primaryIndexInfo": {
        "indexType": "PrimaryKey",
        "indexFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          }
        ],
        "cardinality": 0,
        "nullable": true,
        "isFirstUniqueIndex": false
      },
      "uniqueIndexInfo": [],
      "foreignIndexInfo": [],
      "normalIndexInfo": [],
      "databaseInfo": {
        "databaseType": "MySQL",
        "version": "5.7.35-log"
      },
      "totalRows": 0
    },
    "beforeImage": {
      "recordSchema": {
        "recordFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        ],
        "nameIndex": {
          "id": {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          "topic": {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        },
        "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "primaryIndexInfo": {
          "indexType": "PrimaryKey",
          "indexFields": [
            {
              "fieldName": "id",
              "rawDataTypeNum": 8,
              "isPrimaryKey": true,
              "isUniqueKey": false,
              "fieldPosition": 0
            }
          ],
          "cardinality": 0,
          "nullable": true,
          "isFirstUniqueIndex": false
        },
        "uniqueIndexInfo": [],
        "foreignIndexInfo": [],
        "normalIndexInfo": [],
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        },
        "totalRows": 0
      },
      "values": [
        {
          "data": 115
        },
        {
          "data": {
            "hb": [
              104,
              101,
              108,
              108,
              111
            ],
            "offset": 0,
            "isReadOnly": false,
            "bigEndian": true,
            "nativeByteOrder": false,
            "mark": -1,
            "position": 0,
            "limit": 9,
            "capacity": 9,
            "address": 0
          },
          "charset": "utf8mb4"
        }
      ],
      "size": 45
    },
    "afterImage": {
      "recordSchema": {
        "recordFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        ],
        "nameIndex": {
          "id": {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          "topic": {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        },
        "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "primaryIndexInfo": {
          "indexType": "PrimaryKey",
          "indexFields": [
            {
              "fieldName": "id",
              "rawDataTypeNum": 8,
              "isPrimaryKey": true,
              "isUniqueKey": false,
              "fieldPosition": 0
            }
          ],
          "cardinality": 0,
          "nullable": true,
          "isFirstUniqueIndex": false
        },
        "uniqueIndexInfo": [],
        "foreignIndexInfo": [],
        "normalIndexInfo": [],
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        },
        "totalRows": 0
      },
      "values": [
        {
          "data": 115
        },
        {
          "data": {
            "hb": [
              98,
              121,
              101
            ],
            "offset": 0,
            "isReadOnly": false,
            "bigEndian": true,
            "nativeByteOrder": false,
            "mark": -1,
            "position": 0,
            "limit": 11,
            "capacity": 11,
            "address": 0
          },
          "charset": "utf8mb4"
        }
      ],
      "size": 47
    }
  },
  "id": "12f701a43741d404fa9a7be89d9acae0-321****",
  "source": "DTSstreamDemo",
  "specversion": "1.0",
  "type": "dts:ConsumeMessage",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2022-06-10T07:55:57Z",
  "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}

CloudEvents 仕様で定義されているパラメーターの詳細については、「概要」をご参照ください。

次の表に、data フィールドに含まれるパラメーターを示します。

パラメーター

タイプ

説明

id

String

DTS データエントリの ID。

topicPartition

Array

イベントがプッシュされるトピックに関するパーティション情報。

hash

String

DTS の基盤となるストレージパラメーター。

partition

String

パーティション。

topic

String

トピック名。

offset

Int

DTS データエントリのオフセット。

sourceTimestamp

Int

DTS データエントリが生成された時刻を示すタイムスタンプ。

operationType

String

DTS データエントリに関連する操作のタイプ。

schema

Array

データベースに関するスキーマ情報。

recordFields

Array

フィールドの詳細。

fieldName

String

フィールド名。

rawDataTypeNum

Int

フィールドタイプのマップされた値。

このパラメーターの値は、変更トラッキングインスタンスからの逆シリアル化された増分データの dataTypeNumber フィールドの値に対応します。詳細については、「Kafka クライアントを使用して追跡データを使用する」をご参照ください。

isPrimaryKey

Boolean

フィールドが主キーフィールドかどうかを示します。

isUniqueKey

Boolean

フィールドに一意のキーがあるかどうかを示します。

fieldPosition

String

フィールドの位置。

nameIndex

Array

フィールド名に基づくフィールドのインデックス情報。

schemaId

String

データベーススキーマの ID。

databaseName

String

データベース名。

tableName

String

テーブル名。

primaryIndexInfo

String

主キーインデックス。

indexType

String

インデックスタイプ。

indexFields

Array

インデックスが作成されるフィールド。

cardinality

String

主キーのカーディナリティ。

nullable

Boolean

主キーを NULL にできるかどうかを示します。

isFirstUniqueIndex

Boolean

インデックスが最初の一意のインデックスかどうかを示します。

uniqueIndexInfo

String

一意のインデックス。

foreignIndexInfo

String

外部キーのインデックス。

normalIndexInfo

String

通常のインデックス。

databaseInfo

Array

データベースに関する情報。

databaseType

String

データベースエンジン。

version

String

データベースエンジンのバージョン。

totalRows

Int

テーブルの合計行数。

beforeImage

String

操作が実行される前のフィールド値を記録するイメージ。

values

String

記録されたフィールド値。

size

Int

記録されたフィールドのサイズ。

afterImage

String

操作が実行された後のフィールド値を記録するイメージ。