すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:MaxCompute シンクコネクタの作成

最終更新日:Jan 11, 2025

このトピックでは、ApsaraMQ for Kafka インスタンスのデータソース トピックから MaxCompute テーブルにデータをエクスポートするための MaxCompute シンクコネクタを作成する方法について説明します。

前提条件

以下の要件を満たしている必要があります。

  • ApsaraMQ for Kafka

    • ApsaraMQ for Kafka インスタンスでコネクタ機能が有効になっていること。詳細については、コネクタ機能の有効化 をご参照ください。

    • ApsaraMQ for Kafka インスタンスにトピックが作成されていること。詳細については、ステップ 1: トピックの作成 をご参照ください。

      この例では、maxcompute-test-input という名前のトピックを使用します。

  • MaxCompute

    • MaxCompute クライアントで MaxCompute テーブルが作成されていること。詳細については、テーブルの作成 をご参照ください。

      この例では、connector_test という名前のプロジェクトに test_kafka という名前の MaxCompute テーブルが作成されています。次のステートメントを実行して、test_kafka という名前の MaxCompute テーブルを作成できます。

      CREATE TABLE IF NOT EXISTS test_kafka(topic STRING,partition BIGINT,offset BIGINT,key STRING,value STRING) PARTITIONED by (pt STRING);
  • オプション: EventBridge

    説明

    データソース トピックを含むインスタンスが中国 (杭州) または中国 (成都) リージョンにある場合のみ、EventBridge をアクティブ化する必要があります。

注意事項

  • ApsaraMQ for Kafka インスタンスのデータソース トピックから、同じリージョン内の MaxCompute テーブルにのみデータをエクスポートできます。コネクタの制限の詳細については、制限 をご参照ください。

  • データソース トピックを含むインスタンスが中国 (杭州) または中国 (成都) リージョンにある場合、コネクタタスクは EventBridge にパブリッシュされます。

    • 現在、EventBridge は無料です。詳細については、課金 をご参照ください。

    • コネクタを作成すると、EventBridge によって AliyunServiceRoleForEventBridgeSourceKafka サービスリンクロールが自動的に作成されます。

      • サービスリンクロールが使用できない場合、EventBridgeEventBridge から ApsaraMQ for Kafka へのアクセスを許可するために、自動的にサービスリンクロールを作成します。

      • サービスリンクロールが使用可能な場合、EventBridge は新しいサービスリンクロールを作成しません。

      サービスリンクロールの詳細については、サービスリンクロール をご参照ください。

    • EventBridge にパブリッシュされたコネクタタスクの操作ログは表示できません。コネクタタスクが完了したら、データソース トピックを購読しているグループの消費の詳細を表示して、コネクタタスクのステータスを確認できます。詳細については、コンシューマーの詳細の表示 をご参照ください。

手順

MaxCompute シンクコネクタを使用して、ApsaraMQ for Kafka インスタンスのデータソース トピックから MaxCompute テーブルにデータをエクスポートするには、次の手順を実行します。

  1. ApsaraMQ for Kafka に MaxCompute へのアクセス権限を付与します。

  2. オプション: MaxCompute シンクコネクタに必要なトピックとグループを作成します。

    トピックとグループを手動で作成したくない場合は、この手順をスキップし、次の手順で [リソース作成方法] パラメーターを [自動] に設定します。

    重要

    MaxCompute シンクコネクタに必要な特定のトピックでは、ローカルストレージエンジンを使用する必要があります。ApsaraMQ for Kafka インスタンスのメジャーバージョンが 0.10.2 の場合、ローカルストレージエンジンを使用するトピックは手動で作成できず、自動的に作成する必要があります。

    1. MaxCompute シンクコネクタに必要なトピックの作成

    2. MaxCompute シンクコネクタに必要なグループの作成

  3. MaxCompute シンクコネクタの作成とデプロイ

  4. 結果を確認します。

    1. テストメッセージの送信

    2. MaxCompute テーブルのデータの表示

RAM ロールの作成

