すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:Tablestore シンクコネクタを作成する

最終更新日:Feb 13, 2025

このトピックでは、Tablestore シンクコネクタを作成して、ApsaraMQ for Kafka インスタンスのトピックから Tablestore インスタンスのテーブルにデータを同期する方法について説明します。

前提条件

使用上の注意

  • ApsaraMQ for Kafka インスタンスのトピックからデータをエクスポートできるのは、ApsaraMQ for Kafka インスタンスと同じリージョンにある Tablestore インスタンスのテーブルのみです。コネクタの制限については、「制限」をご参照ください。

  • Tablestore シンクコネクタを作成する場合、サービスロールが必要です。

    • サービスロールが作成されていない場合、ApsaraMQ for Kafka によって自動的にロールが作成されます。このようにして、ApsaraMQ for Kafka から Tablestore にデータを想定どおりにエクスポートできます。

    • サービスロールが作成されている場合、ApsaraMQ for Kafka はロールを再度作成しません。

    詳細については、「サービスロール」をご参照ください。

プロセス

このセクションでは、Tablestore シンクコネクタを使用して、ApsaraMQ for Kafka インスタンスのトピックから Tablestore インスタンスのテーブルにデータをエクスポートする方法について説明します。

  1. (オプション) Tablestore シンクコネクタに必要なトピックと グループ を作成します。

    トピックと グループ を手動で作成したくない場合は、この手順をスキップし、次の手順で [リソース作成方法] パラメーターを [自動] に設定します。

    重要

    Tablestore シンクコネクタに必要な特定のトピックでは、ローカル記憶域を使用する必要があります。ApsaraMQ for Kafka インスタンスのメジャーバージョンが 0.10.2 の場合、ローカル記憶域を使用するトピックを手動で作成することはできません。メジャーバージョン 0.10.2 では、これらのトピックは自動的に作成する必要があります。

    1. Tablestore シンクコネクタに必要なトピックを作成する

    2. Tablestore シンクコネクタに必要なグループを作成する

  2. Tablestore シンクコネクタを作成してデプロイする

  3. 結果を確認する

    1. テストメッセージを送信する

    2. テーブル内のデータを表示する

Tablestore シンクコネクタに必要なトピックを作成する

