このトピックでは、ApsaraMQ for Kafka インスタンスのソース Topic から Tablestore インスタンスのテーブルにデータを同期するために、Tablestore シンクコネクタを作成する方法について説明します。
前提条件
Tablestore が有効化され、インスタンスが作成されていること。 詳細については、「Tablestore を有効化してインスタンスを作成する」をご参照ください。
ApsaraMQ for Kafka インスタンスが購入およびデプロイされ、インスタンスに Topic が作成されていること。 詳細については、「手順 2:インスタンスを購入してデプロイする」および「手順 3:リソースを作成する」をご参照ください。
Tablestore シンクコネクタを作成すると、サービスロールが自動的に生成されます。
AliyunOTSFullAccessポリシーをロールに手動で追加する必要があります。 このポリシーは、サービスロールに Tablestore へのアクセス権限を付与するために使用されます。 詳細については、「方法 1:ロールページで権限の付与をクリックして RAM ロールに権限を付与する」をご参照ください。
手順 1:Tablestore テーブルを作成する
ApsaraMQ for Kafka から Tablestore にデータを同期するための Tablestore テーブルを作成します。 詳細については、「手順」をご参照ください。
この例では、ots-sink という名前のインスタンスと ots_sink_table という名前のデータテーブルが作成されます。 データテーブルの作成時に、プライマリキー topic、partition、および offset が指定されます。
手順 2:Tablestore シンクコネクタを作成して開始する
ApsaraMQ for Kafka コンソール にログインします。 リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、 を選択します。
タスクリスト ページで、タスクの作成 をクリックします。
タスクの作成 ページで、Task Name パラメーターと Description パラメーターを設定し、画面の指示に従って他のパラメーターを設定します。 次に、[保存] をクリックします。 次のセクションでは、パラメーターについて説明します。
タスクの作成
Source (ソース) 手順で、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを設定します。 次に、[次の手順] をクリックします。 次の表にパラメーターを示します。
パラメーター
説明
例
[リージョン]
ソース ApsaraMQ for Kafka インスタンスが存在するリージョン。
中国 (北京)
[apsaramq For Kafka インスタンス]
ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンス。
alikafka_post-cn-jte3****
[topic]
ルーティングするメッセージが生成される ApsaraMQ for Kafka インスタンスの Topic。
demo-topic
[グループ ID]
ソース ApsaraMQ for Kafka インスタンスのコンシューマーグループの名前。
[クイック作成]:システムは、
GID_EVENTBRIDGE_xxx形式で名前が付けられたコンシューマーグループを自動的に作成します。 この値を選択することをお勧めします。[既存のグループを使用]:使用されていない既存のグループの ID を選択します。 使用中の既存のグループを選択すると、既存のメッセージの発行とサブスクライブに影響します。
クイック作成
[コンシューマーオフセット]
メッセージが消費されるオフセット。 有効な値:
[最新のオフセット]
[最も古いオフセット]
最新のオフセット
[ネットワーク設定]
メッセージをルーティングするネットワークのタイプ。 有効な値:
[ベーシックネットワーク]
[セルフマネージドインターネット]
ベーシックネットワーク
[VPC]
ApsaraMQ for Kafka インスタンスがデプロイされている VPC(Virtual Private Cloud)の ID。 このパラメーターは、[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ必須です。
vpc-bp17fapfdj0dwzjkd****
[vswitch]
ApsaraMQ for Kafka インスタンスが属する vSwitch の ID。 このパラメーターは、[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ必須です。
vsw-bp1gbjhj53hdjdkg****
[セキュリティグループ]
ApsaraMQ for Kafka インスタンスが属するセキュリティグループの ID。 このパラメーターは、[ネットワーク設定] パラメーターを セルフマネージドインターネット に設定した場合にのみ必須です。
alikafka_pre-cn-7mz2****
データ形式
データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。 複数のデータ形式がサポートされています。 エンコードに関する特別な要件がない場合は、値として Json を指定します。
[json]:バイナリデータは、UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。
[テキスト]:バイナリデータは、UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。 これはデフォルト値です。
[バイナリ]:バイナリデータは、Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。
Json
一括プッシュの件数
各関数呼び出しで送信できるメッセージの最大数。 リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。 有効な値:1 ~ 10000。
100
バッチプッシュ間隔 (単位:秒)
関数が呼び出される時間間隔。 システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。 有効な値:0 ~ 15。 単位:秒。 値 0 は、集約後すぐにメッセージが送信されることを指定します。
3
Filtering (フィルタリング) 手順で、パターン内容 コードエディターでデータパターンを定義してデータをフィルタリングします。 詳細については、「イベントパターン」をご参照ください。
Transform (変換) 手順で、データ分割、マッピング、エンリッチメント、およびルーティング機能を実装するためのデータクレンジング方法を指定します。 詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。
Sink (ターゲット) 手順で、サービスタイプ パラメーターを [tablestore] に設定し、画面の指示に従って他のパラメーターを設定します。 次の表にパラメーターを示します。
パラメーター
説明
例
インスタンス名
作成した Tablestore インスタンスの名前。
ost-sink
ターゲットテーブル
作成した Tablestore データテーブル。
ost_sink_table
主キー
Tablestore でプライマリキーと属性列を生成するために使用するメソッド。 各属性列の内容を抽出するためのルールを JSONPath 構文で定義する必要があります。 データ形式 パラメーターを Source (ソース) 手順で [json] に設定した場合、ApsaraMQ for Kafka から転送されるデータの形式は、次のコードのようになります。
{ "data": { "topic": "demo-topic", "partition": 0, "offset": 2, "timestamp": 1739756629123, "headers": { "headers": [], "isReadOnly": false }, "key":"ots-sink-k1", "value": "ots-sink-v1" }, "id": "7702ca16-f944-4b08-***-***-0-2", "source": "acs:alikafka", "specversion": "1.0", "type": "alikafka:Topic:Message", "datacontenttype": "application/json; charset=utf-8", "time": "2025-02-17T01:43:49.123Z", "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic", "aliyunaccountid": "1******6789" }たとえば、プライマリキー名として topic を指定し、数値抽出ルールとして
$.data.topicを指定できます。属性列
たとえば、属性列名として key を指定し、数値抽出ルールとして
$.data.keyを指定できます。操作モード
Tablestore にデータを書き込むモード。 有効な値:
[put]:2 つのデータエントリのプライマリキーが同じ場合、新しいデータエントリは古いデータエントリを上書きします。
[update]:2 つのデータエントリのプライマリキーが同じ場合、新しいデータエントリが行に書き込まれ、古いデータエントリは保持されます。
[delete]:指定されたキーが削除されます。
put
ネットワーク設定
[VPC]:ApsaraMQ for Kafka のメッセージは、VPC(Virtual Private Cloud)内の Tablestore に配信されます。
[インターネット]:ApsaraMQ for Kafka のメッセージは、インターネット経由で Tablestore に配信されます。
VPC
VPC
VPC ID。 このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。
vpc-bp17fapfdj0dwzjkd****
vSwitch
vSwitch ID。 このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。
vsw-bp1gbjhj53hdjdkg****
セキュリティグループ
セキュリティグループ ID。 このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。
test_group
タスクのプロパティ
イベントのプッシュに失敗した場合に使用する再試行ポリシーと、エラーを処理するために使用するメソッドを設定します。 詳細については、「再試行ポリシーと配信不能キュー」をご参照ください。
タスクリスト ページに戻り、作成した Tablestore シンクコネクタを見つけ、操作する 列の 有効化する をクリックします。
ヒント メッセージで、OK をクリックします。
シンクコネクタが有効になるまで 30 ~ 60 秒かかります。 タスクリスト ページの Status 列で進行状況を確認できます。
手順 3:Tablestore シンクコネクタをテストする
[タスク] ページで、作成した Tablestore シンクコネクタを見つけ、[イベントソース] 列のソース Topic 名をクリックします。
- Topic 詳細ページで、[メッセージの送信] をクリックします。
[メッセージの送受信を開始] パネルで、次の図に基づいてパラメーターを設定し、[OK] をクリックします。

[タスク] ページで、作成した Tablestore シンクコネクタを見つけ、[イベントターゲット] 列の宛先テーブル名をクリックします。
[データのクエリ] タブの [テーブルの管理] ページで、Tablestore テーブルに格納されているデータを表示します。
