Kafka Reader allows you to read data from Kafka by using the Java SDK for Kafka.

Apache Kafka is a fast, scalable, and distributed message system with high throughput and fault-tolerance. It is used to publish and subscribe to messages. With the high-throughput, built-in partition, data replica, and fault-tolerance features, Kafka is suitable for processing a large amount of messages.

How it works

Kafka Reader reads data from Kafka by using the following version of the Java SDK for Kafka:

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.0.0</version>
</dependency>

Kafka Reader uses the following methods of the Java SDK for Kafka. For more information about the functions and limits of the SDK, see the Kafka official documentation.

  • Use KafkaConsumer as the client that consumes messages.
    org.apache.kafka.clients.consumer.KafkaConsumer<K,V>
  • Query offsets based on the UNIX timestamp.
    Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
  • Seek to the start offset.
    public void seekToBeginning(Collection<TopicPartition> partitions)
  • Seek to the end offset.
    public void seekToEnd(Collection<TopicPartition> partitions)
  • Seek to the specified offset.
    public void seek(TopicPartition partition,long offset)
  • Poll for data from the server.
    public ConsumerRecords<K,V> poll(final Duration timeout)
Note Kafka Reader automatically commits offsets when consuming data.

Parameters

Parameter Description Required
server The broker server address of Kafka in the format of ip:port. Yes
topic The name of the topic from which data is read in Kafka. Kafka maintains feeds of messages in categories called topics. Yes
column The columns to be read by Kafka Reader. Constant, data, and property columns are supported.
  • Constant column: a column enclosed in single quotation marks (' '), such as ["'abc'", "'123'"].
  • Data column
    • If your data is in JSON format, you can obtain JSON properties, such as ["event_id"].
    • If your data is in JSON format, you can obtain nested child JSON properties, such as ["tag.desc"].
  • Property column
    • __key__: the key of the message.
    • __value__: the complete content of the message.
    • __partition__: the partition where the message resides.
    • __headers__: the header of the message.
    • __offset__: the offset of the message.
    • __timestamp__: the timestamp of the message.

    Example:

    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]
Yes
keyType The key type in Kafka. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT. Yes
valueType The value type in Kafka. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT. Yes
beginDateTime The start timestamp for consuming data. This parameter defines the left boundary of the left-closed and right-open interval. The value is a time string in the yyyymmddhhmmss format. It can be used in combination with scheduling properties. For more information, see Scheduling properties. Kafka 0.10.2 and later versions support this parameter. You can set either beginDateTime or beginOffset, but not both.
Note The beginDateTime and endDateTime parameters must be used in pairs.
endDateTime The end timestamp for consuming data. This parameter defines the right boundary of the left-closed and right-open interval. The value is a time string in the yyyymmddhhmmss format. It can be used in combination with scheduling properties. For more information, see Scheduling properties. Kafka 0.10.2 and later versions support this parameter. You can set either endDateTime or endOffset, but not both.
Note The beginDateTime and endDateTime parameters must be used in pairs.
beginOffset The offset where data consumption starts, in one of the following formats:
  • Numeric string: starts to consume data from the specified offset, such as 15553274.
  • seekToBeginning: starts to consume data from the start offset.
  • seekToLast: starts to consume data from the last offset.
  • seekToEnd: starts to consume data from the end offset. The read data is null.
You can set either beginOffset or beginDateTime, but not both.
endOffset The offset where data consumption ends. You can set either endOffset or endDateTime, but not both.
skipExceedRecord Kafka uses public ConsumerRecords<K, V> poll(final Duration timeout) for data consumption. However, the data obtained in a poll may be beyond the boundary specified by the endOffset or endDateTime parameter. skipExceedRecord specifies whether to export such excess data to the destination. Kafka automatically commits offsets for data consumption. Therefore, we recommend that you set the skipExceedRecord parameter as follows:
  • Set skipExceedRecor to false for versions earlier than Kafka 0.10.2.
  • Set skipExceedRecord to true for Kafka 0.10.2 and later versions.
No. Default value: false.
partition The partition of the topic in Kafka. Generally, Kafka Reader reads data in a specified offset interval from multiple partitions of a topic. If you want Kafka Reader to read data from a specified partition, you can set the partition parameter. No. Default value: none.
kafkaConfig The extended parameters specified when KafkaConsumer is created, such as bootstrap.servers, auto.commit.interval.ms, and session.timeout.ms. By setting parameters in kafkaConfig, you can control the data consumption behaviors of KafkaConsumer. No
The following extended parameters can be configured in kafkaConfig:
  • fetch.min.bytes: the minimum number of bytes that the consumer can obtain from the broker. The broker waits until the number of bytes reaches the specified value before returning data to the consumer.
  • fetch.max.wait.ms: the maximum duration during which the consumer waits for data from the broker. Default value: 500 ms. The broker returns data to the consumer so long as either the minimum number of bytes or the maximum waiting duration reaches the values specified by the fetch.min.bytes and fetch.max.wait.ms parameters.
  • max.partition.fetch.bytes: the maximum number of bytes in each partition returned by the broker to the consumer. Default value: 1 MB.
  • session.timeout.ms: the timeout duration for disconnecting the consumer from the broker. After that, the consumer no longer receives data from the broker. Default value: 30s.
  • auto.offset.reset: the handling method when no offset is available or the offset is invalid. This occurs if the consumer times out or the record with the specified offset has expired and been deleted. Default value: latest, which specifies that the consumer reads data from the latest record. You can change this value to earliest, which specifies that the consumer reads data from the start offset.
  • max.poll.records: the number of messages that can be returned for a poll.
  • key.deserializer: the method used to deserialize the message key, such as org.apache.kafka.common.serialization.StringDeserializer.
  • value.deserializer: the method used to deserialize the message value, such as org.apache.kafka.common.serialization.StringDeserializer.
  • ssl.truststore.location: the path of the Secure Sockets Layer (SSL) root certificate.
  • ssl.truststore.password: the password of the truststore in the SSL root certificate. If Message Queue for Apache Kafka is used, set this parameter to KafkaOnsClient.
  • security.protocol: the access protocol. Only Simple Authentication and Security Layer (SASL) SSL is supported.
  • sasl.mechanism: the SASL authentication mode. If Message Queue for Apache Kafka is used, set this parameter to PLAIN.

Example:

{
    "group.id": "demo_test",
    "java.security.auth.login.config": "/home/admin/kafka_client_jaas.conf",
    "ssl.truststore.location": "/home/admin/kafka.client.truststore.jks",
    "ssl.truststore.password": "KafkaOnsClient",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "PLAIN",
    "ssl.endpoint.identification.algorithm": ""
}

Configure Kafka Reader by using the code editor

In the following code, a node is configured to read data from a Kafka topic.

{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "false"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": false,
            "concurrent": 1,
            "dmu": 1
        }
    }
}