すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:オフラインモードまたはリアルタイムモードでMaxComputeにKafkaデータをインポートする

最終更新日:Mar 28, 2025

MaxComputeとKafkaの統合により、効率的で信頼性の高いデータ処理および分析機能が提供されます。 MaxComputeとKafkaの統合は、リアルタイム処理、大規模なデータストリーム、複雑なデータ分析が必要なシナリオに適しています。 このトピックでは、ApsaraMQ for KafkaおよびセルフマネージドKafkaのデータをMaxComputeにインポートする方法について説明します。 このトピックでは、セルフマネージドKafkaのデータをMaxComputeにインポートする方法の例も提供します。

ApsaraMQ for KafkaからMaxComputeへのデータのインポート

MaxComputeはApsaraMQ for Kafkaと密接に統合されています。 ApsaraMQ for Kafkaが提供するMaxComputeシンクコネクタを直接使用して、特定のトピックのデータをMaxComputeテーブルに継続的にインポートできます。 サードパーティのツールを使用したり、カスタム開発を実行したりする必要はありません。 MaxComputeシンクコネクタの作成方法の詳細については、「MaxComputeシンクコネクタの作成」をご参照ください。

セルフマネージドApache KafkaからMaxComputeへのデータのインポート

前提条件

  • V2.2以降のKafkaサービスがデプロイされ、Kafkaトピックが作成されます。 V3.4.0のKafkaサービスをデプロイすることを推奨します。

  • MaxComputeプロジェクトとMaxComputeテーブルが作成されます。 詳細については、「MaxCompute プロジェクトを作成する」および「テーブルの作成」をご参照ください。

注意事項

Kafka-connectorサービスを使用すると、TEXTCSVJSON、またはFLATTENタイプのKafkaデータをMaxComputeに書き込むことができます。 さまざまな種類のKafkaデータを書き込む場合は、次の項目に注意してください。 データ型の詳細については、formatパラメーターの説明をご参照ください。

  • 次の表に、TEXTまたはJSONタイプのKafkaデータが書き込まれるMaxComputeテーブルの要件を示します。

    フィールド名

    データ型

    必須

    topic

    STRING

    はい。

    partition

    BIGINT

    はい。

    offset

    BIGINT

    はい。

    key

    • TEXT型のKafkaデータを記述する場合、フィールドはSTRING型でなければなりません。

    • JSON型のKafkaデータを記述する場合、フィールドは、記述されたデータのデータ型設定に基づいてSTRING型またはJSON型にすることができます。

    このフィールドは、KafkaメッセージのキーをMaxComputeテーブルに同期する必要がある場合に必要です。 KafkaメッセージがMaxComputeに同期されるモードの詳細については、modeパラメーターの説明をご参照ください。

    value

    • TEXT型のKafkaデータを記述する場合、フィールドはSTRING型でなければなりません。

    • JSON型のKafkaデータを記述する場合、フィールドは、記述されたデータのデータ型設定に基づいてSTRING型またはJSON型にすることができます。

    このフィールドは、Kafkaメッセージの値をMaxComputeテーブルに同期する必要がある場合に必要です。 KafkaメッセージがMaxComputeに同期されるモードの詳細については、modeパラメーターの説明をご参照ください。

    pt

    STRING (パーティションフィールド)

    はい。

  • FLATTEN型またはCSV型のKafkaデータをMaxComputeに書き込む場合、次の表に示すフィールドが含まれ、必要なデータ型である必要があります。 書き込まれたデータに基づいてカスタムフィールドを設定することもできます。

    フィールド名

    データ型

    topic

    STRING

    partition

    BIGINT

    offset

    BIGINT

    pt

    STRING (パーティションフィールド)

    • CSVタイプのKafkaデータをMaxComputeテーブルに書き込む場合、MaxComputeテーブルのカスタムフィールドシーケンスとフィールドタイプは、Kafkaデータのものと一致している必要があります。 これにより、Kafkaデータを正しく書き込むことができます。

    • FLATTEN型のKafkaデータをMaxComputeテーブルに書き込む場合、MaxComputeテーブルのカスタムフィールド名は、Kafkaデータのフィールド名と一致している必要があります。 これにより、Kafkaデータを正しく書き込むことができます。

      たとえば、書き込むFLATTEN型のKafkaデータが {"A":a,"B":"b","C":{"D":"d","E":"e"}} の場合、次の文を実行してデータを保存するMaxComputeテーブルを作成できます。

      CREATE TABLE IF NOT EXISTS table_flatten(
       topic STRING,
       `partition` BIGINT,
       `offset` BIGINT,
       A BIGINT,
       B STRING,
       C JSON
      ) PARTITIONED BY (pt STRING);

