Kafka Reader reads data from Kafka by using Kafka SDK for Java.

Background information

Apache Kafka is a fast, scalable, high-throughput, and distributed messaging system that supports fault tolerance. This system is used to publish and subscribe to messages. Kafka provides built-in partitions, supports data replicas, and can be used to process a large number of messages.
Notice
  • Kafka Reader can be used to read data from Message Queue for Apache Kafka data sources and self-managed Kafka data sources. However, the versions of self-managed Kafka data sources must range from 0.10.2 to 2.2.x.
  • Self-managed Kafka data sources whose versions are earlier than 0.10.2 do not support the query of offsets of partition data and do not support timestamps. Data synchronization cannot be performed.
  • Kafka Reader supports only exclusive resource groups for Data Integration, but not the shared resource group or custom resource groups for Data Integration. For more information, see Create and use an exclusive resource group for Data Integration and Create a custom resource group for Data Integration.

How it works

Kafka Reader reads data from Kafka by using Kafka SDK for Java of the following version:
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.0.0</version>
</dependency>
The following methods of Kafka SDK for Java are used by Kafka Reader. For more information, see Kafka documentation.
  • Use a Kafka consumer as a client that consumes messages.
    org.apache.kafka.clients.consumer.KafkaConsumer<K,V>
  • Query offsets based on the UNIX timestamp.
    Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
  • Seek to the start offset.
    public void seekToBeginning(Collection<TopicPartition> partitions)
  • Seek to the end offset.
    public void seekToEnd(Collection<TopicPartition> partitions)
  • Seek to a specific offset.
    public void seek(TopicPartition partition,long offset)
  • Poll for data from the server.
    public ConsumerRecords<K,V> poll(final Duration timeout)
Note Kafka Reader automatically commits offsets when it consumes data.

Parameters

Parameter Description Required
datasource The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. Yes
server The address of a Kafka broker in your Kafka cluster. Specify the address in the following format: IP address:Port number.

You can specify one or more broker addresses in the server parameter. You must make sure that DataWorks can use the specified addresses to access the related brokers.

Yes
topic The name of the Kafka topic from which you want to read data. Kafka maintains feeds of messages in topics. Yes
column The names of the columns from which you want to read data. Constant columns, data columns, and property columns are supported.
  • Constant column: a column whose name is enclosed in single quotation marks ('). Example: ["'abc'", "'123'"].
  • Data column:
    • If your data is in the JSON format, you can obtain JSON properties. Example: ["event_id"].
    • If your data is in the JSON format, you can obtain the properties of nested objects in the data. Example: ["tag.desc"].
  • Property column:
    • __key__: the key of a Kafka record.
    • __value__: the complete content of a Kafka record.
    • __partition__: the partition where a Kafka record resides.
    • __headers__: the header of a Kafka record.
    • __offset__: the offset of a Kafka record.
    • __timestamp__: the timestamp of a Kafka record.
    The following code provides a configuration example of the column parameter:
    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]
Yes
keyType The data type of the key in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT. Yes
valueType The data type of the value in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT. Yes
beginDateTime The start time of data consumption. This parameter specifies the left boundary of a left-closed, right-open interval. Specify the time in the yyyyMMddHHmmss format. This parameter can be used together with the scheduling time parameters in DataWorks. For more information, see Overview of scheduling parameters.
Note This parameter is supported by Kafka 0.10.2 and later.
You must configure either the beginDateTime or beginOffset parameter.
Note The beginDateTime and endDateTime parameters must be used in pairs.
endDateTime The end time of data consumption. This parameter specifies the right boundary of a left-closed, right-open interval. Specify the time in the yyyyMMddHHmmss format. This parameter can be used together with the scheduling time parameters in DataWorks. For more information, see Overview of scheduling parameters.
Note This parameter is supported by Kafka 0.10.2 and later.
You must configure either the endDateTime or endOffset parameter.
Note The beginDateTime and endDateTime parameters must be used in pairs.
beginOffset The offset from which data consumption starts. The following formats are supported:
  • Numeric string: Data consumption starts from the specified offset, such as 15553274.
  • seekToBeginning: Data is consumed from the start offset.
  • seekToLast: Data is consumed from the last offset.
  • seekToEnd: Data is consumed from the end offset. In this case, null may be read.
