Kafka Reader uses a Kafka SDK to read data from Kafka in real time.

Background information

Note
  • Kafka Reader can be used to read data from Message Queue for Apache Kafka data sources and self-managed Kafka data sources. However, the versions of self-managed Kafka data sources must range from 0.10.2 to 2.2.x.
  • Self-managed Kafka data sources whose versions are earlier than 0.10.2 do not support the query of offsets of partition data and do not support timestamps. If you use such a Kafka data source in a synchronization node, the latency data of the synchronization node that is displayed in Operation Center may be incorrect, and the offset from which incremental data starts to be synchronized cannot be reset.
If you want to use the simple authentication and security layer (SASL) authentication mode, contact the technical support and provide the technical support with the SSL root certificate and SASL authentication file. Then, the technical support configures the SSL root certificate and SASL authentication file for the environment in which your synchronization node runs. In addition, you must add the following configurations to the settings of the Kafka data source that is used for your synchronization node:
{ "java.security.auth.login.config": "/home/admin/kafka_client_jaas.conf",
 "ssl.truststore.location": "/home/admin/kafka.client.truststore.jks",
 "ssl.truststore.password": "KafkaOnsClient", "security.protocol": "SASL_SSL",
 "sasl.mechanism": "PLAIN", "ssl.endpoint.identification.algorithm": "" }
For more information about how to add a Kafka data source, see Add a Kafka data source.

Procedure

  1. Go to the DataStudio page.
    1. Log on to the DataWorks console.
    2. In the left-side navigation pane, click Workspaces.
    3. In the top navigation bar, select the region in which the workspace that you want to manage resides. Find the workspace and click Data Development in the Actions column.
  2. In the Scheduled Workflow pane, move the pointer over the Create icon icon and choose Data Integration > Real-time synchronization.
    Alternatively, click the required workflow, right-click Data Integration, and then choose Create > Real-time synchronization.
  3. In the Create Node dialog box, set the Sync Method parameter to End-to-end ETL and set the Node Name and Location parameters.
    Notice The node name can be up to 128 characters in length and can contain letters, digits, underscores (_), and periods (.).
  4. Click Commit.
  5. On the configuration tab of the real-time synchronization node, drag Kafka in the Input section to the canvas on the right.
  6. Click the Kafka node. In the panel that appears, configure the parameters.
    Kafka
    Parameter Description
    Kafka Cluster Address The address of the Kafka broker. Specify the address in the IP address:Port number format.
    Topic The name of the Kafka topic from which you want to read data. Topics are categories in which Kafka maintains the feeds of messages.
    Each message that is published to a Kafka cluster is assigned to a topic. Each topic contains a group of messages.
    Note Kafka Reader in each synchronization node can read data from only one topic.
    key Type The data type of the keys in the Kafka topic.
    Value Type The data type of the values in the Kafka topic.
    Initiation Site The offset of the first message from which Kafka Reader reads data when the synchronization node runs.
    • EARLIEST: the earliest offset in each partition
    • LATEST: the latest offset in each partition
    • TIMESTAMP: the start offset that is specified in Operation Center
    • GROUP_OFFSETS: the offset of the message that is previously consumed by the consumer group specified by the group.id parameter
    Note If you rerun the node, the Initiation Site parameter does not take effect. You can specify a new start offset in Operation Center to rerun the node. If you do not specify a new start offset in Operation Center, Kafka Reader reads messages from the buffer offset.
    Output Mode The mode in which Kafka Reader parses messages in the Kafka topic. Valid values:
    • Single-row Output: Kafka Reader parses messages as unstructured strings or JSON objects. One message is parsed into one output record.
    • Multi-row Output: Kafka Reader parses messages as JSON arrays. Each array element is parsed into one output record. Therefore, one message may be parsed into multiple output records.
    Note This parameter is supported only in some regions and will be supported in other regions in the future.
    Path of Array The path of the JSON array in the value of the Kafka message. This parameter is displayed only if you set the Output Mode parameter to Multi-row Output. If you want to reference the fields in a specific JSON object, you can specify a value for this parameter in the a.a1 format. If you want to reference the fields in a specific JSON array, you can specify a value for this parameter in the a[0].a1 format. If you leave this parameter empty, Kafka Reader parses the value of a message as a JSON array.
    Configuration parameters The extended parameters that you can configure when you create a Kafka producer. For example, you can configure the bootstrap.servers, auto.commit.interval.ms, and session.timeout.ms parameters. For more information about the parameters supported by Kafka clusters of different versions for Kafka producers, see Documentation of Apache Kafka. You can configure parameters in KafkaConfig to control the data write behavior of a Kafka producer. For a real-time synchronization node that synchronizes data from a Kafka data source, the default value of the acks parameter for a Kafka producer is all. If you require high performance for a Kafka producer, you can specify a different value for the acks parameter. Valid values of the acks parameter:
    • 0: A Kafka producer does not acknowledge whether data is written to the destination.
    • 1: A Kafka producer acknowledges that the write operation is successful if data is written to the primary replica.
    • all: A Kafka producer acknowledges that the write operation is successful if data is written to all replicas.
    Output Fields The output fields, which can be customized.
    • Click Add more fields. In the fields that appear, enter a field name and select a data type to customize a field.
      DataWorks provides two types of methods based on which Kafka Reader obtains values for fields from messages. You can click the Arrow icon to switch between the two types of methods.
      • Default methods:
        • value: the values of messages
        • key: the keys of messages
        • partition: the IDs of partitions
        • offset: the offsets of messages
        • timestamp: the timestamps of messages, in milliseconds
        • headers: the headers of messages
      • JSON-based parsing: You can use the .Sub-field or [Element in an array] syntax to obtain the content in the JSON format. To ensure that the values of fields are compatible with historical logic, you can use a string that starts with two underscores (_), such as __value__, to obtain specific values for fields from messages. The following code shows the data in a sample Kafka message:
        {
              "a": {
              "a1": "hello"
              },
              "b": "world",
              "c":[
                    "xxxxxxx",
                    "yyyyyyy"
                    ],
              "d":[
                    {
                          "AA":"this",
                          "BB":"is_data"
                    },
                    {
                          "AA":"that",
                          "BB":"is_also_data"
                    }
                ]
        }
        • You can use one of the following methods based on the preceding code:
          • If you want to read the values of messages, use __value__.
          • If you want to read the keys of messages, use __key__.
          • If you want to read the partitions that store messages, use __partition__.
          • If you want to read the offsets of messages, use __offset__.
          • If you want to read the timestamps of messages, use __timestamp__.
          • If you want to read the headers of messages, use __headers__.
          • If you want to read "hello" in the a1 field, use a.a1.
          • If you want to read "world" in the b field, use b.
          • If you want to read "yyyyyyy" in the c field, use c[1].
          • If you want to read "this" in the AA field, use d[0].AA.
    • To remove a field, move the pointer over the field and click the Delete icon.
  7. Click the Save icon in the top toolbar.
    Note Kafka Reader in each synchronization node can read data from only one topic.