すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:,

最終更新日:Feb 27, 2026

MaxCompute と Kafka を統合することで、効率的で信頼性の高いデータ処理および分析機能が提供されます。この統合は、リアルタイム処理、大規模なデータストリーム、および複雑なデータ分析を必要とするシナリオに最適です。このトピックでは、Message Queue for Apache Kafka およびセルフマネージド Kafka インスタンスから MaxCompute にデータを書き込む方法について説明し、セルフマネージド Kafka インスタンスの詳細な例を示します。

Kafka データの MaxCompute への書き込み:Alibaba Cloud フルマネージド Kafka

MaxCompute は Message Queue for Apache Kafka と密接に統合されています。Message Queue for Apache Kafka 用の MaxCompute Sink Connector を使用すると、サードパーティツールやカスタム開発を必要とせずに、指定したトピックから MaxCompute テーブルにデータを継続的にインポートできます。詳細については、「MaxCompute Sink Connector の作成」をご参照ください。

Kafka データの MaxCompute への書き込み:セルフマネージドオープンソース Kafka

前提条件

  • Kafka V2.2 以降をデプロイし、Kafka トピックを作成済みであること。バージョン 3.4.0 を推奨します。

  • MaxCompute プロジェクトとテーブルを作成済みであること。詳細については、「MaxCompute プロジェクトの作成」および「テーブルの作成」をご参照ください。

注意事項

Kafka コネクタは、TEXTCSVJSON、および FLATTEN フォーマットでのデータ書き込みをサポートしています。以下の注意事項は各フォーマットに適用されます。データ型の詳細については、「データ型の説明」をご参照ください。

  • TEXT または JSON フォーマットの Kafka データを MaxCompute に書き込む場合、MaxCompute テーブルは次の要件を満たす必要があります:

    フィールド名

    フィールドの型

    固定フィールド

    topic

    STRING

    はい

    partition

    BIGINT

    はい

    offset

    BIGINT

    はい

    key

    • TEXT フォーマットの Kafka データを書き込む場合、フィールドの型は STRING である必要があります。

    • JSON フォーマットの Kafka データを書き込む場合、フィールドの型は書き込むデータに応じて STRING または JSON になります。

    このフィールドは、Kafka メッセージから MaxCompute テーブルにキーを同期するために固定されています。Kafka メッセージを MaxCompute に同期するモードの詳細については、「mode」をご参照ください。

    value

    • TEXT フォーマットの Kafka データを書き込む場合、フィールドの型は STRING である必要があります。

    • JSON フォーマットの Kafka データを書き込む場合、フィールドの型は書き込むデータに応じて STRING または JSON になります。

    このフィールドは、Kafka メッセージから MaxCompute テーブルに値を同期するために固定されています。Kafka メッセージを MaxCompute に同期するモードの詳細については、「mode」をご参照ください。

    pt

    STRING (パーティションフィールド)

    はい

  • FLATTEN または CSV フォーマットの Kafka データを MaxCompute に書き込む場合、テーブルには次のフィールドとデータ型を含める必要があります。書き込むデータに基づいて他のフィールドを定義できます。

    フィールド名

    フィールドの型

    topic

    STRING

    partition

    BIGINT

    offset

    BIGINT

    pt

    STRING (パーティションフィールド)

    • CSV フォーマットの Kafka データを MaxCompute テーブルに書き込む場合、書き込み操作を正常に完了させるために、MaxCompute テーブルのカスタムフィールドの順序とデータ型が Kafka データの列と一致する必要があります。

    • FLATTEN フォーマットの Kafka データを MaxCompute テーブルに書き込む場合、書き込み操作を正常に完了させるために、MaxCompute テーブルのカスタムフィールドの名前が Kafka データのフィールド名と一致する必要があります。

      たとえば、FLATTEN フォーマットの Kafka データが {"A":a,"B":"b","C":{"D":"d","E":"e"}} の場合、MaxCompute テーブルは次のように設定する必要があります。

      CREATE TABLE IF NOT EXISTS table_flatten(
       topic STRING,
       `partition` BIGINT,
       `offset` BIGINT,
       A BIGINT,
       B STRING,
       C JSON
      ) PARTITIONED BY (pt STRING);

