All Products
Search
Document Center

DataWorks:Kafka

Last Updated:Mar 27, 2026

The Kafka data source enables bidirectional data synchronization between DataWorks and Kafka—read from Kafka topics in offline or real-time tasks, and write to Kafka topics from any supported source.

Supported versions

DataWorks supports Alibaba Cloud Kafka and self-managed Kafka clusters running versions 0.10.2 through 3.6.x.

Note

Versions earlier than 0.10.2 are not supported. Those versions lack partition offset retrieval support and may have data structures incompatible with timestamps.

Supported sync modes

Sync mode Supported
Offline read (single table) Yes
Real-time read (single table) Yes
Offline write (single table) Yes
Real-time write (single table) Yes
Real-time write (entire database) Yes

Resource groups

The Kafka data source supports Serverless resource groups (recommended) and old versions of exclusive resource groups for Data Integration.

Resource estimation for real-time read

When using a subscription Serverless resource group, estimate resources before starting a task to avoid failures from insufficient capacity.

Per-topic base allocation: 1 CU per topic

Traffic-based allocation:

Kafka data type CU per 10 MB/s of traffic
Uncompressed 1 CU
Compressed 2 CU
Compressed + JSON parsing 3 CU

Cluster slot usage limits:

Failover tolerance Maximum slot usage
High 80%
Low 70%
Note

Actual usage depends on data content and format. Adjust based on observed runtime conditions after your initial estimate.

Limitations

Offline read from a single table

If both parameter.groupId and parameter.kafkaConfig.group.id are configured, parameter.groupId takes precedence.

Real-time write to a single table

Data deduplication is not supported. If a task restarts after an offset reset or a failover, duplicate records may be written.

Real-time write for an entire database

Key assignment by primary key configuration:

Source table primary key Key assignment behavior
Has primary key Primary key value is used as the Kafka record key—records for the same primary key are written to the same partition in order
No primary key, sync without primary key Key is empty; the destination Kafka topic must have exactly one partition to preserve write order
No primary key, custom primary key A combination of one or more non-primary key fields is used as the key

Strict ordering under cluster exceptions:

If the Kafka cluster returns exceptions and you need strict ordering for records with the same primary key, add the following to the extended parameters of the Kafka data source:

