This topic describes how to create a Tablestore sink connector and how to export data from a data source topic in a Message Queue for Apache Kafka instance to Tablestore.

Prerequisites

  • Message Queue for Apache Kafka
    • The connector feature is enabled for the Message Queue for Apache Kafka instance. For more information, see Enable the connector feature.
    • A topic is created as the data source in the Message Queue for Apache Kafka instance. For more information, see Step 1: Create a topic.
  • Tablestore

Precautions

  • You can export data from a data source topic in a Message Queue for Apache Kafka instance only to a Tablestore instance in the same region as the Message Queue for Apache Kafka instance. For more information about the limits on connectors, see Limits.
  • When you create a connector, Message Queue for Apache Kafka automatically creates a service-linked role for you.
    • If no service-linked role is available, Message Queue for Apache Kafka automatically creates a role for you to export data from Message Queue for Apache Kafka to Tablestore.
    • If the service-linked role is available, Message Queue for Apache Kafka does not create a new one.
    For more information about service-linked roles, see Service-linked roles.

Procedure

This section describes how to export data from a data source topic in the Message Queue for Apache Kafka instance to Tablestore by using a Tablestore sink connector.

  1. Optional: Create the topics and group that are required by a Tablestore sink connector.

    If you do not want to customize the topics and group, skip the steps of manually creating the topics and group, and set the Resource Creation Method parameter to Auto in the step of creating the sink connector.

    Notice Some topics that are required by a Tablestore sink connector must use a local storage engine. If the major version of your Message Queue for Apache Kafka instance is 0.10.2, you cannot manually create topics that use a local storage engine. In major version 0.10.2, these topics must be automatically created.
    1. Create the topics that are required by a Tablestore sink connector.
    2. Create the group that is required by a Tablestore sink connector.
  2. Create and deploy a Tablestore sink connector.
  3. Verify the results.
    1. Send a test message.
    2. View data in the Tablestore table.

Create the topics that are required by a Tablestore sink connector

In the Message Queue for Apache Kafka console, you can manually create the five topics required by the Tablestore sink connector. The five topics are: task offset topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. The five topics differ in the partition number and storage engine. For more information, see Table 1.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
    Notice You must create a topic in the region where your application resides. To do this, select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if a topic is created in the China (Beijing) region, the message producer and consumer must run on ECS instances that reside in the China (Beijing) region.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, click Create Topic.
  6. In the Create Topic panel, set the properties of the topic and click OK.
    Create a topic
    Parameter Description Example
    Name The name of the topic. demo
    Description The description of the topic. demo test
    Partitions The number of partitions in the topic. 12
    Storage Engine The storage engine of the topic.

    Message Queue for Apache Kafka supports the following storage engines:

    • Cloud Storage: If this option is selected, disks provided by Alibaba Cloud are used and three replicas are stored in distributed mode. This type of storage engine features low latency, high performance, durability, and high reliability. If the Instance Edition of your instance is Standard (High Write), you can select only Cloud Storage.
    • Local Storage: If this option is selected, the in-sync replicas (ISR) algorithm of open source Apache Kafka is used and three replicas are stored in distributed mode.
    Cloud Storage
    Message Type The message type of the topic.
    • Normal Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the order of the messages may not be preserved in affected partitions. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
    • Partitionally Ordered Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the order of the messages are preserved in affected partitions. However, messages in the affected partitions cannot be sent until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
    Normal Message
    Log Cleanup Policy The log cleanup policy for the topic.

    If you set the Storage Engine parameter to Local Storage, you must set the Log Cleanup Policy parameter.

    Message Queue for Apache Kafka supports the following log cleanup policies:

    • Delete: The default log cleanup policy is used. If the system has sufficient disk space, messages are retained for the maximum retention period. The system considers disk space to be insufficient when the disk usage exceeds 85%. When disk space is insufficient, the system deletes messages starting from the earliest stored message to ensure service availability.
    • Compact: The Apache Kafka log compaction policy is used. If the keys of different messages are the same, messages that have the latest key values are retained. This policy applies to scenarios in which the system recovers from a failure, or the cache is reloaded after a system restart. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the system status information or configuration information in a log-compacted topic.
      Notice Log-compacted topics are generally used only in specific ecosystem components, such as Kafka Connect or Confluent Schema Registry. Do not use this log cleanup policy for a topic that is used to send and subscribe to messages in other components. For more information, see Message Queue for Apache Kafka demos.
    Compact
    Tag The tags to be attached to the topic. demo
    After the topic is created, it is displayed on the Topics page.

Create the group that is required by a Tablestore sink connector

