Tablestore は、Apsara システム上に構築された分散型 NoSQL データストレージサービスです。Tablestore トリガーは、Function Compute のイベントソースとして Tablestore を接続します。Tablestore のテーブル内のデータが変更されると、このトリガーは自動的にご利用の関数を呼び出して増分データを処理します — ポーリングは不要です。
典型的な利用シーン:データソースがテーブル A に書き込みます。この更新により関数が起動し、データをクリーニングして結果をテーブル B に書き込みます。これにより、テーブル B は直接読み取り可能な状態になります。この一連の処理全体は、弾力的かつスケーラブルなサーバーレスアプリケーションとして実行されます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
制限事項
Tablestore トリガーは、以下のリージョンでサポートされています:中国 (北京)、中国 (杭州)、中国 (上海)、中国 (深セン)、日本 (東京)、シンガポール、ドイツ (フランクフルト)、および中国 (香港)。
Tablestore のテーブルは、関数と同じリージョンに配置されている必要があります。
Tablestore トリガーによって呼び出される関数の実行時間は、1 分を超えてはなりません。
内部ネットワーク経由で Tablestore にアクセスするには、Virtual Private Cloud (VPC) エンドポイントを次の形式で使用します:
{instance}.{region}.vpc.tablestore.aliyuncs.com。
注意事項
呼び出しループを回避してください。 関数が自身を起動したテーブル(例:テーブル A が関数 B を起動し、関数 B が再度テーブル A を更新する)に書き戻す場合、トリガーが再び起動し、無限ループが発生します。そのため、関数の出力先は別のテーブルとなるようデータフローを設計してください。
失敗時の再試行動作。 関数の実行が失敗した場合、トリガーは Tablestore 内のログデータの有効期限が切れるまで再試行を繰り返します。
関数の実行失敗は、以下のいずれかの状況で発生します。
関数インスタンスが起動したものの、コードが期待通りに実行されない場合。この場合、インスタンスに対して課金されます。
関数インスタンスの起動に失敗した場合(例:起動コマンドのエラーなど)。この場合、課金は発生しません。
無限の再試行を停止するには、データテーブルの Stream 機能を無効化してください。Stream 機能を無効化する前に、他のトリガーが同一テーブルを使用していないことを確認してください。
手順 1:データテーブルの Stream 機能を有効化
Tablestore の Stream 機能は、増分データの変更をキャプチャし、それをご利用の関数に配信します。トリガーを作成する前に、データテーブルで Stream 機能を有効化してください。
Tablestore コンソール にログインします。
上部ナビゲーションバーからリージョンを選択します。
概要 ページで、インスタンスのエイリアスをクリックするか、インスタンスの管理 を 操作 列からクリックします。
インスタンスの詳細 タブで、データテーブル タブをクリックします。その後、テーブル名をクリックし、Stream タブを選択するか、
アイコンをクリックして Stream を選択します。Stream タブで、有効化 を Stream 情報の横からクリックします。
Stream の有効化 ダイアログボックスで、ログの有効期限 パラメーターを設定し、有効化 をクリックします。値は 0 以外の整数(単位:時間)である必要があります。最大値は 168 時間です。
ログの有効期限は設定後に変更できません。慎重に選択してください。
手順 2:Tablestore トリガーの作成
Function Compute コンソール にログインします。左側ナビゲーションウィンドウで、関数 をクリックします。
上部ナビゲーションバーからリージョンを選択します。関数 ページで、設定対象の関数をクリックします。
関数の詳細ページで、構成 タブをクリックします。左側ナビゲーションウィンドウで、トリガー をクリックし、トリガーの作成 をクリックします。
トリガーの作成 パネルで、以下のパラメーターを設定し、OK をクリックします。
| パラメーター | 説明 | 例 |
|---|---|---|
| トリガーの種類 | トリガーの種類です。Tablestore を選択します。 | Tablestore |
| 名前 | トリガーの名前です。 | Tablestore-trigger |
| バージョンまたはエイリアス | バインドする関数のバージョンまたはエイリアスです。デフォルト値は LATEST です。異なるバージョンまたはエイリアスをバインドする場合は、関数の詳細ページの バージョンまたはエイリアス ドロップダウンリストから選択してください。詳細については、「バージョンの管理」および「エイリアスの管理」をご参照ください。 | LATEST |
| インスタンス | 既存の Tablestore インスタンスの名前です。 | d00dd8xm\*\*\*\* |
| テーブル | 既存のテーブルの名前です。 | mytable |
| ロール名 | Function Compute が Tablestore Stream から読み取る権限を付与する RAM ロールです。AliyunTableStoreStreamNotificationRole を選択します。 | AliyunTableStoreStreamNotificationRole |
本タイプのトリガーを初めて作成する場合、表示されるダイアログボックスで 今すぐ承認 をクリックしてください。
トリガーが作成された後、トリガー タブに表示されます。トリガーの編集または削除については、「トリガーの管理」をご参照ください。
手順 3:テストパラメーターの設定
Tablestore からのデータ到着前に関数をテストするには、Tablestore トリガーのペイロード形式に一致するテストイベントを設定します。
関数の詳細ページの[コード]タブで、[テスト関数]の横にある
アイコンをクリックし、[テストパラメーターの設定]を選択します。テストパラメーターの設定 パネルで、新しいテストイベントの作成 または 既存のテストイベントの変更 をクリックし、イベント名とイベント内容を入力して、OK をクリックします。
Tablestore トリガーは、増分データを Concise Binary Object Representation (CBOR) 形式でエンコードし、それをイベントとしてご利用の関数に渡します。以下の例では、PutRow、UpdateRow、DeleteRow の 3 種類の RecordType を含むペイロードを示しており、操作ごとの構造の違いを確認できます。
{
"Version": "Sync-v1",
"Records": [
{
"Type": "PutRow",
"Info": {
"Timestamp": 1506416585740836
},
"PrimaryKey": [
{
"ColumnName": "pk_0",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_1",
"Value": "2017-09-26 17:03:05.8815909 +0800 CST"
},
{
"ColumnName": "pk_2",
"Value": 1506416585741000
}
],
"Columns": [
{
"Type": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
"Timestamp": 1506416585741
},
{
"Type": "Put",
"ColumnName": "attr_1",
"Value": 1506416585881590900,
"Timestamp": 1506416585741
}
]
},
{
"Type": "UpdateRow",
"Info": {
"Timestamp": 1506416600000000
},
"PrimaryKey": [
{
"ColumnName": "pk_0",
"Value": 1506416585881590900
}
],
"Columns": [
{
"Type": "Put",
"ColumnName": "attr_0",
"Value": "updated_value",
"Timestamp": 1506416600000
},
{
"Type": "DeleteOneVersion",
"ColumnName": "attr_1",
"Timestamp": 1506416585741
}
]
},
{
"Type": "DeleteRow",
"Info": {
"Timestamp": 1506416700000000
},
"PrimaryKey": [
{
"ColumnName": "pk_0",
"Value": 1506416585881590900
}
],
"Columns": []
}
]
}以下の表では、イベントフィールドについて説明します。
| フィールド | 説明 |
|---|---|
Version | ペイロードのバージョンです。値: Sync-v1(文字列)。 |
Records | 増分データのレコードを格納する配列です。各要素には Type、Info、PrimaryKey、および Columns が含まれます。 |
Type(レコード) | レコードに対する操作の種類です。有効な値: PutRow、UpdateRow、DeleteRow(文字列)。 |
Info | レコードのメタデータです。Timestamp を含み、これはレコードが最後に変更された協定世界時 (UTC)(INT64)です。 |
PrimaryKey | プライマリキー列の配列です。各要素には ColumnName(文字列)および Value(INTEGER、STRING、または BLOB)が含まれます。 |
Columns | 属性列の配列です。各要素には以下の項目が含まれます:Type(列に対する操作: Put、DeleteOneVersion、または DeleteAllVersions)、ColumnName(文字列)、Value(INTEGER、BOOLEAN、DOUBLE、STRING、または BLOB)、および Timestamp(最後に変更された協定世界時 (UTC)、INT64)。 |
手順 4:関数コードの記述およびテスト
トリガーが作成された後、Tablestore のイベントペイロードを処理する関数コードを記述します。テーブル内のデータが変更されると、関数は自動的に呼び出されます。
関数の詳細ページで、コード タブをクリックし、関数コードを入力して、デプロイ をクリックします。以下の Python の例では、Tablestore のイベントレコードを読み取り、プライマリキー列および属性列の値を抽出しています。
import logging import cbor import json def get_attribute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] def handler(event, context): logger = logging.getLogger() logger.info("Begin to handle event") #records = cbor.loads(event) records = json.loads(event) for record in records['Records']: logger.info("Handle record: %s", record) pk_0 = get_pk_value(record, "pk_0") attr_0 = get_attribute_value(record, "attr_0") return 'OK'Node.js、PHP、Java、C# ランタイムにおけるサンプルについては、「Node.js、PHP、Java、C# ランタイムで Tablestore を使用して Function Compute をトリガーする方法」をご参照ください。
関数のテスト をクリックします。
実行後、コード タブで結果を確認できます。
よくある質問
特定のリージョンでトリガーの作成に失敗するのはなぜですか?
Tablestore トリガーは、一部のリージョンでのみ利用可能です。ご利用のリージョンが対応しているかどうかを確認するには、「制限事項」セクションをご参照ください。
トリガーの作成時に Tablestore のテーブルが見つからないのはなぜですか?
Tablestore のテーブルは、関数と同じリージョンに配置されている必要があります。異なるリージョンに配置されている場合、トリガーの設定画面には表示されません。
「Invocation canceled by client」というエラーが繰り返し表示されるのはなぜですか?
このエラーは、通常、クライアント側のタイムアウト期間が実際の関数実行時間よりも短いことを意味します。クライアントのタイムアウト期間を延長してください。詳細については、「クライアントが切断され、「Invocation canceled by client」というメッセージが表示された場合の対処方法」をご参照ください。
テーブルへのデータ書き込み後に Tablestore トリガーが起動しないのはなぜですか?
以下の点を確認してください。
テーブルの Stream 機能が有効化されていることを確認します。詳細については、「手順 1:データテーブルの Stream 機能を有効化」をご参照ください。
トリガーの作成時に正しいロール(
AliyunTableStoreStreamNotificationRole)が設定されていることを確認します。詳細については、「手順 2:Tablestore トリガーの作成」をご参照ください。関数の実行ログを確認し、実行が失敗していないかを確認します。関数が繰り返し失敗すると、ログデータの有効期限が切れるまで再試行が継続されます。詳細については、「トリガーが関数の実行を起動できない場合の対処方法」をご参照ください。