すべてのプロダクト
Search
ドキュメントセンター

Tablestore:データクレンジングに Function Compute を使用する

最終更新日:Dec 28, 2024

Tablestore は、高並列書き込みパフォーマンスと低ストレージコストを提供し、IoT データ、ログ、監視データの保存に適しています。Tablestore データテーブルにデータを書き込む際に、Function Compute を使用してデータをクレンジングし、クレンジングされたデータを Tablestore の別のデータテーブルに書き込むことができます。Tablestore の生データまたはクレンジングされたデータにリアルタイムでアクセスできます。

サンプルシナリオ

3 つのフィールドを含むログデータを Tablestore のデータテーブルに書き込むとします。ログを効率的にクエリするには、level フィールドの値が 1 より大きいログを Tablestore の result という名前の別のデータテーブルに書き込む必要があります。次の表は、ログに含まれるフィールドを示しています。

フィールド

タイプ

説明

id

整数

ログの ID。

level

整数

ログのレベル。値が大きいほど、レベルが高いことを示します。

message

文字列

ログの内容。

手順 1: データテーブルの Stream 機能を有効にする

トリガーを作成する前に、Tablestore コンソールでデータテーブルの Stream 機能を有効にして、関数がテーブルに書き込まれた増分データを処理できるようにする必要があります。

  1. Tablestore コンソール にログインします。

  2. 上部のナビゲーションバーで、リージョンを選択します。

  3. 概要ページで、管理するインスタンスの名前をクリックするか、インスタンスの管理列のアクションをクリックします。

  4. インスタンスの詳細タブのテーブルタブで、管理するデータテーブルの名前をクリックし、トンネルタブをクリックします。または、fig_001アイコンをクリックし、トンネルをクリックします。

  5. トンネルタブの「Stream 情報」セクションで、有効化をクリックします。

  6. Stream の有効化ダイアログボックスで、「ログの有効期限」パラメーターを設定し、有効化をクリックします。

    「ログの有効期限」パラメーターの値は、ゼロ以外の整数である必要があります。単位:時間。最大値:168。

    重要

    「ログの有効期限」パラメーターは、指定後に変更することはできません。慎重に行ってください。

