Kafka Reader通过Kafka服务的Java SDK从Kafka读取数据。

背景信息

Apache Kafka是一个快速、可扩展、高吞吐和可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理的场景中使用。
注意 Kafka Reader仅支持使用新增和使用独享数据集成资源组,不支持使用公共资源组和自定义资源组

实现原理

Kafka Reader通过Kafka Java SDK读取Kafka中的数据,使用的日志服务Java SDK版本如下所示。
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.0.0</version>
</dependency>
主要涉及的Kafka SDK调用方法如下,详情请参见Kafka官方文档
  • 使用KafkaConsumer作为消息消费的客户端。
    org.apache.kafka.clients.consumer.KafkaConsumer<K,V>
  • 根据unix时间戳查询Kafka点位offSet。
    Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
  • 定位到开始点位offSet。
    public void seekToBeginning(Collection<TopicPartition> partitions)
  • 定位到结束点位offSet。
    public void seekToEnd(Collection<TopicPartition> partitions)
  • 定位到指定点位offSet。
    public void seek(TopicPartition partition,long offset)
  • 客户端从服务端拉取poll数据。
    public ConsumerRecords<K,V> poll(final Duration timeout)
说明 Kafka Reader消费数据使用了自动点位提交机制。

参数说明

参数 描述 是否必选
datasource 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须要与添加的数据源名称保持一致。
server Kafka的broker server地址,格式为ip:port

您可以只配置一个server,但请务必保证Kafka集群中所有broker的IP地址都可以连通DataWorks。

topic Kafka的Topic,是Kafka处理资源的消息源(feeds of messages)的聚合。
column 需要读取的Kafka数据,支持常量列、数据列和属性列:
  • 常量列:使用单引号包裹的列为常量列,例如["'abc'", "'123'"]
  • 数据列
    • 如果您的数据是一个JSON,支持获取JSON的属性,例如["event_id"]
    • 如果您的数据是一个JSON,支持获取JSON的嵌套子属性,例如["tag.desc"]
  • 属性列
    • __key__表示消息的key。
    • __value__表示消息的完整内容 。
    • __partition__表示当前消息所在分区。
    • __headers__表示当前消息headers信息。
    • __offset__表示当前消息的偏移量。
    • __timestamp__表示当前消息的时间戳。
    完整示例如下。
    "column": [
        "__key__",
        "__value__",
        "__partition__",
        "__offset__",
        "__timestamp__",
        "'123'",
        "event_id",
        "tag.desc"
        ]
keyType Kafka的Key的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
valueType Kafka的Value的类型,包括BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
beginDateTime 数据消费的开始时间位点,为时间范围(左闭右开)的左边界。yyyymmddhhmmss格式的时间字符串,可以配合时间属性使用。详情请参见配置调度参数
说明 Kafka 0.10.2及以上的版本支持该功能。
需要和beginOffset二选一。
说明 beginDateTimeendDateTime配合使用。
endDateTime 数据消费的结束时间位点,为时间范围(左闭右开)的右边界。yyyymmddhhmmss格式的时间字符串,可以配合时间属性使用。详情请参见配置调度参数
说明 Kafka 0.10.2及以上的版本支持该功能。
需要和endOffset二选一。
说明 endDateTimebeginDateTime配合使用。
beginOffset 数据消费的开始时间位点,您可以配置以下形式:
  • 数字形式(例如15553274),表示开始消费的点位。
  • seekToBeginning:表示从开始点位消费数据。
  • seekToLast:表示从上次的偏移位置读取数据。
  • seekToEnd:表示从最后点位消费数据,会读取到空数据。
需要和beginDateTime二选一。
endOffset 数据消费的结束位点,用于控制结束数据消费任务退出的时间。 需要和endDateTime二选一。
skipExceedRecord Kafka使用public ConsumerRecords<K, V> poll(final Duration timeout)消费数据,一次poll调用获取的数据可能在endOffset或者endDateTime之外。skipExceedRecord用于控制是否写出多余的数据至目的端。由于消费数据使用了自动点位提交,建议您:
  • Kafka 0.10.2之前版本:建议配置skipExceedRecord为false。
  • Kafka 0.10.2及以上版本:建议配置skipExceedRecord为true。
