All Products
Search
Document Center

MaxCompute:,

Last Updated:Mar 26, 2026

Import data from Apache Kafka to MaxCompute

The MaxCompute Kafka sink connector continuously writes messages from Apache Kafka topics into MaxCompute tables. It runs as a Kafka Connect worker process and requires no custom code.

The connector supports:

  • Four message formats: TEXT, CSV, JSON, and FLATTEN

  • Two authentication methods: AccessKey (ALIYUN) and Security Token Service (STS)

  • Two write modes: batch Tunnel (default) and streaming Tunnel (use_streaming: true)

  • Error routing: redirect schema-mismatched records to a dedicated Kafka error topic instead of blocking the pipeline

Two deployment paths are available:

  • Alibaba Cloud Message Queue for Apache Kafka: Use the built-in MaxCompute Sink Connector — no JAR downloads or manual configuration required. See Create a MaxCompute Sink Connector.

  • Self-managed Apache Kafka: Download the connector JAR, configure a worker, and deploy a sink task. The rest of this document covers this path.

How it works

The connector runs inside a Kafka Connect distributed worker. When a sink task starts, it subscribes to the specified Kafka topics and buffers incoming messages in memory. When the buffer reaches the configured size (buffer_size_kb) or the partition window closes, the connector flushes the buffered records to the target MaxCompute table through the Tunnel service. The pt partition field is set automatically based on the flush time and the partition_window_type setting (DAY, HOUR, or MINUTE).

If a message cannot be written due to a schema mismatch or type error, behavior depends on the skip_error and runtime.error.topic.name settings — see Error handling.

Prerequisites

Before you begin, make sure you have:

  • Apache Kafka V2.2 or later deployed and a Kafka topic created (V3.4.0 is recommended)

  • A MaxCompute project and table created — see Create a MaxCompute project and Create a table

  • An Alibaba Cloud AccessKey ID and AccessKey Secret with write access to the target MaxCompute project — retrieve them from AccessKey Management

Supported formats and table requirements

The connector supports four message formats: TEXT, CSV, JSON, and FLATTEN.

TEXT and JSON formats

The MaxCompute target table must include the following fixed fields. For TEXT format, key and value must be STRING. For JSON format, key and value can be STRING or JSON, depending on your data.

Field name Field type Notes
topic STRING Fixed field
partition BIGINT Fixed field
offset BIGINT Fixed field
key STRING (TEXT) / STRING or JSON (JSON) Controlled by the mode parameter
value STRING (TEXT) / STRING or JSON (JSON) Controlled by the mode parameter
pt STRING Partition field, set automatically by the connector

CSV and FLATTEN formats

The table must include the four fixed fields below. Add additional columns to match your message data.

Field name Field type
topic STRING
partition BIGINT
offset BIGINT
pt STRING (partition field)

Additional field requirements by format:

  • CSV: The order and types of custom columns must match the comma-separated values in each Kafka message, left to right.

  • FLATTEN: The names of custom columns must match the JSON keys in each Kafka message. Nested JSON objects are written as a JSON column. For example, if your Kafka messages contain {"A":1,"B":"b","C":{"D":"d","E":"e"}}, create the table as:

    CREATE TABLE IF NOT EXISTS table_flatten(
      topic STRING,
      `partition` BIGINT,
      `offset` BIGINT,
      A BIGINT,
      B STRING,
      C JSON
    ) PARTITIONED BY (pt STRING);

Set up and start the connector service

This section uses a Linux environment.

  1. In a terminal, download the kafka-connector-2.0.jar package:

    If the JAR is not compatible with your Kafka version, see Configure Kafka-connector for alternative setup instructions.
    wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jar

    To prevent dependency conflicts, place the JAR in a dedicated subfolder inside $KAFKA_HOME/libs, for example $KAFKA_HOME/libs/connector.

  2. In $KAFKA_HOME/config/connect-distributed.properties, add and update the following properties:

    # Point Kafka Connect to the connector JAR
    plugin.path=<KAFKA_HOME>/libs/connector
    
    # Use string serialization for keys and values
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
  3. Start the Kafka Connect worker in distributed mode:

    bin/connect-distributed.sh config/connect-distributed.properties &