ステップ 2: 関数と Tablestore トリガーを作成する

  1. 関数を作成します。

    1. Function Compute コンソール にログインします。

    2. オプション: ページの右上隅にある Function Compute 3.0 に移動 をクリックします。

      説明
      • Function Compute 3.0 は、さまざまな拡張機能を提供します。この例では、Function Compute 3.0 を使用します。

      • ページの右上隅に Function Compute 2.0 に戻る と表示されている場合は、既に Function Compute 3.0 コンソールにいるため、この手順をスキップしてください。

    3. 左側のナビゲーションペインで、関数 をクリックします。

    4. 上部のナビゲーションバーで、リージョンを選択します。関数ページで、関数の作成 をクリックします。

    5. 関数の作成ページで、関数を作成する方法を選択し、次のパラメーターを設定して、作成 をクリックします。

      この例では、Tablestore でのデータ変更をリアルタイムで計算する関数を作成する方法を説明するために、イベント関数 を選択しています。

      説明

      Tablestore のデータを処理する関数を作成する方法として、イベント関数Web 関数、または タスク関数 を選択できます。詳細については、関数を作成する方法の選択 を参照してください。

      • Tablestore でのデータ変更によってデータ処理を自動的にトリガーする場合は、イベント関数 を選択します。詳細については、イベント関数の作成 を参照してください。

      • 特定の HTTP リクエストによってデータ処理を自動的にトリガーする場合は、Web 関数 を選択します。詳細については、Web 関数の作成 を参照してください。

      • データ処理を定期的にまたは非同期にトリガーする場合は、タスク関数 を選択します。詳細については、タスク関数の作成 を参照してください。

      • 基本設定: 関数名 パラメーターを設定します。

      • コード: 関数の実行時およびコード関連の情報を構成します。

        パラメーター

        説明

        ランタイム

        Python、Java、PHP、Node.jsなどのランタイム、またはカスタムコンテナイメージを選択します。

        カスタムコンテナイメージ

        この例では、Python 3.9 が選択されています。

        コードのアップロード方法

        Function Computeへのコードのアップロード方法を指定します。

        • サンプルコードを使用: Function Computeが提供するサンプルコードを選択して、ビジネス要件に基づいて関数を作成できます。これはデフォルトの方法です。

        • ZIPをアップロード: 関数コードを含む.zipファイルを選択してアップロードします。

        • フォルダーをアップロード: 関数コードを含むフォルダーを選択してアップロードします。

        • OSS: オブジェクトストレージサービス (OSS) バケットからコードをアップロードします。この場合、バケット名 パラメーターと オブジェクト名 パラメーターを指定する必要があります。

        この例では、サンプルコードを使用Hello, world! サンプルコードが選択されています。

      • 詳細設定: インスタンス情報と関数の実行タイムアウト期間を構成します。

        パラメーター

        説明

        仕様

        ビジネス要件に基づいて、Vcpu容量メモリ容量などのインスタンス仕様を設定します。リソースの課金についての詳細は、課金概要をご参照ください。

        説明

        vCPU仕様とメモリ容量(GB単位)の比率は、1:1から1:4の範囲内である必要があります。

        0.35 vCPU、512 MB

        一時ディスクのサイズ

        ビジネス要件に基づいて、ファイルを一時的に保存するために使用するディスクのサイズを指定します。

        有効な値:

        • 512 MB:デフォルト値。このサイズのテンポラリディスクの使用には課金されません。Function Computeは512 MBの空きディスク容量を提供します。

        • 10 GB:9.5 GBのディスクサイズに基づいて課金されます。

        説明

        データは一時ディスクの領域を共有し、ディスク内のすべてのディレクトリに書き込むことができます。

        一時ディスクのライフサイクルは、基盤となるインスタンスのライフサイクルと一致します。インスタンスがシステムによってリサイクルされると、ハードディスク上のデータは消去されます。ファイルを永続化するには、File Storage NASまたはOSSを使用できます。詳細については、NASファイルシステムの設定OSSファイルシステムの設定をご参照ください。

        512 MB

        実行タイムアウト期間

        関数のタイムアウト期間を指定します。デフォルトのタイムアウト期間は180秒で、最大タイムアウト期間は86,400秒です。

        180

        ハンドラー

        関数のハンドラーを指定します。Function Computeランタイムは、リクエストを処理するためにハンドラーをロードして呼び出します。Web関数を選択して関数を作成する場合は、このパラメーターをスキップします。

        説明

        コードのアップロード方法パラメーターをサンプルコードを使用に設定する場合は、ハンドラーパラメーターの値を保持します。別のコードアップロード方法を選択する場合は、ビジネス要件に基づいてハンドラーパラメーターの値を変更します。そうしないと、関数が実行されたときにエラーが報告されます。

        index.handler

        タイムゾーン

        関数のタイムゾーンを選択します。関数のタイムゾーンを指定すると、環境変数TZが関数に自動的に追加されます。値は指定したタイムゾーンです。

        UTC

        関数ロール

        関数のResource Access Management (RAM)ロールを指定します。Function Computeはこのロールを使用して、Alibaba Cloudリソースへのアクセスに使用される一時的なAccessKeyペアを生成し、AccessKeyペアをコードに渡します。

        重要

        詳細については、付録:Function ComputeにTablestoreへのアクセス権限を付与するをご参照ください。

        AliyunFCDefaultRole

        VPCへのアクセス

        関数がVPCリソースにアクセスできるようにするかどうかを指定します。詳細については、ネットワーク設定をご参照ください。

        はい

        VPC

        VPCを指定します。VPCへのアクセスはいに設定した場合、このパラメーターは必須です。VPCを作成するか、ドロップダウンリストから関数がアクセスする既存のVPCのIDを選択します。

        fc.auto.create.vpc.1632317****

        vSwitch

        vSwitchを指定します。VPCへのアクセスはいに設定した場合、このパラメーターは必須です。vSwitchを作成するか、ドロップダウンリストから既存のvSwitchのIDを選択します。

        fc.auto.create.vswitch.vpc-bp1p8248****

        セキュリティグループ

        セキュリティグループを指定します。VPCへのアクセスはいに設定した場合、このパラメーターは必須です。セキュリティグループを作成するか、ドロップダウンリストから既存のセキュリティグループを選択します。

        fc.auto.create.SecurityGroup.vsw-bp15ftbbbbd****

        デフォルトNICのインターネットアクセスを許可

        デフォルトのネットワークインターフェースコントローラー(NIC)を使用して関数がインターネットにアクセスできるようにするかどうかを指定します。「いいえ」を選択すると、関数はFunction ComputeのデフォルトNICを使用してインターネットにアクセスできません。

        重要

        静的パブリックIPアドレスを使用する場合は、デフォルトNICのインターネットアクセスを許可を「いいえ」に設定する必要があります。そうしないと、設定された静的パブリックIPアドレスは有効になりません。詳細については、静的パブリックIPアドレスの設定をご参照ください。

        はい

        ロギング

        ロギング機能を有効にするかどうかを指定します。有効な値:

        • 有効:Function Computeは関数の実行ログを永続ストレージのためにSimple Log Serviceに送信します。これらのログを使用して、コードのデバッグ、障害の分析、データの分析を行うことができます。

          説明

          ロギング機能を有効にすると、標準出力(stdout)に出力されるログはSimple Log Serviceによって収集されます。その後、これらのログを使用して、コードのデバッグ、障害の分析、データの分析を行うことができます。

          • 詳細については、ロギングの設定をご参照ください。

          • バックグラウンドでFunction Computeによって作成されたSimple Log Serviceリソースに対して課金されます。詳細については、従量課金制の課金対象項目をご参照ください。

        • 無効:Simple Log Serviceを使用して関数の実行ログを保存およびクエリすることはできません。

        有効

      • (オプション) 環境変数: 関数のランタイム環境で環境変数を設定します。詳細については、環境変数の設定 を参照してください。

  2. Tablestore トリガーを作成します。

    1. 関数詳細タブで、構成タブをクリックします。左側のナビゲーションペインで、トリガーをクリックし、次にトリガーの作成をクリックします。

    2. トリガーの作成パネルで、パラメーターを構成し、OK をクリックします。

      パラメーター

      説明

      トリガータイプ

      トリガーのタイプ。 Tablestore を選択します。

      Tablestore

      名前

      トリガーの名前。

      Tablestore-trigger

      バージョンまたはエイリアス

      トリガーのバージョンまたはエイリアス。デフォルト値: LATEST。別のバージョンまたはエイリアスのトリガーを作成する場合は、関数詳細ページの バージョンまたはエイリアス ドロップダウンリストからバージョンまたはエイリアスを選択します。バージョンとエイリアスの詳細については、バージョンの管理 および エイリアスの管理 を参照してください。

      LATEST

      インスタンス

      既存の Tablestore インスタンスの名前。

      d00dd8xm****

      テーブル

      既存のテーブルの名前。

      mytable

      ロール名

      AliyunTableStoreStreamNotificationRole を選択します。

      説明

      上記のパラメーターを構成した後、OK をクリックします。このタイプのトリガーを初めて作成する場合は、表示されるダイアログボックスで 今すぐ承認 をクリックします。

      AliyunTableStoreStreamNotificationRole

      トリガーが作成されると、トリガータブに表示されます。トリガーを変更または削除するには、トリガーの管理 を参照してください。