Kafka コネクタサービスの設定と起動

  1. この例では Linux 環境を使用します。コマンドウィンドウで、次のコマンドを実行するか、ダウンロードリンクを使用して kafka-connector-2.0.jar パッケージをダウンロードします。

    wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar

    依存関係の競合を防ぐために、$KAFKA_HOME/libs ディレクトリに connector などのサブフォルダを作成し、その中に kafka-connector-2.0.jar パッケージを配置します。

    説明

    kafka-connector-2.0.jar パッケージがご利用の Kafka デプロイメント環境と互換性がない場合は、「Kafka-connector の設定」で Kafka-connector サービスの設定と起動方法に関する詳細をご参照ください。

  2. $KAFKA_HOME/config ディレクトリで、connect-distributed.properties ファイルを設定します。

    connect-distributed.properties ファイルに次の内容を追加します。

    ## 次の内容を追加します
    plugin.path=<KAFKA_HOME>/libs/connector
    
    ## key.converter および value.converter パラメーターの値を更新します
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter  
  3. $KAFKA_HOME/ ディレクトリで、次のコマンドを実行して Kafka-connector サービスを起動します。

    ## 起動コマンド
    bin/connect-distributed.sh config/connect-distributed.properties &

Kafka コネクタタスクの設定と起動

  1. odps-sink-connector.json 設定ファイルを作成して設定します。次に、odps-sink-connector.json ファイルを任意の場所にアップロードします。

    odps-sink-connector.json 設定ファイルの内容とパラメーターについては、次のセクションで説明します。

    {
      "name": "Kafka コネクタタスク名",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "your_topic",
        "endpoint": "エンドポイント",
        "tunnel_endpoint": "your_tunnel エンドポイント",
        "project": "プロジェクト",
        "schema":"default",
        "table": "your_table",
        "account_type": "アカウントタイプ (STS または ALIYUN)",
        "access_id": "アクセス ID",
        "access_key": "アクセスキー",
        "account_id": "sts のアカウント ID",
        "sts.endpoint": "sts エンドポイント",
        "region_id": "sts のリージョン ID",
        "role_name": "sts のロール名",
        "client_timeout_ms": "STS トークンの有効期間 (ms)",
        "format": "TEXT",
        "mode": "KEY",
        "partition_window_type": "MINUTE",
        "use_streaming": false,
        "buffer_size_kb": 65536,
        "sink_pool_size":"150",
        "record_batch_size":"8000",
        "runtime.error.topic.name":"実行時エラー発生時の kafka トピック",
        "runtime.error.topic.bootstrap.servers":"エラーキューの kafka ブートストラップサーバー",
        "skip_error":"false"
      }
    }
    • 共通パラメーター

      パラメーター

      必須

      説明

      name

      はい

      タスクの名前。名前は一意である必要があります。

      connector.class

      はい

      Kafka コネクタサービスを起動するためのクラス名。デフォルト値は com.aliyun.odps.kafka.connect.MaxComputeSinkConnector です。

      tasks.max

      はい

      Kafka コネクタ内のコンシューマープロセスの最大数。値は 0 より大きい整数である必要があります。

      topics

      はい

      Kafka トピックの名前。

      endpoint

      はい

      MaxCompute サービスのエンドポイント。

      MaxCompute プロジェクトの作成時に選択したリージョンとネットワーク接続タイプに基づいてエンドポイントを設定する必要があります。各リージョンとネットワークのエンドポイントについては、「エンドポイント」をご参照ください。

      tunnel_endpoint

      いいえ

      Tunnel サービスのパブリックエンドポイント。

      Tunnel エンドポイントを設定しない場合、トンネルは MaxCompute サービスが配置されているネットワークに対応する Tunnel エンドポイントに自動的にルーティングされます。Tunnel エンドポイントを設定した場合、その設定が優先され、自動ルーティングは無効になります。

      各リージョンとネットワークのエンドポイントについては、「エンドポイント」をご参照ください。

      project

      はい

      ターゲットの MaxCompute プロジェクトの名前。

      schema

      いいえ

      • このパラメーターは、ターゲットの MaxCompute プロジェクトが3層スキーマモデルで設定されている場合に必須です。デフォルト値は default です。

      • このパラメーターは、ターゲットの MaxCompute プロジェクトが3層スキーマモデルで設定されていない場合は不要です。

      スキーマの詳細については、「スキーマ操作」をご参照ください。

      table

      はい

      ターゲットの MaxCompute プロジェクト内のテーブルの名前。

      format

      いいえ

      書き込むメッセージのフォーマット。有効な値:

      • TEXT (デフォルト):メッセージは文字列です。

      • BINARY:メッセージはバイト配列です。

      • CSV:メッセージはカンマ (,) で区切られた値を持つ文字列です。

      • JSON:メッセージは JSON データ型の文字列です。MaxCompute の JSON 型の詳細については、「JSON データ型」をご参照ください。

      • FLATTEN:メッセージは JSON データ型の文字列です。JSON 文字列のキーと値が解析され、MaxCompute テーブルの対応する列に書き込まれます。JSON データのキーは、MaxCompute テーブルの列名に対応している必要があります。

      さまざまなフォーマットのメッセージをインポートする例については、「使用例」をご参照ください。

      mode

      いいえ

      メッセージを MaxCompute に同期するモード。有効な値:

      • KEY:メッセージキーのみを保持し、キーをターゲットの MaxCompute テーブルに書き込みます。

      • VALUE:メッセージ値のみを保持し、値をターゲットの MaxCompute テーブルに書き込みます。

      • DEFAULT (デフォルト):メッセージキーと値の両方を保持し、それらをターゲットの MaxCompute テーブルに書き込みます。

        DEFAULT モードでは、TEXT および BINARY データフォーマットのみがサポートされます。

      partition_window_type

      いいえ

      システム時刻によってデータをパーティション分割します。有効な値:DAYHOUR (デフォルト)、および MINUTE

      use_streaming

      いいえ

      ストリーミングデータトンネルを使用するかどうかを指定します。有効な値:

      • false (デフォルト):無効。

      • true:有効。

      buffer_size_kb

      いいえ

      odps パーティションライターの内部バッファーのサイズ (KB)。デフォルト値は 65536 KB です。

      sink_pool_size

      いいえ

      マルチスレッド書き込みの最大スレッド数。デフォルト値はシステムの CPU コア数です。

      record_batch_size

      いいえ

      Kafka コネクタタスク内の単一スレッドが一度に並行して送信できるメッセージの最大数。

      skip_error

      いいえ

      不明なエラーを引き起こすレコードをスキップするかどうかを指定します。有効な値:

      • false (デフォルト):レコードをスキップしません。

      • true:レコードをスキップします。

        説明
        • skip_errorfalse に設定され、runtime.error.topic.name パラメーターが設定されていない場合、不明なエラーが発生すると、プロセスはデータ書き込みを停止します。プロセスはブロックされ、ログに例外がスローされます。

        • skip_errortrue に設定され、runtime.error.topic.name が設定されていない場合、データ書き込みプロセスは続行され、異常データは破棄されます。

        • skip_errorfalse に設定され、runtime.error.topic.name が設定されている場合、データ書き込みプロセスは続行され、異常データは runtime.error.topic.name で指定されたトピックに記録されます。

        異常データの処理方法の例については、「異常データの処理例」をご参照ください。

      runtime.error.topic.name

      いいえ

      書き込み操作中に不明なエラーを引き起こしたデータが書き込まれる Kafka トピックの名前。

      runtime.error.topic.bootstrap.servers

      いいえ

      書き込み操作中に不明なエラーを引き起こしたデータが書き込まれる Kafka インスタンスのブートストラップサーバーアドレス。

      account_type

      はい

      ターゲットの MaxCompute サービスにアクセスするメソッド。有効な値は STSALIYUN です。デフォルト値は ALIYUN です。

      アクセスするメソッドが異なれば、必要なアクセス認証情報パラメーターも異なります。詳細については、「ALIYUN メソッドを使用した MaxCompute へのアクセス」および「STS メソッドを使用した MaxCompute へのアクセス」をご参照ください。

    • 共通パラメーターに加えて、次のパラメーターも設定する必要があります。

      パラメーター名

      説明

      access_id

      ご利用の Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

      AccessKey ID は [AccessKey 管理] ページで取得できます。

      access_key

      AccessKey ID に対応する AccessKey Secret。

    • 共通パラメーターに加えて、次のパラメーターも設定する必要があります。

      パラメーター

      説明

      account_id

      ターゲットの MaxCompute プロジェクトへのアクセスに使用するアカウントの ID。アカウント ID は [アカウントセンター] で確認できます。

      region_id

      ターゲットの MaxCompute プロジェクトのリージョン ID。各リージョンの ID については、「エンドポイント」をご参照ください。

      role_name

      ターゲットの MaxCompute プロジェクトへのアクセスに使用するロールの名前。ロール名は [ロール] ページで確認できます。

      client_timeout_ms

      Security Token Service (STS) トークンの更新間隔 (ミリ秒)。デフォルト値は 11 ms です。

      sts.endpoint

      一時的なセキュリティトークン (STS) を使用した身分認証に必要な STS サービスのエンドポイント。

      各リージョンとネットワークのエンドポイントについては、「エンドポイント」をご参照ください。

  2. 次のコマンドを実行して、Kafka コネクタのデータ移行タスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json

