このトピックでは、Tablestore シンクコネクタを作成して、ApsaraMQ for Kafka インスタンスのトピックから Tablestore インスタンスのテーブルにデータを同期する方法について説明します。
前提条件
ApsaraMQ for Kafka
ApsaraMQ for Kafka インスタンスでコネクタ機能が有効になっていること。詳細については、「コネクタ機能を有効にする」をご参照ください。
ApsaraMQ for Kafka インスタンスで、データソースとしてトピックが作成されていること。詳細については、「ステップ 1: トピックを作成する」をご参照ください。
Tablestore
Tablestore がアクティブ化され、インスタンスが作成されていること。詳細については、「Tablestore をアクティブ化してインスタンスを作成する」をご参照ください。
使用上の注意
ApsaraMQ for Kafka インスタンスのトピックからデータをエクスポートできるのは、ApsaraMQ for Kafka インスタンスと同じリージョンにある Tablestore インスタンスのテーブルのみです。コネクタの制限については、「制限」をご参照ください。
Tablestore シンクコネクタを作成する場合、サービスロールが必要です。
サービスロールが作成されていない場合、ApsaraMQ for Kafka によって自動的にロールが作成されます。このようにして、ApsaraMQ for Kafka から Tablestore にデータを想定どおりにエクスポートできます。
サービスロールが作成されている場合、ApsaraMQ for Kafka はロールを再度作成しません。
詳細については、「サービスロール」をご参照ください。
プロセス
このセクションでは、Tablestore シンクコネクタを使用して、ApsaraMQ for Kafka インスタンスのトピックから Tablestore インスタンスのテーブルにデータをエクスポートする方法について説明します。
(オプション) Tablestore シンクコネクタに必要なトピックと グループ を作成します。
トピックと グループ を手動で作成したくない場合は、この手順をスキップし、次の手順で [リソース作成方法] パラメーターを [自動] に設定します。
重要Tablestore シンクコネクタに必要な特定のトピックでは、ローカル記憶域を使用する必要があります。ApsaraMQ for Kafka インスタンスのメジャーバージョンが 0.10.2 の場合、ローカル記憶域を使用するトピックを手動で作成することはできません。メジャーバージョン 0.10.2 では、これらのトピックは自動的に作成する必要があります。
結果を確認する
Tablestore シンクコネクタに必要なトピックを作成する
ApsaraMQ for Kafka コンソールで、Tablestore シンクコネクタに必要な以下のトピックを手動で作成できます。タスクオフセットトピック、タスク構成トピック、タスクステータストピック、デッドレターキュー、エラートピック。各トピックに必要なパーティションの数とストレージエンジンはさまざまです。詳細については、「ソースサービスに対して構成されたパラメーター」をご参照ください。
ApsaraMQ for Kafka コンソール にログインします。
リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
重要Elastic Compute Service (ECS) インスタンスがデプロイされているリージョンに Topic を作成する必要があります。Topic はリージョンを跨いで使用することはできません。たとえば、メッセージのプロデューサーとコンシューマーが中国 (北京) リージョンにデプロイされている ECS インスタンス上で実行されている場合、Topic も中国 (北京) リージョンに作成する必要があります。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーション ウィンドウで、トピック管理 をクリックします。
トピック管理 ページで、トピックの作成 をクリックします。
トピックの作成 パネルで、トピックのプロパティを指定し、[OK] をクリックします。
パラメーター
説明
例
名前
トピック名。
demo
記述
トピックの説明。
demo test
パーティションの数
トピック内のパーティション数。
12
ストレージエンジン
説明Professional Edition インスタンスを使用している場合のみ、ストレージエンジンの種類を指定できます。Standard Edition インスタンスを使用している場合は、デフォルトでクラウドストレージが選択されます。
トピックにメッセージを格納するために使用されるストレージエンジンの種類。
ApsaraMQ for Kafka は、以下の種類のストレージエンジンをサポートしています。
クラウドストレージ: この値を選択すると、システムはトピックに Alibaba Cloud ディスクを使用し、分散モードで 3 つのレプリカにデータを格納します。このストレージエンジンは、低レイテンシ、高パフォーマンス、長い耐久性、そして高い信頼性を備えています。インスタンスの作成時に 仕様タイプ パラメーターを Standard Edition (High Write) に設定した場合、このパラメーターは クラウドストレージ のみに設定できます。
ローカルストレージ: この値を選択すると、システムはオープンソースの Apache Kafka の同期レプリカ (ISR) アルゴリズムを使用し、分散モードで 3 つのレプリカにデータを格納します。
クラウドストレージ
メッセージタイプ
トピックのメッセージタイプ。有効な値:
通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、送信された順序で同じパーティションに格納されます。クラスタ内のブローカーに障害が発生した場合、パーティションに格納されているメッセージの順序は保持されない可能性があります。ストレージエンジン パラメーターを クラウドストレージ に設定すると、このパラメーターは自動的に 通常のメッセージ に設定されます。
パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、送信された順序で同じパーティションに格納されます。クラスタ内のブローカーに障害が発生した場合でも、メッセージは送信された順序でパーティションに格納されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。ストレージエンジン パラメーターを ローカルストレージ に設定すると、このパラメーターは自動的に パーティション順位メッセージ に設定されます。
通常のメッセージ
ログリリースポリシー
トピックで使用されるログクリーンアップポリシー。
ストレージエンジン パラメーターを ローカルストレージ に設定する場合、ログリリースポリシー パラメーターを構成する必要があります。ストレージエンジン パラメーターをローカルストレージに設定できるのは、ApsaraMQ for Kafka プロフェッショナル版インスタンスを使用している場合のみです。
ApsaraMQ for Kafka は、以下のログクリーンアップポリシーを提供します。
Delete: デフォルトのログクリーンアップポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保存期間に基づいて保持されます。ストレージ使用量が 85% を超えると、サービスの可用性を確保するために、システムは最も古いメッセージを削除します。
Compact: Apache Kafka で使用されるログ圧縮ポリシー。ログの圧縮により、同じキーを持つメッセージの最新の値が保持されます。このポリシーは、障害が発生したシステムの復元や、システムの再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka Connect または Confluent Schema Registry を使用する場合、システムの状態と構成に関する情報をログ圧縮トピックに格納する必要があります。
重要ログ圧縮トピックは、Kafka Connect や Confluent Schema Registry などの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。
Compact
タグ
トピックにアタッチするタグ。
demo
トピックが作成されると、トピック管理 ページでトピックを表示できます。
Tablestore シンクコネクタに必要な グループ を作成する
ApsaraMQ for Kafka コンソールで、Tablestore シンクコネクタに必要な グループ を手動で作成できます。グループ 名は、connect-タスク名 形式である必要があります。詳細については、「ソースサービスに対して構成されたパラメータ」をご参照ください。
ApsaraMQ for Kafka コンソール にログオンします。
リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
インスタンスリスト ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、Group の管理 をクリックします。
Group の管理 ページで、グループの作成 をクリックします。
グループの作成 パネルで、Group ID フィールドにグループ名を入力し、記述 フィールドにグループの説明を入力し、グループにタグをアタッチして、[OK] をクリックします。
コンシューマーグループが作成されると、Group の管理 ページでコンシューマーグループを表示できます。
Tablestore シンクコネクタの作成とデプロイ
次の手順を実行して、ApsaraMQ for Kafka から Tablestore にデータをエクスポートするために使用できる Tablestore シンクコネクタを作成およびデプロイします。
ApsaraMQ for Kafka コンソール にログインします。
リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、Connector タスクリスト をクリックします。
Connector タスクリスト ページで、インスタンスの選択 ドロップダウンリストからコネクタが属するインスタンスを選択し、Connector の作成 をクリックします。
Connector の作成 ウィザードで、次の手順を実行します。
基本情報の設定 ステップで、画面の指示に従ってパラメーターを構成し、次へ をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
名前
コネクタの名前。次のルールに基づいてコネクタ名を指定します。
コネクタ名は 1 ~ 48 文字である必要があります。数字、小文字、およびハイフン (-) を使用できますが、ハイフン (-) で始めることはできません。
各コネクタ名は、ApsaraMQ for Kafka インスタンス内で一意である必要があります。
コネクタのデータ同期タスクでは、connect-タスク名 形式で名前が付けられたコンシューマーグループを使用する必要があります。このようなコンシューマーグループを作成していない場合は、Message Queue for Apache Kafka によって自動的に作成されます。
kafka-ts-sink
インスタンス
ApsaraMQ for Kafka インスタンスに関する情報。デフォルトでは、インスタンスの名前と ID が表示されます。
demo alikafka_post-cn-st21p8vj****
ソースサービスの設定 ステップで、ソースサービスとして ApsaraMQ for Kafka を選択し、画面の指示に従ってその他のパラメーターを構成してから、次へ をクリックします。次の表にパラメーターを示します。
説明トピックと グループ を事前に作成している場合は、[リソース作成方法] パラメーターを [手動] に設定し、作成したリソースの名前を入力します。それ以外の場合は、[リソース作成方法] パラメーターを [自動] に設定します。
表 1. ソースサービス用に構成されたパラメーター
パラメーター
説明
例
データソース Topic
データを同期するトピックの名前。
ts-test-input
コンシューマースレッドの同時発生数
ソーストピックからデータを同期するために使用される同時コンシューマースレッドの数。デフォルト値:6。有効な値:
1
2
3
6
12
6
消費の開始位置
メッセージ消費を開始するオフセット。有効な値:
一番古いオフセット:最も古いオフセットからメッセージを消費します。
一番新しいオフセット:最新のオフセットからメッセージを消費します。
一番古いオフセット
VPC ID
ソースインスタンスがデプロイされている Virtual Private Cloud(VPC)の ID。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。デフォルトでは、ソースの ApsaraMQ for Kafka インスタンスをデプロイしたときに指定した VPC ID が表示されます。このパラメーターを構成する必要はありません。
vpc-bp1xpdnd3l***
VSwitch ID
ソースインスタンスが関連付けられている vSwitch の ID。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。 vSwitch は、ソースの ApsaraMQ for Kafka インスタンスと同じ VPC 内に存在する必要があります。デフォルトでは、ソースの ApsaraMQ for Kafka インスタンスをデプロイしたときに指定した vSwitch ID が表示されます。
vsw-bp1d2jgg81***
失敗の処理
メッセージの送信に失敗した後、エラーが発生したパーティションへのサブスクリプションを保持するかどうかを指定します。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。有効な値:
サブスクリプションの継続:エラーが発生したパーティションへのサブスクリプションを保持し、ログを出力します。
サブスクリプションの停止:エラーが発生したパーティションへのサブスクリプションを停止し、ログを出力します。
サブスクリプションの継続
リソースの作成方法
Tablestore シンクコネクタに必要なトピックと グループ を作成するために使用される方法。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。有効な値:
自動作成
手動で作成します
自動作成
Connector コンシューマーグループ
コネクタのデータ同期タスクで使用される グループ。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。グループ の名前は、connect-タスク名 形式である必要があります。
connect-cluster-kafka-ots-sink
タスクサイトの Topic
コンシューマーオフセットを格納するために使用されるトピック。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。
トピック:トピック名。トピック名は connect-offset で始めることをお勧めします。
パーティション:トピック内のパーティションの数。このパラメーターは、1 より大きい値に設定する必要があります。
ストレージエンジン:トピックで使用されるストレージエンジン。このパラメーターは、ローカルストレージに設定する必要があります。
cleanup.policy:トピックで使用されるログクリーンアップポリシー。このパラメーターは、Compact に設定する必要があります。
connect-offset-kafka-ots-sink
タスク設定の Topic
タスク構成を格納するために使用されるトピック。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。
トピック:トピック名。トピック名は connect-config で始めることをお勧めします。
パーティション:トピック内のパーティションの数。このパラメーターは 1 に設定する必要があります。
ストレージエンジン:トピックで使用されるストレージエンジン。このパラメーターは、ローカルストレージに設定する必要があります。
cleanup.policy:トピックで使用されるログクリーンアップポリシー。このパラメーターは、Compact に設定する必要があります。
connect-config-kafka-ots-sink
タスクステータスの Topic
タスクステータスを格納するために使用されるトピック。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。
トピック:トピック名。トピック名は connect-status で始めることをお勧めします。
パーティション:トピック内のパーティションの数。このパラメーターは 6 に設定することをお勧めします。
ストレージエンジン:トピックで使用されるストレージエンジン。このパラメーターは、ローカルストレージに設定する必要があります。
cleanup.policy:トピックで使用されるログクリーンアップポリシー。このパラメーターは、Compact に設定する必要があります。
connect-status-kafka-ots-sink
デッドレターキューの Topic
Kafka Connect フレームワークのエラーデータを格納するために使用されるトピック。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。トピックリソースを節約するために、トピックを作成し、そのトピックをデッドレターキューのトピックと エラーデータのトピック の両方として使用できます。
トピック:トピック名。トピック名は connect-error で始めることをお勧めします。
パーティション:トピック内のパーティションの数。このパラメーターは 6 に設定することをお勧めします。
ストレージエンジン:トピックで使用されるストレージエンジン。このパラメーターは、ローカルストレージまたはクラウドストレージに設定できます。
connect-error-kafka-ots-sink
例外データ Topic
コネクタのエラーデータを格納するために使用されるトピック。このパラメーターは、実行環境の設定 をクリックした後にのみ表示されます。トピックリソースを節約するために、トピックを作成し、そのトピックを デッドレターキューのトピック とエラーデータのトピックの両方として使用できます。
トピック:トピック名。トピック名は connect-error で始めることをお勧めします。
パーティション:トピック内のパーティションの数。このパラメーターは 6 に設定することをお勧めします。
ストレージエンジン:トピックで使用されるストレージエンジン。このパラメーターは、ローカルストレージまたはクラウドストレージに設定できます。
connect-error-kafka-ots-sink
ターゲットサービスの設定 ステップで、宛先サービスとして Tablestore を選択し、画面の指示に従ってその他のパラメーターを構成してから、作成 をクリックします。次の表にパラメーターを示します。
パラメーター
説明
例
インスタンス名
Tablestore インスタンスの名前。
k00eny67****
宛先テーブルの自動作成
Tablestore に宛先テーブルを自動的に作成するかどうかを指定します。有効な値:
はい:同期データを格納するために Tablestore にテーブルを自動的に作成します。テーブルのカスタム名を指定できます。
いいえ:既存のテーブルを使用して同期データを格納します。
はい
宛先テーブル名
同期データを格納するテーブルの名前。宛先テーブルの自動作成 パラメーターを いいえ に設定した場合は、入力したテーブル名が Tablestore インスタンス内の既存のテーブルの名前と同じであることを確認してください。
kafka_table
Tablestore
同期データを格納するテーブルのタイプ。有効な値:
ワイドカラムモデル
時系列モデル
ワイドカラムモデル
メッセージキー形式
メッセージキーの形式。有効な値:文字列 および JSON。デフォルト値:JSON。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。
文字列:メッセージキーは文字列として解析されます。
JSON:メッセージキーは JSON 形式である必要があります。
文字列
メッセージ値の形式
メッセージ値の形式。有効な値:文字列 および JSON。デフォルト値:JSON。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。
文字列:メッセージ値は文字列として解析されます。
JSON:メッセージ値は JSON 形式である必要があります。
文字列
JSON メッセージフィールド変換
JSON メッセージのフィールド処理方法。このパラメーターは、メッセージキー形式 または メッセージ値の形式 パラメーターを JSON に設定した場合にのみ表示されます。有効な値:
すべて文字列として書き込む:すべてのフィールドは Tablestore で文字列に変換されます。
フィールドタイプを自動的に識別する:JSON 形式のメッセージ本文の文字列フィールドとブールフィールドは、それぞれ Tablestore の文字列フィールドとブールフィールドに変換されます。JSON 形式のメッセージ本文の整数型と浮動小数点型のデータは、Tablestore の倍精度浮動小数点型のデータに変換されます。
すべて文字列として書き込む
主キーモード
プライマリキーモード。データテーブルのプライマリキーは、ApsaraMQ for Kafka メッセージレコードのさまざまな部分から抽出できます。これには、メッセージレコードの座標(トピック、パーティション、オフセット)、キー、値が含まれます。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。デフォルト値:kafka。有効な値:
kafka:<connect_topic>_<connect_partition> と <connect_offset> をデータテーブルのプライマリキーとして使用します。
record_key:メッセージキーのフィールドをデータテーブルのプライマリキーとして使用します。
record_value:メッセージ値のフィールドをデータテーブルのプライマリキーとして使用します。
kafka
プライマリキー列名の設定
プライマリキー列の名前と対応するデータ型。列名は、メッセージキーまたはメッセージ値から抽出されたフィールドを指定します。フィールドのデータ型を文字列または整数に設定できます。
このパラメーターは、メッセージキー形式 パラメーターを JSON に、主キーモード パラメーターを record_key に設定した場合に表示されます。このパラメーターは、メッセージ値の形式 パラメーターを JSON に、主キーモード パラメーターを record_value に設定した場合にも表示されます。
追加 をクリックして、列名を追加できます。最大 4 つの列名を構成できます。
なし
書き込みモード
書き込みモード。有効な値:put および update。デフォルト値:put。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。
put:新しいデータはテーブル内の元のデータを上書きします。
update:新しいデータがテーブルに追加され、元のデータは保持されます。
put
削除モード
ApsaraMQ for Kafka メッセージレコードに空の値が含まれている場合に、行または属性列を削除するかどうかを指定します。このパラメーターは、主キーモード パラメーターを record_key に設定した場合にのみ表示されます。有効な値:
なし:行または属性列を削除することはできません。これがデフォルト値です。
行:行を削除できます。
列:属性列を削除できます。
行と列:行と属性列の両方を削除できます。
削除操作は書き込みモードによって異なります。
書き込みモード パラメーターを put に設定した場合は、[削除モード] パラメーターを任意の値に設定します。メッセージレコードに空の値が含まれている場合でも、メッセージレコードのデータは上書きモードで Tablestore テーブルにエクスポートされます。
書き込みモード パラメーターを update に設定した場合は、[削除モード] パラメーターを なし または 行 に設定します。メッセージレコードのすべてのフィールドが空の場合、メッセージレコードはダーティデータとして処理されます。メッセージレコードの特定のフィールドが空の場合、空の値は自動的にスキップされ、その他の空でない値が Tablestore テーブルに追加されます。[削除モード] パラメーターを 列 または 行と列 に設定し、メッセージレコードに空の値が含まれている場合、システムは空の値に対応する属性列、または行と属性列の両方を削除します。その後、残りのデータが Tablestore テーブルに追加されます。
なし
メジャー名フィールド
Tablestore 時系列モデルのメトリック名フィールド(_m_name)としてフィールドをマップします。メトリック名フィールドは、温度や速度など、時系列データによって測定される物理量またはモニタリングメトリックを示します。空の値はサポートされていません。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。
measurement
データソースフィールド
Tablestore 時系列モデルのデータソースフィールド(_data_source)としてフィールドをマップします。データソースフィールドは、特定の時系列データのソース(サーバー名やデバイス ID など)を示します。空の値はサポートされています。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。
source
ラベルフィールド
1 つ以上のフィールドを Tablestore の時系列モデルのタグフィールド(_tags)として使用します。各タグは、文字列型のキーと値のペアです。キーは構成されたフィールド名で、値はフィールド値です。タグはタイムラインメタデータの一部です。タイムラインは、メトリック名、データソース、およびタグで構成されます。空の値はサポートされています。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。
tag1, tag2
タイムスタンプフィールド
Tablestore 時系列モデルのタイムスタンプフィールド(_time)としてフィールドをマップします。タイムスタンプフィールドは、行の時系列データに対応する時点(物理量が生成された時刻など)を示します。データが Tablestore に書き込まれると、タイムスタンプフィールドはマイクロ秒単位の値に変換されます。このパラメーターは、Tablestore パラメーターをワイドカラムモデルに設定した場合にのみ表示されます。
time
タイムスタンプ単位
タイムスタンプの構成に基づいてこのパラメーターを構成します。このパラメーターは、Tablestore パラメーターを時系列モデルに設定した場合にのみ表示されます。有効な値:
秒
ミリ秒
マイクロ秒
ナノ秒
ミリ秒
すべての非主キーフィールドのマッピングの設定を行うかどうか
すべての非プライマリキーフィールドをデータフィールドとしてマップするかどうかを指定します。プライマリキーフィールドは、メトリック名、データソース、タグ、またはタイムスタンプとしてマップされているフィールドです。このパラメーターは、Tablestore パラメーターを時系列モデルに設定した場合にのみ表示されます。有効な値:
はい:フィールドは自動的にマップされ、データ型は自動的に決定されます。数値はすべて倍精度浮動小数点データに変換されます。
いいえ:マップするフィールドとデータ型を指定する必要があります。
はい
すべての非主キーフィールドのマッピングの設定
Tablestore 時系列モデルの非プライマリキーフィールドの名前に対応するフィールドタイプ。サポートされているデータ型は、倍精度浮動小数点数、整数、文字列、バイナリ、およびブール値です。このパラメーターは、すべての非主キーフィールドのマッピングの設定を行うかどうか パラメーターを [いいえ] に設定した場合にのみ表示されます。
文字列
コネクタを作成した後、Connector タスクリスト ページでコネクタを表示できます。
Connector タスクリスト ページに移動し、作成したコネクタを見つけて、デプロイ操作 列の をクリックします。
[OK] をクリックします。
テストメッセージを送信する
Tablestore シンクコネクタをデプロイした後、ApsaraMQ for Kafka Topic にメッセージを送信して、メッセージが Tablestore テーブルに同期されるかどうかをテストできます。
Connector タスクリスト ページで、管理するコネクタを見つけ、テスト操作 列の をクリックします。
メッセージの送信 パネルで、テストメッセージを送信するためのパラメーターを設定します。
送信方法 パラメーターを コンソール に設定した場合は、次の手順を実行します。
メッセージキー フィールドに、メッセージキーを入力します。例:demo。
メッセージの内容 フィールドに、メッセージコンテンツを入力します。例:{"key": "test"}。
指定されたパーティションに送信 パラメーターを設定して、テストメッセージを特定のパーティションに送信するかどうかを指定します。
テストメッセージを特定のパーティションに送信する場合は、はい をクリックし、パーティション ID フィールドにパーティション ID を入力します。例:0。パーティション ID をクエリする方法については、「パーティションステータスの表示」をご参照ください。
テストメッセージを特定のパーティションに送信しない場合は、いいえ をクリックします。
送信方法 パラメーターを [docker] に設定した場合は、Docker コンテナーを実行してサンプルメッセージを生成する セクションの Docker コマンドを実行して、テストメッセージを送信します。
送信方法 パラメーターを [SDK] に設定した場合は、必要なプログラミング言語またはフレームワークの SDK と、テストメッセージを送受信するためのアクセス方法を選択します。
テーブル内のデータを表示する
ApsaraMQ for Kafka Topic にメッセージを送信した後、Tablestore コンソールでメッセージが受信されているかどうかを確認できます。Tablestore テーブル内のデータを表示するには、次の手順を実行します。
Tablestore コンソール にログオンします。
[概要] ページで、必要なインスタンスの名前をクリックするか、必要なインスタンスの [アクション] 列の [インスタンスの管理] をクリックします。
[インスタンスの詳細] タブの [テーブル] セクションで、管理するテーブルを見つけます。
テーブルの名前をクリックします。[テーブルの管理] ページの [データのクエリ] タブで、テーブル内のデータを表示します。