手順 3: データクレンジングを確認する

トリガーを作成した後、Tablestore にデータを書き込み、データをクエリして、データが期待どおりにクレンジングされているかどうかを確認できます。

  1. 関数の詳細タブで、コードタブをクリックします。コードタブで、コードエディターに関数コードを記述します。

    この例では、関数コードは Python で記述されています。ビジネス要件に基づいて、サンプルコードの INSTANCE_NAME、REGION、ENDPOINT、RESULT_TABLENAME パラメーターの値を変更する必要があります。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import cbor
    import json
    import tablestore as ots
    
    # インスタンス名、リージョン、エンドポイント、結果テーブル名を変更します。
    INSTANCE_NAME = 'distribute-test'
    REGION = 'cn-shanghai'
    ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com' % (INSTANCE_NAME, REGION)
    RESULT_TABLENAME = 'result'
    
    
    def get_attrbute_value(record, column):
        attrs = record[u'Columns']
        for x in attrs:
            if x[u'ColumnName'] == column:
                return x['Value']
    
    
    def get_pk_value(record, column):
        attrs = record[u'PrimaryKey']
        for x in attrs:
            if x['ColumnName'] == column:
                return x['Value']
    
    
    # 取得した認証情報は、AliyunOTSFullAccess ポリシーが RAM ロールにアタッチされているため、Tablestore へのアクセスに使用できます。
    def get_ots_client(context):
        creds = context.credentials
        client = ots.OTSClient(ENDPOINT, creds.access_key_id, creds.access_key_secret, INSTANCE_NAME,
                               sts_token=creds.security_token)
        return client
    
    
    def save_to_ots(client, record):
        id = int(get_pk_value(record, 'id'))
        level = int(get_attrbute_value(record, 'level'))
        msg = get_attrbute_value(record, 'message')
        pk = [('id', id), ]
        attr = [('level', level), ('message', msg), ]
        row = ots.Row(pk, attr)
        client.put_row(RESULT_TABLENAME, row)
    
    
    def handler(event, context):
        records = cbor.loads(event)
        # records = json.loads(event)
        client = get_ots_client(context)
        for record in records['Records']:
            level = int(get_attrbute_value(record, 'level'))
            if level > 1:
                save_to_ots(client, record)
            else:
                # レベルが 1 以下の場合は無視します。
                print("level <= 1, ignore.")
    
  2. source_data という名前のテーブルにデータを書き込みます。id、level、message フィールドの値を順番に入力し、result という名前のテーブルでクレンジングされたデータをクエリします。

    • level フィールドの値が 1 より大きいデータを source_data という名前のテーブルに書き込むと、データは result という名前のテーブルに同期されます。

    • level フィールドの値が 1 以下のデータを source_data という名前のテーブルに書き込むと、データは result という名前のテーブルに同期されません。