In the Message Queue for Apache Kafka console, you can manually create the group that is required by a Tablestore sink connector. The group name must be in the connect-Task name format. For more information, see Table 1.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Connectors.
  4. On the Groups page, click Create Group.
  5. In the Create Group panel, enter the group name in the Group ID field and the group description in the Description field, attach tags to the group, and then click OK.
    After the group is created, it is displayed on the Groups page.

Create and deploy a Tablestore sink connector

Create and deploy a Tablestore sink connector that you can use to export data from Message Queue for Apache Kafka to Tablestore.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Connectors.
  5. On the Connectors page, click Create Connector.
  6. In the Create Connector wizard, perform the following steps:
    1. In the Configure Basic Information step, set parameters that are shown in the following table and click Next.
      Parameter Description Example
      Name The name of the connector. Take note of the following rules when you specify a connector name:
      • The connector name must be 1 to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).
      • Each connector name must be unique within a Message Queue for Apache Kafka instance.

      The name of the group that is used by the connector task must be in the connect-task name format. If you have not already created such a group, Message Queue for Apache Kafka automatically creates one for you.

      kafka-ts-sink
      Instance The information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, set the parameters that are shown in the following table, and then click Next.
      Note If you have created the required topics and group, set the Resource Creation Method parameter to Manual, and enter the names of the resources in the fields below. Otherwise, set the Resource Creation Method parameter to Auto.
      Table 1. Parameters in the Configure Source Service step
      Parameter Description Example
      Data Source Topic The name of the data source topic from which data is to be exported. ts-test-input
      Consumer Thread Concurrency The number of concurrent consumer threads used to export data from the data source topic. Default value: 6. Valid values:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      Consumer Offset The offset where consumption starts. Valid values:
      • Earliest Offset: Consumption starts from the earliest offset.
      • Latest Offset: Consumption starts from the latest offset.
      Earliest Offset
      VPC ID The ID of the virtual private cloud (VPC) where the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The default parameter value that is displayed is the VPC ID that you specified when you deployed the Message Queue for Apache Kafka instance. You do not need to change the value. vpc-bp1xpdnd3l***
      vSwitch ID The ID of the vSwitch where the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the Message Queue for Apache Kafka instance. The default parameter that is displayed is the vSwitch ID that you specified when you deployed the Message Queue for Apache Kafka instance. vsw-bp1d2jgg81***
      Failure Handling Policy Specifies whether to retain the subscription to the partition to which a message fails to be sent. Click Configure Runtime Environment to display the parameter. Valid values:
      • Continue Subscription: retains the subscription to the partition where an error occurs and returns the logs.
      • Stop Subscription: stops the subscription to the partition where an error occurs and returns the logs.
      Note
      • For more information about how to view the connector logs, see View connector logs.
      • For more information about how to troubleshoot errors based on error codes, see Error codes.
      • To resume the subscription to the partition where an error occurs, submit a ticket to the technical support of Message Queue for Apache Kafka.
      Continue Subscription
      Resource Creation Method The method to create the topics and group that are required by the Tablestore sink connector. Click Configure Runtime Environment to display the parameter.
      • Auto
      • Manual
      Auto
      Connector Consumer Group The group that is required by the data synchronization task of the connector. Click Configure Runtime Environment to display the parameter. The name of this group must be in the connect-Task name format. connect-cluster-kafka-ots-sink
      Task Offset Topic The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-offset.
      • Partitions: the number of partitions in the topic. Set this parameter to a value greater than 1.
      • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
      • cleanup.policy: the log cleanup policy for the topic. Set the value to Compact.
      connect-offset-kafka-ots-sink
      Task Configuration Topic The topic that is used to store task configurations. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-config.
      • Partitions: the number of partitions in the topic. Set the value to 1.
      • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
      • cleanup.policy: the log cleanup policy for the topic. Set the value to Compact.
      connect-config-kafka-ots-sink
      Task Status Topic The topic that is used to store task status. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-status.
      • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
      • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
      • cleanup.policy: the log cleanup policy for the topic. Set the value to Compact.
      connect-status-kafka-ots-sink
      Dead-letter Queue Topic The topic that is used to store the error data of the Kafka Connect framework. Click Configure Runtime Environment to display the parameter. You can merge this topic with the error data topic to reduce resource usage.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
      • Storage Engine: the storage engine of the topic. You can set the value to Local Storage or Cloud Storage.
      connect-error-kafka-ots-sink
      Error Data Topic The topic that is used to store the error data of the connector. Click Configure Runtime Environment to display the parameter. You can merge this topic with dead-letter queue topic to reduce resource usage.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
      • Storage Engine: the storage engine of the topic. You can set the value to Local Storage or Cloud Storage.
      connect-error-kafka-ots-sink
    3. In the Configure Destination Service step, select Tablestore as the destination service, set the parameters that are shown in the following table, and then click Create.
      Parameter Description Example
      Instance Name The name of the Tablestore instance. k00eny67****
      Automatically Create Destination Table Specifies whether to automatically create a destination table in Tablestore.
      • Yes: A table is automatically created in Tablestore to store synchronized data. You can customize the table name.
      • No: An existing table is used to store synchronized data.
      Yes
      Destination Table Name The name of the table that stores the synchronized data. If you set Automatically Create Destination Table to No, ensure that the table name you enter is the same as that of an existing table in the Tablestore instance. kafka_table
      Message Key Format The format of the message key in Message Queue for Apache Kafka. You can set this parameter to String or JSON. The default value is JSON.
      • String: The message key is parsed as a string.
      • JSON: The message key must be in the JSON format.
      string
      Message Value Format The format of the message value in Message Queue for Apache Kafka. You can set this parameter to String or JSON. The default value is JSON.
      • String: The message value is parsed as a string.
      • JSON: The message value must be in the JSON format.
      string
      Primary Key Mode Specifies the primary key mode. The primary keys for a data table can be extracted from different parts of Message Queue for Apache Kafka message records, including the Coordinates (Topic, Partition, Offset), Key, and Value of the message records. The default value is kafka.
      • kafka: uses <connect_topic>_<connect_partition> and <connect_offset> as the primary keys of the data table.
      • record_key: uses the fields in the message key as the primary keys of the data table.
      • record_value: uses the fields in the message value as the primary keys of the data table.
      kafka
      Primary Key Column Names The names of the primary key columns and the corresponding data types. A column name specifies the field extracted from the message key or message value. You can set the data type of the field to String or Integer.

      The parameter is displayed when you perform the following steps: Set Message Key Format to JSON, and set Primary Key Mode to record_key. Alternatively, set Message Value Format to JSON, and set Primary Key Mode to record_value.

      Click Create to add a column name. You can configure a maximum of four column names.

      N/A
      Write Mode Specifies the write mode. You can set this parameter to put or update. The default value is put.
      • put: new data overwrites the original data in the table.
      • update: new data is added to the table and the original data is retained.
      put
      Delete Mode Specifies whether you can delete rows or attribute columns when the Message Queue for Apache Kafka message records contain empty values. This parameter is displayed when you set Primary Key Mode to record_key. Valid values:
      • none: The default value. Deletion is not allowed.
      • row: allows you to delete rows.
      • column: allows you to delete attribute columns.
      • row_and_column: allows you to delete rows and attribute columns.
      The deletion operation depends on the write mode.
      • When Write Mode is set to put, set Delete Mode to any value. Even if the message record contains empty values, the data of the message record is exported to the Tablestore data table in the overwrite mode.
      • When the Write Mode is set to update, set Delete Mode to none or row. If all fields in the message record are empty, the message record is regarded as dirty data. If some fields in the message record are empty, these empty values are automatically skipped and other non-empty values are added to the Tablestore table. When you set Delete Mode to column or row_and_column and the message record contains empty values, the system deletes the attribute columns or both rows and attribute columns corresponding to empty values as required. Then, the remaining data is added to the Tablestore table.
      N/A
      After the connector is created, you can view it on the Connectors page.
  7. Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.
  8. Click OK.