Create and start a sink task

  1. Create a file named odps-sink-connector.json and populate it with your configuration. Save the file to any accessible location, for example $KAFKA_HOME/config/. See Connector parameters for a full parameter reference.

    {
      "name": "<task-name>",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "<kafka-topic>",
        "endpoint": "<maxcompute-endpoint>",
        "tunnel_endpoint": "<tunnel-endpoint>",
        "project": "<maxcompute-project>",
        "schema": "default",
        "table": "<maxcompute-table>",
        "account_type": "ALIYUN",
        "access_id": "<your-access-key-id>",
        "access_key": "<your-access-key-secret>",
        "format": "TEXT",
        "mode": "VALUE",
        "partition_window_type": "MINUTE",
        "use_streaming": false,
        "buffer_size_kb": 65536,
        "sink_pool_size": "150",
        "record_batch_size": "8000",
        "skip_error": "false"
      }
    }
  2. Submit the task to the Kafka Connect REST API:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
      http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json

Connector parameters

Required parameters

Parameter Description
name A unique name for the sink task.
connector.class Fixed value: com.aliyun.odps.kafka.connect.MaxComputeSinkConnector.
tasks.max Maximum number of consumer threads. Must be a positive integer.
topics The Kafka topic name to read from.
endpoint The MaxCompute service endpoint for your region and network type. See Endpoints.
project The target MaxCompute project name.
table The target table name within the project.
account_type Authentication method. Valid values: ALIYUN (AccessKey) or STS (Security Token Service).

Authentication parameters

For `account_type: ALIYUN` — provide your long-term credentials:

Parameter Description
access_id Your AccessKey ID. Get it from AccessKey Management.
access_key The AccessKey secret for the above ID.

For `account_type: STS` — provide Security Token Service (STS) credentials for temporary access:

Parameter Description
account_id Your Alibaba Cloud account ID. Find it in Account Center.
region_id The region ID of the target MaxCompute project. See Endpoints.
role_name The RAM role name. Find it on the Roles page.
client_timeout_ms STS token refresh interval in milliseconds. Default: 11 ms.
sts.endpoint The STS service endpoint. See Endpoints.

Optional parameters

Parameter Default Description
schema default Required if the project uses the three-layer schema model (project.schema.table). See Schema operations.
tunnel_endpoint Auto-detected The Tunnel service endpoint. If unset, the connector routes automatically. If set, your value takes precedence.
format TEXT Message format. Valid values: TEXT, CSV, JSON, FLATTEN, BINARY.
mode DEFAULT Which part of the Kafka message to write. Valid values: KEY (key only), VALUE (value only), DEFAULT (both key and value; TEXT and BINARY formats only).
partition_window_type HOUR Time granularity for the pt partition field. Valid values: DAY, HOUR, MINUTE.
use_streaming false Write mode. See the comparison table below.
buffer_size_kb 65536 Internal write buffer size per partition, in KB.
sink_pool_size CPU core count Maximum number of writer threads.
record_batch_size 8000 Maximum number of records a single thread sends in one batch.
skip_error false Whether to skip records that cause unknown write errors. See Error handling.
runtime.error.topic.name Kafka topic to receive records that fail to write.
runtime.error.topic.bootstrap.servers Bootstrap server address for the error topic's Kafka instance.

Batch Tunnel vs. streaming Tunnel (use_streaming)

The use_streaming parameter controls which Tunnel write path the connector uses.

Batch Tunnel (false, default) Streaming Tunnel (true)
Latency Higher — data is buffered until buffer_size_kb is reached or the partition window closes Lower — records are written immediately after buffering
Best for High-throughput, non-latency-sensitive pipelines Near-real-time ingestion
Throughput Higher — optimized for large batch writes May vary depending on load

