Data Transmission Service (DTS) は、EventBridge を介して Function Compute と統合するイベントソースです。この統合により、DTS トリガーは関連付けられた関数を呼び出して、データベース変更追跡タスクからのリアルタイム増分データを処理できます。このトピックでは、Function Compute コンソールで DTS トリガーを作成し、入力パラメーターを設定し、コードを記述してテストする方法について説明します。
概要
Function Compute コンソールでトリガーを作成すると、Function Compute はトリガーの設定に基づいて イベントストリーム を EventBridge に自動的に作成します。
トリガーが作成されると、Function Compute コンソールでその情報を表示できます。また、EventBridge コンソールで自動的に作成されたリソースを表示することもできます。DTS 変更追跡タスクがデータベースから増分データをキャプチャすると、トリガーは関連付けられた関数を呼び出します。その後、バッチ設定に基づいて、1 つ以上のメッセージイベントが処理のためにバッチで関数にプッシュされます。
注意事項
イベントソースとして使用される DTS 変更追跡タスクは、Function Compute 関数と同じリージョンにある必要があります。
作成されたイベントストリームの数が上限に達した場合、追加の DTS トリガーを作成することはできません。イベントストリームの数の制限に関する詳細については、「制限」をご参照ください。
前提条件
EventBridge
Function Compute
Data Transmission Service (DTS)
ステップ 1: DTS トリガーを作成する
Function Compute コンソールにログインします。左側のナビゲーションウィンドウで、 を選択します。
上部のナビゲーションバーで、リージョンを選択します。[関数] ページで、ターゲット関数をクリックします。
関数詳細ページで、[トリガー] タブをクリックし、[トリガーの作成] をクリックします。
[トリガーの作成] パネルでパラメーターを設定し、[OK] をクリックします。
次の表に、基本的な設定項目を示します。
設定項目
説明
例
トリガータイプ
トリガーのタイプ。サポートされているトリガータイプの詳細については、「トリガーの概要」をご参照ください。
DTS
名前
トリガーのカスタム名。
dts-trigger
バージョンまたはエイリアス
デフォルト値は LATEST です。別のバージョンまたはエイリアスのトリガーを作成するには、まず関数詳細ページの右上隅でそのバージョンまたはエイリアスに切り替えます。バージョンとエイリアスの詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。
LATEST
変更追跡タスク
既存の変更追跡タスクの名前。
dtsqntc2***
使用者グループ
追跡タスクからデータを消費するために作成された使用者グループの名前。
重要使用者グループが他のクライアントインスタンスで実行されていないことを確認してください。そうしないと、指定されたコンシューマオフセットが無効になる可能性があります。
test
アカウント
使用者グループを作成したときに設定したアカウント。
test
パスワード
使用者グループを作成したときに設定したパスワード。
******
コンシューマオフセット
最初のデータレコードを消費する予定のタイムスタンプ。コンシューマオフセットは、サブスクリプションインスタンスのタイムスタンプ範囲内である必要があります。
説明コンシューマオフセットは、新しい使用者グループが初めて実行されるときにのみ有効になります。後でタスクが再起動した場合、消費は最後のコンシューマオフセットから続行されます。
2022-06-21 00:00:00
呼び出しメソッド
関数を呼び出すメソッドを選択します。
有効な値は次のとおりです。
同期呼び出し: このメソッドは、シーケンシャルな呼び出しシナリオに適しています。単一のイベントまたはイベントのバッチが関数呼び出しをトリガーします。システムは、次のイベントまたはイベントのバッチが別の関数呼び出しをトリガーする前に関数が実行されて結果を返すのを待ちます。同期呼び出しリクエストの最大ペイロードは 32 MB です。詳細については、「同期呼び出し」をご参照ください。
非同期呼び出し: このメソッドを使用すると、イベントを迅速に消費できます。単一のイベントまたはイベントのバッチが関数呼び出しをトリガーします。Function Compute はすぐにレスポンスを返し、次のイベントまたはイベントのバッチが別の関数呼び出しをトリガーできます。このプロセス中、関数は非同期で実行されます。非同期呼び出しリクエストの最大ペイロードは 128 KB です。詳細については、「概要」をご参照ください。
同期呼び出し
トリガーの状態
作成直後にトリガーを有効にするかどうかを指定します。デフォルトでは、[トリガーを有効にする] が選択されており、作成直後にトリガーが有効になることを意味します。
有効
プッシュ設定、再試行、デッドレターキューなどの詳細設定項目については、「高度な機能」をご参照ください。
トリガーが作成されると、[トリガー名] リストに表示されます。トリガーを変更または削除するには、「トリガーの管理」をご参照ください。
ステップ 2: 関数の入力パラメーターを設定する
DTS イベントソースは、event 入力パラメーターを関数に渡します。手動で event を関数に渡して、トリガーイベントをシミュレートできます。
関数詳細ページの[コード]タブで、[関数をテスト] の横にある
アイコンをクリックし、ドロップダウンリストから[テストパラメーターを設定]を選択します。[テストパラメーターの設定] パネルで、[新しいテストイベントの作成] または [既存のテストイベントの編集] を選択し、イベント名と内容を入力して、[OK] をクリックします。
eventは次のフォーマットを使用します。[ { "data": { "id": 321****, "topicPartition": { "hash": 0, "partition": 0, "topic": "cn_hangzhou_rm_1234****_test_version2" }, "offset": 3218099, "sourceTimestamp": 1654847757, "operationType": "UPDATE", "schema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou--test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "beforeImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 104, 101, 108, 108, 111 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 9, "capacity": 9, "address": 0 }, "charset": "utf8mb4" } ], "size": 45 }, "afterImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 98, 121, 101 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 11, "capacity": 11, "address": 0 }, "charset": "utf8mb4" } ], "size": 47 } }, "id": "12f701a43741d404fa9a7be89d9acae0-321****", "source": "DTSstreamDemo", "specversion": "1.0", "type": "dts:ConsumeMessage", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-10T07:55:57Z", "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro" } ]CloudEvents 仕様で定義されているパラメーターについては、「イベントの概要」をご参照ください。
次の表に、data フィールドに含まれるパラメーターを示します。
パラメーター
タイプ
説明
id
String
DTS データ入力の ID。
topicPartition
Array
イベントがプッシュされる Topic のパーティション情報。
hash
String
DTS の基盤となるストレージパラメーター。
partition
String
パーティション。
topic
String
Topic 名。
offset
Int
DTS データ入力のオフセット。
sourceTimestamp
Int
DTS データ入力が生成された日時を示すタイムスタンプ。
operationType
String
DTS データ入力に関わる操作のタイプ。
schema
Array
データベースに関するスキーマ情報。
recordFields
Array
フィールドの詳細。
fieldName
String
フィールド名。
rawDataTypeNum
Int
フィールドタイプのマップされた値。
このパラメーターの値は、変更追跡インスタンスから逆シリアル化された増分データの dataTypeNumber フィールドの値に対応します。詳細については、「Kafka クライアントを使用して追跡データを消費する」をご参照ください。
isPrimaryKey
Boolean
フィールドがプライマリキーフィールドであるかどうかを示します。
isUniqueKey
Boolean
フィールドに一意なインデックスがあるかどうかを示します。
fieldPosition
String
フィールドの位置。
nameIndex
Array
フィールド名に基づくフィールドのインデックス情報。
schemaId
String
データベーススキーマの ID。
databaseName
String
データベース名。
tableName
String
テーブル名。
primaryIndexInfo
String
主キーインデックス。
indexType
String
インデックスタイプ。
indexFields
Array
インデックスが作成されるフィールド。
cardinality
String
プライマリキーのカーディナリティ。
nullable
Boolean
プライマリキーが null にできるかどうかを示します。
isFirstUniqueIndex
Boolean
インデックスが最初の一意なインデックスであるかどうかを示します。
uniqueIndexInfo
String
一意なインデックス。
foreignIndexInfo
String
外部キーのインデックス。
normalIndexInfo
String
通常のインデックス。
databaseInfo
Array
データベースに関する情報。
databaseType
String
データベースエンジン。
version
String
データベースエンジンのバージョン。
totalRows
Int
テーブル内の行の総数。
beforeImage
String
操作が実行される前のフィールド値を記録するイメージ。
values
String
記録されたフィールド値。
size
Int
記録されたフィールドのサイズ。
afterImage
String
操作が実行された後のフィールド値を記録するイメージ。
ステップ 3: 関数コードを記述してテストする
トリガーを作成した後、関数コードを記述してテストできます。DTS 変更追跡タスクがデータベースから増分データをキャプチャすると、トリガーは自動的に関数を呼び出します。
関数詳細ページの [コード] タブで、コードエディタにコードを記述し、[コードのデプロイ] をクリックします。
このトピックでは、Node.js コードを例として使用します。
'use strict'; /* 初期化機能を有効にするには、 次のように initializer 関数を実装します: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // event パラメーターを解析し、イベントを処理します。 callback(null, 'return result'); }[関数のテスト] をクリックします。
詳細情報
Function Compute コンソールに加えて、次の方法でトリガーを設定することもできます:
Serverless Devs ツールを使用してトリガーを設定します。詳細については、「Serverless Devs の共通コマンド」をご参照ください。
SDK を使用してトリガーを設定します。詳細については、「SDK」をご参照ください。
トリガーを変更または削除するには、「トリガーの管理」をご参照ください。