Tablestore Sink Connector pulls message records based on the subscribed topics from Apache Kafka in poll mode, parses the message records, and then batch imports the data to a data table in Tablestore.

Prerequisites

  • Apache Kafka is installed and enabled, and ZooKeeper is enabled. For more information, see Kafka documentation.
  • The Tablestore service is activated, and an instance and a data table are created. For more information, see Use the Wide Column model.
    Note You can also use Tablestore Sink Connector to automatically create a destination data table. To create this data table, set auto.create to true.
  • An AccessKey pair is obtained. For more information, see Obtain an AccessKey pair.

Step 1: Deploy Tablestore Sink Connector

  1. Obtain the Tablestore Sink Connector package by using one of the following methods:
    • Download the source code from Tablestore Sink Connector source code on GitHub and compile the source code.
      1. Run the following command to download the source code of Tablestore Sink Connector by using the Git tool:
        git clone https://github.com/aliyun/kafka-connect-tablestore.git
      2. Go to the directory where the source code that you downloaded is stored, and run the following command to package the source code by using Maven:
        mvn clean package -DskipTests

        After the compilation is complete, the generated package is stored in the target directory. The kafka-connect-tablestore-1.0.jar package is used as an example.

    • Download the kafka-connect-tablestore package that has been compiled.
  2. Copy the package to the $KAFKA_HOME/libs directory on each node.

Step 2: Start Tablestore Sink Connector

Tablestore Sink Connector can work in the standalone or distributed mode. You can select a mode based on your business requirements.

To use Tablestore Sink Connector in the standalone mode, perform the following steps:

  1. Modify the worker configuration file connect-standalone.properties and the connector configuration file connect-tablestore-sink-quickstart.properties based on your requirements.
    • Example on how to modify the worker configuration file connect-standalone.properties

      The worker configuration file contains configuration items. These items include the Kafka connection parameters, the serialization format, and the frequency at which the offsets are committed. The following sample code is an example that is provided by Apache Kafka on how to modify the worker configuration file. For more information, see Kafka Connect.

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #    http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      
      # These are defaults. This file just demonstrates how to override some settings.
      bootstrap.servers=localhost:9092
      
      # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
      # need to configure these based on the format they want their data in when loaded from or stored into Kafka
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
      # it to
      key.converter.schemas.enable=true
      value.converter.schemas.enable=true
      
      offset.storage.file.filename=/tmp/connect.offsets
      # Flush much faster than normal, which is useful for testing/debugging
      offset.flush.interval.ms=10000
      
      # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
      # (connectors, converters, transformations). The list should consist of top level directories that include 
      # any combination of: 
      # a) directories immediately containing jars with plugins and their dependencies
      # b) uber-jars with plugins and their dependencies
      # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
      # Note: symlinks will be followed to discover dependencies or plugins.
      # Examples: 
      # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
      #plugin.path=
    • Example on how to modify the connector configuration file connect-tablestore-sink-quickstart.properties

      The connector configuration file contains configuration items. These items include the connector class, Tablestore connection parameters, and data mapping. For more information, see Configuration description.

      # Specify the connector name. 
      name=tablestore-sink
      # Specify the connector class. 
      connector.class=TableStoreSinkConnector
      # Specify the maximum number of tasks. 
      tasks.max=1
      # Specify the list of Kafka topics from which data is exported. 
      topics=test
      
      # Specify values for the following Tablestore connection parameters: 
      # The endpoint of the Tablestore instance. 
      tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
      # The AccessKey pair which consists of an AccessKey ID and an AccessKey secret. 
      tablestore.access.key.id =xxx
      tablestore.access.key.secret=xxx
      # The name of the Tablestore instance. 
      tablestore.instance.name=xxx
      
      # Specify the format string for the name of the destination table in Tablestore. <topic> is a placeholder for the topic from which you want to export data. Default value: <topic>. 
      # Examples:
      # If table.name.format=kafka_<topic> is specified, the message records from the topic named test are written to the data table named kafka_test. 
      # table.name.format=
      
      # Specify the primary key mode. Default value: kafka. 
      # If the primary key mode is set to kafka, <topic>_<partition> and <offset> are used as the primary key of the Tablestore data table. <topic>_<partition> specifies the Kafka topic and partition, which are separated by an underscore (_). <offset> specifies the offset of the message record in the partition. 
      # primarykey.mode=
      
      # Specify whether to automatically create a destination table. Default value: false. 
      auto.create=true
  2. Go to the $KAFKA_HOME directory and run the following command to enable the standalone mode:
    bin/connect-standalone.sh config/connect-standalone.properties config/connect-tablestore-sink-quickstart.properties