Choose use_streaming: true when end-to-end latency is a priority. Use the default batch mode for maximum throughput.

Error handling

When the connector encounters a record it cannot write (typically due to a schema mismatch between the Kafka message and the MaxCompute table), behavior is controlled by two parameters:

skip_error runtime.error.topic.name Behavior
false (default) Not set Stops writing. The task blocks and logs an exception.
true Not set Continues writing. The failed record is silently discarded.
false Set Continues writing. The failed record is written to the error topic.
true Set Continues writing. The failed record is written to the error topic.

Set runtime.error.topic.name to preserve failed records for inspection without blocking the pipeline.

Examples

Write TEXT data

  1. Create the target MaxCompute table. Run the following SQL in odpscmd or any MaxCompute SQL client:

    CREATE TABLE IF NOT EXISTS table_text(
      topic STRING,
      `partition` BIGINT,
      `offset` BIGINT,
      key STRING,
      value STRING
    ) PARTITIONED BY (pt STRING);
  2. Create a Kafka topic and send test messages. Run the following commands in the $KAFKA_HOME/bin/ directory:

    sh kafka-topics.sh --create --bootstrap-server localhost:9092 \
      --replication-factor 1 --partitions 1 --topic topic_text
    sh kafka-console-producer.sh --bootstrap-server localhost:9092 \
      --topic topic_text --property parse.key=true
    >123    abc
    >456    edf
  3. (Optional) Start the connector service if it is not already running. See Set up and start the connector service.

  4. Create odps-sink-connector.json and save it to $KAFKA_HOME/config/:

    {
      "name": "odps-test-text",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_text",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema": "default",
        "table": "table_text",
        "account_type": "ALIYUN",
        "access_id": "<yourAccessKeyId>",
        "access_key": "<yourAccessKeySecret>",
        "partition_window_type": "MINUTE",
        "mode": "VALUE",
        "format": "TEXT",
        "sink_pool_size": "150",
        "record_batch_size": "9000",
        "buffer_size_kb": "600000"
      }
    }
  5. Start the sink task:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
      http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  6. Verify the result. Run the following SQL in odpscmd or any MaxCompute SQL client:

    set odps.sql.allow.fullscan=true;
    select * from table_text;

    Expected output — because mode is VALUE, the key column is NULL:

    +-------+------------+------------+-----+-------+------------------+
    | topic | partition  | offset     | key | value | pt               |
    +-------+------------+------------+-----+-------+------------------+
    | topic_text | 0     | 0          | NULL | abc  | 07-13-2023 21:13 |
    | topic_text | 0     | 1          | NULL | edf  | 07-13-2023 21:13 |
    +-------+------------+------------+-----+-------+------------------+

Write CSV data

  1. Create the target table:

    CREATE TABLE IF NOT EXISTS table_csv(
      topic STRING,
      `partition` BIGINT,
      `offset` BIGINT,
      id BIGINT,
      name STRING,
      region STRING
    ) PARTITIONED BY (pt STRING);
  2. Create a Kafka topic and send test messages:

    sh kafka-topics.sh --create --bootstrap-server localhost:9092 \
      --replication-factor 1 --partitions 1 --topic topic_csv
    sh kafka-console-producer.sh --bootstrap-server localhost:9092 \
      --topic topic_csv --property parse.key=true
    >123	1103,zhangsan,china
    >456	1104,lisi,usa
  3. (Optional) Start the connector service if not already running.

  4. Create odps-sink-connector.json:

    {
      "name": "odps-test-csv",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_csv",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema": "default",
        "table": "table_csv",
        "account_type": "ALIYUN",
        "access_id": "<yourAccessKeyId>",
        "access_key": "<yourAccessKeySecret>",
        "partition_window_type": "MINUTE",
        "format": "CSV",
        "mode": "VALUE",
        "sink_pool_size": "150",
        "record_batch_size": "9000",
        "buffer_size_kb": "600000"
      }
    }
  5. Start the sink task:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
      http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  6. Verify the result:

    set odps.sql.allow.fullscan=true;
    select * from table_csv;

    Expected output:

    +----------+-----------+--------+------+----------+--------+------------------+
    | topic    | partition | offset | id   | name     | region | pt               |
    +----------+-----------+--------+------+----------+--------+------------------+
    | csv_test | 0         | 0      | 1103 | zhangsan | china  | 07-14-2023 00:10 |
    | csv_test | 0         | 1      | 1104 | lisi     | usa    | 07-14-2023 00:10 |
    +----------+-----------+--------+------+----------+--------+------------------+