否,默认值为false
partition Kafka的一个Topic有多个分区(partition),正常情况下数据同步任务是读取Topic(多个分区)一个点位区间的数据。您也可以指定partition,仅读取一个分区点位区间的数据。 否,无默认值。
kafkaConfig 创建Kafka数据消费客户端KafkaConsumer可以指定扩展参数,例如bootstrap.serversauto.commit.interval.mssession.timeout.ms等,您可以基于kafkaConfig控制KafkaConsumer消费数据的行为。
encoding 当keyType或valueType配置为STRING时,将使用该配置项指定的编码解析字符串。 否,默认为UTF-8。
waitTIme 消费者对象从Kafka拉取一次数据的最大等待时间,单位为秒。 否,默认为60。
stopWhenPollEmpty 该配置项可选值为true/false。当配置为true时,如果消费者从Kafka拉取数据返回为空(一般是已经读完主题中的全部数据,也可能是网络或者Kafka集群可用性问题),则立即停止任务,否则持续重试直到再次读到数据。 否,默认为true。
kafkaConfig参数说明如下。
参数 描述
fetch.min.bytes 指定消费者从broker获取消息的最小字节数,即有足够的数据时,才将其返回给消费者。
fetch.max.wait.ms 等待broker返回数据的最大时间,默认500毫秒。fetch.min.bytesfetch.max.wait.ms先满足哪个条件,便按照该方式返回数据。
max.partition.fetch.bytes 指定broker从每个partition中返回给消费者的最大字节数,默认为1 MB。
session.timeout.ms 指定消费者不再接收服务之前,可以与服务器断开连接的时间,默认是30秒。
auto.offset.reset 消费者在读取没有偏移量或者偏移量无效的情况下(因为消费者长时间失效,包含偏移量的记录已经过时并被删除)的处理方式。默认为latest(消费者从最新的记录开始读取数据),您可以更改为earliest(消费者从起始位置读取partition的记录)。
max.poll.records 单次调用poll方法能够返回的消息数量。
key.deserializer 消息Key的反序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer
value.deserializer 数据Value的反序列化方法,例如org.apache.kafka.common.serialization.StringDeserializer
ssl.truststore.location SSL根证书的路径。
ssl.truststore.password 根证书Store的密码。如果是Aliyun Kafka,则配置为KafkaOnsClient。
security.protocol 接入协议,目前支持使用SASL_SSL协议接入。
sasl.mechanism SASL鉴权方式,如果是Aliyun Kafka,使用PLAIN。
java.security.auth.login.config SASL鉴权文件路径。

