The Kafka data source enables bidirectional data synchronization between DataWorks and Kafka—read from Kafka topics in offline or real-time tasks, and write to Kafka topics from any supported source.
Supported versions
DataWorks supports Alibaba Cloud Kafka and self-managed Kafka clusters running versions 0.10.2 through 3.6.x.
Versions earlier than 0.10.2 are not supported. Those versions lack partition offset retrieval support and may have data structures incompatible with timestamps.
Supported sync modes
| Sync mode | Supported |
|---|---|
| Offline read (single table) | Yes |
| Real-time read (single table) | Yes |
| Offline write (single table) | Yes |
| Real-time write (single table) | Yes |
| Real-time write (entire database) | Yes |
Resource groups
The Kafka data source supports Serverless resource groups (recommended) and old versions of exclusive resource groups for Data Integration.
Resource estimation for real-time read
When using a subscription Serverless resource group, estimate resources before starting a task to avoid failures from insufficient capacity.
Per-topic base allocation: 1 CU per topic
Traffic-based allocation:
| Kafka data type | CU per 10 MB/s of traffic |
|---|---|
| Uncompressed | 1 CU |
| Compressed | 2 CU |
| Compressed + JSON parsing | 3 CU |
Cluster slot usage limits:
| Failover tolerance | Maximum slot usage |
|---|---|
| High | 80% |
| Low | 70% |
Actual usage depends on data content and format. Adjust based on observed runtime conditions after your initial estimate.
Limitations
Offline read from a single table
If both parameter.groupId and parameter.kafkaConfig.group.id are configured, parameter.groupId takes precedence.
Real-time write to a single table
Data deduplication is not supported. If a task restarts after an offset reset or a failover, duplicate records may be written.
Real-time write for an entire database
Key assignment by primary key configuration:
| Source table primary key | Key assignment behavior |
|---|---|
| Has primary key | Primary key value is used as the Kafka record key—records for the same primary key are written to the same partition in order |
| No primary key, sync without primary key | Key is empty; the destination Kafka topic must have exactly one partition to preserve write order |
| No primary key, custom primary key | A combination of one or more non-primary key fields is used as the key |
Strict ordering under cluster exceptions:
If the Kafka cluster returns exceptions and you need strict ordering for records with the same primary key, add the following to the extended parameters of the Kafka data source:
{"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}
This configuration significantly degrades replication performance. Evaluate the tradeoff between ordering guarantees and throughput before enabling it.
For the format of messages written to Kafka—including heartbeat messages and DDL change records—see Appendix: Message format.
Supported field types
Kafka records contain six data fields: key, value, offset, timestamp, headers, and partition. DataWorks processes these fields as follows when reading from or writing to Kafka.
Read data
DataWorks can parse Kafka data in JSON format. The table below shows how each field is handled.
| Kafka record field | Processed type |
|---|---|
| key | Determined by the keyType parameter in the sync task configuration |
| value | Determined by the valueType parameter in the sync task configuration |
| offset | Long |
| timestamp | Long |
| headers | String |
| partition | Long |
Write data
DataWorks supports writing to Kafka in JSON or text format. The format behavior varies by sync task type.
-
In text format, field names are not included—field values are separated by a delimiter.
-
In real-time sync tasks, DataWorks uses a built-in JSON format that includes database change messages, business timestamps, and DDL information. See Appendix: Message format for details.
| Sync task type | Value format | Source field type | Write behavior |
|---|---|---|---|
| Offline sync (offline synchronization node in DataStudio) | JSON | String | UTF-8 encoded string |
| Boolean | UTF-8 string "true" or "false" |
||
| Time/Date | UTF-8 string in yyyy-MM-dd HH:mm:ss format |
||
| Numeric | UTF-8 encoded numeric string | ||
| Byte stream | Treated as UTF-8 encoded string | ||
| Text | String | UTF-8 encoded string | |
| Boolean | UTF-8 string "true" or "false" |
||
| Time/Date | UTF-8 string in yyyy-MM-dd HH:mm:ss format |
||
| Numeric | UTF-8 encoded numeric string | ||
| Byte stream | Treated as UTF-8 encoded string | ||
| Real-time ETL to Kafka (real-time sync node in DataStudio) | JSON | String | UTF-8 encoded string |
| Boolean | JSON Boolean type | ||
| Time/Date (< millisecond precision) | 13-digit JSON integer (millisecond timestamp) | ||
| Time/Date (microsecond or nanosecond precision) | JSON float with 13-digit integer (ms) + 6-digit decimal (ns) | ||
| Numeric | JSON numeric type | ||
| Byte stream | Base64-encoded, then converted to UTF-8 string | ||
| Text | String | UTF-8 encoded string | |
| Boolean | UTF-8 string "true" or "false" |
||
| Time/Date | UTF-8 string in yyyy-MM-dd HH:mm:ss format |
||
| Numeric | UTF-8 encoded numeric string | ||
| Byte stream | Base64-encoded, then converted to UTF-8 string | ||
| Real-time entire database sync to Kafka (incremental data only) | Built-in JSON | String | UTF-8 encoded string |
| Boolean | JSON Boolean type | ||
| Time/Date | 13-digit millisecond timestamp | ||
| Numeric | JSON numeric value | ||
| Byte stream | Base64-encoded, then converted to UTF-8 string | ||
| One-click real-time sync to Kafka (full offline + incremental real-time) | Built-in JSON | String | UTF-8 encoded string |
| Boolean | JSON Boolean type | ||
| Time/Date | 13-digit millisecond timestamp | ||
| Numeric | JSON numeric value | ||
| Byte stream | Base64-encoded, then converted to UTF-8 string |
Add a data source
Before developing a sync task, add the Kafka data source in DataWorks. See Data source management for the procedure. Parameter descriptions are available directly in the DataWorks console when you add a data source.
Develop a data sync task
Configure an offline sync task for a single table
-
For the setup procedure, see Configure an offline synchronization task in the codeless UI and Configure an offline synchronization task in the code editor.
-
For the full parameter list and a script demo, see Appendix: Script demos and parameter descriptions.
Configure a real-time sync task for a single table or entire database
See Configure a real-time synchronization task in DataStudio, Configure a real-time synchronization task in Data Integration, and Configure a synchronization task for an entire database.
Authentication configuration
DataWorks supports three authentication mechanisms for Kafka: SSL, GSSAPI (Kerberos), and PLAIN. Choose based on your Kafka cluster configuration.
| When to use | Mechanism |
|---|---|
| Kafka cluster uses SSL or SASL_SSL | SSL |
| Kafka cluster uses Kerberos authentication | GSSAPI |
| Kafka cluster uses username/password authentication | PLAIN |
SSL
Set Special Authentication Method to SSL or SASL_SSL to enable SSL authentication. You must upload a client truststore certificate file and enter the truststore passphrase.
Truststore certificate by cluster type:
-
Alibaba Cloud Kafka: See SSL certificate algorithm upgrade instructions to download the correct certificate. The truststore passphrase is
KafkaOnsClient. -
EMR cluster: See Use SSL encryption for Kafka connections to download the certificate and retrieve the passphrase.
-
Self-managed cluster: Upload the correct truststore certificate and enter the corresponding passphrase.
The keystore certificate file, keystore passphrase, and SSL passphrase are only required when bidirectional SSL authentication is enabled (ssl.client.auth=required in the server's server.properties file). See Use SSL encryption for Kafka connections for details.
GSSAPI
Set Sasl Mechanism to GSSAPI when the Kafka cluster uses Kerberos authentication. Upload three files—a JAAS configuration file, a Kerberos configuration file, and a Keytab file—and configure DNS/HOST settings for the exclusive resource group.
For a Serverless resource group, configure host addresses using internal DNS resolution. See Internal DNS resolution (PrivateZone).
JAAS configuration file
The file must start with KafkaClient, with all configuration items enclosed in braces {}.
Format rules:
-
The first line inside the braces specifies the login module class.
-
Each configuration item uses
key=valueformat. -
All items except the last must not end with a semicolon.
-
The last item must end with a semicolon, and a semicolon must follow the closing brace
}.
If the format is incorrect, the file cannot be parsed.
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="<path-to-keytab>"
storeKey=true
serviceName="kafka-server"
principal="<client-principal>@<REALM>";
};
| Configuration item | Description |
|---|---|
| Logon module | Must be com.sun.security.auth.module.Krb5LoginModule. |
useKeyTab |
Must be set to true. |
keyTab |
Specify any path. When the task runs, the uploaded keytab file is automatically downloaded and this path is populated. |
storeKey |
Specifies whether the client saves the key. Set to true or false—this does not affect synchronization. |
serviceName |
Corresponds to sasl.kerberos.service.name in the Kafka server's server.properties. Configure as needed. |
principal |
The Kerberos principal for the Kafka client. Make sure the uploaded keytab file contains the key for this principal. |
Kerberos configuration file
The file must contain two modules: [libdefaults] and [realms].
-
[libdefaults]: Specifies Kerberos authentication parameters inkey=valueformat. -
[realms]: Specifies KDC addresses. Each realm submodule begins withrealm name=and its configuration items are enclosed in braces.
[libdefaults]
default_realm = <REALM>
[realms]
<REALM> = {
kdc = <kdc-ip>:<port>
}
| Configuration item | Description |
|---|---|
[libdefaults].default_realm |
The default realm for accessing Kafka cluster nodes. Usually matches the realm of the client principal in the JAAS file. |
Other [libdefaults] parameters |
Optional Kerberos parameters such as ticket_lifetime. Configure as needed. |
[realms].realm name |
Must match the client principal realm in the JAAS file and [libdefaults].default_realm. If the two differ, include two realm submodules. |
[realms].realm name.kdc |
KDC address and port in ip:port format (for example, kdc=10.0.0.1:88). If the port is omitted, port 88 is used. |
Keytab file
The keytab file must contain the key for the principal specified in the JAAS file and must be verifiable by the Key Distribution Center (KDC). Run the following command to verify:
klist -ket ./client.keytab
Keytab name: FILE:client.keytab
KVNO Timestamp Principal
---- ------------------- ------------------------------------------------------
7 2018-07-30T10:19:16 te**@**.com (des-cbc-md5)
DNS and HOST settings for an exclusive resource group
In a Kerberos-enabled Kafka cluster, each node's hostname is part of its principal registered in the KDC. The client resolves hostnames using DNS and HOST settings to obtain credentials from the KDC. Configure these settings correctly when using an exclusive resource group:
-
DNS settings: If the VPC hosting the exclusive resource group uses a PrivateZone instance for Kafka node name resolution, add a custom route for IP addresses
100.100.2.136and100.100.2.138to the VPC attachment in the DataWorks console.
-
HOST settings: If PrivateZone is not used for Kafka node name resolution, manually add IP-to-hostname mappings for each Kafka cluster node in the Host configuration of the network settings for the exclusive resource group in the DataWorks console.

