ApsaraMQ for Kafka は、ApsaraMQ for Kafka コンソールでオフセットまたは時点別にメッセージを照会するためのメッセージ照会機能を提供します。この機能がお客様のメッセージ照会要件を満たせない場合は、ApsaraMQ for Kafka が提供するメッセージ取得機能を使用できます。メッセージ取得機能を使用すると、パーティション、オフセット範囲、時間範囲、およびメッセージキーと値のキーワードでメッセージを取得できます。このトピックでは、メッセージ取得機能を有効にし、メッセージ取得タスクの検索条件を指定する方法について説明します。また、メッセージ取得タスクを一時停止、再開、および削除する方法についても説明します。
前提条件
ApsaraMQ for Kafka インスタンスでデータソースとしてトピックが作成されていること。詳細については、「ステップ 1:トピックの作成」をご参照ください。
説明Serverless ApsaraMQ for Kafka インスタンスは、メッセージ取得機能をサポートしていません。
メッセージ取得機能は、ApsaraMQ for Kafka インスタンスが存在するリージョンで使用できます。メッセージ取得機能が利用可能なリージョンについては、「サポートされているリージョン」をご参照ください。
背景情報
ApsaraMQ for Kafka のメッセージ取得機能は、ApsaraMQ for Kafka のコネクタ機能と Tablestore のインデックス機能に基づいています。具体的には、ソース トピックのメッセージはコネクタにダンプされ、Tablestore テーブルに転送されます。その後、Tablestore が提供するインデックス機能を使用して、メッセージを取得できます。
メッセージ取得機能を有効にすることで、実際には ApsaraMQ for Kafka から Tablestore にデータを同期するために使用されるコネクタを作成します。コネクタ名は ots-ms-{トピック名}-{6 つのランダムな文字} 形式です。メッセージ取得タスクの詳細を表示する方法については、「メッセージ取得タスクの詳細を表示する」をご参照ください。
メッセージ取得機能を初めて有効にすると、ApsaraMQ for Kafka は Tablestore を自動的にアクティブ化し、同じリージョンに Tablestore インスタンスとテーブルを作成します。メッセージ取得機能が有効になっているトピックごとに、1 つの Tablestore テーブルが作成されます。自動的に作成される Tablestore インスタンスとテーブルの名前は、次の形式です。
インスタンス名:kfk-{ApsaraMQ for Kafka インスタンスの名前の最後の 12 文字}
テーブル名:{トピック名}:kafka_topic_{トピック名}_{6 つのランダムな文字}
メッセージ取得機能が有効になっている各トピックについて、ApsaraMQ for Kafka は対応するインスタンスに 4 つのトピックと 2 つの グループ を自動的に作成します。トピックとグループは、コネクタの構成とステータスを記録するために使用されます。作成されたトピックとグループの名前は、次の形式です。
コネクタのコンシューマー オフセットを記録するために使用されるトピック:connect-offset-{コネクタ名}
コネクタの構成を記録するために使用されるトピック:connect-config-{コネクタ名}
コネクタのステータスを記録するために使用されるトピック:connect-status-{コネクタ名}
デッドレター キューと例外のデータを記録するために使用されるトピック:connect-error-{コネクタ名}
コネクタのコンシューマー グループ:connect-{コネクタ名}
タスク クラスタのグループ:connect-cluster-{タスク名}
課金
ApsaraMQ for Kafka のメッセージ取得機能はパブリック プレビュー中です。ApsaraMQ for Kafka は、パブリック プレビュー中はメッセージ取得機能に対して課金されません。
メッセージ取得機能のパブリック プレビュー中は、Tablestore に自動的に作成されるインスタンスとテーブルに対して課金されません。
Alibaba Cloud は、メッセージ取得機能のサービス レベル契約 (SLA) を提供していません。メッセージ取得機能の使用に必要な他のサービスの SLA と課金については、関連サービスのドキュメントをご参照ください。
使用上の注意
デフォルトでは、ApsaraMQ for Kafka インスタンスの最大 3 つのトピックに対して、同時にメッセージ取得機能を有効にできます。
Tablestore の STRING 型の属性列の値は 2 MB を超えることはできません。サイズが 2 MB を超えるメッセージは同期または取得できません。
Tablestore は、ApsaraMQ for Kafka インスタンスから同期されたメッセージを、ApsaraMQ for Kafka がメッセージを保持するのと同じ期間保持します。保持期間が経過すると、Tablestore は自動的にメッセージをクリアし、関連するインデックスを削除します。ApsaraMQ for Kafka でのメッセージ保持期間の構成と説明については、「メッセージの構成を変更する」をご参照ください。
Tablestore のデータ有効期限ポリシーは、ApsaraMQ for Kafka のデータ有効期限ポリシーとは異なります。そのため、各メッセージ取得タスクによって取得されたメッセージは、指定された検索条件を満たすメッセージとは異なる場合があります。この場合、最終的な取得結果が優先されます。
メッセージ取得の手順
ステップ 1:メッセージ取得機能を有効にする
ApsaraMQ for Kafka インスタンスのトピックに対してメッセージ取得機能を有効にすると、ビジネス要件に基づいてトピック内のメッセージを取得できます。
ApsaraMQ for Kafka インスタンスのトピックに対して初めてメッセージ取得機能を有効にする場合、ApsaraMQ for Kafka は ApsaraMQ for Kafka インスタンスと同じリージョンに存在する Tablestore インスタンスを自動的に作成します。
ApsaraMQ for Kafka インスタンスのトピックに対して初めてメッセージ取得機能を有効にする場合、ApsaraMQ for Kafka は、コネクタ機能を使用できるように、サービスリンクロール AliyunServiceRoleForAlikafkaConnector を自動的に作成します。サービスリンクロールが作成されている場合、ApsaraMQ for Kafka はロールを再度作成しません。詳細については、「サービスリンクロール」をご参照ください。
ApsaraMQ for Kafka コンソール にログインします。
リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。
左側のナビゲーションペインで、Instances をクリックします。
Instances ページで、管理するインスタンスの名前をクリックします。
左側のナビゲーションペインで、メッセージ検索 をクリックします。表示されるページで、メッセージ検索の有効化 をクリックします。
メッセージ検索の有効化 パネルで、パラメータを構成し、[OK] をクリックします。
説明ApsaraMQ for Kafka インスタンスのトピックに対して初めてメッセージ取得機能を有効にする場合、メッセージ検索の有効化 をクリックした後に、インスタンスのコネクタ機能を有効にしていないことを示すメッセージが表示されます。この場合、メッセージ内の [OK] をクリックし、メッセージ検索の有効化 パネルのパラメータを構成します。
メッセージ検索 ページで、メッセージ取得機能を有効にしたトピックを表示できます。
ステップ 2:テスト メッセージを送信する送信メッセージ
ApsaraMQ for Kafka インスタンスのトピックに対してメッセージ取得機能を有効にすると、トピックにメッセージを送信して、メッセージ取得タスクが作成されたかどうかをテストできます。
メッセージ検索 ページで、管理するトピックを見つけ、詳細操作 列の をクリックします。
[タスクの詳細] ページの右上隅にある テスト をクリックします。
メッセージの送信 パネルで、パラメータを構成し、[OK] をクリックします。
メッセージキー フィールドに、メッセージ キーを入力します。例:demo。
メッセージの内容 フィールドに、メッセージの内容を入力します。例:{"key": "test"}。
指定されたパーティションに送信 パラメータを構成して、テスト メッセージを特定のパーティションに送信するかどうかを指定します。
テスト メッセージを特定のパーティションに送信する場合は、はい をクリックし、パーティション ID フィールドにパーティション ID を入力します。例:0。パーティション ID を照会する方法については、「パーティション ステータスの表示」をご参照ください。
テスト メッセージを特定のパーティションに送信しない場合は、いいえ をクリックします。
ステップ 3: メッセージを取得する
メッセージ検索 ページで、管理するトピックを見つけ、検索操作 列の をクリックします。
検索 パネルで、次の操作を実行して検索条件を指定します。検索条件ドロップダウンリストから項目を選択し、[検索条件を追加] をクリックし、[値] 列に検索条件を指定して、[OK] をクリックします。
説明検索条件を指定する場合、パーティション、オフセットの範囲、時間の範囲、[キー]、[値] を含む、1 つ以上の値を選択できます。
メッセージを送信するときは、一意のビジネス識別子をキー値として使用することをお勧めします。こうすることで、検索中にメッセージを正確に一致させることができます。
[トピック検索] ページで、取得したメッセージを表示します。
説明検索結果には最大 10 件のメッセージが表示されます。正確な一致のために正確な検索条件を指定することをお勧めします。
取得された各メッセージの内容は、ApsaraMQ for Kafka コンソールに最大 1 KB まで表示できます。取得されたメッセージのサイズが 1 KB を超える場合、システムは自動的に内容を切り捨てます。メッセージの内容全体を表示する場合は、メッセージをダウンロードしてください。
関連操作
メッセージ取得タスクの詳細を表示する
トピックのメッセージ取得機能を有効にすると、メッセージ取得タスクが自動的に作成されます。 ApsaraMQ for Kafkaインスタンスに自動的に作成されるトピックやグループ、Tablestoreに自動的に作成されるインスタンスとテーブルの名前など、タスクの詳細を表示できます。また、[タスクの詳細] ページからTablestore テーブルの詳細ページに移動することもできます。
メッセージ検索 ページで、管理するトピックを見つけ、詳細[アクション] 列の をクリックします。
消費の詳細を表示する
トピックに対してメッセージ取得機能を有効にすると、トピックの各パーティションのアクティブな グループ の消費の進捗状況を表示できます。これは、メッセージの消費と累積に関する情報を取得するのに役立ちます。
メッセージ検索 ページで、管理するトピックを見つけ、消費の進捗状況[アクション] 列の をクリックします。
[コンシューマーの詳細] ページで、トピックの各パーティションのトピックを購読しているグループの消費ステータスを表示できます。
メッセージ取得タスクを一時停止する
メッセージ検索 ページで、管理するトピックを見つけ、 [アクション] 列の を選択します。
表示されるメッセージで、[OK] をクリックします。
メッセージ取得タスクを再開する
ビジネス要件に基づいて、一時停止されたメッセージ取得タスクを再開できます。
メッセージ検索 ページで、管理するトピックを見つけ、 [アクション] 列の を選択します。
表示されるメッセージで、[OK] をクリックします。
メッセージ取得タスクを削除する
トピックのメッセージ取得タスクを削除すると、関連する Tablestore テーブルと検索インデックスも削除されます。トピックは、メッセージ取得機能を提供しなくなります。トピックに対してメッセージ取得機能を再度使用する場合は、メッセージ取得タスクを再作成し、データの同期が完了するまで待ちます。
メッセージ検索 ページで、管理するトピックを見つけ、メッセージ取得タスクのステータスに基づいて次のいずれかの操作を実行します。
タスクが [実行中] または [一時停止中] 状態ではない場合、[アクション] 列の 削除 をクリックします。
タスクが [実行中] または [一時停止中] 状態の場合、
[アクション] 列の を選択します。
表示されるメッセージで、[OK] をクリックします。
メッセージ検索 ページには、メッセージ取得タスクが削除されたトピックは表示されなくなります。