すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Tablestore シンクコネクタを作成する

最終更新日:Mar 14, 2025

このトピックでは、ApsaraMQ for Kafka インスタンスのソース Topic から Tablestore インスタンスのテーブルにデータを同期するために、Tablestore シンクコネクタを作成する方法について説明します。

前提条件

手順 1:Tablestore テーブルを作成する

ApsaraMQ for Kafka から Tablestore にデータを同期するための Tablestore テーブルを作成します。 詳細については、「手順」をご参照ください。

この例では、ots-sink という名前のインスタンスと ots_sink_table という名前のデータテーブルが作成されます。 データテーブルの作成時に、プライマリキー topicpartition、および offset が指定されます。image

手順 2:Tablestore シンクコネクタを作成して開始する

  1. ApsaraMQ for Kafka コンソール にログインします。 リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、Connector エコシステムの統合 > タスクリスト を選択します。

  3. タスクリスト ページで、タスクの作成 をクリックします。

  4. タスクの作成 ページで、Task Name パラメーターと Description パラメーターを設定し、画面の指示に従って他のパラメーターを設定します。 次に、[保存] をクリックします。 次のセクションでは、パラメーターについて説明します。

    • タスクの作成

      1. Source (ソース) 手順で、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを設定します。 次に、[次の手順] をクリックします。 次の表にパラメーターを示します。

        パラメーター

        説明

        [リージョン]

        ソース ApsaraMQ for Kafka インスタンスが存在するリージョン。

        中国 (北京)

        [apsaramq For Kafka インスタンス]

        ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス。

        alikafka_post-cn-jte3****

        [topic]

        ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンスの Topic。

        demo-topic

        [グループ ID]

        ソース ApsaraMQ for Kafka インスタンスのコンシューマーグループの名前。

        • [クイック作成]:システムは、GID_EVENTBRIDGE_xxx 形式で名前が付けられたコンシューマーグループを自動的に作成します。 この値を選択することをお勧めします。

        • [既存のグループを使用]:使用されていない既存のグループの ID を選択します。 使用中の既存のグループを選択すると、既存のメッセージの発行とサブスクライブに影響します。

        クイック作成

        [コンシューマーオフセット]

        メッセージが消費されるオフセット。 有効な値:

        • [最新のオフセット]

        • [最も古いオフセット]

        最新のオフセット

        [ネットワーク設定]

        メッセージをルーティングするネットワークのタイプ。 有効な値:

        • [ベーシックネットワーク]

        • [セルフマネージドインターネット]

        ベーシックネットワーク

        [VPC]

        ApsaraMQ for Kafka インスタンスがデプロイされている VPC(Virtual Private Cloud)の ID。 このパラメーターは、[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ必須です。

        vpc-bp17fapfdj0dwzjkd****

        [vswitch]

        ApsaraMQ for Kafka インスタンスが属する vSwitch の ID。 このパラメーターは、[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ必須です。

        vsw-bp1gbjhj53hdjdkg****

        [セキュリティグループ]

        ApsaraMQ for Kafka インスタンスが属するセキュリティグループの ID。 このパラメーターは、[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ必須です。

        alikafka_pre-cn-7mz2****

        データ形式

        データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。 複数のデータ形式がサポートされています。 エンコードに関する特別な要件がない場合は、値として Json を指定します。

        • [json]:バイナリデータは、UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。

        • [テキスト]:バイナリデータは、UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。 これはデフォルト値です。

        • [バイナリ]:バイナリデータは、Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        Json

        一括プッシュの件数

        各関数呼び出しで送信できるメッセージの最大数。 リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。 有効な値:1 ~ 10000。

        100

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される時間間隔。 システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。 有効な値:0 ~ 15。 単位:秒。 値 0 は、集約後すぐにメッセージが送信されることを指定します。

        3

      2. Filtering (フィルタリング) 手順で、パターン内容 コードエディターでデータパターンを定義してデータをフィルタリングします。 詳細については、「イベントパターン」をご参照ください。

      3. Transform (変換) 手順で、データ分割、マッピング、エンリッチメント、およびルーティング機能を実装するためのデータクレンジング方法を指定します。 詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。

      4. Sink (ターゲット) 手順で、サービスタイプ パラメーターを [tablestore] に設定し、画面の指示に従って他のパラメーターを設定します。 次の表にパラメーターを示します。

        パラメーター

        説明

        インスタンス名

        作成した Tablestore インスタンスの名前。

        ost-sink

        ターゲットテーブル

        作成した Tablestore データテーブル。

        ost_sink_table

        主キー

        Tablestore でプライマリキーと属性列を生成するために使用するメソッド。 各属性列の内容を抽出するためのルールを JSONPath 構文で定義する必要があります。 データ形式 パラメーターを Source (ソース) 手順で [json] に設定した場合、ApsaraMQ for Kafka から転送されるデータの形式は、次のコードのようになります。

        {
            "data": {
                "topic": "demo-topic",
                "partition": 0,
                "offset": 2,
                "timestamp": 1739756629123,
                "headers": {
                    "headers": [],
                    "isReadOnly": false
                },
                "key":"ots-sink-k1",
                "value": "ots-sink-v1"
            },
            "id": "7702ca16-f944-4b08-***-***-0-2",
            "source": "acs:alikafka",
            "specversion": "1.0",
            "type": "alikafka:Topic:Message",
            "datacontenttype": "application/json; charset=utf-8",
            "time": "2025-02-17T01:43:49.123Z",
            "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic",
            "aliyunaccountid": "1******6789"
        }

        たとえば、プライマリキー名として topic を指定し、数値抽出ルールとして $.data.topic を指定できます。

        属性列

        たとえば、属性列名として key を指定し、数値抽出ルールとして $.data.key を指定できます。

        操作モード

        Tablestore にデータを書き込むモード。 有効な値:

        • [put]:2 つのデータエントリのプライマリキーが同じ場合、新しいデータエントリは古いデータエントリを上書きします。

        • [update]:2 つのデータエントリのプライマリキーが同じ場合、新しいデータエントリが行に書き込まれ、古いデータエントリは保持されます。

        • [delete]:指定されたキーが削除されます。

        put

        ネットワーク設定

        • [VPC]:ApsaraMQ for Kafka のメッセージは、VPC(Virtual Private Cloud)内の Tablestore に配信されます。

        • [インターネット]:ApsaraMQ for Kafka のメッセージは、インターネット経由で Tablestore に配信されます。

        VPC

        VPC

        VPC ID。 このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        vSwitch ID。 このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。

        vsw-bp1gbjhj53hdjdkg****

        セキュリティグループ

        セキュリティグループ ID。 このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。

        test_group

    • タスクのプロパティ

      イベントのプッシュに失敗した場合に使用する再試行ポリシーと、エラーを処理するために使用するメソッドを設定します。 詳細については、「再試行ポリシーと配信不能キュー」をご参照ください。

  5. タスクリスト ページに戻り、作成した Tablestore シンクコネクタを見つけ、操作する 列の 有効化する をクリックします。

  6. ヒント メッセージで、OK をクリックします。

    シンクコネクタが有効になるまで 30 ~ 60 秒かかります。 タスクリスト ページの Status 列で進行状況を確認できます。

手順 3:Tablestore シンクコネクタをテストする

  1. [タスク] ページで、作成した Tablestore シンクコネクタを見つけ、[イベントソース] 列のソース Topic 名をクリックします。

  2. Topic 詳細ページで、[メッセージの送信] をクリックします。
  3. [メッセージの送受信を開始] パネルで、次の図に基づいてパラメーターを設定し、[OK] をクリックします。

    image

  4. [タスク] ページで、作成した Tablestore シンクコネクタを見つけ、[イベントターゲット] 列の宛先テーブル名をクリックします。

  5. [データのクエリ] タブの [テーブルの管理] ページで、Tablestore テーブルに格納されているデータを表示します。

    image