All Products
Search
Document Center

DataWorks:Configure Kafka Reader

Last Updated:Aug 01, 2023

Kafka Reader uses a Kafka SDK to read data from Kafka in real time.

Background information

Note
  • Kafka Reader can read data from Message Queue for Apache Kafka data sources and self-managed Kafka data sources. However, the versions of self-managed Kafka data sources must range from 0.10.2 to 2.2.x.
  • Self-managed Kafka data sources whose versions are earlier than 0.10.2 do not support the query of offsets of partition data and do not support timestamps. If you use such a Kafka data source in a data synchronization node, the latency data that is displayed in Operation Center for the data synchronization node may be incorrect, and the offset from which incremental data starts to be synchronized cannot be reset.
For more information about how to add a Kafka data source, see Add a Kafka data source.

Procedure

  1. Go to the DataStudio page.

    1. Log on to the DataWorks console.

    2. In the left-side navigation pane, click Workspaces.

    3. In the top navigation bar, select the region in which the workspace that you want to manage resides. On the Workspaces page, find the workspace and click Shortcuts > Data Development in the Actions column.

  2. In the Scheduled Workflow pane, move the pointer over the Create a table icon and choose Create Node > Data Integration > Real-time synchronization.

    Alternatively, right-click the required workflow, and then choose Create Node > Data Integration > Real-time synchronizationReal-time synchronization.

  3. In the Create Node dialog box, set the Sync Method parameter to End-to-end ETL and configure the Name and Path parameters.

    Important

    The node name cannot exceed 128 characters in length and can contain letters, digits, underscores (_), and periods (.).

  4. Click Confirm.

  5. On the configuration tab of the real-time synchronization node, drag Kafka in the Input section to the canvas on the right.
  6. Click the Kafka node. In the panel that appears, configure the parameters.
    Kafka
    ParameterDescription
    Data sourceThe name of the Kafka data source that you added. You can select only a Kafka data source. If no data source is available, click New data source on the right to add one on the Data Source page. For more information, see Add a Kafka data source.
    TopicThe name of the Kafka topic from which you want to read data. Topics are categories in which Kafka maintains the feeds of messages.

    Each message that is published to a Kafka cluster is assigned a topic. Each topic contains a group of messages.

    Note Kafka Reader in each data synchronization node can read data from only one topic.
    Key TypeThe data type of the keys in the Kafka topic. The value of this parameter determines the setting of key.deserializer that is used to initialize a Kafka consumer. Valid values: String, ByteArray, Double, Float, Integer, Long, and Short.
    Value TypeThe data type of the values in the Kafka topic. The value of this parameter determines the setting of value.deserializer that is used to initialize a Kafka consumer. Valid values: String, ByteArray, Double, Float, Integer, Long, and Short.
    Output ModeThe mode in which Kafka Reader parses messages in the Kafka topic. Valid values:
    • Single-row Output: Kafka Reader parses messages as unstructured strings or JSON objects. One message is parsed into one output record.
    • Multi-row Output: Kafka Reader parses messages as JSON arrays. Each array element is parsed into one output record. Therefore, one message may be parsed into multiple output records.
    Note This parameter is supported only in some regions and will be supported in other regions in the future.
    Path of ArrayThe path of the JSON array in the value of the Kafka message. This parameter is displayed only if you set the Output Mode parameter to Multi-row Output. If you want to reference the fields in a specific JSON object, you can specify a value for this parameter in the a.a1 format. If you want to reference the fields in a specific JSON array, you can specify a value for this parameter in the a[0].a1 format. If you leave this parameter empty, Kafka Reader parses the value of a message as a JSON array.

    You must make sure that the JSON array to be parsed is an object array such as [{"a":"hello"},{"b":"world"}], instead of a numeric array or string array such as ["a","b"].

    Configuration parametersThe extended parameters that you can configure when you create a Kafka consumer. For example, you can configure the bootstrap.servers, auto.commit.interval.ms, and session.timeout.ms parameters. For more information about the parameters supported by Kafka clusters of different versions for Kafka consumers, see Documentation of Apache Kafka. You can configure parameters in KafkaConfig to control the data read behavior of a Kafka consumer. For a real-time synchronization node that synchronizes data from Kafka, a Kafka consumer uses a random string as the value of the group.id parameter. If you want the synchronization offset to be uploaded to a specified group in the Kafka cluster, you can manually specify a value for the group.id parameter. A real-time synchronization node that synchronizes data from Kafka does not manage offsets based on the group information maintained by the Kafka server. Therefore, the setting of the group.id parameter does not affect the synchronization offset after the data synchronization node is started or restarted or after a failover is performed on the data synchronization node.
    Output FieldsThe output fields, which can be customized.
    • Click Add more fields. In the fields that appear, enter a field name and select a data type to customize a field.
      DataWorks provides two types of methods based on which Kafka Reader obtains values for fields from messages. You can click the Arrow icon icon to switch between the two types of methods.
      • Default methods:
        • value: the values of messages
        • key: the keys of messages
        • partition: the IDs of partitions
        • offset: the offsets of messages
        • timestamp: the timestamps of messages, in milliseconds
        • headers: the headers of messages
      • JSON-based parsing: You can use the .Sub-field or [Element in an array] syntax to obtain the content in the JSON format. To ensure that the values of fields are compatible with historical logic, you can use a string that starts with two underscores (_), such as __value__, to obtain specific values for fields from messages. The following code shows the data in a sample Kafka message:
        {
           "a": {
           "a1": "hello"
           },
           "b": "world",
           "c":[
              "xxxxxxx",
              "yyyyyyy"
              ],
           "d":[
              {
                 "AA":"this",
                 "BB":"is_data"
              },
              {
                 "AA":"that",
                 "BB":"is_also_data"
              }
            ]
        }
        • You can use one of the following methods based on the preceding code:
          • If you want to read the values of messages, use __value__.
          • If you want to read the keys of messages, use __key__.
          • If you want to read the partitions that store messages, use __partition__.
          • If you want to read the offsets of messages, use __offset__.
          • If you want to read the timestamps of messages, use __timestamp__.
          • If you want to read the headers of messages, use __headers__.
          • If you want to read "hello" in the a1 field, use a.a1.
          • If you want to read "world" in the b field, use b.
          • If you want to read "yyyyyyy" in the c field, use c[1].
          • If you want to read "this" in the AA field, use d[0].AA.
    • To remove a field, move the pointer over the field and click the Remove icon.
    Sample scenario: If you set the Output Mode parameter to Multi-row Output, Kafka Reader parses messages as JSON arrays based on the JSON array path that is specified by the Path of Array parameter, extracts each element in the JSON arrays, and then forms output fields based on the field names that are defined and the specified method to obtain values. The method to obtain values for the output fields is the same as the method to obtain values that is used if you set the Output Mode parameter to Single-row Output. You can use the .Sub-field or [Element in an array] syntax to obtain the content in the JSON format. The following code shows the data in a sample Kafka message:
    {
        "c": {
            "c0": [
                {
                    "AA": "this",
                    "BB": "is_data"
                },
                {
                    "AA": "that",
                    "BB": "is_also_data"
                }
            ]
        }
    }
    If you specify a value in the c.c0 format for the Array of Path parameter and define the output field AA whose method to obtain a value is AA, and the output field BB whose method to obtain a value is BB, the output records shown in the following table will be obtained.Output record
  7. Click the Save icon in the top toolbar to save the configurations.
    Note Kafka Reader in each data synchronization node can read data from only one topic.