RAM ロールの作成時に、ApsaraMQ for Kafka を信頼できるサービスとして選択することはできません。そのため、最初に信頼できるサービスとなる任意のサービスを選択してください。次に、RAM ロールの信頼ポリシーを手動で変更します。

  1. RAM コンソール にログインします。

  2. 左側のナビゲーションペインで、[ID] > [ロール] を選択します。

  3. [ロール] ページで、[ロールの作成] をクリックします。 image

  4. [ロールの作成] パネルで、次の操作を実行します。

    1. 信頼できるエンティティとして [alibaba Cloud サービス] を選択し、[次へ] をクリックします。

    2. [ロールタイプ] パラメーターを [標準サービスロール] に設定します。[RAM ロール名] フィールドに、AliyunKafkaMaxComputeUser1 と入力します。[信頼できるサービスの選択] ドロップダウンリストから、[maxcompute] を選択します。次に、[OK] をクリックします。

  5. [ロール] ページで、[aliyunkafkamaxcomputeuser1] を見つけてクリックします。

  6. [aliyunkafkamaxcomputeuser1] ページで、[信頼ポリシーの管理] タブをクリックし、次に [信頼ポリシーの編集] をクリックします。

  7. [信頼ポリシーの編集] パネルで、スクリプト内の [fc]alikafka に置き換え、[OK] をクリックします。

    pg_ram

権限の追加

MaxCompute シンクコネクタを使用して MaxCompute テーブルにメッセージをエクスポートするには、RAM ロールに次の権限を付与する必要があります。

オブジェクト

操作

説明

プロジェクト

CreateInstance

プロジェクトでインスタンスを作成するための権限。

テーブル

Describe

テーブルのメタデータを読み取るための権限。

テーブル

Alter

テーブルのメタデータを変更するための権限、およびパーティションを作成および削除するための権限。

テーブル

Update

テーブルのデータを上書きするための権限、およびテーブルにデータを挿入するための権限。

上記の権限とこれらの権限の付与方法の詳細については、MaxCompute 権限 をご参照ください。