Write JSON data

  1. Create the target table. The value column uses the MaxCompute JSON type to store structured data:

    CREATE TABLE IF NOT EXISTS table_json(
      topic STRING,
      `partition` BIGINT,
      `offset` BIGINT,
      key STRING,
      value JSON
    ) PARTITIONED BY (pt STRING);
  2. Create a Kafka topic and send test messages:

    sh kafka-topics.sh --create --bootstrap-server localhost:9092 \
      --replication-factor 1 --partitions 1 --topic topic_json
    sh kafka-console-producer.sh --bootstrap-server localhost:9092 \
      --topic topic_json --property parse.key=true
    >123    {"id":123,"name":"json-1","region":"beijing"}
    >456    {"id":456,"name":"json-2","region":"hangzhou"}
  3. (Optional) Start the connector service if not already running.

  4. Create odps-sink-connector.json:

    {
      "name": "odps-test-json",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_json",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema": "default",
        "table": "table_json",
        "account_type": "ALIYUN",
        "access_id": "<yourAccessKeyId>",
        "access_key": "<yourAccessKeySecret>",
        "partition_window_type": "MINUTE",
        "mode": "VALUE",
        "format": "JSON",
        "sink_pool_size": "150",
        "record_batch_size": "9000",
        "buffer_size_kb": "600000"
      }
    }
  5. Start the sink task:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
      http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  6. Verify the result:

    set odps.sql.allow.fullscan=true;
    select * from table_json;

    Expected output — the JSON payload is stored in the value column:

    +------------+-----------+--------+------+-----------------------------------------------+------------------+
    | topic      | partition | offset | key  | value                                         | pt               |
    +------------+-----------+--------+------+-----------------------------------------------+------------------+
    | Topic_json | 0         | 0      | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 |
    | Topic_json | 0         | 1      | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 |
    +------------+-----------+--------+------+-----------------------------------------------+------------------+

Write FLATTEN data

FLATTEN mode parses each JSON message and maps its top-level keys to matching columns in the MaxCompute table. Nested objects are stored as JSON columns.

  1. Create the target table. Custom columns (id, name, extendinfo) match the JSON keys in the Kafka messages:

    CREATE TABLE IF NOT EXISTS table_flatten(
      topic STRING,
      `partition` BIGINT,
      `offset` BIGINT,
      id BIGINT,
      name STRING,
      extendinfo JSON
    ) PARTITIONED BY (pt STRING);
  2. Create a Kafka topic and send test messages:

    ./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \
      --replication-factor 1 --partitions 1 --topic topic_flatten
    sh kafka-console-producer.sh --bootstrap-server localhost:9092 \
      --topic topic_flatten --property parse.key=true
    >123  {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}}
    >456  {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}}
  3. (Optional) Start the connector service if not already running.

  4. Create odps-sink-connector.json:

    {
      "name": "odps-test-flatten",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_flatten",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema": "default",
        "table": "table_flatten",
        "account_type": "ALIYUN",
        "access_id": "<yourAccessKeyId>",
        "access_key": "<yourAccessKeySecret>",
        "partition_window_type": "MINUTE",
        "mode": "VALUE",
        "format": "FLATTEN",
        "sink_pool_size": "150",
        "record_batch_size": "9000",
        "buffer_size_kb": "600000"
      }
    }
  5. Start the sink task:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
      http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  6. Verify the result:

    set odps.sql.allow.fullscan=true;
    select * from table_flatten;

    Expected output — the JSON payload is flattened into columns, with extendinfo stored as a nested JSON field:

    +---------------+-----------+--------+-----+--------+--------------------------------+------------------+
    | topic         | partition | offset | id  | name   | extendinfo                     | pt               |
    +---------------+-----------+--------+-----+--------+--------------------------------+------------------+
    | topic_flatten | 0         | 0      | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 |
    | topic_flatten | 0         | 1      | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 |
    +---------------+-----------+--------+-----+--------+--------------------------------+------------------+

