Tablestore は、Apsara システム上に構築された分散 NoSQL データストレージサービスです。Tablestore トリガーを作成して、Tablestore を Function Compute のイベントソースとして接続できます。指定されたイベントが発生すると、Function Compute の関数が自動的にトリガーされ、Tablestore データを処理します。
シナリオ
次の図は、Tablestore トリガーの典型的なシナリオを示しています。
データソースはデータをテーブル A に格納します。データの更新は、データをクレンジングし、クレンジングされたデータを直接読み取りするためにテーブル B に格納する関数をトリガーします。プロセス全体は、弾力性とスケーラビリティを備えたサーバーレス Web アプリケーションです。
前提条件
制限事項
Tablestore トリガーは、中国 (北京)、中国 (杭州)、中国 (上海)、中国 (深セン)、日本 (東京)、シンガポール、ドイツ (フランクフルト)、中国 (香港) の各リージョンでサポートされています。
Tablestore テーブルは、Function Compute の関連サービスと同じリージョンに存在する必要があります。
内部ネットワーク経由で Tablestore にアクセスする場合は、Virtual Private Cloud (VPC) エンドポイントを使用できます。形式は {instance}.{region}.vpc.tablestore.aliyuncs.com です。
Tablestore トリガーによって呼び出される関数の処理時間は 1 分を超えることはできません。
注意事項
関数コードを記述する際は、呼び出しループを避けてください。たとえば、テーブル A が関数 B をトリガーしてテーブル A のデータを更新し、それが再び関数 B をトリガーするようなロジックは、呼び出しループを引き起こします。
関数の処理中にエラーが発生した場合、関数は Tablestore のログデータの有効期限が切れるまで再試行を続けます。
説明関数の処理例外は、次のいずれかのシナリオで発生します。
関数インスタンスは開始されましたが、関数コードが想定どおりに動作していません。この場合、インスタンスの料金が発生します。
起動コマンドエラーなどの理由により、関数インスタンスが開始に失敗しました。この場合、インスタンスの料金は発生しません。
関数の処理例外が発生した場合は、データテーブルの Stream 機能を無効にして、関数が無期限に再試行されないようにすることができます。Stream 機能を無効にする前に、他のトリガーがデータテーブルを使用していないことを確認してください。そうでない場合、これらのトリガーは想定どおりに動作しない可能性があります。
手順 1: データテーブルの Stream 機能を有効にする
トリガーを作成する前に、Tablestore コンソールでデータテーブルの Stream 機能を有効にして、関数がテーブルに書き込まれた増分データを処理できるようにする必要があります。
Tablestore コンソール にログインします。
上部のナビゲーションバーで、リージョンを選択します。
[概要] ページで、インスタンスエイリアスをクリックするか、[アクション] 列の [インスタンスの管理] をクリックします。
[インスタンスの詳細] タブで、[データテーブル] タブをクリックします。次に、データテーブルの名前をクリックして [Stream] タブを選択するか、
をクリックして [Stream] を選択します。[Stream] タブで、「Stream 情報」の横にある [有効化] をクリックします。
[Stream の有効化] ダイアログボックスで、「ログの有効期限」パラメーターを設定し、[有効化] をクリックします。
「ログの有効期限」パラメーターの値は、ゼロ以外の整数である必要があります。単位: 時間。最大値: 168。
重要「ログの有効期限」パラメーターは、指定した後は変更できません。注意して進めてください。
手順 2: Tablestore トリガーを作成する
Function Compute コンソール にログインします。左側のナビゲーションウィンドウで、[関数] をクリックします。
上部のナビゲーションバーで、リージョンを選択します。[関数] ページで、管理する関数をクリックします。
「関数の詳細」ページで、[構成] タブをクリックします。左側のナビゲーションウィンドウで、[トリガー] をクリックします。次に、[トリガーの作成] をクリックします。
[トリガーの作成] パネルで、パラメーターを設定し、 [OK] をクリックします。
パラメーター
説明
例
トリガータイプ
トリガーのタイプ。[Tablestore] を選択します。
Tablestore
名前
トリガーの名前。
Tablestore-trigger
バージョンまたはエイリアス
トリガーのバージョンまたはエイリアス。デフォルト値: [LATEST]。別のバージョンまたはエイリアスのトリガーを作成する場合は、「関数の詳細」ページの [バージョンまたはエイリアス] ドロップダウンリストからバージョンまたはエイリアスを選択します。バージョンとエイリアスの詳細については、バージョンの管理 および エイリアスの管理 をご参照ください。
LATEST
インスタンス
既存の Tablestore インスタンスの名前。
d00dd8xm****
テーブル
既存のテーブルの名前。
mytable
ロール名
[AliyunTableStoreStreamNotificationRole] を選択します。
説明上記のパラメーターを設定した後、[OK] をクリックします。このタイプのトリガーを初めて作成する場合は、表示されるダイアログボックスで [今すぐ承認] をクリックします。
AliyunTableStoreStreamNotificationRole
トリガーが作成されると、[トリガー] タブに表示されます。トリガーを変更または削除するには、トリガーの管理をご参照ください。
手順 3: 関数の入力パラメーターを設定する
関数詳細ページの [コード] タブで、
アイコンをクリックし、[関数をテスト] の隣にあるドロップダウンリストから [テスト パラメーターの構成] を選択します。[テストパラメーターの設定] パネルで、[新しいテストイベントの作成] タブまたは [既存のテストイベントの変更] タブをクリックし、イベント名とイベントコンテンツを入力して、[OK] をクリックします。
Tablestore トリガーは、Concise Binary Object Representation (CBOR) 形式で増分データをエンコードして、Function Compute の関数を呼び出すために使用されるイベントを構築します。次のサンプルコードは、イベントコンテンツの形式の例を示しています。
{ "Version": "Sync-v1", "Records": [ { "Type": "PutRow", "Info": { "Timestamp": 1506416585740836 }, "PrimaryKey": [ { "ColumnName": "pk_0", "Value": 1506416585881590900 }, { "ColumnName": "pk_1", "Value": "2017-09-26 17:03:05.8815909 +0800 CST" }, { "ColumnName": "pk_2", "Value": 1506416585741000 } ], "Columns": [ { "Type": "Put", "ColumnName": "attr_0", "Value": "hello_table_store", "Timestamp": 1506416585741 }, { "Type": "Put", "ColumnName": "attr_1", "Value": 1506416585881590900, "Timestamp": 1506416585741 } ] } ] }次の表は、イベントのフィールドについて説明しています。
パラメーター
説明
Version
ペイロードのバージョン。例: Sync-v1。値は文字列です。
Records
テーブル内の増分データの行を格納する配列。各要素には、次のパラメーターが含まれています。
Type: 行に対して実行される操作のタイプ。有効な値: PutRow、UpdateRow、および DeleteRow。値は文字列です。
Info: 行に関する情報。Timestamp パラメーターを含みます。これは、行が最後に変更された時刻を指定します。時刻は UTC である必要があります。値は INT64 タイプです。
PrimaryKey
プライマリキー列を格納する配列。各要素には、次のパラメーターが含まれています。
ColumnName: プライマリキー列の名前。値は文字列です。
Value: プライマリキー列の値。値は formated_value タイプで、INTEGER、STRING、または BLOB を使用できます。
Columns
属性列を格納する配列。各要素には、次のパラメーターが含まれています。
Type: 属性列に対して実行される操作のタイプ。有効な値: Put、DeleteOneVersion、および DeleteAllVersions。値は文字列です。
ColumnName: 属性列の名前。値は文字列です。
Value: 属性列の値。値は formatted_value タイプで、INTEGER、BOOLEAN、DOUBLE、STRING、または BLOB を使用できます。
Timestamp: 属性列が最後に変更された時刻。時刻は UTC である必要があります。値は INT64 タイプです。
手順 4: 関数コードを記述してテストする
Tablestore トリガーを作成した後、関数コードを記述し、関数コードをテストして、コードが有効かどうかを確認できます。Tablestore のデータが更新されると、関数が自動的に呼び出されます。
「関数の詳細」ページで、[コード] タブをクリックし、コードエディタに関数コードを入力して、[デプロイ] をクリックします。
この例では、関数コードは Python で記述されています。他のランタイム環境で関数コードを記述する方法の詳細については、Node.js、PHP、Java、および C# ランタイムで Tablestore を使用して Function Compute をトリガーするをご参照ください。
import logging import cbor import json # レコードから属性値を取得する def get_attribute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] # レコードからプライマリキー値を取得する def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] # イベントハンドラ def handler(event, context): logger = logging.getLogger() logger.info("イベントの処理を開始します") #records = cbor.loads(event) records = json.loads(event) for record in records['Records']: logger.info("レコードを処理します: %s", record) pk_0 = get_pk_value(record, "pk_0") attr_0 = get_attribute_value(record, "attr_0") return 'OK'[関数のテスト] をクリックします。
関数が処理された後、[コード] タブで結果を表示できます。
FAQ
特定のリージョンで Tablestore トリガーを作成できない場合は、そのリージョンで Tablestore トリガーがサポートされているかどうかを確認してください。詳細については、制限事項をご参照ください。
Tablestore トリガーを作成するときに、作成済みの Tablestore テーブルが見つからない場合は、テーブルが Function Compute の関連サービスと同じリージョンに存在するかどうかを確認してください。
ほとんどの場合、Tablestore トリガーを使用しているときに、クライアントが呼び出しをキャンセルしたことを示すエラーが繰り返し報告される場合は、クライアントで関数の実行に設定されているタイムアウト期間が、実際の実行時間よりも短いことが原因です。この場合は、クライアントのタイムアウト期間を長くすることをお勧めします。詳細については、クライアントが切断され、「クライアントによって呼び出しがキャンセルされました」というメッセージが報告された場合はどうすればよいですか。をご参照ください。
Tablestore テーブルにデータが追加されても、関連付けられている Tablestore トリガーがトリガーされない場合は、次の手順を実行して問題のトラブルシューティングを行うことができます。トリガーの失敗のトラブルシューティング方法の詳細については、トリガーが関数の実行をトリガーできない場合はどうすればよいですか。をご参照ください。
テーブルの Stream 機能が有効になっているかどうかを確認します。詳細については、手順 1: データテーブルの Stream 機能を有効にするをご参照ください。
トリガーを作成したときに正しいロールが設定されているかどうかを確認します。デフォルトのトリガーロール
AliyunTableStoreStreamNotificationRoleを使用できます。詳細については、手順 2: Tablestore トリガーを作成するをご参照ください。関数のランログを確認して、関数が処理に失敗したかどうかを確認します。関数が処理に失敗した場合、関数は Tablestore のログデータの有効期限が切れるまで再試行されます。