ApsaraMQ for Kafka コンソールで、Tablestore シンクコネクタに必要な以下のトピックを手動で作成できます。タスクオフセットトピック、タスク構成トピック、タスクステータストピック、デッドレターキュー、エラートピック。各トピックに必要なパーティションの数とストレージエンジンはさまざまです。詳細については、「ソースサービスに対して構成されたパラメーター」をご参照ください。

  1. ApsaraMQ for Kafka コンソール にログインします。

  2. リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

    重要

    Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに Topic を作成する必要があります。Topic はリージョンを跨いで使用することはできません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされている ECS インスタンス上で実行されている場合、Topic も中国 (北京) リージョンに作成する必要があります。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーション ウィンドウで、トピック管理 をクリックします。

  5. トピック管理 ページで、トピックの作成 をクリックします。

  6. トピックの作成 パネルで、トピックのプロパティを指定し、[OK] をクリックします。

    パラメーター

    説明

    名前

    トピック名。

    demo

    記述

    トピックの説明。

    demo test

    パーティションの数

    トピック内のパーティション数。

    12

    ストレージエンジン

    説明

    Professional Edition インスタンスを使用している場合のみ、ストレージエンジンの種類を指定できます。Standard Edition インスタンスを使用している場合は、デフォルトでクラウドストレージが選択されます。

    トピックにメッセージを格納するために使用されるストレージエンジンの種類。

    ApsaraMQ for Kafka は、以下の種類のストレージエンジンをサポートしています。

    • クラウドストレージ: この値を選択すると、システムはトピックに Alibaba Cloud ディスクを使用し、分散モードで 3 つのレプリカにデータを格納します。このストレージエンジンは、低レイテンシ、高パフォーマンス、長い耐久性、そして高い信頼性を備えています。インスタンスの作成時に 仕様タイプ パラメーターを Standard Edition (High Write) に設定した場合、このパラメーターは クラウドストレージ のみに設定できます。

    • ローカルストレージ: この値を選択すると、システムはオープンソースの Apache Kafka の同期レプリカ (ISR) アルゴリズムを使用し、分散モードで 3 つのレプリカにデータを格納します。

    クラウドストレージ

    メッセージタイプ

    トピックのメッセージタイプ。有効な値:

    • 通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、送信された順序で同じパーティションに格納されます。クラスタ内のブローカーに障害が発生した場合、パーティションに格納されているメッセージの順序は保持されない可能性があります。ストレージエンジン パラメーターを クラウドストレージ に設定すると、このパラメーターは自動的に 通常のメッセージ に設定されます。

    • パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、送信された順序で同じパーティションに格納されます。クラスタ内のブローカーに障害が発生した場合でも、メッセージは送信された順序でパーティションに格納されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。ストレージエンジン パラメーターを ローカルストレージ に設定すると、このパラメーターは自動的に パーティション順位メッセージ に設定されます。

    通常のメッセージ

    ログリリースポリシー

    トピックで使用されるログクリーンアップポリシー。

    ストレージエンジン パラメーターを ローカルストレージ に設定する場合、ログリリースポリシー パラメーターを構成する必要があります。ストレージエンジン パラメーターをローカルストレージに設定できるのは、ApsaraMQ for Kafka プロフェッショナル版インスタンスを使用している場合のみです。

    ApsaraMQ for Kafka は、以下のログクリーンアップポリシーを提供します。

    • Delete: デフォルトのログクリーンアップポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保存期間に基づいて保持されます。ストレージ使用量が 85% を超えると、サービスの可用性を確保するために、システムは最も古いメッセージを削除します。

    • Compact: Apache Kafka で使用されるログ圧縮ポリシー。ログの圧縮により、同じキーを持つメッセージの最新の値が保持されます。このポリシーは、障害が発生したシステムの復元や、システムの再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka Connect または Confluent Schema Registry を使用する場合、システムの状態と構成に関する情報をログ圧縮トピックに格納する必要があります。

      重要

      ログ圧縮トピックは、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。

    Compact

    タグ

    トピックにアタッチするタグ。

    demo

    トピックが作成されると、トピック管理 ページでトピックを表示できます。

Tablestore シンクコネクタに必要な グループ を作成する

ApsaraMQ for Kafka コンソールで、Tablestore シンクコネクタに必要な グループ を手動で作成できます。グループ 名は、connect-タスク名 形式である必要があります。詳細については、「ソースサービスに対して構成されたパラメータ」をご参照ください。

  1. ApsaraMQ for Kafka コンソール にログオンします。

  2. リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションウィンドウで、Group の管理 をクリックします。

  5. Group の管理 ページで、グループの作成 をクリックします。

  6. グループの作成 パネルで、Group ID フィールドにグループ名を入力し、記述 フィールドにグループの説明を入力し、グループにタグをアタッチして、[OK] をクリックします。

    コンシューマーグループが作成されると、Group の管理 ページでコンシューマーグループを表示できます。

Tablestore シンクコネクタの作成とデプロイ