よくあるご質問

  • リージョンで Tablestore トリガーを作成できない場合は、リージョンが Tablestore トリガーをサポートしているかどうかを確認してください。詳細については、使用上の注意 を参照してください。

  • Tablestore トリガーを作成するときに既存の Tablestore データテーブルが見つからない場合は、データテーブルが Function Compute の関連サービスと同じリージョンにあるかどうかを確認してください。

  • ほとんどの場合、Tablestore トリガーを使用するときにクライアントが呼び出しをキャンセルしたことを示すエラーが繰り返し報告される場合は、クライアントで関数の実行に設定されたタイムアウト期間が実際の実行時間よりも短いことが原因です。この場合は、クライアントのタイムアウト期間を長くすることをお勧めします。詳細については、クライアントが切断され、「クライアントによって呼び出しがキャンセルされました」というメッセージが報告された場合はどうすればよいですか? を参照してください。

  • データが Tablestore データテーブルに書き込まれたにもかかわらず、関連付けられた Tablestore トリガーがトリガーされない場合は、次の手順を実行して問題のトラブルシューティングを行うことができます。トリガーの障害のトラブルシューティング方法の詳細については、トリガーが関数の実行をトリガーできない場合はどうすればよいですか? を参照してください。

    • データテーブルで Stream 機能が有効になっていることを確認します。詳細については、手順 1: データテーブルの Stream 機能を有効にする を参照してください。

    • トリガーを作成するときに、ロールが正しく設定されているかどうかを確認します。デフォルトロール AliyunTableStoreStreamNotificationRole を使用できます。詳細については、Tablestore トリガーの作成 を参照してください。

    • 関数の実行ログを表示して、関数が実行に失敗したかどうかを確認します。関数が実行に失敗した場合、Tablestore のログデータの有効期限が切れるまで関数は再試行されます。

  • 関数の実行時に「access_key_id is None or empty.」というエラーメッセージが返された場合は、関数に設定されたロールに Tablestore にアクセスするための権限が付与されているかどうかを確認してください。詳細については、付録: Tablestore にアクセスするための権限を Function Compute に付与する を参照してください。