PLAIN
Set Sasl Mechanism to PLAIN when the Kafka cluster uses username/password authentication. The JAAS file must start with KafkaClient, with all configuration items enclosed in braces {}.
Format rules:
-
The first line inside the braces specifies the login module class.
-
Each configuration item uses
key=valueformat. -
All items except the last must not end with a semicolon.
-
The last item must end with a semicolon, and a semicolon must follow the closing brace
}.
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<your-username>"
password="<your-password>";
};
| Configuration item | Description |
|---|---|
| Logon module | Must be org.apache.kafka.common.security.plain.PlainLoginModule. |
username |
Your Kafka username. |
password |
Your Kafka password. |
FAQ
Appendix: Script demos and parameter descriptions
To configure a batch sync task using the code editor, specify parameters in the script using the unified script format. See Configure a task in the code editor for general requirements. The following sections cover the Kafka-specific parameters.
Reader script demo
{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "host:9093",
"column": [
"__key__",
"__value__",
"__partition__",
"__offset__",
"__timestamp__",
"'123'",
"event_id",
"tag.desc"
],
"kafkaConfig": {
"group.id": "demo_test"
},
"topic": "topicName",
"keyType": "ByteArray",
"valueType": "ByteArray",
"beginDateTime": "20190416000000",
"endDateTime": "20190416000006",
"skipExceedRecord": "true"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
}
}
Reader script parameters
| Parameter | Description | Required |
|---|---|---|
datasource |
The data source name. Must match the name of the data source added in DataWorks. | Yes |
server |
The Kafka broker address in ip:port format. Only one server is needed, but DataWorks must be able to reach all brokers in the cluster. |
Yes |
topic |
The Kafka topic to read from. | Yes |
column |
The Kafka data fields to read. Three types are supported: | Yes |
Constant columns: Values enclosed in single quotes, such as ["'abc'", "'123'"]. |
||
Data columns: JSON property names such as ["event_id"], or nested properties such as ["tag.desc"]. |
||
Attribute columns: __key__, __value__, __partition__, __headers__, __offset__, __timestamp__ |
||
keyType |
The type of the Kafka key. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT. |
No |
valueType |
The type of the Kafka value. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT. |
No |
beginDateTime |
Start offset for consumption as an inclusive time boundary in yyyymmddhhmmss format. Use with endDateTime. Compatible with scheduling parameters. Supported in Kafka 0.10.2 and later. |
Required if beginOffset is not set |
endDateTime |
End offset for consumption as an exclusive time boundary in yyyymmddhhmmss format. Use with beginDateTime. Compatible with scheduling parameters. Supported in Kafka 0.10.2 and later. |
Required if endOffset is not set |
beginOffset |
Start offset for consumption. Options: a numeric offset value; seekToBeginning (earliest offset); seekToLast (group-committed offset); seekToEnd (latest offset, reads no data). |
Required if beginDateTime is not set |
endOffset |
End offset for consumption. Controls when the task exits. | Required if endDateTime is not set |
skipExceedRecord |
Whether to discard records outside the endOffset or endDateTime boundary fetched in a single poll. Set to true for Kafka 0.10.2 and later; false for earlier versions. Default: false. |
No |
partition |
The partition to read from. By default, the task reads from all partitions. Specify an integer to restrict to a single partition. | No |
kafkaConfig |
Extended KafkaConsumer parameters. See kafkaConfig parameters below. | No |
encoding |
The encoding used when keyType or valueType is STRING. Default: UTF-8. |
No |
waitTIme |
Maximum wait time in seconds for a single data pull attempt. Default: 60. |
No |
stopWhenPollEmpty |
If true, stops the task when a poll returns empty data. If false, retries until data is available. Default: true. |
No |
stopWhenReachEndOffset |
Takes effect only when stopWhenPollEmpty is true. If true, checks whether all partition data has been read before stopping. If false, stops immediately on empty poll without checking. Default: false. Note
The default is |
No |
kafkaConfig parameters
Use kafkaConfig to pass extended configuration to the KafkaConsumer client.
| Parameter | Description |
|---|---|
fetch.min.bytes |
Minimum bytes the consumer retrieves from the broker per request. The broker waits until this threshold is met before responding. |
fetch.max.wait.ms |
Maximum wait time for the broker to return data. Default: 500 ms. Data is returned when either fetch.min.bytes or fetch.max.wait.ms is satisfied, whichever comes first. |
max.partition.fetch.bytes |
Maximum bytes the broker returns per partition per request. Default: 1 MB. |
session.timeout.ms |
Maximum time the consumer can be disconnected from the server before losing group membership. Default: 30 seconds. |
auto.offset.reset |
Action when no offset exists or the offset is invalid. Default: none (no automatic reset). Set to earliest to read from the beginning of the partition. |
max.poll.records |
Maximum number of records returned by a single poll() call. |
key.deserializer |
Deserialization class for the message key, such as org.apache.kafka.common.serialization.StringDeserializer. |
value.deserializer |
Deserialization class for the message value, such as org.apache.kafka.common.serialization.StringDeserializer. |
ssl.truststore.location |
Path to the SSL root certificate. |
ssl.truststore.password |
Password for the truststore. For Alibaba Cloud Kafka, use KafkaOnsClient. |
security.protocol |
Access protocol. Only SASL_SSL is supported. |
sasl.mechanism |
SASL authentication method. For Alibaba Cloud Kafka, use PLAIN. |
java.security.auth.login.config |
Path to the SASL authentication file. |
Writer script demo
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "Kafka",
"parameter": {
"server": "ip:9092",
"keyIndex": 0,
"valueIndex": 1,
"keyType": "Integer",
"valueType": "Short",
"topic": "t08",
"batchSize": 1024
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer script parameters
| Parameter | Description | Required |
|---|---|---|
datasource |
The data source name. Must match the name of the data source added in DataWorks. | Yes |
server |
The Kafka server address in ip:port format. |
Yes |
topic |
The Kafka topic to write to. | Yes |
keyType |
The type of the Kafka key. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT. |
Yes |
valueType |
The type of the Kafka value. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT. |
Yes |
valueIndex |
The column index (0-based) to use as the Kafka record value. If not set, all columns are concatenated using fieldDelimiter. |
No |
writeMode |
Format for concatenating all columns into the Kafka record value when valueIndex is not set. Options: text (default) or JSON. Ignored if valueIndex is configured. |
No |
column |
Field names for the JSON value structure when writeMode is JSON and valueIndex is not set. Format: [{"name":"col1","type":"JSON_NUMBER"}, {"name":"col2","type":"JSON_STRING"}]. If the source has more columns than defined fields, data is truncated. If fewer, missing fields are filled with null or the nullValueFormat value. Default JSON field type is JSON_STRING. |
Required when writeMode=JSON and valueIndex is not set |
partition |
The target partition number in the Kafka topic. Must be an integer >= 0. | No |
keyIndex |
Column index (0-based integer >= 0) to use as the Kafka record key. Mutually exclusive with keyIndexes. |
No |
keyIndexes |
Array of column indexes (0-based) to concatenate as the Kafka record key, separated by commas. Mutually exclusive with keyIndex. If neither is set, the key is null and records are distributed across partitions in round-robin order. |
No |
fieldDelimiter |
Column separator when writeMode=text and valueIndex is not set. Supports single or multiple characters, Unicode (for example, \u0001), and escape characters (\t, \n). Default: \t. |
No |
nullKeyFormat |
Replacement string for null values in the column specified by keyIndex or keyIndexes. If not set, null is not replaced. |
No |
nullValueFormat |
Replacement string for null source column values when assembling the Kafka record value. If not set, null is not replaced. | No |
acks |
Producer acknowledgment mode. Options: 0 (no acknowledgment), 1 (primary replica only), all (all replicas). Default: all. |
No |
Appendix: Definition of message format for writing to Kafka
When a real-time sync task runs, DataWorks first writes all existing data from the source table to the Kafka topic, then continuously writes incremental changes. Both full data and incremental DDL change information are written in JSON format.
For the complete message format—including heartbeat messages and the format of records corresponding to source data changes—see Appendix: Message format.
In the JSON structure of data written by an offline sync task, the payload.sequenceId, payload.timestamp.eventTime, and payload.timestamp.checkpointTime fields are set to -1.
Appendix: JSON field types
When writeMode is set to JSON, use the type field in the column parameter to specify the JSON data type. If conversion fails, the record is treated as dirty data.
| Type | Description |
|---|---|
JSON_STRING |
Converts the source value to a string. Example: integer 123 → {"col1":"123"} |
JSON_NUMBER |
Converts the source value to a number. Example: string "1.23" → {"col1":1.23} |
JSON_BOOL |
Converts the source value to a Boolean. Example: string "true" → {"col1":true} |
JSON_ARRAY |
Converts the source value to a JSON array. Example: string "[1,2,3]" → {"col1":[1,2,3]} |
JSON_MAP |
Converts the source value to a JSON object. Example: string {"k1":"v1"} → {"col1":{"k1":"v1"}} |
JSON_BASE64 |
Base64-encodes the binary content of the source value. Example: byte array 0x01 0x02 → {"col1":"AQI="} |
JSON_HEX |
Converts the binary content of the source value to a hexadecimal string. Example: byte array 0x01 0x02 → {"col1":"0102"} |