Kafka Reader通过Kafka服务的Java SDK从Kafka读取数据。
背景信息
实现原理
Kafka Reader通过Kafka Java SDK读取Kafka中的数据,使用的日志服务Java SDK版本如下所示。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
主要涉及的Kafka SDK调用方法如下,详情请参见Kafka官方文档:
- 使用KafkaConsumer作为消息消费的客户端。
org.apache.kafka.clients.consumer.KafkaConsumer<K,V>
- 根据unix时间戳查询Kafka点位offSet。
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
- 定位到开始点位offSet。
public void seekToBeginning(Collection<TopicPartition> partitions)
- 定位到结束点位offSet。
public void seekToEnd(Collection<TopicPartition> partitions)
- 定位到指定点位offSet。
public void seek(TopicPartition partition,long offset)
- 客户端从服务端拉取poll数据。
public ConsumerRecords<K,V> poll(final Duration timeout)
说明 Kafka Reader消费数据使用了自动点位提交机制。
参数说明
参数 | 描述 | 是否必选 |
---|---|---|
server | Kafka的broker server地址,格式为ip:port。
您可以只配置一个server,但请务必保证Kafka集群中所有broker的IP地址都可以连通DataWorks。 |
是 |
topic | Kafka的Topic,是Kafka处理资源的消息源(feeds of messages)的聚合。 | 是 |
column | 需要读取的Kafka数据,支持常量列、数据列和属性列:
|
是 |
keyType | Kafka的Key的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
valueType | Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。 | 是 |
beginDateTime | 数据消费的开始时间位点,为时间范围(左闭右开)的左边界。yyyymmddhhmmss格式的时间字符串,可以配合时间属性使用。详情请参见调度参数。
说明 Kafka 0.10.2及以上的版本支持该功能。
|
需要和beginOffset二选一。
说明 beginDateTime和endDateTime配合使用。
|
endDateTime | 数据消费的结束时间位点,为时间范围(左闭右开)的右边界。yyyymmddhhmmss格式的时间字符串,可以配合时间属性使用。详情请参见调度参数。
说明 Kafka 0.10.2及以上的版本支持该功能。
|
需要和endOffset二选一。
说明 endDateTime和beginDateTime配合使用。
|
beginOffset | 数据消费的开始时间位点,您可以配置以下形式:
|
需要和beginDateTime二选一。 |
endOffset | 数据消费的结束位点,用于控制结束数据消费任务退出的时间。 | 需要和endDateTime二选一。 |
skipExceedRecord | Kafka使用public ConsumerRecords<K, V> poll(final Duration timeout) 消费数据,一次poll调用获取的数据可能在endOffset或者endDateTime之外。skipExceedRecord用于控制是否写出多余的数据至目的端。由于消费数据使用了自动点位提交,建议您:
|
否,默认值为false。 |
partition | Kafka的一个Topic有多个分区(partition),正常情况下数据同步任务是读取Topic(多个分区)一个点位区间的数据。您也可以指定partition,仅读取一个分区点位区间的数据。 | 否,无默认值。 |
kafkaConfig | 创建Kafka数据消费客户端KafkaConsumer可以指定扩展参数,例如bootstrap.servers、auto.commit.interval.ms、session.timeout.ms等,您可以基于kafkaConfig控制KafkaConsumer消费数据的行为。 | 否 |
kafkaConfig参数说明如下。
参数 | 描述 |
---|---|
fetch.min.bytes | 指定消费者从broker获取消息的最小字节数,即有足够的数据时,才将其返回给消费者。 |
fetch.max.wait.ms | 等待broker返回数据的最大时间,默认500毫秒。fetch.min.bytes和fetch.max.wait.ms先满足哪个条件,便按照该方式返回数据。 |
max.partition.fetch.bytes | 指定broker从每个partition中返回给消费者的最大字节数,默认为1 MB。 |
session.timeout.ms | 指定消费者不再接收服务之前,可以与服务器断开连接的时间,默认是30秒。 |
auto.offset.reset | 消费者在读取没有偏移量或者偏移量无效的情况下(因为消费者长时间失效,包含偏移量的记录已经过时并被删除)的处理方式。默认为latest(消费者从最新的记录开始读取数据),您可以更改为earliest(消费者从起始位置读取partition的记录)。 |
max.poll.records | 单次调用poll方法能够返回的消息数量。 |
key.deserializer | 消息Key的反序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer。 |
value.deserializer | 数据Value的反序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer。 |
ssl.truststore.location | SSL根证书的路径。 |
ssl.truststore.password | 根证书Store的密码。如果是Aliyun Kafka,则配置为KafkaOnsClient。 |
security.protocol | 接入协议,目前支持使用SASL_SSL协议接入。 |
sasl.mechanism | SASL鉴权方式,如果是Aliyun Kafka,使用PLAIN。 |
配置示例如下。
{
"group.id": "demo_test",
"java.security.auth.login.config": "/home/admin/kafka_client_jaas.conf",
"ssl.truststore.location": "/home/admin/kafka.client.truststore.jks",
"ssl.truststore.password": "KafkaOnsClient",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"ssl.endpoint.identification.algorithm": ""
}
脚本开发示例
使用脚本模式开发的详情请参见通过脚本模式配置任务。
从Kafka读取数据的JSON配置,如下所示。
{
"type": "job",
"steps": [
{
"stepType": "kafka",
"parameter": {
"server": "host:9093",
"column": [
"__key__",
"__value__",
"__partition__",
"__offset__",
"__timestamp__",
"'123'",
"event_id",
"tag.desc"
],
"kafkaConfig": {
"group.id": "demo_test"
},
"topic": "topicName",
"keyType": "ByteArray",
"valueType": "ByteArray",
"beginDateTime": "20190416000000",
"endDateTime": "20190416000006",
"skipExceedRecord": "false"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {
"print": false,
"fieldDelimiter": ","
},
"name": "Writer",
"category": "writer"
}
],
"version": "2.0",
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": false,
"concurrent": 1
}
}
}