All Products
Search
Document Center

MaxCompute:Import Kafka data to MaxCompute in offline or real-time mode

Last Updated:Jan 02, 2024

The integration of MaxCompute and Kafka provides efficient and reliable data processing and analytics capabilities. The integration of MaxCompute and Kafka is suitable for scenarios that require real-time processing, large-scale data streams, and complex data analytics. This topic describes how to import data of ApsaraMQ for Kafka and self-managed Kafka to MaxCompute. This topic also provides examples on how to import data of self-managed Kafka to MaxCompute.

Import data from ApsaraMQ for Kafka to MaxCompute

MaxCompute is closely integrated with ApsaraMQ for Kafka. You can directly use MaxCompute sink connectors provided by ApsaraMQ for Kafka to continuously import data of specific topics to MaxCompute tables. You do not need to use third-party tools or perform custom development. For more information about how to create MaxCompute sink connectors, see Create MaxCompute sink connectors.

Import data from self-managed Apache Kafka to MaxCompute

Prerequisites

  • A Kafka service of V2.2 or later is deployed, and a Kafka topic is created. We recommend that you deploy a Kafka service of V3.4.0.

  • A MaxCompute project and a MaxCompute table are created. For more information, see Create a MaxCompute project and Create tables.

Precautions

The Kafka-connector service allows you to write Kafka data of the TEXT, CSV, JSON, or FLATTEN type to MaxCompute. Take note of the following items when you write different types of Kafka data. For more information about data types, see the description of the format parameter.

  • The following table describes the requirements for a MaxCompute table to which Kafka data of the TEXT or JSON type is written.

    Field name

    Data type

    Required

    topic

    STRING

    Yes.

    partition

    BIGINT

    Yes.

    offset

    BIGINT

    Yes.

    key

    • If you write Kafka data of the TEXT type, the field must be of the STRING type.

    • If you write Kafka data of the JSON type, the field can be of the STRING or JSON type based on the data type settings of written data.

    This field is required if you need to synchronize the key in a Kafka message to the MaxCompute table. For more information about the mode in which Kafka messages are synchronized to MaxCompute, see the description of the mode parameter.

    value

    • If you write Kafka data of the TEXT type, the field must be of the STRING type.

    • If you write Kafka data of the JSON type, the field can be of the STRING or JSON type based on the data type settings of written data.

    This field is required if you need to synchronize the value in a Kafka message to the MaxCompute table. For more information about the mode in which Kafka messages are synchronized to MaxCompute, see the description of the mode parameter.

    pt

    STRING (partition field)

    Yes.

  • If you write Kafka data of the FLATTEN or CSV type to MaxCompute, the fields listed in the following table must be included and must be of the required data types. You can also configure custom fields based on the written data.

    Field name

    Data type

    topic

    STRING

    partition

    BIGINT

    offset

    BIGINT

    pt

    STRING (partition field)

    • If you write Kafka data of the CSV type to a MaxCompute table, the custom field sequence and field types in the MaxCompute table must be consistent with those of the Kafka data. This ensures that the Kafka data can be correctly written.

    • If you write Kafka data of the FLATTEN type to a MaxCompute table, the custom field names in the MaxCompute table must be consistent with the field names in the Kafka data. This ensures that the Kafka data can be correctly written.

      For example, if the Kafka data of the FLATTEN type that you want to write is {"A":a,"B":"b","C":{"D":"d","E":"e"}}, you can execute the following statement to create a MaxCompute table for storing the data.

      CREATE TABLE IF NOT EXISTS table_flatten(
       topic STRING,
       `partition` BIGINT,
       `offset` BIGINT,
       A BIGINT,
       B STRING,
       C JSON
      ) PARTITIONED BY (pt STRING);

Configure and start the Kafka-connector service

  1. In the Linux environment, run the following command in the CLI or click the download link to download the kafka-connector-2.0.jar package:

    wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar

    To prevent dependency conflicts, we recommend that you create a subfolder such as connector in $KAFKA_HOME/libs to store the kafka-connector-2.0.jar package.

    Note

    If the deployment environment of the kafka-connector-2.0.jar package is not the same as the deployment environment of Kafka data, you must configure and start the Kafka-connector service by following the instructions provided in aliware-kafka-demos.

  2. In the $KAFKA_HOME/config directory, configure the connect-distributed.properties file.

    Add the following configurations to the connect-distributed.properties file.

    ## Add the following content:
    plugin.path=<KAFKA_HOME>/libs/connector
    
    ## Update the values of the key.converter and value.converter parameters.
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter  
  3. Run the following command in the $KAFKA_HOME/ directory to start the Kafka-connector service:

    ## Run the following command:
    bin/connect-distributed.sh config/connect-distributed.properties &

