すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:MaxCompute シンクコネクタを作成する

最終更新日:Mar 27, 2025

このトピックでは、ApsaraMQ for Kafka Topic から MaxCompute テーブルにデータをエクスポートする MaxCompute シンクコネクタを作成する方法について説明します。

前提条件

前提条件については、「前提条件」をご参照ください。

使用上の注意

MaxCompute のパーティション機能を使用する場合は、テーブルを作成するときに、名前が time で型が string の追加のパーティションキー列を作成する必要があります。

ステップ 1:MaxCompute リソースを作成する

MaxCompute クライアントでテーブルを作成します。詳細については、「テーブルを作成する」をご参照ください。

この例では、kafka_to_maxcompute という名前のテーブルが作成されます。テーブルには 3 つの列が含まれており、パーティション機能が有効になっています。次のコードは、テーブルを作成するために実行されるステートメントを示しています。

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);

次のコードは、パーティション機能が無効になっている場合にテーブルを作成するために実行されるステートメントを示しています。

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);

ステートメントが実行されると、次の結果が表示されます。执行成果

[テーブル] ページで、作成されたテーブルに関する情報を表示します。表

ステップ 2:MaxCompute シンクコネクタを作成して起動する

  1. ApsaraMQ for Kafka コンソール にログインします。リソースの分布 セクションの 概要 ページで、管理する ApsaraMQ for Kafka インスタンスが存在するリージョンを選択します。

  2. 左側のナビゲーションウィンドウで、[コネクタエコシステム統合] > [タスク] を選択します。

  3. [タスク] ページで、[タスクの作成] をクリックします。

  4. タスクの作成 ページで、タスク名 パラメーターと 説明 パラメーターを構成します。次に、画面の指示に従って他のパラメーターを構成します。

    • タスクの作成

      1. Source (ソース) ステップで、データプロバイダー パラメーターを [apsaramq For Kafka] に設定し、画面の指示に従って他のパラメーターを構成します。次に、[次のステップ] をクリックします。次の表にパラメーターを示します。

        パラメーター

        説明

        Region

        ApsaraMQ for Kafka インスタンスが存在するリージョン。

        中国 (杭州)

        Apsaramq For Kafka インスタンス

        ルーティングするデータが生成される ApsaraMQ for Kafka インスタンスの ID。

        alikafka_post-cn-9hdsbdhd****

        Topic

        ルーティングするデータが生成される ApsaraMQ for Kafka インスタンスのトピック。

        guide-sink-topic

        グループ ID

        ルーティングするデータが生成される ApsaraMQ for Kafka インスタンスのグループの ID。

        • [クイック作成]: システムは、ID が GID_EVENTBRIDGE_xxx 形式のグループを自動的に作成します。

        • [既存のグループを使用]: 使用されていない既存のグループの ID を選択します。使用中の既存のグループを選択すると、既存のメッセージの発行とサブスクライブに影響します。

        既存のグループを使用

        Consumer Offset

        • [最新のオフセット]: メッセージは最新のオフセットから消費されます。

        • [最も古いオフセット]: メッセージは最も古いオフセットから消費されます。

        最新のオフセット

        ネットワーク設定

        国境を越えたデータ転送が必要な場合は、[セルフマネージドインターネット] を選択します。それ以外の場合は、[ベーシックネットワーク] を選択します。

        ベーシックネットワーク

        データ形式

        データ形式機能は、ソースから配信されたバイナリデータを特定のデータ形式にエンコードするために使用されます。複数のデータ形式がサポートされています。エンコーディングに特別な要件がない場合は、値として Json を指定します。

        • Json: バイナリデータは UTF-8 エンコーディングに基づいて JSON 形式のデータにエンコードされ、ペイロードに配置されます。これはデフォルト値です。

        • テキスト: バイナリデータは UTF-8 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        • バイナリ: バイナリデータは Base64 エンコーディングに基づいて文字列にエンコードされ、ペイロードに配置されます。

        Json

        一括プッシュの件数

        各関数呼び出しで送信できるメッセージの最大数。リクエストは、バックログ内のメッセージ数が指定された値に達した場合にのみ送信されます。有効な値: 1 ~ 10000。

        2000

        バッチプッシュ間隔 (単位:秒)

        関数が呼び出される時間間隔。システムは、指定された時間間隔で集約されたメッセージを Function Compute に送信します。有効な値: 0 ~ 15。単位: 秒。値 0 は、集約後すぐにメッセージが送信されることを示します。

        3

      2. Filtering (フィルタリング) ステップで、リクエストをフィルタリングするデータパターンを定義します。詳細については、「イベントパターン」をご参照ください。

      3. Transform (変換) ステップで、分割、マッピング、エンリッチメント、動的ルーティングなどのデータ処理機能を実装するデータクレンジング方法を指定します。詳細については、「Function Compute を使用してメッセージクレンジングを実行する」をご参照ください。

      4. Sink (ターゲット) ステップで、サービスタイプ パラメーターを [maxcompute Acs.maxcompute] に設定し、画面の指示に従って他のパラメーターを構成します。次の表にパラメーターを示します。

        パラメーター

        説明

        AccessKey ID

        Alibaba Cloud アカウントの AccessKey ID。 AccessKey ID は、MaxCompute へのアクセスに使用されます。

        yourAccessKeyID

        AccessKey Secret

        Alibaba Cloud アカウントの AccessKey Secret。

        yourAccessKeySecret

        Maxcompute プロジェクト名

        作成した MaxCompute プロジェクト。

        test_compute

        Maxcompute テーブル名

        作成した MaxCompute テーブル。

        kafka_to_maxcompute

        Maxcompute テーブル入力パラメーター

        MaxCompute テーブルを選択すると、テーブルの列名と型がこのセクションに表示されます。[値抽出ルール] パラメーターのみを構成する必要があります。次のコードは、メッセージの値抽出ルールを構成する方法を示しています。この例では、topic 列の値はメッセージの topic フィールドから抽出されます。したがって、[値抽出ルール] パラメーターは $.topic に設定されます。

        {
          'data': {
            'topic': 't_test',
            'partition': 2,
            'offset': 1,
            'timestamp': 1717048990499,
            'headers': {
              'headers': [],
              'isReadOnly': False
            },
            'key': 'MaxCompute-K1',
            'value': 'MaxCompute-V1'
          },
          'id': '9b05fc19-9838-4990-bb49-ddb942307d3f-2-1',
          'source': 'acs:alikafka',
          'specversion': '1.0',
          'type': 'alikafka:Topic:Message',
          'datacontenttype': 'application/json; charset=utf-8',
          'time': '2024-05-30T06:03:10.499Z',
          'aliyunaccountid': '1413397765616316'
        }

        topic: $.data.topic

        valuename: $.data.value

        valueage: $.data.offset

        パーティションディメンション

        有効な値:

        • [無効]

        • [有効]

          パーティション機能を有効にする場合は、[パーティション値] などのパラメーターも構成する必要があります。 [パーティション値] パラメーターの値は、次のいずれかの形式にすることができます。

          • 時間変数: {yyyy}、{MM}、{dd}、{HH}、{mm}。これらの変数は大文字と小文字が区別され、それぞれ年、月、日、時、分を示します。

          • 定数。

        はい

        {yyyy}-{MM}-{dd}.{HH}:{mm}.suffix

        ネットワーク設定

        • [VPC]: ApsaraMQ for Kafka のメッセージは、VPC (Virtual Private Cloud) 内で MaxCompute に配信されます。

        • [インターネット]: ApsaraMQ for Kafka のメッセージは、インターネット経由で MaxCompute に配信されます。

        インターネット

        VPC

        VPC ID。このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        vSwitch ID。このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。

        vsw-bp1gbjhj53hdjdkg****

        セキュリティグループ

        セキュリティグループ ID。このパラメーターは、ネットワーク設定 パラメーターを [VPC] に設定した場合にのみ必須です。

        test_group

    • タスクのプロパティ

      イベントのプッシュに失敗した場合に使用するリトライポリシーと、エラーを処理するために使用するメソッドを構成します。詳細については、「リトライポリシーと配信不能キュー」をご参照ください。

  5. [保存] をクリックします。[タスク] ページで、作成した MaxCompute シンクコネクタを見つけます。[ステータス] 列のステータスが [開始中] から [実行中] に変わると、コネクタが作成されます。

ステップ 3:MaxCompute シンクコネクタをテストする

  1. [タスク] ページで、作成した MaxCompute シンクコネクタを見つけ、[イベントソース] 列のソース Topic の名前をクリックします。

  2. トピック詳細ページで、[メッセージの送信] をクリックします。

  3. [メッセージの送受信を開始] パネルで、次の図に基づいてパラメーターを構成し、[OK] をクリックします。

    发送消息

  4. MaxCompute コンソールに移動し、次の SQL ステートメントを実行して、パーティションに関する情報をクエリします。

    show PARTITIONS kafka_to_maxcompute;

    次の結果が返されます。分区

  5. パーティション情報に基づいて次のステートメントを実行して、パーティション内のデータをクエリします。

    SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";

    次の結果が返されます。分区内数据