AliyunKafkaMaxComputeUser1 に必要な権限を付与するには、次の手順を実行します。

  1. MaxCompute クライアントにログインします。

  2. 次のコマンドを実行して、AliyunKafkaMaxComputeUser1 RAM ロールを RAM ユーザーとして追加します。

    add user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`; // <accountid> を Alibaba Cloud アカウント ID に置き換えます。
    説明

    <accountid> を Alibaba Cloud アカウント ID に置き換えます。

  3. RAM ユーザーに、MaxCompute にアクセスするために必要な最小限の権限を付与します。

    1. 次のコマンドを実行して、RAM ユーザーに connector_test プロジェクトに対する権限を付与します。

      grant CreateInstance on project connector_test to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`; // <accountid> を Alibaba Cloud アカウント ID に置き換えます。
      説明

      <accountid> を Alibaba Cloud アカウント ID に置き換えます。

    2. 次のコマンドを実行して、RAM ユーザーに test_kafka テーブルに対する権限を付与します。

      grant Describe, Alter, Update on table test_kafka to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`; // <accountid> を Alibaba Cloud アカウント ID に置き換えます。
      説明

      <accountid> を Alibaba Cloud アカウント ID に置き換えます。

MaxCompute シンクコネクタに必要なトピックの作成

ApsaraMQ for Kafka コンソールで、MaxCompute シンクコネクタに必要な 5 つのトピックを手動で作成できます。5 つのトピックは、タスクオフセットトピック、タスク設定トピック、タスクステータストピック、デッドレターキュー トピック、およびエラデータ トピックです。5 つのトピックは、パーティション数とストレージエンジンが異なります。詳細については、ソースサービスの設定手順のパラメーター をご参照ください。

  1. ApsaraMQ for Kafka コンソール にログインします。

  2. リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

    重要

    Elastic Compute Service ( ECS ) インスタンスがデプロイされているリージョンにトピックを作成する必要があります。トピックはリージョンをまたいで使用することはできません。たとえば、メッセージのプロデューサーとコンシューマーが中国 ( 北京 ) リージョンにデプロイされている ECS インスタンスで実行されている場合、トピックも中国 ( 北京 ) リージョンに作成する必要があります。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションペインで、トピック管理 をクリックします。

  5. トピック管理 ページで、トピックの作成 をクリックします。

  6. トピックの作成 パネルで、トピックのプロパティを指定し、[OK] をクリックします。

    パラメーター

    説明

    名前

    トピック名。

    demo

    記述

    トピックの説明。

    demo test

    パーティションの数

    トピック内のパーティション数。

    12

    ストレージエンジン

    説明

    Professional Editionインスタンスを使用している場合のみ、ストレージエンジンの種類を指定できます。Standard Editionインスタンスを使用している場合、クラウドストレージがデフォルトで選択されます。

    トピックにメッセージを格納するために使用されるストレージエンジンの種類。

    ApsaraMQ for Kafka は、以下の種類のストレージエンジンをサポートしています。

    • クラウドストレージ: この値を選択すると、システムはトピックにAlibaba Cloudディスクを使用し、分散モードで3つのレプリカにデータを格納します。このストレージエンジンは、低レイテンシ、高パフォーマンス、長期の耐久性、高信頼性を備えています。インスタンスの作成時に 仕様タイプ パラメーターを Standard Edition (High Write) に設定した場合、このパラメーターは クラウドストレージ のみに設定できます。

    • ローカルストレージ: この値を選択すると、システムはオープンソースのApache Kafkaの同期レプリカ(ISR)アルゴリズムを使用し、分散モードで3つのレプリカにデータを格納します。

    クラウドストレージ

    メッセージタイプ

    トピックのメッセージタイプ。有効な値:

    • 通常のメッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに格納されます。クラスター内のブローカーに障害が発生した場合、パーティションに格納されているメッセージの順序は保持されない場合があります。ストレージエンジン パラメーターを クラウドストレージ に設定すると、このパラメーターは自動的に 通常のメッセージ に設定されます。

    • パーティション順位メッセージ: デフォルトでは、同じキーを持つメッセージは、メッセージが送信された順序で同じパーティションに格納されます。クラスター内のブローカーに障害が発生した場合でも、メッセージは送信された順序でパーティションに格納されます。一部のパーティションのメッセージは、パーティションが復元されるまで送信できません。ストレージエンジン パラメーターを ローカルストレージ に設定すると、このパラメーターは自動的に パーティション順位メッセージ に設定されます。

    通常のメッセージ

    ログリリースポリシー

    トピックで使用されるログクリーンアップポリシー。

    ストレージエンジン パラメーターを ローカルストレージ に設定する場合は、ログリリースポリシー パラメーターを設定する必要があります。ApsaraMQ for Kafka Professional Editionインスタンスを使用している場合のみ、ストレージエンジンパラメーターをローカルストレージに設定できます。

    ApsaraMQ for Kafka は、以下のログクリーンアップポリシーを提供します。

    • Delete: デフォルトのログクリーンアップポリシー。システムに十分なストレージ容量がある場合、メッセージは最大保存期間に基づいて保持されます。ストレージ使用量が 85% を超えると、システムはサービスの可用性を確保するために最も古いメッセージを削除します。

    • Compact: Apache Kafkaで使用されるログ圧縮ポリシー。ログの圧縮により、同じキーを持つメッセージの最新の値が保持されます。このポリシーは、障害が発生したシステムの復元や、システムの再起動後のキャッシュの再読み込みなどのシナリオに適しています。たとえば、Kafka ConnectまたはConfluent Schema Registryを使用する場合、システムの状態と構成に関する情報をログ圧縮トピックに格納する必要があります。

      重要

      ログ圧縮トピックは、Kafka ConnectやConfluent Schema Registryなどの特定のクラウドネイティブコンポーネントでのみ使用できます。詳細については、「aliware-kafka-demos」をご参照ください。

    Compact

    タグ

    トピックに添付するタグ。

    demo

    トピックが作成されると、トピック管理 ページでトピックを表示できます。

MaxComputeシンクコネクタに必要なグループを作成する

ApsaraMQ for Kafka コンソールで、MaxComputeシンクコネクタに必要なグループを手動で作成できます。グループ名は、connect-タスク名 形式である必要があります。詳細については、「ソースサービスの構成手順のパラメータ」をご参照ください。

  1. ApsaraMQ for Kafka コンソール にログインします。

  2. リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションペインで、Group の管理 をクリックします。

  5. Group の管理 ページで、グループの作成 をクリックします。

  6. グループの作成 パネルで、Group ID フィールドにグループ名を入力し、記述 フィールドにグループの説明を入力し、グループにタグを添付して、[OK] をクリックします。

    コンシューマーグループが作成されると、Group の管理 ページでコンシューマーグループを表示できます。

MaxComputeシンクコネクタの作成とデプロイ

ApsaraMQ for Kafka から MaxCompute にデータをエクスポートするために使用する MaxComputeシンクコネクタを作成およびデプロイするには、次の手順を実行します。

  1. ApsaraMQ for Kafka コンソール にログインします。

  2. リソースの分布概要 ページの セクションで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  3. インスタンスリスト ページで、管理するインスタンスの名前をクリックします。

  4. 左側のナビゲーションペインで、Connector タスクリスト をクリックします。

  5. Connector タスクリスト ページで、Connector の作成 をクリックします。

  6. Connector の作成 ウィザードで、次の手順を実行します。

    1. 基本情報の設定 手順で、次の表に示すパラメータを設定し、次へ をクリックします。

      パラメータ

      説明

      名前

      コネクタの名前。コネクタ名を指定する際は、次の規則に注意してください。

      • コネクタ名は 1 ~ 48 文字である必要があります。数字、小文字、ハイフン(-)を含めることができますが、ハイフン(-)で始めることはできません。

      • 各コネクタ名は、ApsaraMQ for Kafka インスタンス内で一意である必要があります。

      コネクタタスクで使用されるグループの名前は、connect-タスク名 形式である必要があります。このようなグループをまだ作成していない場合は、Message Queue for Apache Kafka によって自動的に作成されます。

      kafka-maxcompute-sink

      インスタンス

      Message Queue for Apache Kafka インスタンスに関する情報。デフォルトでは、インスタンスの名前と ID が表示されます。

      demo alikafka_post-cn-st21p8vj****

    2. ソースサービスの設定 手順で、ソースサービスとして Message Queue for Apache Kafka を選択し、次の表に示すパラメータを設定して、次へ をクリックします。

      説明

      トピックとコンシューマーグループを事前に作成している場合は、[リソース作成方法] パラメータを [手動] に設定し、作成したリソースの名前を以下のフィールドに入力します。それ以外の場合は、[リソース作成方法] パラメータを [自動] に設定します。

      表 1. [ソースサービスの構成] 手順のパラメータ

      パラメータ

      説明

      データソース Topic

      データをエクスポートするデータソース トピックの名前。

      maxcompute-test-input

      コンシューマースレッドの同時発生数

      データソース トピックからデータをエクスポートするために使用される同時コンシューマースレッドの数。デフォルト値: 6 。有効な値:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      消費の開始位置

      消費が開始されるオフセット。有効な値:

      • 一番古いオフセット:消費は最も早いオフセットから開始されます。

      • 一番新しいオフセット:消費は最新のオフセットから開始されます。

      一番古いオフセット

      VPC ID

      データエクスポート タスクが実行される仮想プライベートクラウド(VPC)の ID。実行環境の設定 をクリックすると、パラメータが表示されます。デフォルト値は、ApsaraMQ for Kafka インスタンスをデプロイしたときに指定した VPC ID です。値を変更する必要はありません。

      vpc-bp1xpdnd3l***

      VSwitch ID

      データエクスポート タスクが実行される vSwitch の ID。実行環境の設定 をクリックすると、パラメータが表示されます。vSwitch は、ApsaraMQ for Kafka インスタンスと同じ VPC にデプロイする必要があります。デフォルト値は、ApsaraMQ for Kafka インスタンスをデプロイしたときに指定した vSwitch ID です。

      vsw-bp1d2jgg81***

      失敗の処理

      メッセージ送信エラーが発生したパーティションへのサブスクリプションを保持するかどうかを指定します。実行環境の設定 をクリックすると、パラメータが表示されます。有効な値:

      • サブスクリプションの継続:エラーが発生したパーティションへのサブスクリプションを保持し、ログを返します。

      • サブスクリプションの停止:エラーが発生したパーティションへのサブスクリプションを停止し、ログを返します。

      説明
      • 詳細については、「コネクタの管理」をご参照ください。

      • エラーコードに基づいてエラーをトラブルシューティングする方法の詳細については、「エラーコード」をご参照ください。

      サブスクリプションの継続

      リソースの作成方法

      MaxCompute シンクコネクタに必要なトピックとグループを作成する方法。実行環境の設定 をクリックすると、パラメータが表示されます。

      • 自動作成

      • 手動で作成します

      自動作成

      Connector コンシューマーグループ

      コネクタのデータエクスポート タスクで使用されるグループ。実行環境の設定 をクリックすると、パラメータが表示されます。グループの名前は、connect-タスク名 形式である必要があります。

      connect-kafka-maxcompute-sink

      タスクサイトの Topic

      コンシューマー オフセットを格納するために使用されるトピック。実行環境の設定 をクリックすると、パラメータが表示されます。

      • トピック:トピック名は connect-offset で始めることをお勧めします。

      • パーティション:トピックのパーティション数は 1 より大きい必要があります。

      • ストレージ エンジン:トピックのストレージ エンジンはローカル ストレージに設定する必要があります。

      • cleanup.policy:トピックのログクリーンアップ ポリシーは Compact に設定する必要があります。

      connect-offset-kafka-maxcompute-sink

      タスク設定の Topic

      タスク構成を格納するために使用されるトピック。実行環境の設定 をクリックすると、パラメータが表示されます。

      • トピック:トピック名は connect-config で始めることをお勧めします。

      • パーティション:トピックには 1 つのパーティションのみを含めることができます。

      • ストレージ エンジン:トピックのストレージ エンジンはローカル ストレージに設定する必要があります。

      • cleanup.policy:トピックのログクリーンアップ ポリシーは Compact に設定する必要があります。

      connect-config-kafka-maxcompute-sink

      タスクステータスの Topic

      タスクステータスを格納するために使用されるトピック。実行環境の設定 をクリックすると、パラメータが表示されます。

      • トピック:トピック名は connect-status で始めることをお勧めします。

      • パーティション:トピックのパーティション数は 6 に設定することをお勧めします。

      • ストレージ エンジン:トピックのストレージ エンジンはローカル ストレージに設定する必要があります。

      • cleanup.policy:トピックのログクリーンアップ ポリシーは Compact に設定する必要があります。

      connect-status-kafka-maxcompute-sink

      デッドレターキューの Topic

      Kafka Connect フレームワークのエラーデータを格納するために使用されるトピック。実行環境の設定 をクリックすると、パラメータが表示されます。トピックリソースを節約するために、配信不能キュー トピックと エラーデータ トピック の両方としてトピックを作成できます。

      • トピック:トピック名は connect-error で始めることをお勧めします。

      • パーティション:トピックのパーティション数は 6 に設定することをお勧めします。

      • ストレージ エンジン:トピックのストレージ エンジンは、ローカル ストレージまたはクラウド ストレージに設定できます。

      connect-error-kafka-maxcompute-sink

      例外データ Topic

      コネクタのエラーデータを格納するために使用されるトピック。実行環境の設定 をクリックすると、パラメータが表示されます。トピックリソースを節約するために、配信不能キュー トピック とエラーデータ トピックの両方としてトピックを作成できます。

      • トピック:トピック名は connect-error で始めることをお勧めします。

      • パーティション:トピックのパーティション数は 6 に設定することをお勧めします。

      • ストレージ エンジン:トピックのストレージ エンジンは、ローカル ストレージまたはクラウド ストレージに設定できます。

      connect-error-kafka-maxcompute-sink

    3. ターゲットサービスの設定 手順で、宛先サービスとして MaxCompute を選択し、次の表に示すパラメータを設定して、作成 をクリックします。

      説明

      データソース トピックを含むインスタンスが中国(杭州)または中国(成都)リージョンにある場合、宛先サービスとして MaxCompute を選択すると、[サービス承認] ダイアログ ボックスが表示されます。[OK][サービス承認] ダイアログ ボックスでクリックし、次の表に示すパラメータを設定して、作成 をクリックします。

      パラメータ

      説明

      接続アドレス

      MaxCompute のエンドポイント。詳細については、「エンドポイント」をご参照ください。

      • VPC エンドポイント:レイテンシが低いため、VPC エンドポイントを使用することをお勧めします。ApsaraMQ for Kafka インスタンスと MaxCompute プロジェクトが同じリージョンにある場合、VPC エンドポイントを使用できます。

      • パブリック エンドポイント:レイテンシが高いため、パブリック エンドポイントを使用しないことをお勧めします。ApsaraMQ for Kafka インスタンスと MaxCompute プロジェクトが異なるリージョンにある場合、パブリック エンドポイントを使用できます。パブリック エンドポイントを使用するには、コネクタのインターネットアクセスを有効にする必要があります。詳細については、「コネクタのインターネット アクセスを有効にする」をご参照ください。

      http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api

      ワークスペース

      データをエクスポートする MaxCompute プロジェクトの名前。

      connector_test

      テーブル

      データをエクスポートする MaxCompute テーブルの名前。

      test_kafka

      テーブルリージョン

      MaxCompute テーブルが作成されるリージョン。

      中国(杭州)

      サービスアカウント

      MaxCompute へのアクセスに使用される Alibaba Cloud アカウントの ID。

      188***

      権限が付与されたロール名

      ApsaraMQ for Kafka によって引き受けられる RAM ロールの名前。詳細については、「RAM ロールの作成」をご参照ください。

      AliyunKafkaMaxComputeUser1

      モード

      メッセージが MaxCompute シンクコネクタにエクスポートされるモード。デフォルト値:DEFAULT。有効な値:

      • KEY:メッセージのキーのみが保持され、MaxCompute テーブルの Key 列に書き込まれます。

      • VALUE:メッセージの値のみが保持され、MaxCompute テーブルの Value 列に書き込まれます。

      • DEFAULT:メッセージのキーと値の両方が保持されます。キーは Key 列に書き込まれ、値は MaxCompute テーブルの Value 列に書き込まれます。

        重要

        DEFAULT モードでは、CSV 形式はサポートされていません。TEXT 形式と BINARY 形式のみを選択できます。

      DEFAULT

      フォーマット

      メッセージが MaxCompute シンクコネクタにエクスポートされる形式。デフォルト値:TEXT。有効な値:

      • TEXT:文字列

      • BINARY:バイト配列

      • CSV:カンマ(,)で区切られた文字列

        重要

        パラメータを CSV に設定した場合、DEFAULT モードはサポートされていません。KEY モードと VALUE モードのみがサポートされています。

        • KEY モード:メッセージのキーのみが保持されます。キーはカンマ(,)で区切られ、インデックスの順序で MaxCompute テーブルに書き込まれます。

        • VALUE モード:メッセージの値のみが保持されます。値はカンマ(,)で区切られ、インデックスの順序で MaxCompute テーブルに書き込まれます。

      TEXT

      パーティション

      パーティションが作成される頻度。デフォルト値:HOUR。有効な値:

      • DAY:毎日新しいパーティションにデータを書き込みます。

      • HOUR:毎時新しいパーティションにデータを書き込みます。

      • MINUTE:毎分新しいパーティションにデータを書き込みます。

      HOUR

      タイムゾーン

      データソース トピックにメッセージを送信する ApsaraMQ for Kafka プロデューサー クライアントのタイムゾーン。デフォルト値:GMT 08:00。

      GMT 08:00

      コネクタが作成された後、Connector タスクリスト ページで表示できます。

  7. Connector タスクリスト ページに移動し、作成したコネクタを見つけて、デプロイ操作 列の をクリックします。

テストメッセージを送信する

MaxComputeシンクコネクタをデプロイした後、ApsaraMQ for Kafka のデータソース トピックにメッセージを送信して、メッセージが MaxCompute にエクスポートできるかどうかをテストできます。

  1. Connector タスクリスト ページで、管理するコネクタを見つけ、テスト操作 列の をクリックします。

  2. メッセージの送信 パネルで、テスト メッセージを送信するためのパラメータを設定します。

    • 送信方法 パラメータを コンソール に設定した場合は、次の手順を実行します。

      1. メッセージキー フィールドに、メッセージ キーを入力します。例: demo。

      2. メッセージの内容 フィールドに、メッセージの内容を入力します。例: {"key": "test"}。

      3. 指定されたパーティションに送信 パラメータを設定して、テスト メッセージを特定のパーティションに送信するかどうかを指定します。

        • テスト メッセージを特定のパーティションに送信する場合は、はい をクリックし、パーティション ID フィールドにパーティション ID を入力します。例: 0。パーティション ID を照会する方法については、パーティションの状態を表示する を参照してください。

        • テスト メッセージを特定のパーティションに送信しない場合は、いいえ をクリックします。

    • 送信方法 パラメータを [docker] に設定した場合は、Docker コンテナーを実行してサンプルメッセージを生成する セクションの Docker コマンドを実行して、テスト メッセージを送信します。

    • 送信方法 パラメータを [SDK] に設定した場合は、必要なプログラミング言語またはフレームワークの SDK と、テスト メッセージを送受信するためのアクセス方法を選択します。

MaxComputeテーブルのデータを表示する

ApsaraMQ for Kafka のデータソース トピックにメッセージを送信した後、MaxComputeクライアントにログオンして、メッセージが受信されたかどうかを確認できます。

test_kafkaテーブルを表示するには、次の手順を実行します。

  1. MaxComputeクライアントにログオンします。

  2. 次のコマンドを実行して、テーブルのパーティションを表示します。

    show partitions test_kafka;

    この例では、次の結果が返されます。

    pt=11-17-2020 15
    
    OK
  3. 次のコマンドを実行して、パーティションに格納されているデータを表示します。

    select * from test_kafka where pt ="11-17-2020 14";

    この例では、次の結果が返されます。

    +----------------------+------------+------------+-----+-------+---------------+
    | topic                | partition  | offset     | key | value | pt            |
    +----------------------+------------+------------+-----+-------+---------------+
    | maxcompute-test-input| 0          | 0          | 1   | 1     | 11-17-2020 14 |
    +----------------------+------------+------------+-----+-------+---------------+