次の手順を実行して、ApsaraMQ for Kafka から Tablestore にデータをエクスポートするために使用できる Tablestore シンクコネクタを作成およびデプロイします。

  1. ApsaraMQ for Kafka コンソール にログインします。

  2. リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. 左側のナビゲーションウィンドウで、Connector タスクリスト をクリックします。

  4. Connector タスクリスト ページで、インスタンスの選択 ドロップダウンリストからコネクタが属するインスタンスを選択し、Connector の作成 をクリックします。

  5. Connector の作成 ウィザードで、次の手順を実行します。

    1. 基本情報の設定 ステップで、画面の指示に従ってパラメーターを構成し、次へ をクリックします。次の表にパラメーターを示します。

      パラメーター

      説明

      名前

      コネクタの名前。次のルールに基づいてコネクタ名を指定します。

      • コネクタ名は 1 ~ 48 文字である必要があります。数字、小文字、およびハイフン (-) を使用できますが、ハイフン (-) で始めることはできません。

      • 各コネクタ名は、ApsaraMQ for Kafka インスタンス内で一意である必要があります。

      コネクタのデータ同期タスクでは、connect-タスク名 形式で名前が付けられたコンシューマーグループを使用する必要があります。このようなコンシューマーグループを作成していない場合は、Message Queue for Apache Kafka によって自動的に作成されます。

      kafka-ts-sink

      インスタンス

      ApsaraMQ for Kafka インスタンスに関する情報。デフォルトでは、インスタンスの名前と ID が表示されます。

      demo alikafka_post-cn-st21p8vj****

    2. ソースサービスの設定 ステップで、ソースサービスとして ApsaraMQ for Kafka を選択し、画面の指示に従ってその他のパラメーターを構成してから、次へ をクリックします。次の表にパラメーターを示します。

      説明

      トピックと グループ を事前に作成している場合は、[リソース作成方法] パラメーターを [手動] に設定し、作成したリソースの名前を入力します。それ以外の場合は、[リソース作成方法] パラメーターを [自動] に設定します。

      表 1. ソースサービス用に構成されたパラメーター

      パラメーター

      説明

      データソース Topic

      データを同期するトピックの名前。

      ts-test-input

      コンシューマースレッドの同時発生数

      ソーストピックからデータを同期するために使用される同時コンシューマースレッドの数。デフォルト値:6。有効な値:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      消費の開始位置

      メッセージ消費を開始するオフセット。有効な値:

      • 一番古いオフセット:最も古いオフセットからメッセージを消費します。

      • 一番新しいオフセット:最新のオフセットからメッセージを消費します。

      一番古いオフセット

      VPC ID

      ソースインスタンスがデプロイされている Virtual Private Cloud(VPC)の ID。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。デフォルトでは、ソースの ApsaraMQ for Kafka インスタンスをデプロイしたときに指定した VPC ID が表示されます。このパラメーターを構成する必要はありません。

      vpc-bp1xpdnd3l***

      VSwitch ID

      ソースインスタンスが関連付けられている vSwitch の ID。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。 vSwitch は、ソースの ApsaraMQ for Kafka インスタンスと同じ VPC 内に存在する必要があります。デフォルトでは、ソースの ApsaraMQ for Kafka インスタンスをデプロイしたときに指定した vSwitch ID が表示されます。

      vsw-bp1d2jgg81***

      失敗の処理

      メッセージの送信に失敗した後、エラーが発生したパーティションへのサブスクリプションを保持するかどうかを指定します。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。有効な値:

      • サブスクリプションの継続:エラーが発生したパーティションへのサブスクリプションを保持し、ログを出力します。

      • サブスクリプションの停止:エラーが発生したパーティションへのサブスクリプションを停止し、ログを出力します。

      説明
      • コネクタログの表示方法については、「コネクタの管理」をご参照ください。

      • エラーコードに基づいてエラーをトラブルシューティングする方法については、「エラーコード」をご参照ください。

      サブスクリプションの継続

      リソースの作成方法

      Tablestore シンクコネクタに必要なトピックと グループ を作成するために使用される方法。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。有効な値:

      • 自動作成

      • 手動で作成します

      自動作成

      Connector コンシューマーグループ

      コネクタのデータ同期タスクで使用される グループ。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。グループ の名前は、connect-タスク名 形式である必要があります。

      connect-cluster-kafka-ots-sink

      タスクサイトの Topic

      コンシューマーオフセットを格納するために使用されるトピック。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。

      • トピック:トピック名。トピック名は connect-offset で始めることをお勧めします。

      • パーティション:トピック内のパーティションの数。このパラメーターは、1 より大きい値に設定する必要があります。

      • ストレージエンジン:トピックで使用されるストレージエンジン。このパラメーターは、ローカルストレージに設定する必要があります。

      • cleanup.policy:トピックで使用されるログクリーンアップポリシー。このパラメーターは、Compact に設定する必要があります。

      connect-offset-kafka-ots-sink

      タスク設定の Topic

      タスク構成を格納するために使用されるトピック。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。

      • トピック:トピック名。トピック名は connect-config で始めることをお勧めします。

      • パーティション:トピック内のパーティションの数。このパラメーターは 1 に設定する必要があります。

      • ストレージエンジン:トピックで使用されるストレージエンジン。このパラメーターは、ローカルストレージに設定する必要があります。

      • cleanup.policy:トピックで使用されるログクリーンアップポリシー。このパラメーターは、Compact に設定する必要があります。

      connect-config-kafka-ots-sink

      タスクステータスの Topic

      タスクステータスを格納するために使用されるトピック。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。

      • トピック:トピック名。トピック名は connect-status で始めることをお勧めします。

      • パーティション:トピック内のパーティションの数。このパラメーターは 6 に設定することをお勧めします。

      • ストレージエンジン:トピックで使用されるストレージエンジン。このパラメーターは、ローカルストレージに設定する必要があります。

      • cleanup.policy:トピックで使用されるログクリーンアップポリシー。このパラメーターは、Compact に設定する必要があります。

      connect-status-kafka-ots-sink

      デッドレターキューの Topic

      Kafka Connect フレームワークのエラーデータを格納するために使用されるトピック。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。トピックリソースを節約するために、トピックを作成し、そのトピックをデッドレターキューのトピックと エラーデータのトピック の両方として使用できます。

      • トピック:トピック名。トピック名は connect-error で始めることをお勧めします。

      • パーティション:トピック内のパーティションの数。このパラメーターは 6 に設定することをお勧めします。

      • ストレージエンジン:トピックで使用されるストレージエンジン。このパラメーターは、ローカルストレージまたはクラウドストレージに設定できます。

      connect-error-kafka-ots-sink

      例外データ Topic

      コネクタのエラーデータを格納するために使用されるトピック。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。トピックリソースを節約するために、トピックを作成し、そのトピックを デッドレターキューのトピック とエラーデータのトピックの両方として使用できます。

      • トピック:トピック名。トピック名は connect-error で始めることをお勧めします。

      • パーティション:トピック内のパーティションの数。このパラメーターは 6 に設定することをお勧めします。

      • ストレージエンジン:トピックで使用されるストレージエンジン。このパラメーターは、ローカルストレージまたはクラウドストレージに設定できます。

      connect-error-kafka-ots-sink

    3. ターゲットサービスの設定 ステップで、宛先サービスとして Tablestore を選択し、画面の指示に従ってその他のパラメーターを構成してから、作成 をクリックします。次の表にパラメーターを示します。

      パラメーター

      説明

      インスタンス名

      Tablestore インスタンスの名前。

      k00eny67****

      宛先テーブルの自動作成

      Tablestore に宛先テーブルを自動的に作成するかどうかを指定します。有効な値:

      • はい:同期データを格納するために Tablestore にテーブルを自動的に作成します。テーブルのカスタム名を指定できます。

      • いいえ:既存のテーブルを使用して同期データを格納します。

      はい

      宛先テーブル名

      同期データを格納するテーブルの名前。宛先テーブルの自動作成 パラメーターを いいえ に設定した場合は、入力したテーブル名が Tablestore インスタンス内の既存のテーブルの名前と同じであることを確認してください。

      kafka_table

      Tablestore

      同期データを格納するテーブルのタイプ。有効な値:

      • ワイドカラムモデル

      • 時系列モデル

      ワイドカラムモデル

      メッセージキー形式

      メッセージキーの形式。有効な値:文字列 および JSON。デフォルト値:JSON。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。

      • 文字列:メッセージキーは文字列として解析されます。

      • JSON:メッセージキーは JSON 形式である必要があります。

      文字列

      メッセージ値の形式

      メッセージ値の形式。有効な値:文字列 および JSON。デフォルト値:JSON。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。

      • 文字列:メッセージ値は文字列として解析されます。

      • JSON:メッセージ値は JSON 形式である必要があります。

      文字列

      JSON メッセージフィールド変換

      JSON メッセージのフィールド処理方法。このパラメーターは、メッセージキー形式 または メッセージ値の形式 パラメーターを JSON に設定した場合にのみ表示されます。有効な値:

      • すべて文字列として書き込む:すべてのフィールドは Tablestore で文字列に変換されます。

      • フィールドタイプを自動的に識別する:JSON 形式のメッセージ本文の文字列フィールドとブールフィールドは、それぞれ Tablestore の文字列フィールドとブールフィールドに変換されます。JSON 形式のメッセージ本文の整数型と浮動小数点型のデータは、Tablestore の倍精度浮動小数点型のデータに変換されます。

      すべて文字列として書き込む

      主キーモード

      プライマリキーモード。データテーブルのプライマリキーは、ApsaraMQ for Kafka メッセージレコードのさまざまな部分から抽出できます。これには、メッセージレコードの座標(トピック、パーティション、オフセット)、キー、値が含まれます。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。デフォルト値:kafka。有効な値:

      • kafka:<connect_topic>_<connect_partition> と <connect_offset> をデータテーブルのプライマリキーとして使用します。

      • record_key:メッセージキーのフィールドをデータテーブルのプライマリキーとして使用します。

      • record_value:メッセージ値のフィールドをデータテーブルのプライマリキーとして使用します。

      kafka

      プライマリキー列名の設定

      プライマリキー列の名前と対応するデータ型。列名は、メッセージキーまたはメッセージ値から抽出されたフィールドを指定します。フィールドのデータ型を文字列または整数に設定できます。

      このパラメーターは、メッセージキー形式 パラメーターを JSON に、主キーモード パラメーターを record_key に設定した場合に表示されます。このパラメーターは、メッセージ値の形式 パラメーターを JSON に、主キーモード パラメーターを record_value に設定した場合にも表示されます。

      追加 をクリックして、列名を追加できます。最大 4 つの列名を構成できます。

      なし

      書き込みモード

      書き込みモード。有効な値:put および update。デフォルト値:put。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。

      • put:新しいデータはテーブル内の元のデータを上書きします。

      • update:新しいデータがテーブルに追加され、元のデータは保持されます。

      put

      削除モード

      ApsaraMQ for Kafka メッセージレコードに空の値が含まれている場合に、行または属性列を削除するかどうかを指定します。このパラメーターは、主キーモード パラメーターを record_key に設定した場合にのみ表示されます。有効な値:

      • なし:行または属性列を削除することはできません。これがデフォルト値です。

      • :行を削除できます。

      • :属性列を削除できます。

      • 行と列:行と属性列の両方を削除できます。

      削除操作は書き込みモードによって異なります。

      • 書き込みモード パラメーターを put に設定した場合は、[削除モード] パラメーターを任意の値に設定します。メッセージレコードに空の値が含まれている場合でも、メッセージレコードのデータは上書きモードで Tablestore テーブルにエクスポートされます。

      • 書き込みモード パラメーターを update に設定した場合は、[削除モード] パラメーターを なし または に設定します。メッセージレコードのすべてのフィールドが空の場合、メッセージレコードはダーティデータとして処理されます。メッセージレコードの特定のフィールドが空の場合、空の値は自動的にスキップされ、その他の空でない値が Tablestore テーブルに追加されます。[削除モード] パラメーターを または 行と列 に設定し、メッセージレコードに空の値が含まれている場合、システムは空の値に対応する属性列、または行と属性列の両方を削除します。その後、残りのデータが Tablestore テーブルに追加されます。

      なし

      メジャー名フィールド

      Tablestore 時系列モデルのメトリック名フィールド(_m_name)としてフィールドをマップします。メトリック名フィールドは、温度や速度など、時系列データによって測定される物理量またはモニタリングメトリックを示します。空の値はサポートされていません。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。

      measurement

      データソースフィールド

      Tablestore 時系列モデルのデータソースフィールド(_data_source)としてフィールドをマップします。データソースフィールドは、特定の時系列データのソース(サーバー名やデバイス ID など)を示します。空の値はサポートされています。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。

      source

      ラベルフィールド

      1 つ以上のフィールドを Tablestore の時系列モデルのタグフィールド(_tags)として使用します。各タグは、文字列型のキーと値のペアです。キーは構成されたフィールド名で、値はフィールド値です。タグはタイムラインメタデータの一部です。タイムラインは、メトリック名、データソース、およびタグで構成されます。空の値はサポートされています。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。

      tag1, tag2

      タイムスタンプフィールド

      Tablestore 時系列モデルのタイムスタンプフィールド(_time)としてフィールドをマップします。タイムスタンプフィールドは、行の時系列データに対応する時点(物理量が生成された時刻など)を示します。データが Tablestore に書き込まれると、タイムスタンプフィールドはマイクロ秒単位の値に変換されます。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。

      time

      タイムスタンプ単位

      タイムスタンプの構成に基づいてこのパラメーターを構成します。このパラメーターは、Tablestore パラメーターを時系列モデルに設定した場合にのみ表示されます。有効な値:

      • ミリ秒

      • マイクロ秒

      • ナノ秒

      ミリ秒

      すべての非主キーフィールドのマッピングの設定を行うかどうか

      すべての非プライマリキーフィールドをデータフィールドとしてマップするかどうかを指定します。プライマリキーフィールドは、メトリック名、データソース、タグ、またはタイムスタンプとしてマップされているフィールドです。このパラメーターは、Tablestore パラメーターを時系列モデルに設定した場合にのみ表示されます。有効な値:

      • はい:フィールドは自動的にマップされ、データ型は自動的に決定されます。数値はすべて倍精度浮動小数点データに変換されます。

      • いいえ:マップするフィールドとデータ型を指定する必要があります。

      はい

      すべての非主キーフィールドのマッピングの設定

      Tablestore 時系列モデルの非プライマリキーフィールドの名前に対応するフィールドタイプ。サポートされているデータ型は、倍精度浮動小数点数、整数、文字列、バイナリ、およびブール値です。このパラメーターは、すべての非主キーフィールドのマッピングの設定を行うかどうか パラメーターを [いいえ] に設定した場合にのみ表示されます。

      文字列

      コネクタを作成した後、Connector タスクリスト ページでコネクタを表示できます。

  6. Connector タスクリスト ページに移動し、作成したコネクタを見つけて、デプロイ操作 列の をクリックします。

  7. [OK] をクリックします。

