Kafka Reader allows you to read data from Kafka by using Kafka SDK for Java.
Background information
Apache Kafka is a fast, scalable, and distributed message system with high throughput
and fault tolerance. It is used to publish and subscribe to messages. Kafka features
high throughput, built-in partitioning, data replica, and fault tolerance, and can
be used to process a large number of messages.
Notice Kafka Reader supports only exclusive resource groups for Data Integration, but not
the shared resource group or custom resource groups. For more information, see Exclusive resource groups for Data Integration and Create a custom resource group for Data Integration.
How it works
Kafka Reader reads data from Kafka by using Kafka SDK for Java. The following version of Kafka SDK for Java is used:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
Kafka Reader uses the following methods of Kafka SDK for Java. For more information,
visit the Kafka website.
- Use the Kafka consumer as the client that consumes messages.
org.apache.kafka.clients.consumer.KafkaConsumer<K,V>
- Query offsets based on the UNIX timestamp.
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
- Seek to the start offset.
public void seekToBeginning(Collection<TopicPartition> partitions)
- Seek to the end offset.
public void seekToEnd(Collection<TopicPartition> partitions)
- Seek to the specified offset.
public void seek(TopicPartition partition,long offset)
- Poll for data from the server.
public ConsumerRecords<K,V> poll(final Duration timeout)
Note Kafka Reader automatically commits offsets when it consumes data.
Parameters
Parameter | Description | Required |
---|---|---|
server | The address of the Kafka broker in the format of ip:port.
You can specify only one broker address for the server parameter. However, make sure that DataWorks can use the address to access all the brokers in the Kafka cluster. |
Yes |
topic | The topic from which data is to be read in Kafka. Kafka maintains feeds of messages in topics. | Yes |
column | The columns to be read by Kafka Reader. Constant, data, and property columns are supported.
|
Yes |
keyType | The type of the key in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT. | Yes |
valueType | The type of the value in the Kafka topic. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT. | Yes |
beginDateTime | The start timestamp to consume data. This parameter defines the left boundary of the
left-closed, right-open interval. The value is a time string in the yyyymmddhhmmss format. It can be used in combination with scheduling properties. For more information, see Scheduling parameters.
Note Kafka 0.10.2 and later versions support this parameter.
|
You can use only one of the beginDateTime and beginOffset parameters.
Note The beginDateTime and endDateTime parameters must be used in pairs.
|
endDateTime | The end timestamp to consume data. This parameter defines the right boundary of the
left-closed, right-open interval. The value is a time string in the yyyymmddhhmmss format. It can be used in combination with scheduling properties. For more information, see Scheduling parameters.
Note Kafka 0.10.2 and later versions support this parameter.
|
You can use only one of the endDateTime and endOffset parameters.
Note The beginDateTime and endDateTime parameters must be used in pairs.
|
beginOffset | The offset where data consumption starts. The following formats are supported:
|
You can use only one of the beginOffset and beginDateTime parameters. |
endOffset | The offset where data consumption ends. This parameter specifies the offset where data consumption ends. | You can use only one of the endOffset and endDateTime parameters. |
skipExceedRecord | Kafka uses public ConsumerRecords<K, V> poll(final Duration timeout) for data consumption. However, the data obtained in a poll may be beyond the boundary
that is specified by the endOffset or endDateTime parameter. The skipExceedRecord
parameter specifies whether to export the excess data to the destination table. Kafka
automatically commits offsets for data consumption. Therefore, we recommend that you
perform one of the following operations:
|
No. Default value: false. |
partition | A specific topic in Kafka can contain multiple partitions. Generally, Kafka Reader reads data in a specified offset interval from multiple partitions of a topic. If you want Kafka Reader to read data from a specified partition, you can set the partition parameter to a specific value. | No. This parameter does not have a default value. |
kafkaConfig | The extended parameters that are specified when the Kafka consumer is created, such as bootstrap.servers, auto.commit.interval.ms, and session.timeout.ms. You can set the parameters in kafkaConfig to manage data consumption of the Kafka consumer. | No |
The following table describes the parameters in kafkaConfig.
Parameter | Description |
---|---|
fetch.min.bytes | The minimum number of bytes that the consumer can obtain from the broker. The broker waits until the number of bytes reaches the specified value before it returns data to the consumer. |
fetch.max.wait.ms | The maximum duration during which the consumer waits for data from the broker. Default value: 500. Unit: milliseconds. The broker returns data to the consumer when one of the conditions that are specified by the fetch.min.bytes and fetch.max.wait.ms parameters is met. |
max.partition.fetch.bytes | The maximum number of bytes in each partition that the broker returns to the consumer. Default value: 1. Unit: MB. |
session.timeout.ms | The timeout period for disconnecting the consumer from the server. After that, the consumer no longer receives data from the server. Default value: 30. Unit: seconds. |
auto.offset.reset | The handling method when no offset is available or the offset is invalid. This occurs if the consumer times out or the record with the specified offset has expired and been deleted. By default, this parameter is set to latest, which specifies that the consumer reads data from the latest record. You can set this parameter to earliest, which specifies that the consumer reads data from the start offset. |
max.poll.records | The number of messages that can be returned for a single poll. |
key.deserializer | The method that is used to deserialize the message key, such as org.apache.kafka.common.serialization.StringDeserializer. |
value.deserializer | The method that is used to deserialize the message value, such as org.apache.kafka.common.serialization.StringDeserializer. |
ssl.truststore.location | The path of the Secure Sockets Layer (SSL) root certificate. |
ssl.truststore.password | The password of the truststore in the SSL root certificate. If Message Queue for Apache Kafka is used, set this parameter to KafkaOnsClient. |
security.protocol | The access protocol. Set the value to SASL_SSL. |
sasl.mechanism | The simple authentication and security layer (SASL) authentication mode. If Message Queue for Apache Kafka is used, set this parameter to PLAIN. |
Example:
{
"group.id": "demo_test",
"java.security.auth.login.config": "/home/admin/kafka_client_jaas.conf",
"ssl.truststore.location": "/home/admin/kafka.client.truststore.jks",
"ssl.truststore.password": "KafkaOnsClient",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"ssl.endpoint.identification.algorithm": ""
}
Sample code in the code editor mode
For more information about the code editor mode, see Create a sync node by using the code editor.
In the following JSON code, a node is configured to read data from a Kafka topic:
{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "host:9093",
"column": [
"__key__",
"__value__",
"__partition__",
"__offset__",
"__timestamp__",
"'123'",
"event_id",
"tag.desc"
],
"kafkaConfig": {
"group.id": "demo_test"
},
"topic": "topicName",
"keyType": "ByteArray",
"valueType": "ByteArray",
"beginDateTime": "20190416000000",
"endDateTime": "20190416000006",
"skipExceedRecord": "false"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": false,
"concurrent": 1
}
}
}