{"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}
Important

This configuration significantly degrades replication performance. Evaluate the tradeoff between ordering guarantees and throughput before enabling it.

For the format of messages written to Kafka—including heartbeat messages and DDL change records—see Appendix: Message format.

Supported field types

Kafka records contain six data fields: key, value, offset, timestamp, headers, and partition. DataWorks processes these fields as follows when reading from or writing to Kafka.

Read data

DataWorks can parse Kafka data in JSON format. The table below shows how each field is handled.

Kafka record field Processed type
key Determined by the keyType parameter in the sync task configuration
value Determined by the valueType parameter in the sync task configuration
offset Long
timestamp Long
headers String
partition Long

Write data

DataWorks supports writing to Kafka in JSON or text format. The format behavior varies by sync task type.

Important
  • In text format, field names are not included—field values are separated by a delimiter.

  • In real-time sync tasks, DataWorks uses a built-in JSON format that includes database change messages, business timestamps, and DDL information. See Appendix: Message format for details.

Sync task type Value format Source field type Write behavior
Offline sync (offline synchronization node in DataStudio) JSON String UTF-8 encoded string
Boolean UTF-8 string "true" or "false"
Time/Date UTF-8 string in yyyy-MM-dd HH:mm:ss format
Numeric UTF-8 encoded numeric string
Byte stream Treated as UTF-8 encoded string
Text String UTF-8 encoded string
Boolean UTF-8 string "true" or "false"
Time/Date UTF-8 string in yyyy-MM-dd HH:mm:ss format
Numeric UTF-8 encoded numeric string
Byte stream Treated as UTF-8 encoded string
Real-time ETL to Kafka (real-time sync node in DataStudio) JSON String UTF-8 encoded string
Boolean JSON Boolean type
Time/Date (< millisecond precision) 13-digit JSON integer (millisecond timestamp)
Time/Date (microsecond or nanosecond precision) JSON float with 13-digit integer (ms) + 6-digit decimal (ns)
Numeric JSON numeric type
Byte stream Base64-encoded, then converted to UTF-8 string
Text String UTF-8 encoded string
Boolean UTF-8 string "true" or "false"
Time/Date UTF-8 string in yyyy-MM-dd HH:mm:ss format
Numeric UTF-8 encoded numeric string
Byte stream Base64-encoded, then converted to UTF-8 string
Real-time entire database sync to Kafka (incremental data only) Built-in JSON String UTF-8 encoded string
Boolean JSON Boolean type
Time/Date 13-digit millisecond timestamp
Numeric JSON numeric value
Byte stream Base64-encoded, then converted to UTF-8 string
One-click real-time sync to Kafka (full offline + incremental real-time) Built-in JSON String UTF-8 encoded string
Boolean JSON Boolean type
Time/Date 13-digit millisecond timestamp
Numeric JSON numeric value
Byte stream Base64-encoded, then converted to UTF-8 string

Add a data source

Before developing a sync task, add the Kafka data source in DataWorks. See Data source management for the procedure. Parameter descriptions are available directly in the DataWorks console when you add a data source.

Develop a data sync task

Configure an offline sync task for a single table

Configure a real-time sync task for a single table or entire database

See Configure a real-time synchronization task in DataStudio, Configure a real-time synchronization task in Data Integration, and Configure a synchronization task for an entire database.

Authentication configuration

DataWorks supports three authentication mechanisms for Kafka: SSL, GSSAPI (Kerberos), and PLAIN. Choose based on your Kafka cluster configuration.

When to use Mechanism
Kafka cluster uses SSL or SASL_SSL SSL
Kafka cluster uses Kerberos authentication GSSAPI
Kafka cluster uses username/password authentication PLAIN

SSL

Set Special Authentication Method to SSL or SASL_SSL to enable SSL authentication. You must upload a client truststore certificate file and enter the truststore passphrase.

Truststore certificate by cluster type:

The keystore certificate file, keystore passphrase, and SSL passphrase are only required when bidirectional SSL authentication is enabled (ssl.client.auth=required in the server's server.properties file). See Use SSL encryption for Kafka connections for details.

GSSAPI

Set Sasl Mechanism to GSSAPI when the Kafka cluster uses Kerberos authentication. Upload three files—a JAAS configuration file, a Kerberos configuration file, and a Keytab file—and configure DNS/HOST settings for the exclusive resource group.

Note

For a Serverless resource group, configure host addresses using internal DNS resolution. See Internal DNS resolution (PrivateZone).

JAAS configuration file

The file must start with KafkaClient, with all configuration items enclosed in braces {}.

Format rules:

  • The first line inside the braces specifies the login module class.

  • Each configuration item uses key=value format.

  • All items except the last must not end with a semicolon.

  • The last item must end with a semicolon, and a semicolon must follow the closing brace }.

If the format is incorrect, the file cannot be parsed.

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="<path-to-keytab>"
   storeKey=true
   serviceName="kafka-server"
   principal="<client-principal>@<REALM>";
};
Configuration item Description
Logon module Must be com.sun.security.auth.module.Krb5LoginModule.
useKeyTab Must be set to true.
keyTab Specify any path. When the task runs, the uploaded keytab file is automatically downloaded and this path is populated.
storeKey Specifies whether the client saves the key. Set to true or false—this does not affect synchronization.
serviceName Corresponds to sasl.kerberos.service.name in the Kafka server's server.properties. Configure as needed.
principal The Kerberos principal for the Kafka client. Make sure the uploaded keytab file contains the key for this principal.

Kerberos configuration file

The file must contain two modules: [libdefaults] and [realms].

  • [libdefaults]: Specifies Kerberos authentication parameters in key=value format.

  • [realms]: Specifies KDC addresses. Each realm submodule begins with realm name= and its configuration items are enclosed in braces.

[libdefaults]
  default_realm = <REALM>

[realms]
  <REALM> = {
    kdc = <kdc-ip>:<port>
  }
Configuration item Description
[libdefaults].default_realm The default realm for accessing Kafka cluster nodes. Usually matches the realm of the client principal in the JAAS file.
Other [libdefaults] parameters Optional Kerberos parameters such as ticket_lifetime. Configure as needed.
[realms].realm name Must match the client principal realm in the JAAS file and [libdefaults].default_realm. If the two differ, include two realm submodules.
[realms].realm name.kdc KDC address and port in ip:port format (for example, kdc=10.0.0.1:88). If the port is omitted, port 88 is used.

Keytab file

The keytab file must contain the key for the principal specified in the JAAS file and must be verifiable by the Key Distribution Center (KDC). Run the following command to verify:

klist -ket ./client.keytab

Keytab name: FILE:client.keytab
KVNO Timestamp           Principal
---- ------------------- ------------------------------------------------------
   7 2018-07-30T10:19:16 te**@**.com (des-cbc-md5)

DNS and HOST settings for an exclusive resource group