Configure and start a Kafka-connector task

  1. Create and configure the odps-sink-connector.json file and upload the odps-sink-connector.json file to any location.

    The following code and tables describe the content and parameters of the odps-sink-connector.json file.

    {
      "name": "Kafka connector task name",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "your_topic",
        "endpoint": "endpoint",
        "tunnel_endpoint": "your_tunnel endpoint",
        "project": "project",
        "schema":"default",
        "table": "your_table",
        "account_type": "account type (STS or ALIYUN)",
        "access_id": "access id",
        "access_key": "access key",
        "account_id": "account id for sts",
        "sts.endpoint": "sts endpoint",
        "region_id": "region id for sts",
        "role_name": "role name for sts",
        "client_timeout_ms": "STS Token valid period (ms)",
        "format": "TEXT",
        "mode": "KEY",
        "partition_window_type": "MINUTE",
        "use_streaming": false,
        "buffer_size_kb": 65536,
        "sink_pool_size":"150",
        "record_batch_size":"8000",
        "runtime.error.topic.name":"kafka topic when runtime errors happens",
        "runtime.error.topic.bootstrap.servers":"kafka bootstrap servers of error topic queue",
        "skip_error":"false"
      }
    }
    • Common parameters

      Parameter

      Required

      Description

      name

      Yes

      The name of the task. The name must be unique.

      connector.class

      Yes

      The class name of the Kafka-connector service. Default value: com.aliyun.odps.kafka.connect.MaxComputeSinkConnector.

      tasks.max

      Yes

      The maximum number of consumer processes in the Kafka-connector service. The value must be an integer greater than 0.

      topics

      Yes

      The name of the Kafka topic.

      endpoint

      Yes

      The endpoint of MaxCompute.

      You must configure this parameter based on the region and network connection type that you selected when you create the MaxCompute project. For more information about the endpoints of different network types in each region, see Endpoints.

      tunnel_endpoint

      No

      The public endpoint of MaxCompute Tunnel.

      If you do not configure this parameter, traffic is automatically routed to the Tunnel endpoint that corresponds to the network in which MaxCompute resides. If you configure this parameter, traffic is routed to the specified endpoint and automatic routing is not performed.

      For more information about the Tunnel endpoints of different network types in each region, see Endpoints.

      project

      Yes

      The name of the MaxCompute project that you want to access.

      schema

      No

      • This parameter is required if the destination MaxCompute project has a three-layer schema model. Default value: default.

      • If the destination MaxCompute project does not have a three-layer schema model, you do not need to configure this parameter.

      For more information about schemas, see Schema-related operations.

      table

      Yes

      The name of the table in the destination MaxCompute project.

      format

      No

      The format of the written message. Valid values:

      • TEXT: a string. This is the default value.

      • BINARY: a byte array.

      • CSV: a list of strings separated by commas (,).

      • JSON: a JSON string. For more information about MaxCompute JSON data types, see Instructions for using the JSON type of MaxCompute (beta version).

      • FLATTEN: a JSON string. The keys and values in the JSON string are parsed and written to the specified MaxCompute table. The keys in the JSON string must correspond to the column names in the MaxCompute table.

      For more information about how to import messages in different formats, see Examples.

      mode

      No

      The mode in which messages are synchronized to MaxCompute. Valid values:

      • KEY: Only the key of the message is retained and written to the destination MaxCompute table.

      • VALUE: Only the value of the message is retained and written to the destination MaxCompute table.

      • DEFAULT: Both the key and value of the message are retained and written to the destination MaxCompute table. This is the default value.

        If you set this parameter to DEFAULT, only data of the TEXT or BINARY type can be written.

      partition_window_type

      No

      Data is partitioned based on the system time. Valid values: DAY, HOUR, and MINUTE. Default value: HOUR.

      use_streaming

      No

      Specifies whether to use Streaming Tunnel. Valid values:

      • false: Streaming Tunnel is not used. This is the default value.

      • true: Streaming Tunnel is used.

      buffer_size_kb

      No

      The internal buffer size of the odps partition writer. Unit: KB. The default size is 65,536 KB.

      sink_pool_size

      No

      The maximum number of threads for multi-thread writing. The default value is the number of CPU cores in the system.

      record_batch_size

      No

      The maximum number of messages that can be simultaneously sent by a thread in a Kafka-connector task.

      skip_error

      No

      Specifies whether to skip records generated when unknown errors occur. Valid values:

      • false: The records are not skipped. This is the default value.

      • true: The records are skipped.

        Note
        • If skip_error is set to false and the runtime.error.topic.name parameter is not configured, subsequent data write operations are stopped, processes are blocked, and an exception is logged when an unknown error occurs.

        • If the skip_error is set to true and the runtime.error.topic.name parameter is not configured, the process for data writing continues to write data, and abnormal data is discarded.

        • If the skip_error parameter is set to false and the runtime.error.topic.name parameter is configured, the process for data writing continues to write data, and abnormal data is recorded in the topic specified by runtime.error.topic.name topic.

        For more examples on processing abnormal data, see Process abnormal data.

      runtime.error.topic.name

      No

      The name of the Kafka topic to which data is written when an unknown error occurs.

      runtime.error.topic.bootstrap.servers

      No

      The addresses in the bootstrap-servers configuration. The addresses are the addresses of the Kafka brokers to which data is written when an unknown error occurs.

      account_type

      Yes

      The method that is used to access the destination MaxCompute service. Valid values: STS and ALIYUN. Default value: ALIYUN.

      You must configure different access credential parameters for different methods to access MaxCompute. For more information, see Access MaxCompute by using the ALIYUN method and Access MaxCompute by using the STS method in this topic.

    • Access MaxCompute by using the ALIYUN method: You must configure the following parameters in addition to the common parameters.

      Parameter

      Description

      access_id

      The AccessKey ID of your Alibaba Cloud account or a RAM user within the Alibaba Cloud account.

      You can obtain the AccessKey ID from the AccessKey Pair page.

      access_key

      The AccessKey secret that corresponds to the AccessKey ID.

      You can obtain the AccessKey secret from the AccessKey Pair page.

    • Access MaxCompute by using the STS method: You must configure the following parameters in addition to the common parameters.

      Parameter

      Description

      account_id

      The ID of the account that is used to access the destination MaxCompute project. You can view your account ID in the Account Center.

      region_id

      The ID of the region in which the destination MaxCompute project resides. For more information about the ID of each region, see Endpoints.

      role_name

      The name of the role that is used to access the destination MaxCompute project. You can view the role name on the Roles page.

      client_timeout_ms

      The interval at which an STS token is refreshed. Unit: milliseconds. Default value: 11.

      sts.endpoint

      The STS service endpoint that is required when you use an STS token for identity authentication.

      For more information about the endpoints of different network types in each region, see Endpoints.

  2. Run the following command to start the Kafka-connector task:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @odps-sink-connector.json