使用例

TEXT データの書き込み

  1. データの準備

    • ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、ターゲットテーブルを作成します。

      CREATE TABLE IF NOT EXISTS table_text(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value STRING
      ) PARTITIONED BY (pt STRING);
    • Kafka データの作成

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行して Kafka トピックを作成します。この例では、トピック名として topic_text を使用します。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text

      次のコマンドを実行して Kafka メッセージを作成します。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true
      >123    abc
      >456    edf
  2. (任意) Kafka-connector サービスを起動します。詳細については、「Kafka コネクタサービスの設定と起動」をご参照ください。

    説明

    Kafka-connector サービスがすでに実行中の場合は、このステップをスキップできます。

  3. odps-sink-connector.json ファイルを作成して設定します。次に、odps-sink-connector.json ファイルを $KAFKA_HOME/config パスなどの任意の場所にアップロードします。

    次のコードは odps-sink-connector.json ファイルの例です。odps-sink-connector.json ファイルの詳細については、「Kafka コネクタタスクの設定と起動」をご参照ください。

    {
        "name": "odps-test-text",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_text",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",
          "schema":"default",
          "table": "table_text",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"TEXT",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
  4. 次のコマンドを実行して、Kafka コネクタのデータ移行タスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果の検証

    ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、次のコマンドを実行してデータをクエリし、結果を検証します。

    set odps.sql.allow.fullscan=true;
    select * from table_text;

    次の出力が返されます:

    # odps-sink-connector.json 設定ファイルの mode パラメーターが VALUE に設定されているため、値の内容のみが保持されます。キーフィールドは NULL です。
    
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | topic_text | 0      | 0          | NULL | abc   | 07-13-2023 21:13 |
    | topic_text | 0      | 1          | NULL | edf   | 07-13-2023 21:13 |
    +-------+------------+------------+-----+-------+----+

CSV データの書き込み

  1. データの準備

    • ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、ターゲットテーブルを作成します。

      CREATE TABLE IF NOT EXISTS table_csv(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        region STRING
      ) PARTITIONED BY (pt STRING);
    • Kafka へのデータの書き込み

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行して topic_csv という名前の Kafka トピックを作成します。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv

      次のコマンドを実行して Kafka メッセージを作成します。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true
      >123	1103,zhangsan,china
      >456	1104,lisi,usa
  2. (任意) Kafka-connector サービスを起動します。詳細については、「Kafka コネクタサービスの設定と起動」をご参照ください。

    説明

    Kafka-connector サービスがすでに実行中の場合は、このステップをスキップできます。

  3. odps-sink-connector.json ファイルを作成して設定し、odps-sink-connector.json ファイルを任意の場所にアップロードします。このトピックでは、例として $KAFKA_HOME/config パスを使用します。

    次のコードは odps-sink-connector.json ファイルの例です。odps-sink-connector.json ファイルの詳細については、「Kafka コネクタタスクの設定と起動」をご参照ください。

    {
        "name": "odps-test-csv",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_csv",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_csv",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "format":"CSV",
          "mode":"VALUE",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 次のコマンドを実行して、Kafka コネクタのデータ移行タスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果の検証

    ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、次のコマンドを実行してデータをクエリし、結果を検証します。

    set odps.sql.allow.fullscan=true;
    select * from table_csv;

    次の出力が返されます:

    +-------+------------+------------+------------+------+--------+----+
    | topic | partition  | offset     | id         | name | region | pt |
    +-------+------------+------------+------------+------+--------+----+
    | csv_test | 0       | 0          | 1103       | zhangsan | china  | 07-14-2023 00:10 |
    | csv_test | 0       | 1          | 1104       | lisi | usa    | 07-14-2023 00:10 |
    +-------+------------+------------+------------+------+--------+----+

JSON データの書き込み

  1. データの準備

    • ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、ターゲットテーブルを作成します。

      CREATE TABLE IF NOT EXISTS table_json(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value JSON
      ) PARTITIONED BY (pt STRING);
    • Kafka データの作成

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行して Kafka トピックを作成します。この例では、トピック名として topic_json を使用します。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json

      次のコマンドを実行して、Kafka メッセージを作成します。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true
      >123    {"id":123,"name":"json-1","region":"beijing"}                         
      >456    {"id":456,"name":"json-2","region":"hangzhou"}
  2. (任意) Kafka-connector サービスを起動します。詳細については、「Kafka コネクタサービスの設定と起動」をご参照ください。

    説明

    Kafka-connector サービスがすでに実行中の場合は、このステップをスキップできます。

  3. odps-sink-connector.json ファイルを作成して設定します。次に、odps-sink-connector.json ファイルを $KAFKA_HOME/config パスなどの任意の場所にアップロードします。

    次のコードは odps-sink-connector.json ファイルの例です。odps-sink-connector.json ファイルの詳細については、「Kafka コネクタタスクの設定と起動」をご参照ください。

    {
        "name": "odps-test-json",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_json",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_json",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"JSON",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 次のコマンドを実行して、Kafka コネクタのデータ移行タスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果の検証

    ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、次のコマンドを実行してデータをクエリし、結果を検証します。

    set odps.sql.allow.fullscan=true;
    select * from table_json;

    次の出力が返されます:

    # JSON データは正常に value フィールドに書き込まれます。
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | Topic_json | 0      | 0          | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 |
    | Topic_json | 0      | 1          | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 |
    +-------+------------+------------+-----+-------+----+

FLATTEN データの書き込み

  1. データの準備

    • ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、ターゲットテーブルを作成します。

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • Kafka データの作成

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行して Kafka トピックを作成します。この例では、トピック名として topic_flatten を使用します。

      ./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten

      次のコマンドを実行して Kafka メッセージを作成します。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true
      >123  {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}}                         
      >456  {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}

  2. (任意) Kafka-connector サービスを起動します。詳細については、「Kafka コネクタサービスの設定と起動」をご参照ください。

    説明

    Kafka-connector サービスがすでに実行中の場合は、このステップをスキップできます。

  3. odps-sink-connector.json ファイルを作成して設定し、odps-sink-connector.json ファイルを任意の場所にアップロードします。このトピックでは、例として $KAFKA_HOME/config パスを使用します。

    次のコードは odps-sink-connector.json ファイルの例です。odps-sink-connector.json ファイルの詳細については、「Kafka コネクタタスクの設定と起動」をご参照ください。

    {
        "name": "odps-test-flatten",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_flatten",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_flatten",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"FLATTEN",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 次のコマンドを実行して、Kafka コネクタタスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果の検証

    ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、次のコマンドを実行してデータをクエリし、結果を検証します。

    set odps.sql.allow.fullscan=true;
    select * from table_flatten;

    次の結果が表示されます:

    # JSON データは解析されて MaxCompute テーブルに書き込まれ、extendinfo はネストをサポートする JSON フィールドになります。
    +-------+------------+--------+-----+------+------------+----+
    | topic | partition  | offset | id  | name | extendinfo | pt |
    +-------+------------+--------+-----+------+------------+----+
    | topic_flatten | 0   | 0      | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 |
    | topic_flatten | 0   | 1      | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 |
    +-------+------------+--------+-----+------+------------+----+

異常データの処理例

  1. データの準備

    • ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、ターゲットテーブルを作成します。

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • Kafka データの作成

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行して Kafka トピックを作成します。

      • topic_abnormal トピック。

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormal
      • runtime_error 例外のメッセージトピック。

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error
        説明

        データ書き込み操作中にエラーが発生した場合、異常データは runtime_error トピックに書き込まれます。この種のエラーは通常、Kafka データと MaxCompute テーブルスキーマの不一致が原因で発生します。

      次のコマンドを実行して Kafka メッセージを作成します。

      次のコマンドのメッセージの1つが、ターゲットの MaxCompute テーブルのスキーマと一致しません。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true
      
      >100  {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}}                         
      >101  {"id":101,"name":"json-4","extendinfos":"null"}
      >102	{"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}} 
  2. (任意) Kafka-connector サービスを起動します。詳細については、「Kafka コネクタサービスの設定と起動」をご参照ください。

    説明

    Kafka-connector サービスがすでに実行中の場合は、このステップをスキップできます。

  3. odps-sink-connector.json ファイルを作成して設定し、odps-sink-connector.json ファイルを任意の場所にアップロードします。このトピックでは、例として $KAFKA_HOME/config パスを使用します。

    次のコードは odps-sink-connector.json ファイルの例です。odps-sink-connector.json ファイルの詳細については、「Kafka コネクタタスクの設定と起動」をご参照ください。

    {
      "name": "odps-test-runtime-error",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_abnormal",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema":"default",
        "table": "test_flatten",
        "account_type": "ALIYUN",
        "access_id": "<yourAccessKeyId>",
        "access_key": "<yourAccessKeySecret>",
        "partition_window_type": "MINUTE",
        "mode":"VALUE",
        "format":"FLATTEN",
        "sink_pool_size":"150",
        "record_batch_size":"9000",
        "buffer_size_kb":"600000",
        "runtime.error.topic.name":"runtime_error",
        "runtime.error.topic.bootstrap.servers":"http://XXXX",
        "skip_error":"false"
      }
    }
    
  4. 次のコマンドを実行して、Kafka コネクタタスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果の検証

    • MaxCompute テーブルデータのクエリ

      ローカルクライアント (odpscmd) または MaxCompute SQL コマンドを実行できる別のツールを使用して MaxCompute に接続し、次のコマンドを実行してデータをクエリし、結果を検証します。

      set odps.sql.allow.fullscan=true;
      select * from table_flatten;

      次の出力が返されます:

      # 結果からわかるように、ID 101 のデータはテーブルスキーマと一致しなかったため、MaxCompute に書き込まれませんでした。
      # runtime.error.topic.name パラメーターが設定されていたため、プロセスはブロックされず、後続のデータは正常に書き込まれました。
      +-------+------------+------------+------------+------+------------+----+
      | topic | partition  | offset     | id         | name | extendinfo | pt |
      +-------+------------+------------+------------+------+------------+----+
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 2          | 100        | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 4          | 102        | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      +-------+------------+------------+------------+------+------------+----+
    • runtime_error トピックのメッセージのクエリ

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行してメッセージを表示します。

      sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning

      次の結果が返されます:

      # 異常データは正常に runtime_error メッセージキューに書き込まれます。
      {"id":101,"name":"json-4","extendinfos":"null"}