Kafka-connectorサービスの設定と開始

  1. Linux環境で、CLIで次のコマンドを実行するか、ダウンロードリンクをクリックしてkafka-connector-2.0.jarパッケージをダウンロードします。

    wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar

    依存関係の競合を防ぐために、$KAFKA_HOME/libsconnectorなどのサブフォルダーを作成してkafka-connector-2.0.jarパッケージを格納することを推奨します

    説明

    kafka-connector-2.0.jarパッケージの展開環境がKafkaデータの展開環境と同じでない場合は、aliware-Kafka-demosに記載されている手順に従って、kafka-connectorサービスを設定して起動する必要があります。

  2. $KAFKA_HOME/configディレクトリで、connect-distributed.propertiesファイルを設定します。

    connect-distributed.propertiesファイルに次の設定を追加します。

    ## Add the following content:
    plugin.path=<KAFKA_HOME>/libs/connector
    
    ## Update the values of the key.converter and value.converter parameters.
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter  
  3. $KAFKA_HOME/ ディレクトリで次のコマンドを実行し、Kafka-connectorサービスを開始します。

    ## Run the following command:
    bin/connect-distributed.sh config/connect-distributed.properties &

Kafka-connectorタスクの設定と開始

  1. odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。

    次のコードと表は、odps-sink-connector.jsonファイルの内容とパラメーターを示しています。

    {
      "name": "Kafka connector task name",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "your_topic",
        "endpoint": "endpoint",
        "tunnel_endpoint": "your_tunnel endpoint",
        "project": "project",
        "schema":"default",
        "table": "your_table",
        "account_type": "account type (STS or ALIYUN)",
        "access_id": "access id",
        "access_key": "access key",
        "account_id": "account id for sts",
        "sts.endpoint": "sts endpoint",
        "region_id": "region id for sts",
        "role_name": "role name for sts",
        "client_timeout_ms": "STS Token valid period (ms)",
        "format": "TEXT",
        "mode": "KEY",
        "partition_window_type": "MINUTE",
        "use_streaming": false,
        "buffer_size_kb": 65536,
        "sink_pool_size":"150",
        "record_batch_size":"8000",
        "runtime.error.topic.name":"kafka topic when runtime errors happens",
        "runtime.error.topic.bootstrap.servers":"kafka bootstrap servers of error topic queue",
        "skip_error":"false"
      }
    }
    • 共通パラメーター

      パラメーター

      必須

      説明

      name

      必須

      タスクの名前。 名前は一意にする必要があります。

      connector.class

      必須

      Kafka-connectorサービスのクラス名。 デフォルト値: com.aliyun.odps.kafka.connect.MaxComputeSinkConnector

      tasks.max

      必須

      Kafka-connectorサービスのコンシューマプロセスの最大数。 値は0より大きい整数でなければなりません。

      トピック

      必須

      Kafkaトピックの名前。

      endpoint

      必須

      MaxComputeのエンドポイント。

      このパラメーターは、MaxComputeプロジェクトの作成時に選択したリージョンとネットワーク接続タイプに基づいて設定する必要があります。 各リージョンの異なるネットワークタイプのエンドポイントの詳細については、「エンドポイント」をご参照ください。

      tunnel_endpoint

      選択可能

      MaxCompute Tunnelのパブリックエンドポイント。

      このパラメーターを設定しないと、トラフィックはMaxComputeが存在するネットワークに対応するTunnelエンドポイントに自動的にルーティングされます。 このパラメーターを設定すると、トラフィックは指定されたエンドポイントにルーティングされ、自動ルーティングは実行されません。

      各リージョンの異なるネットワークタイプのトンネルエンドポイントの詳細については、「エンドポイント」をご参照ください。

      project

      必須

      アクセスするMaxComputeプロジェクトの名前。

      schema

      選択可能

      • このパラメーターは、ターゲットMaxComputeプロジェクトに3層のスキーマモデルがある場合に必要です。 デフォルト値: Default

      • ターゲットMaxComputeプロジェクトに3層スキーマモデルがない場合、このパラメーターを設定する必要はありません。

      スキーマの詳細については、「スキーマ関連の操作」をご参照ください。

      table

      必須

      ターゲットMaxComputeプロジェクト内のテーブルの名前。

      format

      選択可能

      書かれたメッセージの形式。 有効な値:

      • TEXT: 文字列。 デフォルト値です。

      • BINARY: バイト配列。

      • CSV: コンマ (,) で区切られた文字列のリスト。

      • JSON: JSON文字列。 MaxCompute JSONデータ型の詳細については、「MaxComputeのJSON型を使用する手順 (ベータ版) 」をご参照ください。

      • FLATTEN: JSON文字列。 JSON文字列のキーと値が解析され、指定されたMaxComputeテーブルに書き込まれます。 JSON文字列のキーは、MaxComputeテーブルの列名に対応している必要があります。

      さまざまな形式のメッセージをインポートする方法の詳細については、「」をご参照ください。

      mode

      選択可能

      メッセージがMaxComputeに同期されるモード。 有効な値:

      • KEY: メッセージのキーのみが保持され、宛先MaxComputeテーブルに書き込まれます。

      • VALUE: メッセージの値のみが保持され、宛先MaxComputeテーブルに書き込まれます。

      • DEFAULT: メッセージのキーと値の両方が保持され、宛先MaxComputeテーブルに書き込まれます。 デフォルト値です。

        このパラメーターをDEFAULTに設定すると、TEXT型またはBINARY型のデータのみを書き込むことができます。

      partition_window_type

      選択可能

      データは、システム時間に基づいて分割される。 有効な値: DAYHOURMINUTE。 デフォルト値: HOUR。

      use_streaming

      選択可能

      Streaming Tunnelを使用するかどうかを指定します。 有効な値:

      • false: Streaming Tunnelは使用されません。 デフォルト値です。

      • true: ストリーミングトンネルが使用されます。

      buffer_size_kb

      選択可能

      odpsパーティションライターの内部バッファサイズ。 (単位:KB) デフォルトのサイズは65,536 KBです。

      sink_pool_size

      選択可能

      マルチスレッド書き込み用のスレッドの最大数。 デフォルト値は、システム内のCPUコアの数です。

      record_batch_size

      選択可能

      Kafka-connectorタスクでスレッドが同時に送信できるメッセージの最大数。

      skip_error

      選択可能

      不明なエラーが発生したときに生成されるレコードをスキップするかどうかを指定します。 有効な値:

      • false: レコードはスキップされません。 デフォルト値です。

      • true: レコードはスキップされます。

        説明
        • skip_errorfalseに設定され、runtime.error.topic.nameパラメーターが設定されていない場合、その後のデータ書き込み操作は停止され、プロセスはブロックされ、不明なエラーが発生すると例外がログに記録されます。

        • skip_errortrueに設定されていて、runtime.error.topic.nameパラメーターが設定されていない場合、データの書き込みプロセスはデータの書き込みを続行し、異常なデータは破棄されます。

        • skip_errorパラメーターがfalseに設定され、runtime.error.topic.nameパラメーターが設定されている場合、データ書き込みのプロセスはデータの書き込みを続行し、runtime.error.topic.nameトピックで指定されたトピックに異常なデータが記録されます

        異常データの処理例については、「異常データの処理」をご参照ください。

      runtime.error.topic.name

      選択可能

      不明なエラーが発生したときにデータが書き込まれるKafkaトピックの名前。

      runtime.error.topic.bootstrap.servers

      選択可能

      ブートストラップサーバー構成のアドレス。 アドレスは、未知のエラーが発生したときにデータが書き込まれるKafkaブローカーのアドレスです。

      account_type

      必須

      ターゲットMaxComputeサービスへのアクセスに使用されるメソッド。 有効な値: STSおよびALIYUN。 デフォルト値: ALIYUN

      MaxComputeにアクセスするには、異なる方法に対して異なるアクセス資格情報パラメーターを設定する必要があります。 詳細については、このトピックの「ALIYUNメソッドを使用したMaxComputeへのアクセス」および「STSメソッドを使用したMaxComputeへのアクセス」をご参照ください。

    • ALIYUNメソッドを使用したMaxComputeへのアクセス: 共通パラメーターに加えて、次のパラメーターを設定する必要があります。

      パラメーター

      説明

      access_id

      Alibaba CloudアカウントまたはAlibaba Cloudアカウント内のRAMユーザーのAccessKey ID。

      AccessKeyペアページからAccessKey IDを取得できます。

      access_key

      AccessKey IDに対応するAccessKeyシークレット。

    • STSメソッドを使用したMaxComputeへのアクセス: 共通パラメーターに加えて、次のパラメーターを設定する必要があります。

      パラメーター

      説明

      account_id

      ターゲットMaxComputeプロジェクトへのアクセスに使用されるアカウントのID。 アカウントセンターでアカウントIDを確認できます。

      region_id

      ターゲットMaxComputeプロジェクトが存在するリージョンのID。 各リージョンのIDの詳細については、「エンドポイント」をご参照ください。

      role_name

      ターゲットMaxComputeプロジェクトへのアクセスに使用されるロールの名前。 ロール名は [ロール] ページで確認できます。

      client_timeout_ms

      STSトークンが更新される間隔。 単位:ミリ秒。 デフォルト値: 11。

      sts.endpoint

      ID認証にSTSトークンを使用する場合に必要なSTSサービスエンドポイント。

      各リージョンの異なるネットワークタイプのエンドポイントの詳細については、「エンドポイント」をご参照ください。

  2. 次のコマンドを実行して、Kafka-connectorタスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json

