启动Tablestore Sink Connector时,您需要通过键值映射向Kafka Connect进程传递参数。通过本文您可以结合配置样例和配置参数说明了解Tablestore Sink Connector的相关配置。

配置示例

不同工作模式下相应配置文件的示例如下:

  • .properties格式配置文件的示例,适用于standalone模式。
    # 设置连接器名称。
    name=tablestore-sink
    # 指定连接器类。
    connector.class=TableStoreSinkConnector
    # 设置最大任务数。
    tasks.max=1
    # 指定导出数据的Kafka的Topic列表。
    topics=test
    # 以下为Tablestore连接参数的配置。
    # Tablestore实例的Endpoint。 
    tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
    # 填写AccessKey ID和AccessKey Secret。
    tablestore.access.key.id=xxx
    tablestore.access.key.secret=xxx
    # Tablestore实例名称。
    tablestore.instance.name=xxx
    
    # 以下为数据映射相关的配置。
    # 指定Kafka Record的解析器。
    # 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。
    event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser
    
    # 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。
    # topics.assign.tables配置的优先级更高,如果配置了topics.assign.tables,则忽略table.name.format的配置。
    # 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。
    table.name.format=<topic>
    # 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。
    # 如果缺省,则采取table.name.format的配置。
    # topics.assign.tables=test:test_kafka
    
    # 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。
    # kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。
    # record_key表示以Record Key中的字段作为数据表的主键。
    # record_value表示以Record Value中的字段作为数据表的主键。
    primarykey.mode=kafka
    
    # 定义导入数据表的主键列名和数据类型。
    # 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
    # 其中<tablename>为数据表名称的占位符。
    # 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。
    # 当主键模式为record_key或record_value时,必须配置以下两个属性。
    # tablestore.test.primarykey.name=A,B
    # tablestore.test.primarykey.type=string,integer
    
    # 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。
    # 默认值为空,使用Record Value中的所有字段作为数据表的属性列。
    # 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
    # 其中<tablename>为数据表名称的占位符。
    # tablestore.test.columns.whitelist.name=A,B
    # tablestore.test.columns.whitelist.type=string,integer
    
    # 以下为写入Tablestore相关的配置。
    # 指定写入模式,可选值包括put和update,默认值为put。
    # put表示覆盖写。
    # update表示更新写。
    insert.mode=put
    # 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。
    insert.order.enable=true
    # 是否自动创建目标表,默认值为false。
    auto.create=false
    
    # 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。
    # none表示不允许进行任何删除。
    # row表示允许删除行。
    # column表示允许删除属性列。
    # row_and_column表示允许删除行和属性列。
    delete.mode=none
    
    # 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。
    buffer.size=1024
    # 写入数据表时的回调线程数,默认值为核数+1。
    # max.thread.count=
    # 写入数据表时的最大请求并发数,默认值为10。
    max.concurrency=10
    # 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
    bucket.count=3
    # 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
    flush.Interval=10000
    
    # 以下为脏数据处理相关配置。
    # 在解析Kafka Record或者写入数据表时可能发生错误,您可以可通过以下配置进行处理。
    # 指定容错能力,可选值包括none和all,默认值为none。
    # none表示任何错误都将导致Sink Task立即失败。
    # all表示跳过产生错误的Record,并记录该Record。
    runtime.error.tolerance=none
    # 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。
    # ignore表示忽略所有错误。
    # kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。
    # tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。
    runtime.error.mode=ignore
    
    # 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。
    # runtime.error.bootstrap.servers=localhost:9092
    # runtime.error.topic.name=errors
    
    # 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。
    # runtime.error.table.name=errors
  • .json格式配置文件的示例,适用于distributed模式。
    {
      "name": "tablestore-sink",
      "config": {
        // 指定连接器类。
        "connector.class":"TableStoreSinkConnector",
        // 设置最大任务数。
        "tasks.max":"3",
        // 指定导出数据的Kafka的Topic列表。
        "topics":"test",
        // 以下为Tablestore连接参数的配置。
        // Tablestore实例的Endpoint。
        "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
        // 填写AccessKey ID和AccessKey Secret。
        "tablestore.access.key.id":"xxx",
        "tablestore.access.key.secret":"xxx",
        // Tablestore实例名称。
        "tablestore.instance.name":"xxx",
    
        // 以下为数据映射相关的配置。
        // 指定Kafka Record的解析器。
        // 默认的DefaulteEventParser已支持Struct和Map类型,您也可以使用自定义的EventParser。
        "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",
    
        // 定义目标表名称的格式字符串,字符串中可包含<topic>作为原始Topic的占位符。
        // topics.assign.tables配置的优先级更高。如果配置了topics.assign.tables,则忽略table.name.format的配置。
        // 例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则将映射到Tablestore的表名为kafka_test。
        "table.name.format":"<topic>",
        // 指定Topic与目标表的映射关系,以"<topic>:<tablename>"格式映射Topic和表名,Topic和表名之间的分隔符为英文冒号(:),不同映射之间分隔符为英文逗号(,)。
        // 如果缺省,则采取table.name.format的配置。
        // "topics.assign.tables":"test:test_kafka",
    
        // 指定主键模式,可选值包括kafka、record_key和record_value,默认值为kafka。
        // kafka表示以<connect_topic>_<connect_partition>和<connect_offset>作为数据表的主键。
        // record_key表示以Record Key中的字段作为数据表的主键。
        // record_value表示以Record Value中的字段作为数据表的主键。
        "primarykey.mode":"kafka",
    
        // 定义导入数据表的主键列名和数据类型。
        // 属性名格式为tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type。
        // 其中<tablename>为数据表名称的占位符。
        // 当主键模式为kafka时,无需配置该属性,默认主键列名为{"topic_partition","offset"},默认主键列数据类型为{string, integer}。
        // 当主键模式为record_key或record_value时,必须配置以下两个属性。
        // "tablestore.test.primarykey.name":"A,B",
        // "tablestore.test.primarykey.type":"string,integer",
    
        // 定义属性列白名单,用于过滤Record Value中的字段获取所需属性列。
        // 默认值为空,使用Record Value中的所有字段作为数据表的属性列。
        // 属性名格式为tablestore.<tablename>.columns.whitelist.name和tablestore.<tablename>.columns.whitelist.type。
        // 其中<tablename>为数据表名称的占位符。
        // "tablestore.test.columns.whitelist.name":"A,B",
        // "tablestore.test.columns.whitelist.type":"string,integer",
    
        // 以下为写入Tablestore相关的配置。
        // 指定写入模式,可选值包括put和update,默认值为put。
        // put表示覆盖写。
        // update表示更新写。
        "insert.mode":"put",
        // 是否需要保序,默认值为true。如果关闭保序模式,则有助于提升写入效率。
        "insert.order.enable":"true",
        // 是否自动创建目标表,默认值为false。
        "auto.create":"false",
    
        // 指定删除模式,可选值包括none、row、column和row_and_column,默认值为none。
        // none表示不允许进行任何删除。
        // row表示允许删除行。
        // column表示允许删除属性列。
        // row_and_column表示允许删除行和属性列。
        "delete.mode":"none",
    
        // 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须为2的指数。
        "buffer.size":"1024",
        // 写入数据表时的回调线程数,默认值为核数+1。
        // "max.thread.count":
        // 写入数据表时的最大请求并发数,默认值为10。
        "max.concurrency":"10",
        // 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
        "bucket.count":"3",
        // 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
        "flush.Interval":"10000",
    
        // 以下为脏数据处理相关配置。
        // 在解析Kafka Record或者写入数据表时可能发生错误,您可以通过以下配置进行处理。
        // 指定容错能力,可选值包括none和all,默认值为none。
        // none表示任何错误都将导致Sink Task立即失败。
        // all表示跳过产生错误的Record,并记录该Record。
        "runtime.error.tolerance":"none",
        // 指定脏数据记录模式,可选值包括ignore、kafka和tablestore,默认值为ignore。
        // ignore表示忽略所有错误。
        // kafka表示将产生错误的Record和错误信息存储在Kafka的另一个Topic中。
        // tablestore表示将产生错误的Record和错误信息存储在Tablestore另一张数据表中。
        "runtime.error.mode":"ignore"
    
        // 当脏数据记录模式为kafka时,需要配置Kafka集群地址和Topic。
        // "runtime.error.bootstrap.servers":"localhost:9092",
        // "runtime.error.topic.name":"errors",
    
        // 当脏数据记录模式为tablestore时,需要配置Tablestore中数据表名称。
        // "runtime.error.table.name":"errors",
      }

配置项说明

配置文件中的配置项说明请参见下表。

分类 配置项 类型 是否必选 示例值 描述
Kafka Connect常见配置 name string tablestore-sink 连接器(Connector)名称。连接器名称必须唯一。
connector.class class TableStoreSinkConnector 连接器的Java类。
如果您要使用该连接器,请在connector.class配置项中指定Connector类的名称,支持配置为Connector类的全名(com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector)或别名(TableStoreSinkConnector)。
connector.class=com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector
tasks.max integer 3 连接器支持创建的最大任务数。

如果连接器无法达到此并行度级别,则可能会创建较少的任务。

key.converter string org.apache.kafka.connect.json.JsonConverter 覆盖worker设置的默认key转换器。
value.converter string org.apache.kafka.connect.json.JsonConverter 覆盖worker设置的默认value转换器。
topics list test 连接器输入的Kafka Topic列表,多个Topic之间以英文逗号(,)分隔。

您必须为连接器设置topics或topics.regex来控制连接器输入的Topic。

topics.regex string ^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$ 连接器输入的Kafka Topic的Java正则表达式。

您必须为连接器设置topics或topics.regex来控制连接器输入的Topic。

连接器Connection配置 tablestore.endpoint string https://xxx.xxx.ots.aliyuncs.com Tablestore实例的服务地址。更多信息,请参见服务地址
tablestore.access.key.id string LTAn******************** 登录账号的AccessKey ID和AccessKey Secret,获取方式请参见获取AccessKey
tablestore.access.key.secret string zbnK**************************
tablestore.instance.name string myotstest Tablestore实例的名称。
连接器Data Mapping配置 event.parse.class class DefaultEventParser 消息解析器的Java类,默认值为DefaultEventParser。解析器用于从Kafka Record中解析出数据表的主键列和属性列。
注意 Tablestore对列值大小有限制。string类型和binary类型的主键列列值限制均为1 KB,属性列列值限制均为2 MB。更多信息,请参见通用限制

如果数据类型转换后列值超出对应限制,则将该Kafka Record作为脏数据处理。

如果使用默认的DefaultEventParser解析器,Kafka Record的Key或Value必须为Kafka Connect的Struct或Map类型。Struct中选择的字段必须为支持的数据类型,字段会根据数据类型映射表转换为Tablestore数据类型写入数据表。Map中的值类型也必须为支持的数据类型,支持的数据类型同Struct,最终会被转换为binary类型写入数据表。Kafka和Tablestore的数据类型映射关系请参见附录:Kafka和Tablestore数据类型映射

如果Kafka Record为不兼容的数据格式,则您可以通过实现com.aliyun.tablestore.kafka.connect.parsers.EventParser定义的接口来自定义解析器。

table.name.format string kafka_<topic> 目标数据表名称的格式字符串,默认值为<topic>。字符串中可包含<topic>作为原始Topic的占位符。例如当设置table.name.format为kafka_<topic>时,如果kafka中主题名称为test,则映射到Tablestore的表名为kafka_test。

此配置项的优先级低于topics.assign.tables配置项,如果配置了topics.assign.tables,则忽略table.name.format的配置。

topics.assign.tables list test:destTable 指定topic与Tablestore表之间的映射关系,格式为<topic_1>:<tablename_1>,<topic_2>:<tablename_2>。多个映射关系之间以英文逗号(,)分隔,例如test:destTable表示将Topic名为test的消息记录写入数据表destTable中。

此配置项的优先级高于table.name.format配置项,如果配置了topics.assign.tables,则忽略table.name.format的配置。

primarykey.mode string kafka 数据表的主键模式。取值范围如下:
  • kafka:以<connect_topic>_<connect_partition>(Kafka主题和分区,用下划线"_"分隔)和<connect_offset>(该消息记录在分区中的偏移量)作为数据表的主键。
  • record_key:以Record Key中的字段(Struct类型)或者键(Map类型)作为数据表的主键。
  • record_value:以Record Value中的字段(Struct类型)或者键(Map类型)作为数据表的主键。

请配合tablestore.<tablename>.primarykey.name和tablestore.<tablename>.primarykey.type使用。此配置项不区分大小写。

tablestore.<tablename>.primarykey.name list A,B 数据表的主键列名,其中<tablename>为数据表名称的占位符,包含1~4个主键列,以英文逗号(,)分隔。
主键模式不同时,主键列名的配置不同。
  • 当设置主键模式为kafka时,以topic_partition,offset作为数据表主键列名称。在该主键模式下,您可以不配置此主键列名。如果配置了主键列名,则不会覆盖默认主键列名。
  • 当设置主键模式为record_key时,从Record Key中提取与配置的主键列名相同的字段(Struct类型)或者键(Map类型)作为数据表的主键。在该主键模式下主键列名必须配置。
  • 当设置主键模式为record_value:时,从Record Value中提取与配置的主键列名相同的字段(Struct类型)或者键(Map类型)作为数据表的主键。在该主键模式下主键列名必须配置。

Tablestore数据表的主键列是有顺序的,此属性的配置应注意主键列名顺序,例如PRIMARY KEY(A, B, C)与PRIMARY KEY(A, C, B)是不同的两个主键结构。

tablestore.<tablename>.primarykey.type list string, integer 数据表的主键列数据类型,其中<tablename>为数据表名称的占位符,包含1~4个主键列,以英文逗号(,)分隔,顺序必须与tablestore.<tablename>.primarykey.name相对应。此属性配置不区分大小写。数据类型的可选值包括integer、string、binary和auto_increment。
主键模式不同时,主键数据类型的配置不同。
  • 当主键模式为kafka时,以string, integer作为数据表主键数据类型。

    在该主键模式下,您可以不配置此主键列数据类型。如果配置了主键列数据类型,则不会覆盖默认主键列数据类型。

  • 当主键模式为record_key或者record_value时,指定相应主键列的数据类型。在该主键模式下主键列数据类型必须配置。

    如果指定的数据类型与Kafka Schema中定义的数据类型发生冲突,则会产生解析错误,您可以配置Runtime Error相关属性来应对解析错误。

    当配置此配置项为auto_increment时,表示主键自增列,此时Kafka Record中可以缺省该字段,写入数据表时会自动插入自增列。

tablestore.<tablename>.columns.whitelist.name list A,B 数据表的属性列白名单中属性列名称,其中<tablename>为数据表名称的占位符,以英文逗号(,)分隔。

如果配置为空,则使用Record Value中的所有字段(Struct类型)或者键(Map类型)作为数据表的属性列,否则用于过滤得到所需属性列。

tablestore.<tablename>.columns.whitelist.type list string, integer 数据表的属性列白名单中属性列数据类型,其中<tablename>为数据表名称的占位符,以英文逗号(,)分隔,顺序必须与tablestore.<tablename>.columns.whitelist.name相对应。此属性配置不区分大小写。数据类型的可选值包括integer、string、binary、boolean和double。
连接器Write配置 insert.mode string put 写入模式。取值范围如下:
  • put(默认):对应Tablestore的PutRow操作,即新写入一行数据会覆盖原数据。
  • update:对应Tablestore的UpdateRow操作,即更新一行数据,可以增加一行中的属性列,或者更新已存在的属性列的值。

此属性配置不区分大小写。

insert.order.enable boolean true 写入数据表是否需要保序。取值范围如下:
  • true(默认):写入时保持Kafka消息记录的顺序。
  • false:写入顺序无保证,但写入效率会提升。
auto.create boolean false 是否需要自动创建目标数据表。取值范围如下:
  • true:自动创建目标数据表。
  • false(默认):不自动创建目标数据表。
delete.mode string none 删除模式,仅当主键模式为record_key时才有效。取值范围如下:
  • none(默认):不允许进行任何删除。
  • row:允许删除行。当Record Value为空时会删除行。
  • column:允许删除属性列。当Record Value中字段值(Struct类型)或者键值(Map类型)为空时会删除属性列。
  • row_and_column:允许删除行和属性列。

此属性配置不区分大小写。

删除操作与insert.mode的配置相关。更多信息,请参见附录:删除语义

buffer.size integer 1024 写入数据表时内存中缓冲队列的大小,默认值为1024,单位为行数。此配置项的值必须是2的指数。
max.thread.count integer 3 写入数据表时的回调线程数,默认值为CPU核数+1
max.concurrency integer 10 写入数据表时的最大请求并发数。
bucket.count integer 3 写入数据表时的分桶数,默认值为3。适当调大此配置项的值可提升并发写入能力,但不应大于最大请求并发数。
flush.Interval integer 10000 写入数据表时对缓冲区的刷新时间间隔,默认值为10000,单位为毫秒。
连接器Runtime Error配置 runtime.error.tolerance string none 解析Kafka Record或者写入数据表时产生错误的处理策略。取值范围如下:
  • none(默认):任何错误都将导致Sink Task立即失败。
  • all:跳过产生错误的Record,并记录该Record。

此属性配置不区分大小写。

runtime.error.mode string ignore 解析Kafka Record或者写入数据表时产生错误,对错误的Record的处理策略。取值范围如下:
  • ignore(默认):忽略所有错误。
  • kafka:将产生错误的Record和错误信息存储在Kafka的另一个Topic中,此时需要配置runtime.error.bootstrap.servers和runtime.error.topic.name。记录运行错误的Kafka Record的Key和Value与原Record一致,Header中增加ErrorInfo字段来记录运行错误信息。
  • tablestore:将产生错误的Record和错误信息存储在Tablestore另一张数据表中,此时需要配置runtime.error.table.name。记录运行错误的数据表主键列为topic_partition(string类型), offset(integer类型),并且属性列为key(bytes类型)、value(bytes类型)和error_info(string类型)。

kafka模式下需要对Kafka Record的Header、Key和Value进行序列化转换,tablestore模式下需要对Kafka Record的Key和Value进行序列化转换,此处默认使用org.apache.kafka.connect.json.JsonConverter,并且配置schemas.enable为true,您可以通过JsonConverter反序列化得到原始数据。关于Converter的更多信息,请参见Kafka Converter

runtime.error.bootstrap.servers string localhost:9092 用于记录运行错误的Kafka集群地址。
runtime.error.topic.name string errors 用于记录运行错误的Kafka Topic名称。
runtime.error.table.name string errors 用于记录运行错误的Tablestore数据表名称。

附录:Kafka和Tablestore数据类型映射

Kafka和Tablestore数据类型映射关系请参见下表。

Kafka Schema Type Tablestore数据类型
STRING STRING
INT8、INT16、INT32、INT64 INTEGER
FLOAT32、FLOAT64 DOUBLE
BOOLEAN BOOLEAN
BYTES BINARY

附录:删除语义

当value中存在空值时,根据写入模式(insert.mode)和删除模式(delete.mode)的不同设置,数据写入到表格存储数据表中的处理方式不同,详细说明请参见下表。

insert.mode put update
delete.mode none row column row_and_column none row column row_and_column
value为空值 覆盖写 删行 覆盖写 删行 脏数据 删行 脏数据 删行
value所有字段值均为空值 覆盖写 覆盖写 覆盖写 覆盖写 脏数据 脏数据 删列 删列
value部分字段值为空值 覆盖写 覆盖写 覆盖写 覆盖写 忽略空值 忽略空值 删列 删列