You must configure either the beginOffset or beginDateTime parameter.
endOffset The offset at which data consumption ends. You must configure either the endOffset or endDateTime parameter.
skipExceedRecord The Kafka consumer uses public ConsumerRecords<K, V> poll(final Duration timeout) to poll for data. However, the data obtained in a poll may be beyond the boundary that is specified by the endOffset or endDateTime parameter. The skipExceedRecord parameter specifies whether to poll for the excess data. Kafka Reader automatically commits offsets for data consumption. Therefore, we recommend that you configure the skipExceedRecord parameter based on the following instructions:
  • If the data source from which you want to read data is of a version earlier than Kafka 0.10.2, set the skipExceedRecord parameter to false.
  • If the data source from which you want to read data is of Kafka 0.10.2 or later, set the skipExceedRecord parameter to true.
No. Default value: false.
partition If a Kafka topic contains multiple partitions, Kafka Reader reads data in a specific offset interval from the partitions. If you want Kafka Reader to read data from a specific partition, you can use the partition parameter to specify the partition. No. This parameter does not have a default value.
kafkaConfig The extended parameters that are specified when you create the Kafka consumer, such as bootstrap.servers, auto.commit.interval.ms, and session.timeout.ms. You can configure the parameters in kafkaConfig to manage the data consumption of the Kafka consumer. No
encoding If the keyType or valueType parameter is set to STRING, strings are parsed based on the value of the encoding parameter. No. Default value: UTF-8.
waitTIme The maximum duration during which the Kafka consumer waits to poll for data from Kafka each time. Unit: seconds. No. Default value: 60.
stopWhenPollEmpty You can set this parameter to true or false. If you set this parameter to true, the Kafka consumer may poll for null from Kafka because all data in the Kafka topic is polled or an error occurs on the network or Kafka cluster. If the Kafka consumer polls for null, the related synchronization node immediately stops. If you set this parameter to false and the Kafka consumer polls for null, the Kafka consumer attempts to poll for data until the data is obtained. No. Default value: true.
The following table describes the parameters in kafkaConfig.
Parameter Description
fetch.min.bytes The minimum number of bytes that the consumer can obtain from the broker. The broker returns data to the consumer only after the number of bytes reaches the specified value.
fetch.max.wait.ms The maximum duration during which the consumer waits for data from the broker. Default value: 500. Unit: milliseconds. The broker returns data to the consumer when one of the conditions that are specified by the fetch.min.bytes and fetch.max.wait.ms parameters is met.
max.partition.fetch.bytes The maximum number of bytes in each partition that the broker returns to the consumer. Default value: 1. Unit: MB.
session.timeout.ms The timeout period of a connection session between the consumer and the broker. If this limit is exceeded, the consumer can no longer receive data from the broker. Default value: 30. Unit: seconds.
auto.offset.reset The handling method that is used when no offset is available or the offset is invalid. This occurs if the consumer times out or the record with the specified offset expires and is deleted. The default value of this parameter is latest, which indicates that the consumer reads data from the latest record. You can set this parameter to earliest, which indicates that the consumer reads data from the start offset.
max.poll.records The number of Kafka records that can be returned for a single poll.
key.deserializer The method that is used to deserialize the Kafka record key, such as org.apache.kafka.common.serialization.StringDeserializer.
value.deserializer The method that is used to deserialize the Kafka record value, such as org.apache.kafka.common.serialization.StringDeserializer.
ssl.truststore.location The path of the Secure Sockets Layer (SSL) root certificate.
ssl.truststore.password The password of the truststore in the SSL root certificate. If you use Message Queue for Apache Kafka, set this parameter to KafkaOnsClient.
security.protocol The access protocol. Set this parameter to SASL_SSL.
sasl.mechanism The simple authentication and security layer (SASL) authentication mode. If you use Message Queue for Apache Kafka, set this parameter to PLAIN.
java.security.auth.login.config The path of the SASL authentication file.

