Import data from Apache Kafka to MaxCompute
The MaxCompute Kafka sink connector continuously writes messages from Apache Kafka topics into MaxCompute tables. It runs as a Kafka Connect worker process and requires no custom code.
The connector supports:
-
Four message formats: TEXT, CSV, JSON, and FLATTEN
-
Two authentication methods: AccessKey (ALIYUN) and Security Token Service (STS)
-
Two write modes: batch Tunnel (default) and streaming Tunnel (
use_streaming: true) -
Error routing: redirect schema-mismatched records to a dedicated Kafka error topic instead of blocking the pipeline
Two deployment paths are available:
-
Alibaba Cloud Message Queue for Apache Kafka: Use the built-in MaxCompute Sink Connector — no JAR downloads or manual configuration required. See Create a MaxCompute Sink Connector.
-
Self-managed Apache Kafka: Download the connector JAR, configure a worker, and deploy a sink task. The rest of this document covers this path.
How it works
The connector runs inside a Kafka Connect distributed worker. When a sink task starts, it subscribes to the specified Kafka topics and buffers incoming messages in memory. When the buffer reaches the configured size (buffer_size_kb) or the partition window closes, the connector flushes the buffered records to the target MaxCompute table through the Tunnel service. The pt partition field is set automatically based on the flush time and the partition_window_type setting (DAY, HOUR, or MINUTE).
If a message cannot be written due to a schema mismatch or type error, behavior depends on the skip_error and runtime.error.topic.name settings — see Error handling.
Prerequisites
Before you begin, make sure you have:
-
Apache Kafka V2.2 or later deployed and a Kafka topic created (V3.4.0 is recommended)
-
A MaxCompute project and table created — see Create a MaxCompute project and Create a table
-
An Alibaba Cloud AccessKey ID and AccessKey Secret with write access to the target MaxCompute project — retrieve them from AccessKey Management
Supported formats and table requirements
The connector supports four message formats: TEXT, CSV, JSON, and FLATTEN.
TEXT and JSON formats
The MaxCompute target table must include the following fixed fields. For TEXT format, key and value must be STRING. For JSON format, key and value can be STRING or JSON, depending on your data.
| Field name | Field type | Notes |
|---|---|---|
topic |
STRING | Fixed field |
partition |
BIGINT | Fixed field |
offset |
BIGINT | Fixed field |
key |
STRING (TEXT) / STRING or JSON (JSON) | Controlled by the mode parameter |
value |
STRING (TEXT) / STRING or JSON (JSON) | Controlled by the mode parameter |
pt |
STRING | Partition field, set automatically by the connector |
CSV and FLATTEN formats
The table must include the four fixed fields below. Add additional columns to match your message data.
| Field name | Field type |
|---|---|
topic |
STRING |
partition |
BIGINT |
offset |
BIGINT |
pt |
STRING (partition field) |
Additional field requirements by format:
-
CSV: The order and types of custom columns must match the comma-separated values in each Kafka message, left to right.
-
FLATTEN: The names of custom columns must match the JSON keys in each Kafka message. Nested JSON objects are written as a JSON column. For example, if your Kafka messages contain
{"A":1,"B":"b","C":{"D":"d","E":"e"}}, create the table as:CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, A BIGINT, B STRING, C JSON ) PARTITIONED BY (pt STRING);
Set up and start the connector service
This section uses a Linux environment.
-
In a terminal, download the
kafka-connector-2.0.jarpackage:If the JAR is not compatible with your Kafka version, see Configure Kafka-connector for alternative setup instructions.
wget http://maxcompute-repo.oss-cn-hangzhou.aliyuncs.com/kafka/kafka-connector-2.0.jarTo prevent dependency conflicts, place the JAR in a dedicated subfolder inside
$KAFKA_HOME/libs, for example$KAFKA_HOME/libs/connector. -
In
$KAFKA_HOME/config/connect-distributed.properties, add and update the following properties:# Point Kafka Connect to the connector JAR plugin.path=<KAFKA_HOME>/libs/connector # Use string serialization for keys and values key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter -
Start the Kafka Connect worker in distributed mode:
bin/connect-distributed.sh config/connect-distributed.properties &
Create and start a sink task
-
Create a file named
odps-sink-connector.jsonand populate it with your configuration. Save the file to any accessible location, for example$KAFKA_HOME/config/. See Connector parameters for a full parameter reference.{ "name": "<task-name>", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "<kafka-topic>", "endpoint": "<maxcompute-endpoint>", "tunnel_endpoint": "<tunnel-endpoint>", "project": "<maxcompute-project>", "schema": "default", "table": "<maxcompute-table>", "account_type": "ALIYUN", "access_id": "<your-access-key-id>", "access_key": "<your-access-key-secret>", "format": "TEXT", "mode": "VALUE", "partition_window_type": "MINUTE", "use_streaming": false, "buffer_size_kb": 65536, "sink_pool_size": "150", "record_batch_size": "8000", "skip_error": "false" } } -
Submit the task to the Kafka Connect REST API:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json
Connector parameters
Required parameters
| Parameter | Description |
|---|---|
name |
A unique name for the sink task. |
connector.class |
Fixed value: com.aliyun.odps.kafka.connect.MaxComputeSinkConnector. |
tasks.max |
Maximum number of consumer threads. Must be a positive integer. |
topics |
The Kafka topic name to read from. |
endpoint |
The MaxCompute service endpoint for your region and network type. See Endpoints. |
project |
The target MaxCompute project name. |
table |
The target table name within the project. |
account_type |
Authentication method. Valid values: ALIYUN (AccessKey) or STS (Security Token Service). |
Authentication parameters
For `account_type: ALIYUN` — provide your long-term credentials:
| Parameter | Description |
|---|---|
access_id |
Your AccessKey ID. Get it from AccessKey Management. |
access_key |
The AccessKey secret for the above ID. |
For `account_type: STS` — provide Security Token Service (STS) credentials for temporary access:
| Parameter | Description |
|---|---|
account_id |
Your Alibaba Cloud account ID. Find it in Account Center. |
region_id |
The region ID of the target MaxCompute project. See Endpoints. |
role_name |
The RAM role name. Find it on the Roles page. |
client_timeout_ms |
STS token refresh interval in milliseconds. Default: 11 ms. |
sts.endpoint |
The STS service endpoint. See Endpoints. |
Optional parameters
| Parameter | Default | Description |
|---|---|---|
schema |
default |
Required if the project uses the three-layer schema model (project.schema.table). See Schema operations. |
tunnel_endpoint |
Auto-detected | The Tunnel service endpoint. If unset, the connector routes automatically. If set, your value takes precedence. |
format |
TEXT |
Message format. Valid values: TEXT, CSV, JSON, FLATTEN, BINARY. |
mode |
DEFAULT |
Which part of the Kafka message to write. Valid values: KEY (key only), VALUE (value only), DEFAULT (both key and value; TEXT and BINARY formats only). |
partition_window_type |
HOUR |
Time granularity for the pt partition field. Valid values: DAY, HOUR, MINUTE. |
use_streaming |
false |
Write mode. See the comparison table below. |
buffer_size_kb |
65536 |
Internal write buffer size per partition, in KB. |
sink_pool_size |
CPU core count | Maximum number of writer threads. |
record_batch_size |
8000 |
Maximum number of records a single thread sends in one batch. |
skip_error |
false |
Whether to skip records that cause unknown write errors. See Error handling. |
runtime.error.topic.name |
— | Kafka topic to receive records that fail to write. |
runtime.error.topic.bootstrap.servers |
— | Bootstrap server address for the error topic's Kafka instance. |
Batch Tunnel vs. streaming Tunnel (use_streaming)
The use_streaming parameter controls which Tunnel write path the connector uses.
Batch Tunnel (false, default) |
Streaming Tunnel (true) |
|
|---|---|---|
| Latency | Higher — data is buffered until buffer_size_kb is reached or the partition window closes |
Lower — records are written immediately after buffering |
| Best for | High-throughput, non-latency-sensitive pipelines | Near-real-time ingestion |
| Throughput | Higher — optimized for large batch writes | May vary depending on load |
Choose use_streaming: true when end-to-end latency is a priority. Use the default batch mode for maximum throughput.
Error handling
When the connector encounters a record it cannot write (typically due to a schema mismatch between the Kafka message and the MaxCompute table), behavior is controlled by two parameters:
skip_error |
runtime.error.topic.name |
Behavior |
|---|---|---|
false (default) |
Not set | Stops writing. The task blocks and logs an exception. |
true |
Not set | Continues writing. The failed record is silently discarded. |
false |
Set | Continues writing. The failed record is written to the error topic. |
true |
Set | Continues writing. The failed record is written to the error topic. |
Set runtime.error.topic.name to preserve failed records for inspection without blocking the pipeline.
Examples
Write TEXT data
-
Create the target MaxCompute table. Run the following SQL in odpscmd or any MaxCompute SQL client:
CREATE TABLE IF NOT EXISTS table_text( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value STRING ) PARTITIONED BY (pt STRING); -
Create a Kafka topic and send test messages. Run the following commands in the
$KAFKA_HOME/bin/directory:sh kafka-topics.sh --create --bootstrap-server localhost:9092 \ --replication-factor 1 --partitions 1 --topic topic_textsh kafka-console-producer.sh --bootstrap-server localhost:9092 \ --topic topic_text --property parse.key=true >123 abc >456 edf -
(Optional) Start the connector service if it is not already running. See Set up and start the connector service.
-
Create
odps-sink-connector.jsonand save it to$KAFKA_HOME/config/:{ "name": "odps-test-text", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_text", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema": "default", "table": "table_text", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode": "VALUE", "format": "TEXT", "sink_pool_size": "150", "record_batch_size": "9000", "buffer_size_kb": "600000" } } -
Start the sink task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json -
Verify the result. Run the following SQL in odpscmd or any MaxCompute SQL client:
set odps.sql.allow.fullscan=true; select * from table_text;Expected output — because
modeisVALUE, thekeycolumn is NULL:+-------+------------+------------+-----+-------+------------------+ | topic | partition | offset | key | value | pt | +-------+------------+------------+-----+-------+------------------+ | topic_text | 0 | 0 | NULL | abc | 07-13-2023 21:13 | | topic_text | 0 | 1 | NULL | edf | 07-13-2023 21:13 | +-------+------------+------------+-----+-------+------------------+
Write CSV data
-
Create the target table:
CREATE TABLE IF NOT EXISTS table_csv( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, region STRING ) PARTITIONED BY (pt STRING); -
Create a Kafka topic and send test messages:
sh kafka-topics.sh --create --bootstrap-server localhost:9092 \ --replication-factor 1 --partitions 1 --topic topic_csvsh kafka-console-producer.sh --bootstrap-server localhost:9092 \ --topic topic_csv --property parse.key=true >123 1103,zhangsan,china >456 1104,lisi,usa -
(Optional) Start the connector service if not already running.
-
Create
odps-sink-connector.json:{ "name": "odps-test-csv", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_csv", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema": "default", "table": "table_csv", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "format": "CSV", "mode": "VALUE", "sink_pool_size": "150", "record_batch_size": "9000", "buffer_size_kb": "600000" } } -
Start the sink task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json -
Verify the result:
set odps.sql.allow.fullscan=true; select * from table_csv;Expected output:
+----------+-----------+--------+------+----------+--------+------------------+ | topic | partition | offset | id | name | region | pt | +----------+-----------+--------+------+----------+--------+------------------+ | csv_test | 0 | 0 | 1103 | zhangsan | china | 07-14-2023 00:10 | | csv_test | 0 | 1 | 1104 | lisi | usa | 07-14-2023 00:10 | +----------+-----------+--------+------+----------+--------+------------------+
Write JSON data
-
Create the target table. The
valuecolumn uses the MaxCompute JSON type to store structured data:CREATE TABLE IF NOT EXISTS table_json( topic STRING, `partition` BIGINT, `offset` BIGINT, key STRING, value JSON ) PARTITIONED BY (pt STRING); -
Create a Kafka topic and send test messages:
sh kafka-topics.sh --create --bootstrap-server localhost:9092 \ --replication-factor 1 --partitions 1 --topic topic_jsonsh kafka-console-producer.sh --bootstrap-server localhost:9092 \ --topic topic_json --property parse.key=true >123 {"id":123,"name":"json-1","region":"beijing"} >456 {"id":456,"name":"json-2","region":"hangzhou"} -
(Optional) Start the connector service if not already running.
-
Create
odps-sink-connector.json:{ "name": "odps-test-json", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_json", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema": "default", "table": "table_json", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode": "VALUE", "format": "JSON", "sink_pool_size": "150", "record_batch_size": "9000", "buffer_size_kb": "600000" } } -
Start the sink task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json -
Verify the result:
set odps.sql.allow.fullscan=true; select * from table_json;Expected output — the JSON payload is stored in the
valuecolumn:+------------+-----------+--------+------+-----------------------------------------------+------------------+ | topic | partition | offset | key | value | pt | +------------+-----------+--------+------+-----------------------------------------------+------------------+ | Topic_json | 0 | 0 | NULL | {"id":123,"name":"json-1","region":"beijing"} | 07-14-2023 00:28 | | Topic_json | 0 | 1 | NULL | {"id":456,"name":"json-2","region":"hangzhou"} | 07-14-2023 00:28 | +------------+-----------+--------+------+-----------------------------------------------+------------------+
Write FLATTEN data
FLATTEN mode parses each JSON message and maps its top-level keys to matching columns in the MaxCompute table. Nested objects are stored as JSON columns.
-
Create the target table. Custom columns (
id,name,extendinfo) match the JSON keys in the Kafka messages:CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING); -
Create a Kafka topic and send test messages:
./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 \ --replication-factor 1 --partitions 1 --topic topic_flattensh kafka-console-producer.sh --bootstrap-server localhost:9092 \ --topic topic_flatten --property parse.key=true >123 {"id":123,"name":"json-1","extendinfo":{"region":"beijing","sex":"M"}} >456 {"id":456,"name":"json-2","extendinfo":{"region":"hangzhou","sex":"W"}} -
(Optional) Start the connector service if not already running.
-
Create
odps-sink-connector.json:{ "name": "odps-test-flatten", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_flatten", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema": "default", "table": "table_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode": "VALUE", "format": "FLATTEN", "sink_pool_size": "150", "record_batch_size": "9000", "buffer_size_kb": "600000" } } -
Start the sink task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json -
Verify the result:
set odps.sql.allow.fullscan=true; select * from table_flatten;Expected output — the JSON payload is flattened into columns, with
extendinfostored as a nested JSON field:+---------------+-----------+--------+-----+--------+--------------------------------+------------------+ | topic | partition | offset | id | name | extendinfo | pt | +---------------+-----------+--------+-----+--------+--------------------------------+------------------+ | topic_flatten | 0 | 0 | 123 | json-1 | {"sex":"M","region":"beijing"} | 07-14-2023 01:33 | | topic_flatten | 0 | 1 | 456 | json-2 | {"sex":"W","region":"hangzhou"} | 07-14-2023 01:33 | +---------------+-----------+--------+-----+--------+--------------------------------+------------------+
Handle abnormal data
This example demonstrates how to route schema-mismatched records to a dedicated error topic instead of blocking the pipeline.
-
Create the target table and two Kafka topics — one for normal data and one for errors:
Records that fail to write are redirected to
runtime_error. This usually happens when the Kafka message structure does not match the MaxCompute table schema.CREATE TABLE IF NOT EXISTS table_flatten( topic STRING, `partition` BIGINT, `offset` BIGINT, id BIGINT, name STRING, extendinfo JSON ) PARTITIONED BY (pt STRING);sh kafka-topics.sh --create --bootstrap-server localhost:9092 \ --replication-factor 1 --partitions 1 --topic topic_abnormalsh kafka-topics.sh --create --bootstrap-server localhost:9092 \ --replication-factor 1 --partitions 1 --topic runtime_error -
Send test messages. The second message uses
extendinfos(note the typo) instead ofextendinfo, which does not match the table schema:sh kafka-console-producer.sh --bootstrap-server localhost:9092 \ --topic flatten_test --property parse.key=true >100 {"id":100,"name":"json-3","extendinfo":{"region":"beijing","gender":"M"}} >101 {"id":101,"name":"json-4","extendinfos":"null"} >102 {"id":102,"name":"json-5","extendinfo":{"region":"beijing","gender":"M"}} -
(Optional) Start the connector service if not already running.
-
Create
odps-sink-connector.jsonwithruntime.error.topic.nameset to redirect failed records:{ "name": "odps-test-runtime-error", "config": { "connector.class": "com.aliyun.odps.kafka.connect.MaxComputeSinkConnector", "tasks.max": "3", "topics": "topic_abnormal", "endpoint": "http://service.cn-shanghai.maxcompute.aliyun.com/api", "project": "project_name", "schema": "default", "table": "test_flatten", "account_type": "ALIYUN", "access_id": "<yourAccessKeyId>", "access_key": "<yourAccessKeySecret>", "partition_window_type": "MINUTE", "mode": "VALUE", "format": "FLATTEN", "sink_pool_size": "150", "record_batch_size": "9000", "buffer_size_kb": "600000", "runtime.error.topic.name": "runtime_error", "runtime.error.topic.bootstrap.servers": "http://XXXX", "skip_error": "false" } } -
Start the sink task:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ http://localhost:8083/connectors -d @$KAFKA_HOME/config/odps-sink-connector.json -
Verify the result. Query the MaxCompute table. The record with ID 101 is absent because it failed schema validation, but processing continued for records 100 and 102:
set odps.sql.allow.fullscan=true; select * from table_flatten;+--------------+-----------+--------+-----+--------+-----------------------------------+------------------+ | topic | partition | offset | id | name | extendinfo | pt | +--------------+-----------+--------+-----+--------+-----------------------------------+------------------+ | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 01:33 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 01:33 | | flatten_test | 0 | 0 | 123 | json-1 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 1 | 456 | json-2 | {"gender":"W","region":"hangzhou"} | 07-14-2023 13:16 | | flatten_test | 0 | 2 | 100 | json-3 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | | flatten_test | 0 | 4 | 102 | json-5 | {"gender":"M","region":"beijing"} | 07-14-2023 13:16 | +--------------+-----------+--------+-----+--------+-----------------------------------+------------------+Inspect the failed record in the
runtime_errortopic:sh kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic runtime_error --from-beginning{"id":101,"name":"json-4","extendinfos":"null"}The mismatched record is written to the error topic, allowing you to investigate and reprocess it.
What's next
-
MaxCompute data types — JSON type details relevant to JSON and FLATTEN formats
-
Endpoints — Find the right endpoint for your region and network
-
Schema operations — Configure the three-layer schema model if needed