TEXTタイプの書き込みデータ

  1. データを準備します。

    • MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールを使用して、MaxComputeテーブルを作成します。

      CREATE TABLE IF NOT EXISTS table_text(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value STRING
      ) PARTITIONED BY (pt STRING);
    • Kafkaデータを作成します。

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行してKafkaトピックを作成します。 この例では、topic_textという名前のKafkaトピックが作成されます。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text

      次のコマンドを実行してKafkaメッセージを作成します。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true
      >123    abc
      >456    edf
  2. (オプション) Kafka-connectorサービスを開始します。 詳細については、「Kafka-connectorサービスの設定と開始」をご参照ください。

    説明

    Kafka-connectorサービスが開始されている場合は、この手順をスキップします。

  3. odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。 この例では、odps-sink-connector.jsonファイルは $KAFKA_HOME/configディレクトリにアップロードされます。

    次のコードは、odps-sink-connector.jsonファイルの内容を示しています。 odps-sink-connector.jsonファイルの詳細については、「Kafka-connectorタスクの設定と開始」をご参照ください。

    {
        "name": "odps-test-text",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_text",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",
          "schema":"default",
          "table": "table_text",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"TEXT",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
  4. 次のコマンドを実行して、Kafka-connectorタスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果を確認します。

    MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールで次のコマンドを実行して、データ書き込み結果を照会します。

    set odps.sql.allow.fullscan=true;
    select * from table_text;

    次の応答が返されます。

    # The mode value in the odps-sink-connector.json configuration file is VALUE. Therefore, only the value is retained and the key field is NULL.
    
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | topic_text | 0      | 0          | NULL | abc   | 07-13-2023 21:13 |
    | topic_text | 0      | 1          | NULL | edf   | 07-13-2023 21:13 |
    +-------+------------+------------+-----+-------+----+

CSVタイプのデータの書き込み

  1. データを準備します。

    • MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールを使用して、ターゲットMaxComputeテーブルを作成します。

      CREATE TABLE IF NOT EXISTS table_csv(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        region STRING
      ) PARTITIONED BY (pt STRING);
    • Kafkaデータを作成します。

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行してKafkaトピックを作成します。 この例では、topic_csvという名前のKafkaトピックが作成されます。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv

      次のコマンドを実行してKafkaメッセージを作成します。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true
      >123	1103,zhangsan,china
      >456	1104,lisi,usa
  2. (オプション) Kafka-connectorサービスを開始します。 詳細については、「Kafka-connectorサービスの設定と開始」をご参照ください。

    説明

    Kafka-connectorサービスが開始されている場合は、この手順をスキップします。

  3. odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。 この例では、odps-sink-connector.jsonファイルは $KAFKA_HOME/configディレクトリにアップロードされます。

    次のコードは、odps-sink-connector.jsonファイルの内容を示しています。 odps-sink-connector.jsonファイルの詳細については、「Kafka-connectorタスクの設定と開始」をご参照ください。

    {
        "name": "odps-test-csv",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_csv",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_csv",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "format":"CSV",
          "mode":"VALUE",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 次のコマンドを実行して、Kafka-connectorタスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果を確認します。

    MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールで次のコマンドを実行して、データ書き込み結果を照会します。

    set odps.sql.allow.fullscan=true;
    select * from table_csv;

    次の応答が返されます。

    +-------+------------+------------+------------+------+--------+----+
    | topic | partition  | offset     | id         | name | region | pt |
    +-------+------------+------------+------------+------+--------+----+
    | csv_test | 0       | 0          | 1103       | zhangsan | china  | 07-14-2023 00:10 |
    | csv_test | 0       | 1          | 1104       | lisi | usa    | 07-14-2023 00:10 |
    +-------+------------+------------+------------+------+--------+----+

JSON型のデータの書き込み

  1. データを準備します。

    • MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールを使用して、ターゲットMaxComputeテーブルを作成します。

      CREATE TABLE IF NOT EXISTS table_json(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value JSON
      ) PARTITIONED BY (pt STRING);
    • Kafkaデータを作成します。

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行してKafkaトピックを作成します。 この例では、topic_jsonという名前のトピックが作成されます。

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json

      次のコマンドを実行してKafkaメッセージを作成します。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true
      >123    {"id":123,"name":"json-1","region":"beijing"}                         
      >456    {"id":456,"name":"json-2","region":"hangzhou"}
  2. (オプション) Kafka-connectorサービスを開始します。 詳細については、「Kafka-connectorサービスの設定と開始」をご参照ください。

    説明

    Kafka-connectorサービスが開始されている場合は、この手順をスキップします。

  3. odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。 この例では、odps-sink-connector.jsonファイルは $KAFKA_HOME/configディレクトリにアップロードされます。

    次のコードは、odps-sink-connector.jsonファイルの内容を示しています。 odps-sink-connector.jsonファイルの詳細については、「Kafka-connectorタスクの設定と開始」をご参照ください。

    {
        "name": "odps-test-json",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_json",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_json",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"JSON",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 次のコマンドを実行して、Kafka-connectorタスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果を確認します。

    MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールで次のコマンドを実行して、データ書き込み結果を照会します。

    set odps.sql.allow.fullscan=true;
    select * from table_json;

    次の応答が返されます。

    # Write JSON data to the value field.
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | Topic_json | 0      | 0          | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 |
    | Topic_json | 0      | 1          | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 |
    +-------+------------+------------+-----+-------+----+

FLATTENタイプの書き込みデータ

  1. データを準備します。

    • MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールを使用して、ターゲットMaxComputeテーブルを作成します。

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • Kafkaデータを作成します。

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行してKafkaトピックを作成します。 この例では、topic_flattenという名前のトピックが作成されます。

      ./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten

      次のコマンドを実行してKafkaメッセージを作成します。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true
      >123  {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}}                         
      >456  {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}

  2. (オプション) Kafka-connectorサービスを開始します。 詳細については、「Kafka-connectorサービスの設定と開始」をご参照ください。

    説明

    Kafka-connectorサービスが開始されている場合は、この手順をスキップします。

  3. odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。 この例では、odps-sink-connector.jsonファイルは $KAFKA_HOME/configディレクトリにアップロードされます。

    次のコードは、odps-sink-connector.jsonファイルの内容を示しています。 odps-sink-connector.jsonファイルの詳細については、「Kafka-connectorタスクの設定と開始」をご参照ください。

    {
        "name": "odps-test-flatten",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_flatten",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_flatten",
          "account_type": "ALIYUN",
          "access_id": "<yourAccessKeyId>",
          "access_key": "<yourAccessKeySecret>",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"FLATTEN",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. 次のコマンドを実行して、Kafka-connectorタスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果を確認します。

    MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールで次のコマンドを実行して、データ書き込み結果を照会します。

    set odps.sql.allow.fullscan=true;
    select * from table_flatten;

    次の応答が返されます。

    # JSON data is parsed and written to the MaxCompute table. The exteninfo field in the nested JSON format can be a JSON field.
    +-------+------------+--------+-----+------+------------+----+
    | topic | partition  | offset | id  | name | extendinfo | pt |
    +-------+------------+--------+-----+------+------------+----+
    | topic_flatten | 0   | 0      | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 |
    | topic_flatten | 0   | 1      | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 |
    +-------+------------+--------+-----+------+------------+----+

プロセス異常データ

  1. データを準備します。

    • MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールを使用して、ターゲットMaxComputeテーブルを作成します。

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • Kafkaデータを作成します。

      $KAFKA_HOME/bin/ ディレクトリで、次のKafkaトピックを作成するコマンドを実行します。

      • トピック_異常

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormal
      • runtime_error

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error
        説明

        データの書き込み時に不明なエラーが発生した場合、異常なデータはruntime_errorトピックに書き込まれます。 ほとんどの場合、Kafkaデータの形式がMaxComputeテーブルの形式と一致していないため、不明なエラーが発生します。

      次のコマンドを実行してKafkaメッセージを作成します。

      次のメッセージでは、1つのメッセージのデータ形式がMaxComputeテーブルの形式と同じではありません。

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true
      
      >100  {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}}                         
      >101  {"id":101,"name":"json-4","extendinfos":"null"}
      >102	{"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}} 
  2. (オプション) Kafka-connectorサービスを開始します。 詳細については、「Kafka-connectorサービスの設定と開始」をご参照ください。

    説明

    Kafka-connectorサービスが開始されている場合は、この手順をスキップします。

  3. odps-sink-connector.jsonファイルを作成して構成し、odps-sink-connector.jsonファイルを任意の場所にアップロードします。 この例では、odps-sink-connector.jsonファイルは $KAFKA_HOME/configディレクトリにアップロードされます。

    次のコードは、odps-sink-connector.jsonファイルの内容を示しています。 odps-sink-connector.jsonファイルの詳細については、「Kafka-connectorタスクの設定と開始」をご参照ください。

    {
      "name": "odps-test-runtime-error",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_abnormal",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema":"default",
        "table": "test_flatten",
        "account_type": "ALIYUN",
        "access_id": "<yourAccessKeyId>",
        "access_key": "<yourAccessKeySecret>",
        "partition_window_type": "MINUTE",
        "mode":"VALUE",
        "format":"FLATTEN",
        "sink_pool_size":"150",
        "record_batch_size":"9000",
        "buffer_size_kb":"600000",
        "runtime.error.topic.name":"runtime_error",
        "runtime.error.topic.bootstrap.servers":"http://XXXX",
        "skip_error":"false"
      }
    }
    
  4. 次のコマンドを実行して、Kafka-connectorタスクを開始します。

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. 結果を確認します。

    • MaxComputeテーブルのデータを照会します。

      MaxComputeクライアント (odpscmd) またはMaxCompute SQLを実行できる別のツールで次のコマンドを実行して、データ書き込み結果を照会します。

      set odps.sql.allow.fullscan=true;
      select * from table_flatten;

      次の応答が返されます。

      # The last two records are displayed. This is because the skip_error parameter is set to true. The data with the id of 101 is not written to the MaxCompute table, and subsequent records are not blocked from being written to the MaxCompute table. 
      +-------+------------+------------+------------+------+------------+----+
      | topic | partition  | offset     | id         | name | extendinfo | pt |
      +-------+------------+------------+------------+------+------------+----+
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 2          | 100        | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 4          | 102        | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      +-------+------------+------------+------------+------+------------+----+
    • runtime_errorトピックのメッセージを照会します。

      $KAFKA_HOME/bin/ ディレクトリで、次のコマンドを実行してメッセージの書き込み結果を表示します。

      sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning

      次の応答が返されます。

      # Abnormal data is written to the runtime_error topic.
      {"id":101,"name":"json-4","extendinfos":"null"}