Configure Kafka Reader by using the codeless UI

  1. Configure data sources. Configure Source and Target for the synchronization node. ss
    Parameter Description
    Connection The name of the data source from which you want to read data. This parameter is equivalent to the datasource parameter that is described in the preceding section.
    Topic The name of the Kafka topic from which you want to read data. This parameter is equivalent to the topic parameter that is described in the preceding section.
    Consumer Group ID The ID of the consumer group to which the Kafka consumer that you want to initialize belongs. This parameter is equivalent to the group.id field in the JSON code of kafkaConfig that is described in the preceding section.

    If you want a data synchronization node in Data Integration to consume data from the correct offset, you must set this parameter to a value that is unique to the data synchronization node. If you do not specify a value for this parameter, a random string that starts with datax_ is automatically generated for group.id each time data is synchronized.

    Kafka Version The Kafka version. Valid values: >=0.10.2 and <0.10.2. The value of this parameter affects the options available for the Read From and Read To parameters.
    Read From The offset from which data consumption starts. This parameter is equivalent to the beginOffset parameter that is described in the preceding section.
    • Specific Offset: A UNIX timestamp is automatically generated and used as the data timestamp of each Kafka record that you want to write. The data synchronization node converts the time in the yyyyMMddHHmmss format into a UNIX timestamp, and reads data from the earliest offset in the partitions whose data timestamp is later than or equal to the UNIX timestamp in a Kafka topic. For example, you can set Specific Offset to 20210125000000, which is the same as the value of beginDateTime.
    • Earliest Offset: Kafka Reader starts to read data from the first Kafka record that remains in each partition of a Kafka topic.
    • Consumer Group Current Offset: Kafka Reader starts to read data from the offset that is saved based on the consumer group ID that you specify on the configuration page of the data synchronization node. In most cases, the offset is the position at which the previous data read stops. We recommend that you configure only one data synchronization node in Data Integration for the consumer that uses this consumer group ID. If multiple synchronization nodes in Data Integration use the same consumer group ID, data loss may occur. If you want to use the current offset of the consumer group as the start offset, you must specify a consumer group ID. If you do not specify the consumer group ID, a random consumer group ID is generated for the data synchronization node, and Kafka Reader starts to read data from the earliest offset in a partition because no offset is saved based on the generated consumer group ID.
    Note If you set Read From to Earliest Offset or Consumer Group Current Offset, the value of the beginOffset parameter is replaced with the value of the Earliest Offset or Consumer Group Current Offset parameter. If you set Read From to Specific Offset, the value of the beginOffset parameter is not replaced with the value of the Specific Offset parameter after node configurations are generated. Instead, the beginDateTime parameter value that is replaced with the value of the Start Time parameter determines the offset from which data consumption starts.
    Start Time The start time of data consumption. This parameter is equivalent to the beginDateTime parameter that is described in the preceding section. If you set Read From to Specific Offset, the value of this parameter is a time string in the yyyyMMddHHmmss format. This parameter specifies the left boundary of a left-closed, right-open interval, such as 20210513000000. You can use this parameter with the scheduling time parameters in DataWorks. For more information, see Configure scheduling parameters.
    Read To The offset at which data consumption ends. This parameter is equivalent to the endOffset parameter that is described in the preceding section.
    Note If you set Read To to Latest Offset, the value of the endOffset parameter is replaced with the value of the Latest Offset parameter. If you set Read To to Specific Offset, the value of the endOffset parameter is not replaced with the value of the Specific Offset parameter after node configurations are generated. Instead, the endDateTime parameter value that is replaced by using an end time table determines the offset at which data consumption ends.
    End Time The end time of data consumption. This parameter is equivalent to the endDateTime parameter that is described in the preceding section. If you set Read To to Specific Offset, the value of this parameter is a time string in the yyyyMMddHHmmss format. This parameter specifies the right boundary of a left-closed, right-open interval, such as 20210514000000. You can use this parameter with the scheduling time parameters in DataWorks. For more information, see Configure scheduling parameters.
    Time Zone The time zone that corresponds to the interval if you set Read From to Specific Offset and set Read To to Specific Offset.
    Key Type The data type of the key in the Kafka topic. This parameter is equivalent to the keyType parameter that is described in the preceding section. The value of this parameter determines the value of the key.deserializer parameter when the Kafka consumer is initialized. Valid values: STRING, BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.
    Value Type The data type of the value in the Kafka topic. This parameter is equivalent to the valueType parameter that is described in the preceding section. The value of this parameter determines the value of the value.deserializer parameter when the Kafka consumer is initialized. Valid values: STRING, BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.
    Encoding The encoding method that is used to read data when the Key Type or Value Type parameter is set to STRING. This parameter is equivalent to the encoding parameter that is described in the preceding section.
    Exit Strategy
    • If you set this parameter to Exit when poll nothing in 1 minute, the Kafka consumer may poll for null from Kafka within 1 minute because all data in the Kafka topic is polled or an error occurs on the network or Kafka cluster. If the Kafka consumer polls for null, the synchronization node immediately stops. If you set this parameter to Exit when reach configured end offset or time and the Kafka consumer polls for null, the Kafka consumer attempts to poll for data until the data is obtained.
    • If you set this parameter to Exit when reach configured end offset or time, the synchronization node stops when the data timestamp or the configured end offset of the Kafka record that the synchronization node reads is reached. Otherwise, the Kafka consumer attempts to poll for data until the data is obtained.
    Auto Offset Reset The method that is used when no offset is available or the offset is invalid. This parameter is equivalent to the auto.offset.reset field in the JSON code of kafkaConfig that is described in the preceding section. We recommend that you set this parameter to none to ensure that all data is read.
    Read Batch Size The minimum number of bytes that the consumer can obtain from the broker. This parameter is equivalent to the fetch.min.bytes field in the JSON code of kafkaConfig that is described in the preceding section.
    Read Wait Time The maximum duration during which the consumer waits for data from the broker. This parameter is equivalent to the fetch.max.wait.ms field in the JSON code of kafkaConfig that is described in the preceding section.
    Read Timeout The timeout period of a connection session between the consumer and the broker. If this limit is exceeded, the consumer can no longer receive data from the broker. This parameter is equivalent to the session.timeout.ms field in the JSON code of kafkaConfig that is described in the preceding section.
  2. Configure field mapping. This operation is equivalent to specifying a value for the column parameter that is described in the preceding section. Fields in the source on the left side have a one-to-one mapping with fields in the destination on the right side. You can click Add to add a field. To remove a field, move the pointer over the field and click the Remove icon.
    The following table describes the Kafka columns from which you want to read data. The names of the columns start with two underscores (_).
    Source table field Description
    __key__ The key of a Kafka record.
    __value__ The value of a Kafka record.
    __partition__ The partition in which a Kafka record resides. The field value is an integer that is greater than or equal to 0.
    __headers__ The JSON string that is obtained after you serialize the header of a Kafka record.
    __offset__ The offset of a Kafka record. The field value is an integer that is greater than or equal to 0.
    __timestamp__ The timestamp of a Kafka record. Unit: milliseconds.
    You can also read data from other columns. Kafka Reader parses Kafka records as JSON strings and reads data from the source table field you specify, and a writer writes the data to the destination. In the following example, the source table field is name and Kafka Readers reads bob from this field and a writer writes bob to the destination table.
    {
      "data": {
        "name": "bob",
        "age": 35
      }
    }

    The following figure shows the field mappings between the source table and the destination table.

    Mappings
    Parameter Description
    Map Fields with the Same Name Click Map Fields with the Same Name to establish mappings between fields with the same name. The data types of the fields must match.
    Map Fields in the Same Line Click Map Fields in the Same Line to establish mappings between fields in the same row. The data types of the fields must match.
    Delete All Mappings Click Delete All Mappings to remove the mappings that are established.
    Auto Layout Click Auto Layout. Then, the system automatically sorts the fields based on specific rules.
    Change Fields Click the Change Fields icon. In the Change Fields dialog box, you can manually edit the fields in the source table. Each field occupies a row. The first and the last blank rows are included, whereas other blank rows are ignored.
    Add
    • Click Add to add a field. Take note of the following rules when you add a field: You can enter constants. Each constant must be enclosed in single quotation marks ('), such as 'abc' and '123'.
    • You can use scheduling parameters, such as ${bizdate}.
    • If the field that you entered cannot be parsed, the value of Type for the field is Unidentified.
  3. Configure channel control policies. Channel control
    Parameter Description
    Expected Maximum Concurrency The maximum number of parallel threads that the synchronization node uses to read data from the source or write data to the destination. You can configure the parallelism for the synchronization node on the codeless UI.
    Bandwidth Throttling Specifies whether to enable bandwidth throttling. You can enable bandwidth throttling and specify a maximum transmission rate to avoid heavy read workloads on the source. We recommend that you enable bandwidth throttling and set the maximum transmission rate to an appropriate value based on the configurations of the source.
    Dirty Data Records Allowed The maximum number of dirty data records allowed.
    Distributed Execution

    The distributed execution mode that allows you to split your node into pieces and distribute them to multiple Elastic Compute Service (ECS) instances for parallel execution. This speeds up synchronization. If you use a large number of parallel threads to run your synchronization node in distributed execution mode, excessive access requests are sent to the data sources. Therefore, before you use the distributed execution mode, you must evaluate the access load on the data sources. You can enable this mode only if you use an exclusive resource group for Data Integration. For more information about exclusive resource groups for Data Integration, see Exclusive resource groups for Data Integration and Create and use an exclusive resource group for Data Integration.

Configure Kafka Reader by using the code editor

For more information about how to configure a synchronization node by using the code editor, see Create a synchronization node by using the code editor.

In the following JSON code, a synchronization node is configured to read data from a Kafka topic:
{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "false"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": true,// Specifies whether to enable bandwidth throttling. The value false indicates that bandwidth throttling is disabled, and the value true indicates that bandwidth throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
            "concurrent": 1,// The maximum number of parallel threads.
            "mbps":"12"// The maximum transmission rate.
        }
    }
}

Use SASL authentication

If you want to use SASL or SSL for authentication, configure the SASL or SSL authentication mode when you configure a Kafka data source. For more information, see Add a Kafka data source.