Handle abnormal data

This example demonstrates how to route schema-mismatched records to a dedicated error topic instead of blocking the pipeline.

  1. Create the target table and two Kafka topics — one for normal data and one for errors:

    Records that fail to write are redirected to runtime_error. This usually happens when the Kafka message structure does not match the MaxCompute table schema.
    CREATE TABLE IF NOT EXISTS table_flatten(
      topic STRING,
      `partition` BIGINT,
      `offset` BIGINT,
      id BIGINT,
      name STRING,
      extendinfo JSON
    ) PARTITIONED BY (pt STRING);
    sh kafka-topics.sh --create --bootstrap-server localhost:9092 \
      --replication-factor 1 --partitions 1 --topic topic_abnormal
    sh kafka-topics.sh --create --bootstrap-server localhost:9092 \
      --replication-factor 1 --partitions 1 --topic runtime_error
  2. Send test messages. The second message uses extendinfos (note the typo) instead of extendinfo, which does not match the table schema:

    sh kafka-console-producer.sh --bootstrap-server localhost:9092 \
      --topic flatten_test --property parse.key=true
    >100  {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}}
    >101  {"id":101,"name":"json-4","extendinfos":"null"}
    >102  {"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}}
  3. (Optional) Start the connector service if not already running.

  4. Create odps-sink-connector.json with runtime.error.topic.name set to redirect failed records:

    {
      "name": "odps-test-runtime-error",
      "config": {
        "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector",
        "tasks.max": "3",
        "topics": "topic_abnormal",
        "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api",
        "project": "project_name",
        "schema": "default",
        "table": "test_flatten",
        "account_type": "ALIYUN",
        "access_id": "<yourAccessKeyId>",
        "access_key": "<yourAccessKeySecret>",
        "partition_window_type": "MINUTE",
        "mode": "VALUE",
        "format": "FLATTEN",
        "sink_pool_size": "150",
        "record_batch_size": "9000",
        "buffer_size_kb": "600000",
        "runtime.error.topic.name": "runtime_error",
        "runtime.error.topic.bootstrap.servers": "http://XXXX",
        "skip_error": "false"
      }
    }
  5. Start the sink task:

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
      http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
  6. Verify the result. Query the MaxCompute table. The record with ID 101 is absent because it failed schema validation, but processing continued for records 100 and 102:

    set odps.sql.allow.fullscan=true;
    select * from table_flatten;
    +--------------+-----------+--------+-----+--------+-----------------------------------+------------------+
    | topic        | partition | offset | id  | name   | extendinfo                        | pt               |
    +--------------+-----------+--------+-----+--------+-----------------------------------+------------------+
    | flatten_test | 0         | 0      | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 |
    | flatten_test | 0         | 1      | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 |
    | flatten_test | 0         | 0      | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
    | flatten_test | 0         | 1      | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 |
    | flatten_test | 0         | 2      | 100 | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
    | flatten_test | 0         | 4      | 102 | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 |
    +--------------+-----------+--------+-----+--------+-----------------------------------+------------------+

    Inspect the failed record in the runtime_error topic:

    sh kafka-console-consumer.sh --bootstrap-server localhost:9092 \
      --topic runtime_error --from-beginning
    {"id":101,"name":"json-4","extendinfos":"null"}

    The mismatched record is written to the error topic, allowing you to investigate and reprocess it.

What's next