Before you start Tablestore Sink Connector, you need to specify key-value pairs to pass parameters to the Kafka Connect process. This topic provides configuration examples and parameter descriptions to show how to configure Tablestore Sink Connector.

Configuration examples

The configuration items vary depending on whether data is synchronized from Kafka to a data table or a time series table in Tablestore. The configuration examples of the configuration files vary based on the working mode. This section provides an example on how to configure data synchronization from Kafka to a data table in Tablestore. To synchronize data to a time series table in Tablestore, you need to add configuration items that are specific to data synchronization from Kafka to a time series table in Tablestore.

  • The following sample code provides an example on how to configure the configuration file in the .properties format for Tablestore Sink Connector in the standalone mode
    # Specify the connector name. 
    name=tablestore-sink
    # Specify the connector class. 
    connector.class=TableStoreSinkConnector
    # Specify the maximum number of tasks. 
    tasks.max=1
    # Specify the list of Kafka topics from which data is exported. 
    topics=test
    # Specify values for the following Tablestore connection parameters: 
    # The endpoint of the Tablestore instance.  
    tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
    # The AccessKey pair which consists of an AccessKey ID and an AccessKey secret. 
    tablestore.access.key.id=xxx
    tablestore.access.key.secret=xxx
    # The name of the Tablestore instance. 
    tablestore.instance.name=xxx
    
    # Specify the following data mapping parameters: 
    # Specify the parser that is used to parse Kafka message records. 
    # The DefaultEventParser of Tablestore Sink Connector supports the Struct and Map classes of Kafka Connect. You can also use a custom EventParser. 
    event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser
    
    # Specify the format string for the name of the destination Tablestore table. <topic> can be used in the string as a placeholder for the topic from which you want to export data. 
    # topics.assign.tables is assigned a higher priority than table.name.format. If topics.assign.tables is specified, ignore the configuration of table.name.format. 
    # For example, if table.name.format is set to kafka_<topic> and the name of the Kafka topic from which you want to export data is test, Kafka message records from the test topic are mapped to the table named kafka_test in Tablestore. 
    table.name.format=<topic>
    # Specify the mapping between the Kafka topic and the destination Tablestore table. The value must be in the <topic>:<tablename> format. The topic name and table name are separated with a colon (:). If you want to specify multiple mappings, separate them with commas (,). 
    # If the mapping is not specified, the configuration of table.name.format is used. 
    # topics.assign.tables=test:test_kafka
    
    # Specify the primary key mode. Valid values: kafka, record_key, and record_value. Default value: kafka. 
    # kafka: <connect_topic>_<connect_partition> and <connect_offset> are used as the primary key of the data table. 
    # record_key: Fields in the record keys are used as the primary key of the data table. 
    # record_value: Fields in the record values are used as the primary key of the data table. 
    primarykey.mode=kafka
    
    # Specify the name and data type of the primary key column in the destination Tablestore data table. 
    # The format of the primary key column name is tablestore.<tablename>.primarykey.name. The format of the data type of the primary key column is tablestore.<tablename>.primarykey.type. 
    # <tablename> is a placeholder for the data table name. 
    # If the primary key mode is kafka, you do not need to specify the name and data type of the primary key column. The default primary key column names {"topic_partition","offset"} and the default data types {string, integer} of the primary key columns are used. 
    # If the primary key mode is record_key or record_value, you must specify the name and data type of the primary key column. 
    # tablestore.test.primarykey.name=A,B
    # tablestore.test.primarykey.type=string,integer
    
    # Specify an attribute column whitelist to filter the fields in the record values to obtain the required attribute columns. 
    # By default, the attribute column whitelist is empty. All fields in the record values are used as the attribute columns of the data table. 
    # The format of the attribute column name is tablestore.<tablename>.columns.whitelist.name. The format of the data type of the attribute column is tablestore.<tablename>.columns.whitelist.type. 
    # <tablename> is a placeholder for the data table name. 
    # tablestore.test.columns.whitelist.name=A,B
    # tablestore.test.columns.whitelist.type=string,integer
    
    # Specify how to write Kafka message records to the destination Tablestore table: 
    # Specify the write mode. Valid values: put and update. Default value: put. 
    # put: Data in the destination table is overwritten by Kafka message records. 
    # update: Data in the destination table is updated by Kafka message records. 
    insert.mode=put
    # Specify whether to write data in the sequence that data is read. Default value: true. You can disable this option to improve the write performance. 
    insert.order.enable=true
    # Specify whether to automatically create a destination table. Default value: false. 
    auto.create=false
    
    # Specify the delete mode. Valid values: none, row, column, and row_and_column. Default value: none. 
    # none: No delete operations can be performed. 
    # row: Rows can be deleted. 
    # column: Attribute columns can be deleted. 
    # row_and_column: Rows and attribute columns can be deleted. 
    delete.mode=none
    
    # Specify the maximum number of rows that can be included in the buffer queue in the memory when data is written to the data table. Default value: 1024. The value of this parameter must be an exponent of 2. 
    buffer.size=1024
    # Specify the number of callback threads that are used when data is written to the data table. Default value = Number of vCPUs + 1. 
    # max.thread.count=
    # Specify the maximum number of concurrent write requests that can be sent to write data to the data table. Default value: 10. 
    max.concurrency=10
    # Specify the number of buckets to which data is written. Default value: 3. If you increase the value of this parameter, the concurrent write capability can be increased. However, you cannot set the value of this parameter to a value greater than the maximum number of concurrent write requests that you specified. 
    bucket.count=3
    # Specify the interval at which the buffer queue is refreshed when data is written to the data table. Unit: milliseconds. Default value: 10000. 
    flush.Interval=10000
    
    # Specify how to process dirty data: 
    # An error may occur when the Kafka message records are parsed or written to the data table. You can specify the following two parameters to determine how to fix the error: 
    # Specify the fault tolerance capability. Valid values: none and all. Default value: none. 
    # none: An error causes the data import task that uses Tablestore Sink Connector to fail. 
    # all: The message records for which errors are reported are skipped and logged. 
    runtime.error.tolerance=none
    # Specify how dirty data is logged. Valid values: ignore, kafka, and tablestore. Default value: ignore. 
    # ignore: All errors are ignored. 
    # kafka: The message records for which errors are reported and the error messages are stored in a different Kafka topic. 
    # tablestore: The message records for which errors are reported and the error messages are stored in a different Tablestore data table. 
    runtime.error.mode=ignore
    
    # If you set runtime.error.mode to kafka, you must specify the Kafka cluster address and the topic. 
    # runtime.error.bootstrap.servers=localhost:9092
    # runtime.error.topic.name=errors
    
    # If you set runtime.error.mode to tablestore, you must specify the name of the Tablestore data table. 
    # runtime.error.table.name=errors
  • The following sample code provides an example on how to configure the configuration file in the .json format for Tablestore Sink Connector in the distributed mode:
    {
      "name": "tablestore-sink",
      "config": {
        // Specify the connector class. 
        "connector.class":"TableStoreSinkConnector",
        // Specify the maximum number of tasks. 
        "tasks.max":"3",
        // Specify the list of Kafka topics from which you want to export data. 
        "topics":"test",
        // Specify values for the following Tablestore connection parameters: 
        // The endpoint of the Tablestore instance. 
        "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
        // The AccessKey pair which consists of an AccessKey ID and an AccessKey secret. 
        "tablestore.access.key.id":"xxx",
        "tablestore.access.key.secret":"xxx",
        // The name of the Tablestore instance. 
        "tablestore.instance.name":"xxx",
    
        // Specify the following data mapping parameters: 
        // Specify the parser that is used to parse Kafka message records. 
        // The DefaultEventParser of Tablestore Sink Connector supports the Struct and Map classes of Kafka Connect. You can also use a custom EventParser. 
        "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",
    
        // Specify the format string for the name of the destination Tablestore table. <topic> can be used in the string as a placeholder for the topic from which you want to export data. 
        // topics.assign.tables is assigned a higher priority than table.name.format. If topics.assign.tables is specified, ignore the configuration of table.name.format. 
        // For example, if table.name.format is set to kafka_<topic> and the name of the Kafka topic from which you want to export data is test, Kafka message records from the test topic are mapped to the table named kafka_test in Tablestore. 
        "table.name.format":"<topic>",
        // Specify the mapping between the Kafka topic and the destination Tablestore table. The value must be in the <topic>:<tablename> format. The topic name and table name are separated with a colon (:). If you want to specify multiple mappings, separate them with commas (,). 
        // If the mapping is not specified, the configuration of table.name.format is used. 
        // "topics.assign.tables":"test:test_kafka",
    
        // Specify the primary key mode. Valid values: kafka, record_key, and record_value. Default value: kafka. 
        // kafka: <connect_topic>_<connect_partition> and <connect_offset> are used as the primary key of the data table. 
        // record_key: Fields in the record keys are used as the primary key of the data table. 
        // record_value: Fields in the record values are used as the primary key of the data table. 
        "primarykey.mode":"kafka",
    
        // Specify the name and data type of the primary key column in the destination Tablestore data table. 
        // The format of the primary key column name is tablestore.<tablename>.primarykey.name. The format of the data type of the primary key column is tablestore.<tablename>.primarykey.type. 
        // <tablename> is a placeholder for the data table name. 
        // If the primary key mode is kafka, you do not need to specify the name and data type of the primary key column. The default primary key column names {"topic_partition","offset"} and the default data types {string, integer} of the primary key columns are used. 
        // If the primary key mode is record_key or record_value, you must specify the name and data type of the primary key column. 
        // "tablestore.test.primarykey.name":"A,B",
        // "tablestore.test.primarykey.type":"string,integer",
    
        // Specify an attribute column whitelist to filter the fields in the record values to obtain the required attribute columns. 
        // By default, the attribute column whitelist is empty. All fields in the record values are used as the attribute columns of the data table. 
        // The format of the attribute column name is tablestore.<tablename>.columns.whitelist.name. The format of the data type of the attribute column is tablestore.<tablename>.columns.whitelist.type. 
        // <tablename> is a placeholder for the data table name. 
        // "tablestore.test.columns.whitelist.name":"A,B",
        // "tablestore.test.columns.whitelist.type":"string,integer",
    
        // Specify how to write Kafka message records to the destination Tablestore table: 
        // Specify the write mode. Valid values: put and update. Default value: put. 
        // put: Data in the table is overwritten by Kafka message records. 
        // update: Data in the table is updated by Kafka message records. 
        "insert.mode":"put",
        // Specify whether to write data in the sequence that data is read. Default value: true. You can disable this option to improve the write performance. 
        "insert.order.enable":"true",
        // Specify whether to automatically create a destination table. Default value: false. 
        "auto.create":"false",
    
        // Specify the delete mode. Valid values: none, row, column, and row_and_column. Default value: none. 
        // none: No delete operations can be performed. 
        // row: Rows can be deleted. 
        // column: Attribute columns can be deleted. 
        // row_and_column: Rows and attribute columns can be deleted. 
        "delete.mode":"none",
    
        // Specify the maximum number of rows that can be included in the buffer queue in the memory when data is written to the data table. Default value: 1024. The value of this parameter must be an exponent of 2. 
        "buffer.size":"1024",
        // Specify the number of callback threads that are used when data is written to the data table. Default value = Number of vCPUs + 1. 
        // "max.thread.count":
        // Specify the maximum number of concurrent write requests that can be sent to write data to the data table. Default value: 10. 
        "max.concurrency":"10",
        // Specify the number of buckets to which data is written. Default value: 3. You can increase the value of this parameter to increase the concurrent write capability. However, you cannot set the value of this parameter to a value greater than the maximum number of concurrent write requests that you specified. 
        "bucket.count":"3",
        // Specify the interval at which the buffer queue is refreshed when data is written to the data table. Unit: milliseconds. Default value: 10000. 
        "flush.Interval":"10000",
    
        // Specify how to process dirty data: 
        // An error may occur when the Kafka message records are parsed or written to the data table. You can specify the following two parameters to determine how to fix the error: 
        // Specify the fault tolerance capability. Valid values: none and all. Default value: none. 
        // none: An error causes the data import task that uses Tablestore Sink Connector to fail. 
        // all: The message records for which errors are reported are skipped and logged. 
        "runtime.error.tolerance":"none",
        // Specify how dirty data is logged. Valid values: ignore, kafka, and tablestore. Default value: ignore. 
        // ignore: All errors are ignored. 
        // kafka: The message records for which errors are reported and the error messages are stored in a different Kafka topic. 
        // tablestore: The message records for which errors are reported and the error messages are stored in a different Tablestore data table. 
        "runtime.error.mode":"ignore"
    
        // If you set runtime.error.mode to kafka, you must specify the Kafka cluster address and the topic. 
        // "runtime.error.bootstrap.servers":"localhost:9092",
        // "runtime.error.topic.name":"errors",
    
        // If you set runtime.error.mode to tablestore, you must specify the name of the Tablestore data table. 
        // "runtime.error.table.name":"errors",
      }

Parameters

The following table describes the parameters in the configuration file. You need to configure time series-related parameters only when you synchronize data from Kafka to a time series table in Tablestore.

Category Parameter Type Required Example Description
Kafka Connect parameters name string Yes tablestore-sink The name of the connector. The connector name must be unique.
connector.class class Yes TableStoreSinkConnector The Java class of the connector.
If you want to use the connector, specify the connector class by using connector.class. You can set connector.class to the full name or alias of the connector class. The full name of the connector class is com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector and the alias of the connector class is TableStoreSinkConnector.
connector.class=com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector
tasks.max integer Yes 3 The maximum number of tasks that can be created for the connector.

If the maximum number of tasks fail to be created, fewer tasks may be created.

key.converter string No org.apache.kafka.connect.json.JsonConverter The key converter that is used to replace the default key converter that is specified in the worker configuration file.
value.converter string No org.apache.kafka.connect.json.JsonConverter The value converter that is used to replace the default value converter that is specified in the worker configuration file.
topics list Yes test The list of Kafka topics that can be specified for the connector. Separate multiple Kafka topics with commas (,).

You must specify topics to manage topics that are specified for the connector.

Connector connection parameters tablestore.endpoint string Yes https://xxx.xxx.ots.aliyuncs.com The endpoint of the Tablestore instance. For more information, see Endpoint.
tablestore.mode string Yes timeseries The type of the destination table. Default value: normal. Valid values:
  • normal: a data table in Tablestore.
  • timeseries: a time series table in Tablestore.
tablestore.access.key.id string Yes LTAn******************** The AccessKey ID and AccessKey secret of your account. For more information about how to obtain the AccessKey ID and the AccessKey secret, see Obtain an AccessKey pair.
tablestore.access.key.secret string Yes zbnK**************************
tablestore.auth.mode string Yes aksk The authentication mode. Default value: aksk. Valid values:
  • aksk: uses the AccessKey ID and the AccessKey secret of an Alibaba Cloud account or a RAM user for authentication. In this topic, tablestore.auth.mode is set to aksk.
  • sts: uses the temporary access credentials that are obtained from Security Token Service (STS) for authentication. If Tablestore is connected to Message Queue for Apache Kafka, set tablestore.auth.mode to sts.
tablestore.instance.name string Yes myotstest The name of the Tablestore instance.
Data mapping parameters of the connector event.parse.class class Yes DefaultEventParser The Java class of the EventParser. Default value: DefaultEventParser. The parser parses Kafka message records to obtain the primary key column and attribute column of the data table.
Notice Tablestore provides limits on the size of column values. The values of primary key columns of the string or binary type cannot exceed 1 KB in size, and the values of attribute columns cannot exceed 2 MB in size. For more information, see General limits.

If the column values exceed the limits after the data types are converted, the Kafka message records are processed as dirty data.

To use DefaultEventParser, the keys or values of the Kafka message records must be of the Struct or Map class of Kafka Connect. The selected fields in Struct must be of data types that are supported by Tablestore Sink Connector. The fields are converted to data of the Tablestore data types based on the data type mapping table and then written to the data table. The data types of the values in Map must be the data types that are supported by Tablestore Sink Connector. Tablestore Sink Connector supports the same data types in Struct and Map. The values in Map are converted to data of the binary type and then written to the data table. For more information about the data type mappings between Kafka and Tablestore, see Appendix: Data type mappings between Kafka and Tablestore.

If the data types of Kafka message records are incompatible with Tablestore Sink Connector, you can call the operation that is defined by com.aliyun.tablestore.kafka.connect.parsers.EventParser to configure the parser.

table.name.format string No kafka_<topic> The format string for the name of the destination Tablestore data table. Default value: <topic>. <topic> can be used in the string as a placeholder for the topic from which you want to export data. For example, if table.name.format is set to kafka_<topic>, and the name of the Kafka topic from which you want to export data is test, the Kafka message records from the test topic are mapped to the table named kafka_test in Tablestore.

topics.assign.tables is assigned a higher priority than table.name.format. If topics.assign.tables is specified, ignore the configuration of table.name.format.

topics.assign.tables list Yes test:destTable Specifies the mapping between the topic and the destination Tablestore table in the <topic_1>:<tablename_1>,<topic_2>:<tablename_2> format. Separate multiple mappings with commas (,). For example, test:destTable specifies that the message records from the topic named test are written to the data table named destTable.

topics.assign.tables is assigned a higher priority than table.name.format. If topics.assign.tables is specified, ignore the configuration of table.name.format.

primarykey.mode string No kafka The primary key mode of the data table. Valid values:
  • kafka: <connect_topic>_<connect_partition> and <connect_offset> are used as the primary key of the data table. The Kafka topic <connect_topic> and partition <connect_partition> are separated with an underscore (_), and <connect_offset> specifies the offset of the message record in the partition.
  • record_key: The fields of the Struct class or the keys of the Map class in the record keys are used as the primary key of the data table.
  • record_value: The fields of the Struct class or the keys of the Map class in the record values are used as the primary key of the data table.

Configure this parameter together with tablestore.<tablename>.primarykey.name and tablestore.<tablename>.primarykey.type. The value of this parameter is not case-sensitive.

tablestore.<tablename>.primarykey.name list No A,B The primary key column name of the data table. <tablename> is a placeholder for the data table name. The value of this parameter contains one to four primary key column names that are separated with commas (,).
The primary key column name varies with the primary key mode.
  • If the primary key mode is set to kafka, the default value of this parameter is topic_partition,offset. In the kafka primary key mode, you do not need to specify the primary key column names. Even if the primary key column names are specified, the default primary key column names take precedence.
  • If the primary key mode is set to record_key, the fields of the Struct class or the keys of the Map class that have the same names as the specified primary key column names are extracted from the record keys as the primary key of the data table. In the record_key primary key mode, you must specify the primary key column names.
  • If the primary key mode is set to record_value, the fields of the Struct class or the keys of the Map class that have the same names as the specified primary key column names are extracted from the record values as the primary key of the data table. In the record_value primary key mode, you must specify the primary key column names.

The primary key columns of the Tablestore data table are sequential. You need to take note of the sequence of primary key columns when you define tablestore.<tablename>.primarykey.name. For example, PRIMARY KEY (A, B, C) and PRIMARY KEY (A, C, B) have different schemas.

tablestore.<tablename>.primarykey.type list No string, integer The data type of the primary key column in the data table. <tablename> is a placeholder for the name of the data table. The value of this parameter contains one to four data types of the primary key columns. Separate data types of the primary key columns with commas (,). The sequence of data types of the primary key columns must correspond to the sequence of the primary key column names that are specified by tablestore.<tablename>.primarykey.name. The value of this parameter is not case-sensitive. Valid values: integer, string, binary, and auto_increment.
The data type of the primary key column varies with the primary key mode.
  • If the primary key mode is set to kafka, the default value of this parameter is string, integer.

    In the kafka primary key mode, you do not need to specify the data types of the primary key columns. Even if the data types of the primary key columns are specified, the default data types of the primary key columns take precedence.

  • If the primary key mode is set to record_key or record_value, you must specify the data types of the primary key columns.

    If the specified data type of the primary key column conflicts with the data type defined in the Kafka schema, a parse error occurs. In this case, you can configure Runtime Error parameters to fix the error.

    If this parameter is set to auto_increment, the field of the Kafka message records is inserted to the data table as an auto-increment primary key column when data is written to the data table.

tablestore.<tablename>.columns.whitelist.name list No A,B The name of the attribute column in the attribute column whitelist. <tablename> is a placeholder for the data table name. Separate attribute column names with commas (,).

If you do not configure this parameter, all fields of the Struct class or all keys of the Map class in the record values are used as the attribute columns of the data table. If you configure this parameter, the fields in the record values are filtered based on the specified attribute column whitelist to obtain the required attribute columns.

tablestore.<tablename>.columns.whitelist.type list No string, integer The data type of the attribute column in the attribute column whitelist. <tablename> is a placeholder for the data table name. Separate data types of the attribute columns with commas (,). The sequence of data types of the attribute columns must correspond to the sequence of the attribute column names that are specified by <tablename>.columns.whitelist.name. The value of this parameter is not case-sensitive. Valid values: integer, string, binary, boolean, and double.
Connector write parameters insert.mode string No put The write mode. Default value: put. Valid values:
  • put: Existing data is overwritten by a row of data that you write to the table. This value corresponds to the PutRow operation of Tablestore.
  • update: When you update a row of data, attribute columns are added to the row or the values of existing attribute columns are updated. This value corresponds to the UpdateRow operation of Tablestore.

The value of this parameter is not case-sensitive.

insert.order.enable boolean No true Specifies whether data is written to the data table in the sequence that data is read. Default value: true. Valid values:
  • true: Kafka message records are written to the data table in the sequence that message records are read.
  • false: Kafka message records are written to the data table without a specific sequence. This improves the write performance.
auto.create boolean No false Specifies whether to automatically create a destination table. A data table or a time series table can be automatically created. Default value: false. Valid values:
  • true: The system automatically creates a destination Tablestore table.
  • false: The system does not automatically create a destination Tablestore table.
delete.mode string No none The delete mode. The configuration of this parameter takes effect only when data is synchronized to a data table and the primary key mode is set to record_key. Default value: none. Valid values:
  • none: No delete operations can be performed.
  • row: Rows can be deleted. If a record value is empty, the corresponding row is deleted.
  • column: Attribute columns can be deleted. If a field value of the Struct class or a key value of the Map class in the record values is empty, the corresponding attribute column is deleted.
  • row_and_column: Rows and attribute columns can be deleted.

The value of this parameter is not case-sensitive.

This parameter is specified based on the value of the insert.mode parameter. For more information, see Appendix: Delete syntax.

buffer.size integer No 1024 The maximum number of rows that can be included in the buffer queue in the memory when data is written to the data table. Default value: 1024. The value of this parameter must be an exponent of 2.
max.thread.count integer No 3 The number of callback threads that are used when data is written to the data table. Default value = Number of vCPUs + 1.
max.concurrency integer No 10 The maximum number of concurrent write requests that can be sent to write data to the data table.
bucket.count integer No 3 The number of buckets to which data is written. Default value: 3. If you increase the value of this parameter, the concurrent write capability can be increased. However, you cannot set the value of this parameter to a value greater than the maximum number of concurrent write requests that you specified.
flush.Interval integer No 10000 The interval at which the buffer queue is refreshed when data is written to the data table. Unit: milliseconds. Default value: 10000.
Connector Runtime Error parameters runtime.error.tolerance string No none The error handling policy that is used if an error occurs when the Kafka message records are parsed or written to the table. Default value: none. Valid values:
  • none: An error causes the data import task that uses Tablestore Sink Connector to fail.
  • all: The message records for which errors are reported are skipped and logged.

The value of this parameter is not case-sensitive.

runtime.error.mode string No ignore Specifies how to process the message records for which errors are reported when Kafka message records are parsed or written to the table. Default value: ignore. Valid values:
  • ignore: All errors are ignored.
  • kafka: The message records for which errors are reported and the error messages are stored in a different Kafka topic. In this case, you need to specify runtime.error.bootstrap.servers and runtime.error.topic.name. The keys and values of the Kafka message records for which errors are reported in the new topic are the same as those of the message records in the topic from which you want to export data. The ErrorInfo field is included in the header to log the error messages.
  • tablestore: The message records for which errors are reported and the error messages are stored in a different Tablestore data table. In this case, you need to specify runtime.error.table.name. The primary key columns of the data table that is used to log message records for which errors are reported and the error messages are topic_partition (string type) and offset (integer type). The attribute columns of the data table are key (bytes type), value (bytes type), and error_info (string type).

If runtime.error.mode is set to kafka, you need to serialize the headers, keys, and values of the Kafka message records. If runtime.error.mode is set to tablestore, you need to serialize the keys and values of the Kafka message records. By default, org.apache.kafka.connect.json.JsonConverter is used to serialize data and schemas.enable is set to true. You can use JsonConverter to deserialize data to obtain the original data. For more information about Converter, see Kafka Converter.

runtime.error.bootstrap.servers string No localhost:9092 The address of the Kafka cluster where the message records for which errors are reported and the error messages are stored.
runtime.error.topic.name string No errors The name of the Kafka topic that stores the message records for which errors are reported and the error messages.
runtime.error.table.name string No errors The name of the Tablestore table that stores the message records for which errors are reported and the error messages.
Time series-related parameters tablestore.timeseries.<tablename>.measurement string Yes mName Specifies that the values that correspond to the specified key in JSON formatted data are written to the time series table as the values of the _m_name field.

If tablestore.timeseries.<tablename>.measurement is set to <topic>, the values that correspond to the topic key of Kafka message records are written to the time series table as the values of the _m_name field.

<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements. For example, if the name of the time series table is test, the parameter name is tablestore.timeseries.test.measurement.

tablestore.timeseries.<tablename>.dataSource string Yes ds Specifies that the values that correspond to the ds key in JSON formatted data are written to the time series table as the values of the _data_source field.

<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements.

tablestore.timeseries.<tablename>.tags list Yes region,level Specifies that the values that correspond to the region and level keys in JSON formatted data are written to the time series table as the values of the tags field.

<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements.

tablestore.timeseries.<tablename>.time string Yes timestamp Specifies that the values that correspond to the timestamp key in JSON formatted data are written to the time series table as the values of the _time field.

<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements.

tablestore.timeseries.<tablename>.time.unit string Yes MILLISECONDS The unit of the values of the tablestore.timeseries.<tablename>.time parameter. Valid values: SECONDS, MILLISECONDS, MICROSECONDS, and NANOSECONDS.

<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements.

tablestore.timeseries.<tablename>.field.name list No cpu,io Specifies that the cpu and io keys in JSON formatted data are written to the time series table as the names of _field_name and the values that correspond to the cpu and io keys in JSON formatted data are written to the time series table as the values of _field_name.

<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements.

tablestore.timeseries.<tablename>.field.type string No double,integer The data type of the field that is specified by tablestore.timeseries.<tablename>.field.name. Valid values: double, integer, string, binary, and boolean. Separate multiple data types with commas (,).

<tablename> in the parameter is a placeholder for the name of the time series table. Modify the parameter name based on your business requirements.

tablestore.timeseries.mapAll boolean No false Specifies whether fields other than the primary key fields and time fields in JSON formatted data are written to the time series table as fields.

If tablestore.timeseries.mapAll is set to false, you must configure the tablestore.timeseries.<tablename>.field.name and tablestore.timeseries.<tablename>.field.type parameters.

tablestore.timeseries.toLowerCase boolean No true Specifies whether the keys in the fields are converted to lowercase letters before being written/and then written to the time series table. The keys in the fields are keys in the non-primary key fields or non-time fields, or keys specified in tablestore.timeseries.<tablename>.field.name.
tablestore.timeseries.rowsPerBatch integer No 50 The maximum number of rows that can be written to Tablestore in a request. The maximum and default values are 200.

Appendix: Data type mappings between Kafka and Tablestore

The following table describes the mappings between the data types of Kafka and Tablestore.

Kafka schema type Tablestore data type
STRING STRING
INT8, INT16, INT32, and INT64 INTEGER
FLOAT32 and FLOAT64 DOUBLE
BOOLEAN BOOLEAN
BYTES BINARY

Appendix: Delete syntax

Note This feature is supported only when data is synchronized from Kafka to a data table in Tablestore.

The following table describes the methods that are used to write data to a Tablestore data table based on the configurations of the write mode (insert.mode) and delete mode (delete.mode) when message records contain empty values and data is synchronized from Kafka to a data table in Tablestore.

insert.mode put update
delete.mode none row column row_and_column none row column row_and_column
Empty values Overwrite Delete rows Overwrite Delete rows Dirty data Delete rows Dirty data Delete rows
All empty fields in values Overwrite Overwrite Overwrite Overwrite Dirty data Dirty data Delete columns Delete columns
Some empty fields in values Overwrite Overwrite Overwrite Overwrite Ignore empty values Ignore empty values Delete columns Delete columns