To use Tablestore Sink Connector in the distributed mode, perform the following steps:

  1. Modify the worker configuration file connect-distributed.properties based on your business requirements.
    The worker configuration file contains configuration items. These items include the Kafka connection parameters, the serialization format, the frequency at which the offsets are committed, and the topics that store connector information. We recommend that you create the topics in advance. The following sample code is an example that is provided by Apache Kafka on how to modify the worker configuration file. For more information, see Kafka Connect.
    • offset.storage.topic: specifies the compact topic where connector offsets are stored.
    • config.storage.topic: specifies the compact topic where connector and task configurations are stored. The number of partitions for the compact topic must be set to 1.
    • status.storage.topic: specifies the compact topic where the status information about Kafka Connect is stored.
    ##
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    ##
    
    # This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
    # to be used with the examples, and some settings may differ from those used in a production system, especially
    # the `bootstrap.servers` and those specifying replication factors.
    
    # A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
    bootstrap.servers=localhost:9092
    
    # unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
    group.id=connect-cluster
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
    # need to configure these based on the format they want their data in when loaded from or stored into Kafka
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
    # it to
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    
    # Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
    # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
    # the topic before starting Kafka Connect if a specific topic configuration is needed.
    # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
    # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
    # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=1
    #offset.storage.partitions=25
    
    # Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
    # and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
    # the topic before starting Kafka Connect if a specific topic configuration is needed.
    # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
    # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
    # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
    config.storage.topic=connect-configs
    config.storage.replication.factor=1
    
    # Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
    # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
    # the topic before starting Kafka Connect if a specific topic configuration is needed.
    # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
    # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
    # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
    status.storage.topic=connect-status
    status.storage.replication.factor=1
    #status.storage.partitions=5
    
    # Flush much faster than normal, which is useful for testing/debugging
    offset.flush.interval.ms=10000
    
    # These are provided to inform the user about the presence of the REST host and port configs 
    # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
    #rest.host.name=
    #rest.port=8083
    
    # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
    #rest.advertised.host.name=
    #rest.advertised.port=
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations). The list should consist of top level directories that include 
    # any combination of: 
    # a) directories immediately containing jars with plugins and their dependencies
    # b) uber-jars with plugins and their dependencies
    # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
    # Examples: 
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    #plugin.path=
  2. Go to the $KAFKA_HOME directory and run the following command to enable the distributed mode:
    Notice You need to start the worker process on each node.
    bin/connect-distributed.sh config/connect-distributed.properties
  3. Manage connectors by using the REST API. For more information, see REST API.
    1. Create a file named connect-tablestore-sink-quickstart.json in the config path. The following sample code provides an example of the content that you need to add to the file.
      The connector configuration file specifies the key-value pairs for the configuration items by using strings in the JSON format. These items include the connector class, Tablestore connection parameters, and data mapping. For more information, see Configuration description.
      {
        "name": "tablestore-sink",
        "config": {
          "connector.class":"TableStoreSinkConnector",
          "tasks.max":"1",
          "topics":"test",
          "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
          "tablestore.access.key.id":"xxx",
          "tablestore.access.key.secret":"xxx",
          "tablestore.instance.name":"xxx",
          "table.name.format":"<topic>",
          "primarykey.mode":"kafka",
          "auto.create":"true"
        }
      }
    2. Run the following command to start a Tablestore Sink Connector client:
      curl -i -k  -H "Content-type: application/json" -X POST -d @config/connect-tablestore-sink-quickstart.json http://localhost:8083/connectors

      In the preceding command, http://localhost:8083/connectors is the address of the Kafka REST service. Modify the address based on your business requirements.

Step 3: Generate message records

  1. Go to the $KAFKA_HOME directory and run the following command to start a console producer client:
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    The following table describes the parameters that you need to configure to start a console producer client.

    Parameter Example Description
    --broker-list localhost:9092 The address and port of the broker in the Kafka cluster.
    --topic test The name of the topic. By default, a topic is automatically created when you start Tablestore Sink Connector. You can also manually create a topic.
  2. Write messages to the topic named test.
    • Messages in a Struct
      {
          "schema":{
              "type":"struct",
              "fields":[
                  {
                      "type":"int32",
                      "optional":false,
                      "field":"id"
                  },
                  {
                      "type":"string",
                      "optional":false,
                      "field":"product"
                  },
                  {
                      "type":"int64",
                      "optional":false,
                      "field":"quantity"
                  },
                  {
                      "type":"double",
                      "optional":false,
                      "field":"price"
                  }
              ],
              "optional":false,
              "name":"record"
          },
          "payload":{
              "id":1,
              "product":"foo",
              "quantity":100,
              "price":50
          }
      }
    • Messages in a Map
      {
          "schema":{
              "type":"map",
              "keys":{
                  "type":"string",
                  "optional":false
              },
              "values":{
                  "type":"int32",
                  "optional":false
              },
              "optional":false
          },
          "payload":{
              "id":1
          }
      }
  3. Log on to the Tablestore console to view data.
    A data table named test is automatically created in the Tablestore instance. The following figure shows the data in the data table. Data in the first row is the result of the messages in a Map that are imported and data in the second row is the result of the messages in a Struct that are imported. fig_datanew