向导开发介绍

  1. 选择数据源。配置同步任务的数据来源ss
    参数 描述
    数据源 即上述参数说明中的datasource,通常输入您配置的数据源名称。
    主题 即上述参数说明中的topic,可以在下拉列表中选择要读取的Kafka主题名称。
    消费群组ID 即上述参数说明中的kafkaConfig的JSON结构中的group.id字段,是初始化Kafka Consumer时的group.id配置。

    为了确保同步时消费位点的正确性,请避免该参数与其他消费进程重复。如果不指定该参数,则每次执行同步自动生成datax_开头的随机字符串作为group.id。

    Kafka版本 Kafka版本可以选择>=0.10.0或者<0.10.0。该参数主要影响读取起始位点和读取结束位点的可选项。
    读取起始位点 即上述参数说明中的beginOffset。当Kafka版本选择>=0.10.0,支持通过使用yyyyMMddHHmmss格式的时间字符串指定读取的起始位点,也可以选择分区起始位点和消费群组ID对应群组的当前位点作为读取起始位点。当Kafka版本选择<0.10.0,则只能选择分区起始位点和消费群组ID对应群组的当前位点作为读取起始位点。
    • 指定时间:数据写入Kafka的时候自动生成一个unixtime时间戳作为该数据的时间记录。同步任务通过获取用户配置的yyyymmddhhmmss数值,将该值转成unixtimestamp后从kafka中读取相应数据。例如,"beginDateTime": "20210125000000"。
    • 分区起始位点:从kafka topic每个分区没有删除的位点最小的数据开始抽取数据。
    • 群组当前位点:从任务配置上面指定的消费群组ID保存的位点开始读取数据,一般是使用这个群组ID读数据的进程上次停止的位点(最好确保使用这个群组ID的进程只有配置的这个数据集成任务,避免共用群组ID造成数据丢失),如果使用群组当前位点,一定要配置消费群组ID,否则数据集成任务会随机生成一个群组ID,而新的群组ID因为没有保存过位点,所以会从分区起始位点开始读取。
    说明 如果选择除了指定时间之外的选项时对应beginOffset。如果选择指定时间,生成任务时将不会填充beginOffset参数,而是采用起始时间表单填充beginDateTime参数来决定读取的起始位点。
    起始时间 即上述参数说明中的beginDateTime,当读取起始位点选择指定时间时,支持以yyyyMMddHHmmss格式的时间字符串指定具体时间,为时间范围(左闭右开)的左边界,例如20210513000000,可以配合时间属性使用。详情请参见调度参数
    读取结束位点 即上述参数说明中的endOffset,当Kafka版本选择>=0.10.0,支持通过使用yyyyMMddHHmmss格式的时间字符串指定读取的结束位点,也可以选择分区结束位点作为读取结束位点。当Kafka版本选择<0.10.0,则只能选择分区结束位点作为读取结束位点。
    说明 如果选择除了指定时间之外的选项时对应endOffset。数据读取结束位点如果选择指定时间,生成任务配置时将不会填充endOffset参数,而是采用结束时间表单填充endDateTime参数来决定读取的结束位点。
    结束时间 即上述参数说明中的endDateTime,当读取起始位点选择指定时间时,支持以yyyyMMddHHmmss格式的时间字符串指定具体时间,为时间范围(左闭右开)的右边界,例如20210514000000,可以配合时间属性使用。详情请参见调度参数
    时区 当读取起始位点或读取结束位点配置指定时间时,决定时间字符串对应时区。
    键类型 即上述参数说明中的keyType,Kafka的Key的类型,决定了初始化KafkaConsumer时的key.deserializer配置,可选值包括STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
    值类型 即上述参数说明中的valueType,Kafka的Value的类型,决定了初始化KafkaConsumer时的value.deserializer配置,可选值包括STRING、BYTEARRAY、DOUBLE、FLOAT、INTEGER、LONG和SHORT。
    编码 即上述参数说明中的encoding,当键类型或者值类型配置为STRING时,决定读取时的编码。
    同步结束策略
    • 当配置为1分钟读取不到新数据时,如果消费者1分钟从Kafka拉取数据返回为空(一般是已经读完主题中的全部数据,也可能是网络或者Kafka集群可用性原因),则立即停止任务,否则持续重试直到再次读到数据。
    • 当配置为到达指定结束位点时,指的是离线任务启动时Kafka中的数据点位。
    位点重置策略 即上述参数说明中的kafkaConfig的JSON结构中的auto.offset.reset字段,通过控制kafka消费者的auto.offset.reset参数控制找不到位点时的重置策略,建议配置为“异常退出”,避免读取丢失数据。
    单次读取大小 即上述参数说明中的kafkaConfig的JSON结构中的fetch.min.bytes字段,通过控制kafka消费者的fetch.min.bytes参数控制单次读取批大小。
    单次读取时间 即上述参数说明中的kafkaConfig的JSON结构中的fetch.max.wait.ms字段,通过控制kafka消费者的fetch.max.wait.ms参数控制单次读取时间。
    读取超时 即上述参数说明中的kafkaConfig的JSON结构中的session.timeout.ms字段,通过控制kafka消费者的session.timeout.ms参数控制读取超时时间。
  2. 字段映射,即上述参数说明中的column,左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段,鼠标悬停至需要删除的字段上,即可单击删除图标进行删除。
    当源头表字段配置为如下两个下划线(__)开头的6个字符串时,对应Kafka记录的特定内容。具体如下所示:
    源头表字段 描述
    __key__ Kafka记录的key值。
    __value__ Kafka记录的value值。
    __partition__ Kafka记录所在分区编号,是一个大于等于0的整数。
    __headers__ Kafka记录的headers序列化得到的JSON字符串。
    __offset__ Kafka记录所在分区的偏移量,是一个大于等于0的整数。
    __timestamp__ Kafka记录的毫秒时间戳。
    源头表字段也可配置为上述6个字符串之外的字符串,此时将Kafka记录作为JSON字符串进行解析,将源头表字段配置的字符串作为JSON路径读取对应内容作为字段值写入对应的目标表字段,例如以下示例为Kafka记录的value值,当源头表字段配置为data.name时,将会读取"bob"作为这个字段的值并写入对应目标表。
    {
      "data": {
        "name": "bob",
        "age": 35
      }
    }

    源头表和目标表的映射关系如下所示:

    字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
    手动编辑源表字段 请手动编辑字段,一行表示一个字段,首尾空行会被采用,其他空行会被忽略。
    添加一行
    • 可以输入常量,输入的值需要使用英文单引号。例如,'abc’'123’等。
    • 可以配合调度参数使用。例如,${bizdate}等。
    • 如果您输入的值无法解析,则类型显示为未识别。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发示例

使用脚本模式开发的详情请参见通过脚本模式配置任务

从Kafka读取数据的JSON配置,如下所示。
{
    "type": "job",
    "steps": [
        {
            "stepType": "kafka",
            "parameter": {
                "server": "host:9093",
                "column": [
                    "__key__",
                    "__value__",
                    "__partition__",
                    "__offset__",
                    "__timestamp__",
                    "'123'",
                    "event_id",
                    "tag.desc"
                ],
                "kafkaConfig": {
                    "group.id": "demo_test"
                },
                "topic": "topicName",
                "keyType": "ByteArray",
                "valueType": "ByteArray",
                "beginDateTime": "20190416000000",
                "endDateTime": "20190416000006",
                "skipExceedRecord": "false"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "stream",
            "parameter": {
                "print": false,
                "fieldDelimiter": ","
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle": true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent": 1,//并发数
            "mbps":"12"//限流
        }
    }
}

使用SASL鉴权

如果需要使用SASL鉴权或SSL鉴权,请在定义kafka数据源时进行相关配置,详情请参考:配置Kafka数据源