All Products
Search
Document Center

:Configure Kafka input

Last Updated:Jun 20, 2026

Use Kafka as a source for real-time synchronization to continuously read data from a message queue and write it to a destination. This topic shows you how to configure the Kafka Reader component.

Prerequisites

  • Supports Alibaba Cloud Kafka and self-managed Kafka versions from 0.10.2 to 2.2.x, inclusive.

  • Using Kafka versions earlier than 0.10.2 is not recommended. They do not support retrieving partition offsets and may lack timestamps in their data structure. This causes inaccurate latency statistics for the synchronization task and prevents proper offset resets.

For details on how to configure a Kafka data source, see Configure a Kafka data source.

Procedure

  1. Log on to the DataWorks console. In the target region, click Data Development and O&M > Data Development in the left-side navigation pane. Select a workspace from the drop-down list and click Go to Data Development.

  2. Hover over the 新建 icon and choose Create Node > Data Integration > Real-time Synchronization.

    Alternatively, expand a scheduled workflow, right-click it, and choose Create Node > Data Integration > Real-time Synchronization.

  3. In the Create Node dialog box, set Synchronization Method to Single Table (Topic) to Single Table (Topic) ETL, enter a Name, and select a Path.

  4. Click Confirm.

  5. On the configuration page of the real-time synchronization node, choose Input and drag it to the editing canvas.

  6. Click the Kafka node and configure its parameters in the Node Configuration dialog box.

    Parameter

    Description

    Data Source

    Select a configured Kafka data source. Only Kafka data sources are supported. If no data source is configured, click New data source on the right to go to the Workspace Management > Data Source page to create one. For more information, see Configure a Kafka data source.

    Topic

    The name of the topic from which to read messages.

    A topic is a category used to group related messages within a Kafka cluster.

    Note

    A Kafka Reader can read from only one topic.

    Key Type

    The type of the Kafka key determines the key.deserializer configuration for initializing a KafkaConsumer. The available values include STRING, BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

    Value Type

    When you initialize a KafkaConsumer, the type of the Kafka Value determines the value.deserializer configuration. The valid values include STRING, BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

    Output Mode

    Defines how to parse Kafka records.

    • Single-row output: Parses a Kafka record as an unstructured string or a JSON object. One Kafka record generates one output record.

    • Multi-row output: Parses a Kafka record as a JSON array. Each element in the array generates one output record. Therefore, one Kafka record can generate multiple output records.

    Note

    This parameter is available only in some regions. If you do not see it, wait for the feature to be released in your region.

    Path of Array

    When the output mode is set to multi-line output, specify the path to the JSON array in the value of the Kafka record. You can use the a.a1 format to reference a field in a specific JSON object, or the a[0].a1 format to reference a field in a specific JSON array. If this parameter is empty, the entire value of the Kafka record is parsed as a JSON array.

    The target JSON array must be an array of objects, such as [{"a":"hello"},{"b":"world"}]. It cannot be an array of primitives like numbers or strings, such as ["a","b"].

    Configuration parameters

    When creating a Kafka consumer client (KafkaConsumer), you can add extended parameters to kafkaConfig  to customize its data reading behavior. For a complete list of parameters supported by each Kafka cluster version, see the official Kafka documentation for your version.

    • Common parameters:

      • bootstrap.servers

      • auto.commit.interval.ms

      • session.timeout.ms

    • group.id

      • Default behavior
        By default, a real-time synchronization task assigns a randomly generated string as the group.id for the KafkaConsumer.















      • Manual configuration
        You can manually specify a fixed group.id. This allows you to monitor or observe the task's consumption offset in the Kafka cluster by using the specified consumer group.















    Output Field

    Define the fields to extract from each Kafka message.

    • Click Add more fields, enter a Field, and select a Value to add a custom field.

      Assignment Method specifies how to extract a field's value from a Kafka record. Click the 箭头 icon on the right to switch between two value extraction methods.

      • Preset methods: Provides six built-in methods to extract values from a Kafka record:

        • value: The message body.

        • key: The message key.

        • partition: The partition number.

        • offset: The message offset.

        • timestamp: The message timestamp in milliseconds.

        • headers: The message headers.

      • JSON parsing: Use dot notation (.) to access sub-fields and bracket notation ([]) to access array elements to extract values from complex JSON. For backward compatibility, you can use legacy syntax with double underscores (e.g., __value__) to extract metadata or the entire message body from a Kafka record. The following code block shows an example of Kafka data.

        {
           "a": {
           "a1": "hello"
           },
           "b": "world",
           "c":[
              "xxxxxxx",
              "yyyyyyy"
              ],
           "d":[
              {
                 "AA":"this",
                 "BB":"is_data"
              },
              {
                 "AA":"that",
                 "BB":"is_also_data"
              }
            ]
        }
        • The output field values for different scenarios are as follows:

          • To synchronize the value of a Kafka record, enter value for the value retrieval method.

          • If you synchronize the Kafka record key, enter key for the Value Retrieval Method.

          • If you are synchronizing the partition of a Kafka record, set the value source to partition.

          • If you synchronize Kafka record offsets, enter offset for Value Retrieval Method.

          • If you synchronize the Kafka record timestamp, enter timestamp for the value retrieval method.

          • If you synchronize Kafka record headers, enter headers as the value source.

          • To synchronize the data "hello" from a1, enter a.a1 as the path.

          • If you want to synchronize the data "world" from b, set the value retrieval method to b.

          • If you synchronize the data "yyyyyyy" of c, enter c[1] as the value retrieval method.

          • If you want to synchronize the data "this" from AA, enter d[0].AA as the value retrieval method.

    • To delete a field, hover over it and click the 删除 icon that appears.

    Example scenario: If you select Multi-row Output for Output Mode, the system first parses a JSON array based on the specified JSON path. Then, it extracts each JSON object from the array and constructs the output fields based on the defined field names and value extraction methods. The value extraction method is the same as that for the Single-line Output mode. You can use the . syntax (to retrieve sub-fields) and the [] syntax (to retrieve array elements) to get content from a complex JSON format. The Kafka instance data is as follows:

    {
        "c": {
            "c0": [
                {
                    "AA": "this",
                    "BB": "is_data"
                },
                {
                    "AA": "that",
                    "BB": "is_also_data"
                }
            ]
        }
    }

    When the array location is set to c.c0, and you define two output fields, one named AA with its value from AA and another named BB with its value from BB, the Kafka record is parsed into two records (one with AA=this and BB=is_data, and the other with AA=that and BB=is_also_data).

  7. Click the 保存 icon on the toolbar.

    Note

    A Kafka Reader can read from only one topic.