In a Kerberos-enabled Kafka cluster, each node's hostname is part of its principal registered in the KDC. The client resolves hostnames using DNS and HOST settings to obtain credentials from the KDC. Configure these settings correctly when using an exclusive resource group:

  • DNS settings: If the VPC hosting the exclusive resource group uses a PrivateZone instance for Kafka node name resolution, add a custom route for IP addresses 100.100.2.136 and 100.100.2.138 to the VPC attachment in the DataWorks console.

  • HOST settings: If PrivateZone is not used for Kafka node name resolution, manually add IP-to-hostname mappings for each Kafka cluster node in the Host configuration of the network settings for the exclusive resource group in the DataWorks console.

PLAIN

Set Sasl Mechanism to PLAIN when the Kafka cluster uses username/password authentication. The JAAS file must start with KafkaClient, with all configuration items enclosed in braces {}.

Format rules:

  • The first line inside the braces specifies the login module class.

  • Each configuration item uses key=value format.

  • All items except the last must not end with a semicolon.

  • The last item must end with a semicolon, and a semicolon must follow the closing brace }.

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="<your-username>"
  password="<your-password>";
};
Configuration item Description
Logon module Must be org.apache.kafka.common.security.plain.PlainLoginModule.
username Your Kafka username.
password Your Kafka password.

FAQ

Appendix: Script demos and parameter descriptions

To configure a batch sync task using the code editor, specify parameters in the script using the unified script format. See Configure a task in the code editor for general requirements. The following sections cover the Kafka-specific parameters.

Reader script demo

{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "true"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": true,
            "concurrent": 1,
            "mbps": "12"
        }
    }
}

Reader script parameters

Parameter Description Required
datasource The data source name. Must match the name of the data source added in DataWorks. Yes
server The Kafka broker address in ip:port format. Only one server is needed, but DataWorks must be able to reach all brokers in the cluster. Yes
topic The Kafka topic to read from. Yes
column The Kafka data fields to read. Three types are supported: Yes
Constant columns: Values enclosed in single quotes, such as ["'abc'", "'123'"].
Data columns: JSON property names such as ["event_id"], or nested properties such as ["tag.desc"].
Attribute columns: __key__, __value__, __partition__, __headers__, __offset__, __timestamp__
keyType The type of the Kafka key. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT. No
valueType The type of the Kafka value. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT. No
beginDateTime Start offset for consumption as an inclusive time boundary in yyyymmddhhmmss format. Use with endDateTime. Compatible with scheduling parameters. Supported in Kafka 0.10.2 and later. Required if beginOffset is not set
endDateTime End offset for consumption as an exclusive time boundary in yyyymmddhhmmss format. Use with beginDateTime. Compatible with scheduling parameters. Supported in Kafka 0.10.2 and later. Required if endOffset is not set
beginOffset Start offset for consumption. Options: a numeric offset value; seekToBeginning (earliest offset); seekToLast (group-committed offset); seekToEnd (latest offset, reads no data). Required if beginDateTime is not set
endOffset End offset for consumption. Controls when the task exits. Required if endDateTime is not set
skipExceedRecord Whether to discard records outside the endOffset or endDateTime boundary fetched in a single poll. Set to true for Kafka 0.10.2 and later; false for earlier versions. Default: false. No
partition The partition to read from. By default, the task reads from all partitions. Specify an integer to restrict to a single partition. No
kafkaConfig Extended KafkaConsumer parameters. See kafkaConfig parameters below. No
encoding The encoding used when keyType or valueType is STRING. Default: UTF-8. No
waitTIme Maximum wait time in seconds for a single data pull attempt. Default: 60. No
stopWhenPollEmpty If true, stops the task when a poll returns empty data. If false, retries until data is available. Default: true. No
stopWhenReachEndOffset Takes effect only when stopWhenPollEmpty is true. If true, checks whether all partition data has been read before stopping. If false, stops immediately on empty poll without checking. Default: false.
Note

The default is false for compatibility with historical behavior—Kafka versions earlier than 0.10.2 cannot check whether the latest data from all partitions has been read, and some existing tasks may still target those versions.

No

kafkaConfig parameters

Use kafkaConfig to pass extended configuration to the KafkaConsumer client.

Parameter Description
fetch.min.bytes Minimum bytes the consumer retrieves from the broker per request. The broker waits until this threshold is met before responding.
fetch.max.wait.ms Maximum wait time for the broker to return data. Default: 500 ms. Data is returned when either fetch.min.bytes or fetch.max.wait.ms is satisfied, whichever comes first.
max.partition.fetch.bytes Maximum bytes the broker returns per partition per request. Default: 1 MB.
session.timeout.ms Maximum time the consumer can be disconnected from the server before losing group membership. Default: 30 seconds.
auto.offset.reset Action when no offset exists or the offset is invalid. Default: none (no automatic reset). Set to earliest to read from the beginning of the partition.
max.poll.records Maximum number of records returned by a single poll() call.
key.deserializer Deserialization class for the message key, such as org.apache.kafka.common.serialization.StringDeserializer.
value.deserializer Deserialization class for the message value, such as org.apache.kafka.common.serialization.StringDeserializer.
ssl.truststore.location Path to the SSL root certificate.
ssl.truststore.password Password for the truststore. For Alibaba Cloud Kafka, use KafkaOnsClient.
security.protocol Access protocol. Only SASL_SSL is supported.
sasl.mechanism SASL authentication method. For Alibaba Cloud Kafka, use PLAIN.
java.security.auth.login.config Path to the SASL authentication file.