Send a test message

After you deploy a Tablestore sink connector, you can send a message to the data source topic in Message Queue for Apache Kafka to test whether the message can be synchronized to Tablestore.

  1. On the Connectors page, find the connector that you created, and click Test in the Actions column.
  2. In the Send Message panel, set the parameters or use the method as prompted to send a test message.
    • Set the Method of Sending parameter to Console.
      1. In the Message Key field, enter the key of the test message, such as demo.
      2. In the Message Content field, enter the content of the test message, such as {"key": "test"}.
      3. Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
        • If you want to send the test message to a specific partition, click Yes and enter the partition ID, such as 0, in the Partition ID field. For more information about how to query partition IDs, see View partition status.
        • If you do not want to send the test message to a specific partition, click No.
    • Set the Method of Sending parameter to Docker and run the docker commands provided in the Run the Docker container to produce a sample message section to send the test message.
    • Set the Method of Sending parameter to SDK, select a programming language or a framework, and then select an access method to use the corresponding SDK to send the test message.

View data in the Tablestore table

After you send a message to the data source topic in Message Queue for Apache Kafka, you can check whether the message is received in the Tablestore console. Perform the following steps to view the data in the Tablestore table:
  1. Log on to the Tablestore console.
  2. On the Overview page, click the name of the instance that you want to manage, or click Manage Instance in the Actions column of the instance.
  3. On the Instance Details tab, find the table that you want to manage in the Tables section. View the data table
  4. Click the table name. On the Query Data tab of the Manage Table page, view the data in the table. View data in the table