Examples

Write data of the TEXT type

  1. Prepare data.

    • Create a MaxCompute table by using the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL.

      CREATE TABLE IF NOT EXISTS table_text(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value STRING
      ) PARTITIONED BY (pt STRING);
    • Create Kafka data.

      In the $KAFKA_HOME/bin/ directory, run the following command to create a Kafka topic. In this example, a Kafka topic named topic_text is created.

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_text

      Run the following command to create Kafka messages:

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_text --property parse.key=true
      >123    abc
      >456    edf
  2. (Optional) Start the Kafka-connector service. For more information, see Configure and start the Kafka-connector service.

    Note

    If the Kafka-connector service is started, skip this step.

  3. Create and configure the odps-sink-connector.json file, and upload the odps-sink-connector.json file to any location. In this example, the odps-sink-connector.json file is uploaded to the $KAFKA_HOME/config directory.

    The following code shows the content of the odps-sink-connector.json file. For more information about the odps-sink-connector.json file, see Configure and start a Kafka-connector task.

    {
        "name": "odps-test-text",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_text",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",
          "schema":"default",
          "table": "table_text",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"TEXT",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
  4. Run the following command to start the Kafka-connector task:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verify the result.

    Run the following commands on the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL to query the data write result:

    set odps.sql.allow.fullscan=true;
    select * from table_text;

    The following result is returned:

    # The mode value in the odps-sink-connector.json configuration file is VALUE. Therefore, only the value is retained and the key field is NULL.
    
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | topic_text | 0      | 0          | NULL | abc   | 07-13-2023 21:13 |
    | topic_text | 0      | 1          | NULL | edf   | 07-13-2023 21:13 |
    +-------+------------+------------+-----+-------+----+

Write data of the CSV type

  1. Prepare data.

    • Create a destination MaxCompute table by using the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL.

      CREATE TABLE IF NOT EXISTS table_csv(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        region STRING
      ) PARTITIONED BY (pt STRING);
    • Create Kafka data.

      In the $KAFKA_HOME/bin/ directory, run the following command to create a Kafka topic. In this example, a Kafka topic named topic_csv is created.

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_csv

      Run the following command to create Kafka messages:

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_csv --property parse.key=true
      >123	1103,zhangsan,china
      >456	1104,lisi,usa
  2. (Optional) Start the Kafka-connector service. For more information, see Configure and start the Kafka-connector service.

    Note

    If the Kafka-connector service is started, skip this step.

  3. Create and configure the odps-sink-connector.json file, and upload the odps-sink-connector.json file to any location. In this example, the odps-sink-connector.json file is uploaded to the $KAFKA_HOME/config directory.

    The following code shows the content of the odps-sink-connector.json file. For more information about the odps-sink-connector.json file, see Configure and start a Kafka-connector task.

    {
        "name": "odps-test-csv",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_csv",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_csv",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "format":"CSV",
          "mode":"VALUE",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. Run the following command to start the Kafka-connector task:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verify the result.

    Run the following commands on the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL to query the data write result:

    set odps.sql.allow.fullscan=true;
    select * from table_csv;

    The following result is returned:

    +-------+------------+------------+------------+------+--------+----+
    | topic | partition  | offset     | id         | name | region | pt |
    +-------+------------+------------+------------+------+--------+----+
    | csv_test | 0       | 0          | 1103       | zhangsan | china  | 07-14-2023 00:10 |
    | csv_test | 0       | 1          | 1104       | lisi | usa    | 07-14-2023 00:10 |
    +-------+------------+------------+------------+------+--------+----+

Write data of the JSON type

  1. Prepare data.

    • Create a destination MaxCompute table by using the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL.

      CREATE TABLE IF NOT EXISTS table_json(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        key STRING,
        value JSON
      ) PARTITIONED BY (pt STRING);
    • Create Kafka data.

      In the $KAFKA_HOME/bin/ directory, run the following command to create a Kafka topic. In this example, a topic named topic_json is created.

      sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_json

      Run the following command to create Kafka messages:

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_json --property parse.key=true
      >123    {"id":123,"name":"json-1","region":"beijing"}                         
      >456    {"id":456,"name":"json-2","region":"hangzhou"}
  2. (Optional) Start the Kafka-connector service. For more information, see Configure and start the Kafka-connector service.

    Note

    If the Kafka-connector service is started, skip this step.

  3. Create and configure the odps-sink-connector.json file, and upload the odps-sink-connector.json file to any location. In this example, the odps-sink-connector.json file is uploaded to the $KAFKA_HOME/config directory.

    The following code shows the content of the odps-sink-connector.json file. For more information about the odps-sink-connector.json file, see Configure and start a Kafka-connector task.

    {
        "name": "odps-test-json",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_json",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_json",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"JSON",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. Run the following command to start the Kafka-connector task:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verify the result.

    Run the following commands on the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL to query the data write result:

    set odps.sql.allow.fullscan=true;
    select * from table_json;

    The following result is returned:

    # Write JSON data to the value field.
    +-------+------------+------------+-----+-------+----+
    | topic | partition  | offset     | key | value | pt |
    +-------+------------+------------+-----+-------+----+
    | Topic_json | 0      | 0          | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 |
    | Topic_json | 0      | 1          | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 |
    +-------+------------+------------+-----+-------+----+

Write data of the FLATTEN type

  1. Prepare data.

    • Create a destination MaxCompute table by using the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL.

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • Create Kafka data.

      In the $KAFKA_HOME/bin/ directory, run the following command to create a Kafka topic. In this example, a topic named topic_flatten is created.

      ./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_flatten

      Run the following command to create Kafka messages:

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_flatten --property parse.key=true
      >123  {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}}                         
      >456  {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}

  2. (Optional) Start the Kafka-connector service. For more information, see Configure and start the Kafka-connector service.

    Note

    If the Kafka-connector service is started, skip this step.

  3. Create and configure the odps-sink-connector.json file, and upload the odps-sink-connector.json file to any location. In this example, the odps-sink-connector.json file is uploaded to the $KAFKA_HOME/config directory.

    The following code shows the content of the odps-sink-connector.json file. For more information about the odps-sink-connector.json file, see Configure and start a Kafka-connector task.

    {
        "name": "odps-test-flatten",
        "config": {
          "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
          "tasks.max": "3",
          "topics": "topic_flatten",
          "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
          "project": "project_name",    
          "schema":"default",
          "table": "table_flatten",
          "account_type": "ALIYUN",
          "access_id": "LTAI5tM2iHkTd4W69nof****",
          "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
          "partition_window_type": "MINUTE",
          "mode":"VALUE",
          "format":"FLATTEN",
          "sink_pool_size":"150",
          "record_batch_size":"9000",
          "buffer_size_kb":"600000"
        }
      }
    
  4. Run the following command to start the Kafka-connector task:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verify the result.

    Run the following commands on the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL to query the data write result:

    set odps.sql.allow.fullscan=true;
    select * from table_flatten;

    The following result is returned:

    # JSON data is parsed and written to the MaxCompute table. The exteninfo field in the nested JSON format can be a JSON field.
    +-------+------------+--------+-----+------+------------+----+
    | topic | partition  | offset | id  | name | extendinfo | pt |
    +-------+------------+--------+-----+------+------------+----+
    | topic_flatten | 0   | 0      | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 |
    | topic_flatten | 0   | 1      | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 |
    +-------+------------+--------+-----+------+------------+----+

Process abnormal data

  1. Prepare data.

    • Create a destination MaxCompute table by using the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL.

      CREATE TABLE IF NOT EXISTS table_flatten(
        topic STRING,
        `partition` BIGINT,
        `offset` BIGINT,
        id BIGINT,
        name STRING,
        extendinfo JSON
      ) PARTITIONED BY (pt STRING);
    • Create Kafka data.

      In the $KAFKA_HOME/bin/ directory, run commands to create the following Kafka topics:

      • topic_abnormal

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_abnormal
      • runtime_error

        sh kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic runtime_error
        Note

        If an unknown error occurs when data is written, the abnormal data is written to the runtime_error topic. In most cases, an unknown error occurs because the format of Kafka data is not consistent with the format of the MaxCompute table.

      Run the following command to create Kafka messages:

      In the following messages, the data format of one message is not the same as the format of the MaxCompute table.

      sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic flatten_test --property parse.key=true
      
      >100  {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}}                         
      >101  {"id":101,"name":"json-4","extendinfos":"null"}
      >102	{"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}} 
  2. (Optional) Start the Kafka-connector service. For more information, see Configure and start the Kafka-connector service.

    Note

    If the Kafka-connector service is started, skip this step.

  3. Create and configure the odps-sink-connector.json file, and upload the odps-sink-connector.json file to any location. In this example, the odps-sink-connector.json file is uploaded to the $KAFKA_HOME/config directory.

    The following code shows the content of the odps-sink-connector.json file. For more information about the odps-sink-connector.json file, see Configure and start a Kafka-connector task.

    {
      "name": "odps-test-runtime-error",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_abnormal",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema":"default",
        "table": "test_flatten",
        "account_type": "ALIYUN",
        "access_id": "LTAI5tM2iHkTd4W69nof****",
        "access_key": "S0uZvwDYDa56WZ1tjVmA67z1YS****",
        "partition_window_type": "MINUTE",
        "mode":"VALUE",
        "format":"FLATTEN",
        "sink_pool_size":"150",
        "record_batch_size":"9000",
        "buffer_size_kb":"600000",
        "runtime.error.topic.name":"runtime_error",
        "runtime.error.topic.bootstrap.servers":"http://XXXX",
        "skip_error":"false"
      }
    }
    
  4. Run the following command to start the Kafka-connector task:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  5. Verify the result.

    • Query data in the MaxCompute table.

      Run the following commands on the MaxCompute client (odpscmd) or another tool that can run MaxCompute SQL to query the data write result:

      set odps.sql.allow.fullscan=true;
      select * from table_flatten;

      The following result is returned:

      # The last two records are displayed. This is because the skip_error parameter is set to true. The data with the id of 101 is not written to the MaxCompute table, and subsequent records are not blocked from being written to the MaxCompute table. 
      +-------+------------+------------+------------+------+------------+----+
      | topic | partition  | offset     | id         | name | extendinfo | pt |
      +-------+------------+------------+------------+------+------------+----+
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 |
      | flatten_test | 0          | 0          | 123        | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 1          | 456        | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 2          | 100        | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      | flatten_test | 0          | 4          | 102        | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
      +-------+------------+------------+------------+------+------------+----+
    • Query messages in the runtime_error topic.

      In the $KAFKA_HOME/bin/ directory, run the following command to view the message write result:

      sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic runtime_error --from-beginning

      The following result is returned:

      # Abnormal data is written to the runtime_error topic.
      {"id":101,"name":"json-4","extendinfos":"null"}