テストメッセージを送信する

Tablestore シンクコネクタをデプロイした後、ApsaraMQ for Kafka Topic にメッセージを送信して、メッセージが Tablestore テーブルに同期されるかどうかをテストできます。

  1. Connector タスクリスト ページで、管理するコネクタを見つけ、テスト操作 列の をクリックします。

  2. メッセージの送信 パネルで、テストメッセージを送信するためのパラメーターを設定します。

    • 送信方法 パラメーターを コンソール に設定した場合は、次の手順を実行します。

      1. メッセージキー フィールドに、メッセージキーを入力します。例:demo。

      2. メッセージの内容 フィールドに、メッセージコンテンツを入力します。例:{"key": "test"}。

      3. 指定されたパーティションに送信 パラメーターを設定して、テストメッセージを特定のパーティションに送信するかどうかを指定します。

        • テストメッセージを特定のパーティションに送信する場合は、はい をクリックし、パーティション ID フィールドにパーティション ID を入力します。例:0。パーティション ID をクエリする方法については、「パーティションステータスの表示」をご参照ください。

        • テストメッセージを特定のパーティションに送信しない場合は、いいえ をクリックします。

    • 送信方法 パラメーターを [docker] に設定した場合は、Docker コンテナーを実行してサンプルメッセージを生成する セクションの Docker コマンドを実行して、テストメッセージを送信します。

    • 送信方法 パラメーターを [SDK] に設定した場合は、必要なプログラミング言語またはフレームワークの SDK と、テストメッセージを送受信するためのアクセス方法を選択します。

テーブル内のデータを表示する

ApsaraMQ for Kafka Topic にメッセージを送信した後、Tablestore コンソールでメッセージが受信されているかどうかを確認できます。Tablestore テーブル内のデータを表示するには、次の手順を実行します。

  1. Tablestore コンソール にログオンします。

  2. [概要] ページで、必要なインスタンスの名前をクリックするか、必要なインスタンスの [アクション] 列の [インスタンスの管理] をクリックします。

  3. [インスタンスの詳細] タブの [テーブル] セクションで、管理するテーブルを見つけます。查看数据表

  4. テーブルの名前をクリックします。[テーブルの管理] ページの [データのクエリ] タブで、テーブル内のデータを表示します。查看表数据