Writer script demo

{
  "type": "job",
  "version": "2.0",
  "steps": [
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "Kafka",
      "parameter": {
          "server": "ip:9092",
          "keyIndex": 0,
          "valueIndex": 1,
          "keyType": "Integer",
          "valueType": "Short",
          "topic": "t08",
          "batchSize": 1024
        },
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
      "errorLimit": {
        "record": "0"
      },
      "speed": {
          "throttle": true,
          "concurrent": 1,
          "mbps": "12"
      }
   },
  "order": {
    "hops": [
        {
            "from": "Reader",
            "to": "Writer"
        }
      ]
    }
}

Writer script parameters

Parameter Description Required
datasource The data source name. Must match the name of the data source added in DataWorks. Yes
server The Kafka server address in ip:port format. Yes
topic The Kafka topic to write to. Yes
keyType The type of the Kafka key. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT. Yes
valueType The type of the Kafka value. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, SHORT. Yes
valueIndex The column index (0-based) to use as the Kafka record value. If not set, all columns are concatenated using fieldDelimiter. No
writeMode Format for concatenating all columns into the Kafka record value when valueIndex is not set. Options: text (default) or JSON. Ignored if valueIndex is configured. No
column Field names for the JSON value structure when writeMode is JSON and valueIndex is not set. Format: [{"name":"col1","type":"JSON_NUMBER"}, {"name":"col2","type":"JSON_STRING"}]. If the source has more columns than defined fields, data is truncated. If fewer, missing fields are filled with null or the nullValueFormat value. Default JSON field type is JSON_STRING. Required when writeMode=JSON and valueIndex is not set
partition The target partition number in the Kafka topic. Must be an integer >= 0. No
keyIndex Column index (0-based integer >= 0) to use as the Kafka record key. Mutually exclusive with keyIndexes. No
keyIndexes Array of column indexes (0-based) to concatenate as the Kafka record key, separated by commas. Mutually exclusive with keyIndex. If neither is set, the key is null and records are distributed across partitions in round-robin order. No
fieldDelimiter Column separator when writeMode=text and valueIndex is not set. Supports single or multiple characters, Unicode (for example, \u0001), and escape characters (\t, \n). Default: \t. No
nullKeyFormat Replacement string for null values in the column specified by keyIndex or keyIndexes. If not set, null is not replaced. No
nullValueFormat Replacement string for null source column values when assembling the Kafka record value. If not set, null is not replaced. No
acks Producer acknowledgment mode. Options: 0 (no acknowledgment), 1 (primary replica only), all (all replicas). Default: all. No

Appendix: Definition of message format for writing to Kafka

When a real-time sync task runs, DataWorks first writes all existing data from the source table to the Kafka topic, then continuously writes incremental changes. Both full data and incremental DDL change information are written in JSON format.

For the complete message format—including heartbeat messages and the format of records corresponding to source data changes—see Appendix: Message format.

Note

In the JSON structure of data written by an offline sync task, the payload.sequenceId, payload.timestamp.eventTime, and payload.timestamp.checkpointTime fields are set to -1.

Appendix: JSON field types

When writeMode is set to JSON, use the type field in the column parameter to specify the JSON data type. If conversion fails, the record is treated as dirty data.

Type Description
JSON_STRING Converts the source value to a string. Example: integer 123{"col1":"123"}
JSON_NUMBER Converts the source value to a number. Example: string "1.23"{"col1":1.23}
JSON_BOOL Converts the source value to a Boolean. Example: string "true"{"col1":true}
JSON_ARRAY Converts the source value to a JSON array. Example: string "[1,2,3]"{"col1":[1,2,3]}
JSON_MAP Converts the source value to a JSON object. Example: string {"k1":"v1"}{"col1":{"k1":"v1"}}
JSON_BASE64 Base64-encodes the binary content of the source value. Example: byte array 0x01 0x02{"col1":"AQI="}
JSON_HEX Converts the binary content of the source value to a hexadecimal string. Example: byte array 0x01 0x02{"col1":"0102"}