このトピックでは、EventBridge コンソールで、イベントプロバイダーがデータ伝送サービス (DTS) であるイベントストリームを作成する方法について説明します。
前提条件
変更トラッキングタスクが DTS コンソール で作成され、通常 状態になっています。詳細については、「変更トラッキングタスクの管理」をご参照ください。
変更トラッキングタスクにコンシューマーグループが作成されています。詳細については、「コンシューマーグループの作成」をご参照ください。
EventBridge がアクティブ化され、必要な権限がResource Access Management (RAM) ユーザーに付与されています。詳細については、「EventBridge のアクティブ化と RAM ユーザーへの権限の付与」をご参照ください。
サポートされているリージョン
DTS は、中国 (杭州)、中国 (上海)、中国 (青島)、中国 (北京)、中国 (深圳)、中国 (広州)、中国 (成都)、中国 (香港) の各リージョンでイベントストリームのイベントプロバイダーとして使用できます。
手順
EventBridge のイベントストリームは、DTS で INSERT、DELETE、UPDATE、および DDL ステートメントを実行することで管理されるデータのみを転送できます。
EventBridge コンソール にログインします。左側のナビゲーションペインで、[イベントストリーム] をクリックします。
上部のナビゲーションバーで、リージョンを選択し、[イベントストリームの作成] をクリックします。
[イベントストリームの作成] ページで、タスク名 パラメーターと 説明 パラメーターを設定し、画面の指示に従って他のパラメーターを設定します。次に、[保存] をクリックします。次のセクションでは、パラメーターについて説明します。
タスクの作成
[ソース] ステップで、データプロバイダー パラメーターを データ伝送サービス (DTS) に設定し、他のパラメーターを設定します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
データサブスクリプションタスク
DTS コンソール で作成した変更トラッキングタスクの ID。
dts8jqe****
アクセス方法
変更トラッキングタスクのソースとなるデータベースインスタンスのアクセス方法。このパラメーターの値は変更できません。
RDS
インスタンス ID
変更トラッキングタスクのソースとなるデータベースインスタンスの ID。このパラメーターの値は変更できません。
rm-bp18mj3q2dzyb****
コンシューマーグループ
変更トラッキングタスクのデータを使用するために作成したコンシューマーグループの名前。
説明コンシューマーグループが 1 つのクライアントでのみ実行されていることを確認してください。そうでない場合、指定された消費チェックポイントは無効になる可能性があります。
test
アカウント
コンシューマーグループを作成するときに指定したアカウント名。
test
パスワード
コンシューマーグループを作成するときに指定したアカウントパスワード。
******
コンシューマーオフセット
最初のデータエントリが使用される時刻。コンシューマーオフセットで指定されたデータエントリは、変更トラッキングタスクのデータ範囲内にある必要があります。
説明指定したコンシューマーオフセットは、コンシューマーグループが初めてデータを使用するときにのみ有効になります。変更トラッキングタスクが再起動された場合、コンシューマーグループは最後に記録されたコンシューマーオフセットからデータを使用します。
2022-06-21 00:00:00
バッチプッシュ
バッチプッシュ機能を使用すると、一度に複数のイベントを集約できます。この機能は、一括プッシュの件数 パラメーターまたは バッチプッシュ間隔 (単位:秒) パラメーターで指定された条件が満たされた場合にトリガーされます。
たとえば、メッセージパラメーターを 100 に設定し、間隔 (単位: 秒) パラメーターを 15 に設定した場合、経過時間が 10 秒であっても、メッセージ数が 100 に達するとプッシュが実行されます。
有効
一括プッシュの件数
各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。
100
バッチプッシュ間隔 (単位:秒)
関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを示します。
3
[フィルタリング]、[変換]、[シンク] の各ステップで、イベントフィルタリング方法、イベント変換ルール、イベントターゲットを設定します。イベント変換設定の詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。
タスクのプロパティ
イベントストリームの再試行ポリシーとデッドレターキューを設定します。詳細については、「再試行ポリシーとデッドレターキュー」をご参照ください。
[イベントストリーム] ページに戻り、作成したイベントストリームを見つけます。次に、[アクション] 列の [有効化] をクリックします。
イベントストリームの有効化には、30 ~ 60 秒かかります。[イベントストリーム] ページのイベントストリームの [ステータス] 列で進行状況を確認できます。
イベントの例
次のコードは、DTS で MySQL データベースの変更トラッキングタスクが作成された後に生成されるイベントの例を示しています。
{
"data": {
"id": 321****,
"topicPartition": {
"hash": 0,
"partition": 0,
"topic": "cn_hangzhou_rm_1234****_test_version2"
},
"offset": 3218099,
"sourceTimestamp": 1654847757,
"operationType": "UPDATE",
"schema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou--test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"beforeImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [
104,
101,
108,
108,
111
],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 9,
"capacity": 9,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 45
},
"afterImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [
98,
121,
101
],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 11,
"capacity": 11,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 47
}
},
"id": "12f701a43741d404fa9a7be89d9acae0-321****",
"source": "DTSstreamDemo",
"specversion": "1.0",
"type": "dts:ConsumeMessage",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-10T07:55:57Z",
"subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}CloudEvents 仕様で定義されているパラメーターの詳細については、「概要」をご参照ください。
次の表に、data フィールドに含まれるパラメーターを示します。
パラメーター | タイプ | 説明 |
id | String | DTS データエントリの ID。 |
topicPartition | Array | イベントがプッシュされるトピックに関するパーティション情報。 |
hash | String | DTS の基盤となるストレージパラメーター。 |
partition | String | パーティション。 |
topic | String | トピック名。 |
offset | Int | DTS データエントリのオフセット。 |
sourceTimestamp | Int | DTS データエントリが生成された時刻を示すタイムスタンプ。 |
operationType | String | DTS データエントリに関連する操作のタイプ。 |
schema | Array | データベースに関するスキーマ情報。 |
recordFields | Array | フィールドの詳細。 |
fieldName | String | フィールド名。 |
rawDataTypeNum | Int | フィールドタイプのマップされた値。 このパラメーターの値は、変更トラッキングインスタンスからの逆シリアル化された増分データの dataTypeNumber フィールドの値に対応します。詳細については、「Kafka クライアントを使用して追跡データを使用する」をご参照ください。 |
isPrimaryKey | Boolean | フィールドが主キーフィールドかどうかを示します。 |
isUniqueKey | Boolean | フィールドに一意のキーがあるかどうかを示します。 |
fieldPosition | String | フィールドの位置。 |
nameIndex | Array | フィールド名に基づくフィールドのインデックス情報。 |
schemaId | String | データベーススキーマの ID。 |
databaseName | String | データベース名。 |
tableName | String | テーブル名。 |
primaryIndexInfo | String | 主キーインデックス。 |
indexType | String | インデックスタイプ。 |
indexFields | Array | インデックスが作成されるフィールド。 |
cardinality | String | 主キーのカーディナリティ。 |
nullable | Boolean | 主キーを NULL にできるかどうかを示します。 |
isFirstUniqueIndex | Boolean | インデックスが最初の一意のインデックスかどうかを示します。 |
uniqueIndexInfo | String | 一意のインデックス。 |
foreignIndexInfo | String | 外部キーのインデックス。 |
normalIndexInfo | String | 通常のインデックス。 |
databaseInfo | Array | データベースに関する情報。 |
databaseType | String | データベースエンジン。 |
version | String | データベースエンジンのバージョン。 |
totalRows | Int | テーブルの合計行数。 |
beforeImage | String | 操作が実行される前のフィールド値を記録するイメージ。 |
values | String | 記録されたフィールド値。 |
size | Int | 記録されたフィールドのサイズ。 |
afterImage | String | 操作が実行された後のフィールド値を記録するイメージ。 |