All Products
Search
Document Center

ApsaraMQ for Kafka:Create Tablestore sink connectors

Last Updated:Mar 13, 2025

This topic describes how to create a Tablestore sink connector to synchronize data from a source topic on an ApsaraMQ for Kafka instance to a table on a Tablestore instance.

Prerequisites

Step 1: Create a Tablestore table

Create a Tablestore table to synchronize data from ApsaraMQ for Kafka to Tablestore. For more information, see Procedure.

In this example, an instance named ots-sink and a data table named ots_sink_table are created. Primary keys topic, partition, and offset are specified when the data table is created.image

Step 2: Create and start a Tablestore sink connector

  1. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  2. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  3. On the Tasks page, click Create Task.

  4. On the Create Task page, configure the Task Name and Description parameters and follow the on-screen instructions to configure other parameters. Then, click Save. The following section describes the parameters:

    • Task Creation

      1. In the Source step, set the Data Provider parameter to ApsaraMQ for Kafka and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.

        Parameter

        Description

        Example

        Region

        The region where the source ApsaraMQ for Kafka instance resides.

        China (Beijing)

        ApsaraMQ for Kafka Instance

        The ApsaraMQ for Kafka instance in which the messages that you want to route are produced.

        alikafka_post-cn-jte3****

        Topic

        The topic on the ApsaraMQ for Kafka instance in which the messages that you want to route are produced.

        demo-topic

        Group ID

        The name of the consumer group on the source ApsaraMQ for Kafka instance.

        • Quickly Create: The system automatically creates a consumer group named in the GID_EVENTBRIDGE_xxx format. We recommend that you select this value.

        • Use Existing Group: Select the ID of an existing group that is not in use. If you select an existing group that is in use, the publishing and subscription of existing messages are affected.

        Quickly Create

        Consumer Offset

        The offset from which messages are consumed. Valid values:

        • Latest Offset

        • Earliest Offset

        Latest Offset

        Network Configuration

        The type of the network over which you want to route the messages. Valid values:

        • Basic Network

        • Self-managed Internet

        Basic Network

        VPC

        The ID of the virtual private cloud (VPC) in which the ApsaraMQ for Kafka instance is deployed. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        The ID of the vSwitch to which the ApsaraMQ for Kafka instance belongs. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.

        vsw-bp1gbjhj53hdjdkg****

        Security Group

        The ID of the security group to which the ApsaraMQ for Kafka instance belongs. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.

        alikafka_pre-cn-7mz2****

        Data Format

        The data format feature is used to encode binary data delivered from the source into a specific data format. Multiple data formats are supported. If you do not have special requirements on encoding, specify Json as the value.

        • Json: Binary data is encoded into JSON-formatted data based on UTF-8 encoding and then put into the payload.

        • Text: Binary data is encoded into strings based on UTF-8 encoding and then put into the payload. This is the default value.

        • Binary: Binary data is encoded into strings based on Base64 encoding and then put into the payload.

        Json

        Messages

        The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.

        100

        Interval (Unit: Seconds)

        The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 specifies that messages are sent immediately after aggregation.

        3

      2. In the Filtering step, define a data pattern in the Pattern Content code editor to filter data. For more information, see Event patterns.

      3. In the Transformation step, specify a data cleansing method to implement data splitting, mapping, enrichment, and routing capabilities. For more information, see Use Function Compute to perform message cleansing.

      4. In the Sink step, set the Service Type parameter to Tablestore and follow the on-screen instructions to configure other parameters. The following table describes the parameters.

        Parameter

        Description

        Example

        Instance Name

        The name of the Tablestore instance that you created.

        ost-sink

        Destination Table

        The Tablestore data table that you created.

        ost_sink_table

        Primary Key

        The method that you want to use to generate primary keys and attribute columns in Tablestore. You must define a rule in JSONPath syntax to extract the content of each attribute column. If you set the Data Format parameter to Json in the Source step, the format of data forwarded from ApsaraMQ for Kafka is as shown in the following code:

        {
            "data": {
                "topic": "demo-topic",
                "partition": 0,
                "offset": 2,
                "timestamp": 1739756629123,
                "headers": {
                    "headers": [],
                    "isReadOnly": false
                },
                "key":"ots-sink-k1",
                "value": "ots-sink-v1"
            },
            "id": "7702ca16-f944-4b08-***-***-0-2",
            "source": "acs:alikafka",
            "specversion": "1.0",
            "type": "alikafka:Topic:Message",
            "datacontenttype": "application/json; charset=utf-8",
            "time": "2025-02-17T01:43:49.123Z",
            "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic",
            "aliyunaccountid": "1******6789"
        }

        For example, you can specify topic as the primary key name and $.data.topic as the numerical extraction rule.

        Attribute Column

        For example, you can specify key as the attribute column name and $.data.key as the numerical extraction rule.

        Operation Mode

        The mode in which data is written to Tablestore. Valid values:

        • put: If the primary keys of two data entries are the same, the new data entry overwrites the old data entry.

        • update: If the primary keys of two data entries are the same, the new data entry is written to the row and the old data entry is retained.

        • delete: The specified keys are deleted.

        put

        Network Configuration

        • VPC: Messages in ApsaraMQ for Kafka are delivered to Tablestore in a virtual private cloud (VPC).

        • Internet: Messages in ApsaraMQ for Kafka are delivered to Tablestore over the Internet.

        VPC

        VPC

        The VPC ID. This parameter is required only if you set the Network Configuration parameter to VPC.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        The vSwitch ID. This parameter is required only if you set the Network Configuration parameter to VPC.

        vsw-bp1gbjhj53hdjdkg****

        Security Group

        The security group ID. This parameter is required only if you set the Network Configuration parameter to VPC.

        test_group

    • Task Property

      Configure the retry policy that you want to use when events fail to be pushed and the method that you want to use to handle faults. For more information, see Retry policies and dead-letter queues.

  5. Go back to the Tasks page, find the Tablestore sink connector that you created, and then click Enable in the Actions column.

  6. In the Note message, click OK.

    The sink connector requires 30 to 60 seconds to be enabled. You can view the progress in the Status column on the Tasks page.

Step 3: Test the Tablestore sink connector

  1. On the Tasks page, find the Tablestore sink connector that you created and click the name of the source topic in the Event Source column.

  2. On the Topic Details page, click Send Message.
  3. In the Start to Send and Consume Message panel, configure the parameters based on the following figure and click OK.

    image

  4. On the Tasks page, find the Tablestore sink connector that you created and click the name of the destination table in the Event Target column.

  5. On the Query Data tab of the Manage Table page, view the data that is stored in the Tablestore table.

    image