Data Transmission Service (DTS) はイベントソースとして機能し、EventBridge を介して Function Compute と統合されます。統合後、DTS トリガーを使用して関連付けられた関数を呼び出すことができます。これにより、DTS の変更追跡タスクから取得したリアルタイムの増分データに対してカスタム処理を実行できます。このトピックでは、Function Compute コンソールで DTS トリガーを作成し、関数の入力パラメーターを設定し、コードを記述してテストする方法について説明します。
概要
Function Compute コンソールでトリガー作成リクエストを送信すると、Function Compute はトリガー設定に基づいて EventBridge にイベントストリームリソースを自動的に作成します。
リソースが作成されると、Function Compute コンソールでトリガー情報を表示できます。また、EventBridge コンソールで自動的に作成されたリソースに関する情報を表示することもできます。DTS の変更追跡タスクがデータベースから増分データをキャプチャすると、関連付けられた関数が呼び出されます。バッチ設定に基づいて、1 つ以上のメッセージイベントが処理のために関数にバッチでプッシュされます。
注意事項
トリガーソースとして機能する DTS の変更追跡タスクは、Function Compute の関数と同じリージョンにある必要があります。
作成されたイベントストリームの数が上限に達した場合、それ以上 DTS トリガーを作成することはできません。イベントストリームの制限の詳細については、「制限事項」をご参照ください。
前提条件
EventBridge
Function Compute
Data Transmission Service (DTS)
ステップ 1:DTS トリガーの作成
Function Compute コンソールにログインします。左側のナビゲーションウィンドウで、 を選択します。
上部のナビゲーションバーでリージョンを選択します。[関数] ページで、対象の関数をクリックします。
関数詳細ページで、トリガー タブをクリックし、トリガーの作成 をクリックします。
[トリガーの作成] パネルでパラメーターを設定し、[OK] をクリックします。
次の表に基本パラメーターを示します。
設定項目
説明
例
トリガータイプ
トリガーのタイプ。サポートされているトリガータイプの詳細については、「トリガーの概要」をご参照ください。
DTS
名前
トリガーのカスタム名。
dts-trigger
バージョンまたはエイリアス
デフォルト値は LATEST です。別のバージョンまたはエイリアスのトリガーを作成する場合は、関数詳細ページの右上隅でそのバージョンまたはエイリアスに切り替えます。バージョンとエイリアスの詳細については、「バージョン管理」および「エイリアス管理」をご参照ください。
LATEST
変更追跡タスク
既存の変更追跡タスクの名前。
dtsqntc2***
コンシューマーグループ
追跡タスクからデータを消費するために使用されるコンシューマーグループの名前。
重要コンシューマーグループが他のクライアントのインスタンスで実行されていないことを確認してください。そうでない場合、指定されたコンシューマオフセットが無効になる可能性があります。
test
アカウント
コンシューマーグループを作成したときに指定したアカウント。
test
パスワード
コンシューマーグループを作成したときに指定したパスワード。
******
コンシューマーオフセット
最初のデータレコードを消費するタイムスタンプ。コンシューマオフセットは、追跡タスクのタイムスタンプ範囲内である必要があります。
説明コンシューマオフセットは、新しいコンシューマーグループが初めて実行されるときにのみ有効になります。後でタスクが再起動した場合、消費は最後のコンシューマオフセットから続行されます。
2022-06-21 00:00:00
呼び出しメソッド
関数の呼び出しメソッドを選択します。
値の説明は次のとおりです。
同期呼び出し:このメソッドは、シーケンシャルな呼び出しシナリオに適しています。単一のイベントまたはイベントのバッチが関数の呼び出しをトリガーします。システムは、次のイベントまたはイベントのバッチが別の関数の呼び出しをトリガーする前に関数が実行され、結果が返されるのを待ちます。同期呼び出しリクエストの最大ペイロードは 32 MB です。詳細については、「同期呼び出し」をご参照ください。
非同期呼び出し:このメソッドを使用すると、イベントを迅速に消費できます。単一のイベントまたはイベントのバッチが関数の呼び出しをトリガーします。Function Compute はすぐにレスポンスを返し、次のイベントまたはイベントのバッチが別の関数の呼び出しをトリガーします。このプロセス中、関数は非同期に実行されます。非同期呼び出しリクエストの最大ペイロードは 128 KB です。詳細については、「概要」をご参照ください。
同期呼び出し
トリガーの有効化のステータス
作成直後にトリガーを有効にするかどうかを指定します。デフォルトでは、トリガーを有効にする が選択されています。これは、トリガーが作成されるとすぐに有効になることを意味します。
有効
プッシュ設定、再試行、デッドレターキューなどの詳細設定については、「詳細機能」をご参照ください。
トリガーが作成されると、[トリガー名] リストに表示されます。トリガーを変更または削除するには、「トリガーの管理」をご参照ください。
ステップ 2:関数の入力パラメーターの設定
DTS イベントソースは、入力パラメーターとして event を関数に渡します。手動で event を関数に渡して、トリガーイベントをシミュレートできます。
関数詳細ページの[コード] タブで、[テスト関数] の横にある
アイコンをクリックし、ドロップダウンリストから [テストパラメーターを設定] を選択します。テストパラメーターの設定 パネルで、新規テストイベントの作成 または 既存のテストイベントの変更 を選択します。次に、イベント名と内容を入力し、[OK] をクリックします。
eventのフォーマットは次のとおりです:[ { "data": { "id": 321****, "topicPartition": { "hash": 0, "partition": 0, "topic": "cn_hangzhou_rm_1234****_test_version2" }, "offset": 3218099, "sourceTimestamp": 1654847757, "operationType": "UPDATE", "schema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou--test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "beforeImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 104, 101, 108, 108, 111 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 9, "capacity": 9, "address": 0 }, "charset": "utf8mb4" } ], "size": 45 }, "afterImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 98, 121, 101 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 11, "capacity": 11, "address": 0 }, "charset": "utf8mb4" } ], "size": 47 } }, "id": "12f701a43741d404fa9a7be89d9acae0-321****", "source": "DTSstreamDemo", "specversion": "1.0", "type": "dts:ConsumeMessage", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-10T07:55:57Z", "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro" } ]次の表に、data フィールドのパラメーターを示します。
パラメーター
タイプ
説明
id
String
DTS データ ID。
topicPartition
Array
トピックパーティションに関する情報。
hash
String
DTS の基盤となるストレージパラメーター。
partition
String
トピックのパーティション。
topic
String
トピックの名前。
offset
Int
DTS データのログブローカーのオフセットを指定します。
sourceTimestamp
Int
DTS データが生成されたタイムスタンプ。
operationType
String
DTS データの操作タイプ。
schema
Array
データベーステーブルスキーマに関する情報。
recordFields
Array
フィールドレコードの詳細。
fieldName
String
フィールドの名前。
rawDataTypeNum
Int
フィールドタイプのマッピング値。
この値は、変更追跡チャネルから取得した増分データが逆シリアル化された後の `dataTypeNumber` フィールド値に対応します。詳細については、「Kafka クライアントを使用したサブスクライブ済みデータの消費」をご参照ください。
isPrimaryKey
Boolean
フィールドがプライマリキーであるかどうかを示します。
isUniqueKey
Boolean
フィールドに一意の値があるかどうかを示します。
fieldPosition
String
フィールドの位置。
nameIndex
Array
名前付きインデックス。
schemaId
String
データベーステーブルスキーマ情報の ID。
databaseName
String
データベースの名前。
tableName
String
データテーブルの名前。
primaryIndexInfo
String
プライマリキーインデックス。
indexType
String
プライマリキーインデックスのタイプ。
indexFields
Array
プライマリキーインデックスフィールドの内容。
cardinality
String
プライマリキーのカーディナリティ。
nullable
Boolean
プライマリキーが null にできるかどうかを示します。
isFirstUniqueIndex
Boolean
これが最初の一意なインデックスであるかどうかを示します。
uniqueIndexInfo
String
一意なインデックス。
foreignIndexInfo
String
外部キーインデックス。
normalIndexInfo
String
通常のインデックス。
databaseInfo
Array
データベースに関する情報。
databaseType
String
データベースのタイプ。
version
String
データベースのバージョン。
totalRows
Int
データテーブルの総行数。
beforeImage
String
操作前のレコードフィールドの内容のイメージ。
values
String
レコードフィールドの値。
size
Int
レコードフィールドのサイズ。
afterImage
String
操作後のレコードフィールドの内容のイメージ。
ステップ 3:関数コードの記述とテスト
トリガーを作成した後、関数コードを記述してテストし、その正しさを検証できます。実際には、DTS の変更追跡タスクがデータベースから増分データをキャプチャすると、トリガーが自動的に関数を呼び出します。
関数詳細ページの コード タブで、コードエディタにコードを記述し、デプロイメントコード をクリックします。
このトピックでは、Node.js コードを例として使用します。
'use strict'; /* 初期化機能を有効にするには、 次のように初期化関数を実装します: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // イベントパラメーターを解析し、イベントを処理します。 callback(null, 'return result'); }関数のテスト をクリックします。
詳細情報
Function Compute コンソールに加えて、次の方法でトリガーを設定することもできます:
Serverless Devs ツールを使用してトリガーを設定します。詳細については、「Serverless Devs の一般的なコマンド」をご参照ください。
SDK を使用してトリガーを設定します。詳細については、「SDK」をご参照ください。
トリガーを変更または削除するには、「トリガーの管理」をご参照ください。