Kafka Reader allows you to read data from Kafka by using Kafka SDK for Java.

Background information

Apache Kafka is a fast, scalable, and distributed message system with high throughput and fault tolerance. It is used to publish and subscribe to messages. Kafka features high throughput, built-in partitioning, data replica, and fault tolerance, and can be used to process a large number of messages.
Notice Kafka Reader supports only exclusive resource groups for Data Integration, but not the shared resource group or custom resource groups. For more information, see Exclusive resource groups for Data Integration and Create a custom resource group for Data Integration.

How it works

Kafka Reader reads data from Kafka by using Kafka SDK for Java. The following version of Kafka SDK for Java is used:
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.0.0</version>
</dependency>
Kafka Reader uses the following methods of Kafka SDK for Java. For more information, visit the Kafka website.
  • Use the Kafka consumer as the client that consumes messages.
    org.apache.kafka.clients.consumer.KafkaConsumer<K,V>
  • Query offsets based on the UNIX timestamp.
    Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
  • Seek to the start offset.
    public void seekToBeginning(Collection<TopicPartition> partitions)
  • Seek to the end offset.
    public void seekToEnd(Collection<TopicPartition> partitions)
  • Seek to the specified offset.
    public void seek(TopicPartition partition,long offset)
  • Poll for data from the server.
    public ConsumerRecords<K,V> poll(final Duration timeout)
Note Kafka Reader automatically commits offsets when it consumes data.

Parameters

Parameter Description Required
server The address of the Kafka broker in the format of ip:port.

You can specify only one broker address for the server parameter. However, make sure that DataWorks can use the address to access all the brokers in the Kafka cluster.

Yes
topic The topic from which data is to be read in Kafka. Kafka maintains feeds of messages in topics. Yes
column The columns to be read by Kafka Reader. Constant, data, and property columns are supported.
  • Constant column: a column that is enclosed in single quotation marks (' '), such as ["'abc'", "'123'"].
  • Data column:
    • If your data is in JSON format, you can obtain JSON properties, such as ["event_id"].
    • If your data is in JSON format, you can obtain nested child JSON properties, such as ["tag.desc"].
  • Property column:
    • __key__: the key of the message.
    • __value__: the complete content of the message.
    • __partition__: the partition where the message resides.
    • __headers__: the header of the message.
    • __offset__: the offset of the message.
    • __timestamp__: the timestamp of the message.
    Example:
    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]
Yes
keyType The type of the key in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT. Yes
valueType The type of the value in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT. Yes
beginDateTime The start timestamp to consume data. This parameter defines the left boundary of the left-closed, right-open interval. The value is a time string in the yyyymmddhhmmss format. It can be used in combination with scheduling properties. For more information, see Scheduling parameters.
Note Kafka 0.10.2 and later versions support this parameter.
You can use only one of the beginDateTime and beginOffset parameters.
Note The beginDateTime and endDateTime parameters must be used in pairs.
endDateTime The end timestamp to consume data. This parameter defines the right boundary of the left-closed, right-open interval. The value is a time string in the yyyymmddhhmmss format. It can be used in combination with scheduling properties. For more information, see Scheduling parameters.
Note Kafka 0.10.2 and later versions support this parameter.
You can use only one of the endDateTime and endOffset parameters.
Note The beginDateTime and endDateTime parameters must be used in pairs.
beginOffset The offset where data consumption starts. The following formats are supported:
  • Numeric string: starts to consume data from the specified offset, such as 15553274.
  • seekToBeginning: starts to consume data from the start offset.
  • seekToLast: starts to consume data from the last offset.
  • seekToEnd: starts to consume data from the end offset. The data to be read is null.
You can use only one of the beginOffset and beginDateTime parameters.
endOffset The offset where data consumption ends. This parameter specifies the offset where data consumption ends. You can use only one of the endOffset and endDateTime parameters.
skipExceedRecord Kafka uses public ConsumerRecords<K, V> poll(final Duration timeout) for data consumption. However, the data obtained in a poll may be beyond the boundary that is specified by the endOffset or endDateTime parameter. The skipExceedRecord parameter specifies whether to export the excess data to the destination table. Kafka automatically commits offsets for data consumption. Therefore, we recommend that you perform one of the following operations:
  • Set the skipExceedRecord parameter to false for versions earlier than Kafka 0.10.2.
  • Set the skipExceedRecord parameter to true for Kafka 0.10.2 and later versions.
No. Default value: false.
partition A specific topic in Kafka can contain multiple partitions. Generally, Kafka Reader reads data in a specified offset interval from multiple partitions of a topic. If you want Kafka Reader to read data from a specified partition, you can set the partition parameter to a specific value. No. This parameter does not have a default value.
kafkaConfig The extended parameters that are specified when the Kafka consumer is created, such as bootstrap.servers, auto.commit.interval.ms, and session.timeout.ms. You can set the parameters in kafkaConfig to manage data consumption of the Kafka consumer. No
The following table describes the parameters in kafkaConfig.
Parameter Description
fetch.min.bytes The minimum number of bytes that the consumer can obtain from the broker. The broker waits until the number of bytes reaches the specified value before it returns data to the consumer.
fetch.max.wait.ms The maximum duration during which the consumer waits for data from the broker. Default value: 500. Unit: milliseconds. The broker returns data to the consumer when one of the conditions that are specified by the fetch.min.bytes and fetch.max.wait.ms parameters is met.
max.partition.fetch.bytes The maximum number of bytes in each partition that the broker returns to the consumer. Default value: 1. Unit: MB.
session.timeout.ms The timeout period for disconnecting the consumer from the server. After that, the consumer no longer receives data from the server. Default value: 30. Unit: seconds.
auto.offset.reset The handling method when no offset is available or the offset is invalid. This occurs if the consumer times out or the record with the specified offset has expired and been deleted. By default, this parameter is set to latest, which specifies that the consumer reads data from the latest record. You can set this parameter to earliest, which specifies that the consumer reads data from the start offset.
max.poll.records The number of messages that can be returned for a single poll.
key.deserializer The method that is used to deserialize the message key, such as org.apache.kafka.common.serialization.StringDeserializer.
value.deserializer The method that is used to deserialize the message value, such as org.apache.kafka.common.serialization.StringDeserializer.
ssl.truststore.location The path of the Secure Sockets Layer (SSL) root certificate.
ssl.truststore.password The password of the truststore in the SSL root certificate. If Message Queue for Apache Kafka is used, set this parameter to KafkaOnsClient.
security.protocol The access protocol. Set the value to SASL_SSL.
sasl.mechanism The simple authentication and security layer (SASL) authentication mode. If Message Queue for Apache Kafka is used, set this parameter to PLAIN.
Example:
{
    "group.id": "demo_test",
    "java.security.auth.login.config": "/home/admin/kafka_client_jaas.conf",
    "ssl.truststore.location": "/home/admin/kafka.client.truststore.jks",
    "ssl.truststore.password": "KafkaOnsClient",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "PLAIN",
    "ssl.endpoint.identification.algorithm": ""
}

Sample code in the code editor mode

For more information about the code editor mode, see Create a sync node by using the code editor.

In the following JSON code, a node is configured to read data from a Kafka topic:
{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "false"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": false,
            "concurrent": 1
        }
    }
}