全部产品
Search
文档中心

Tablestore:Sinkronisasi data ke tabel data

更新时间:Jul 02, 2025

Tablestore Sink Connector menarik catatan pesan berdasarkan topik yang berlangganan dari Apache Kafka dalam mode polling, mengurai catatan tersebut, dan mengimpor data secara batch ke tabel data di Tablestore.

Prasyarat

  • Apache Kafka telah diinstal dan diaktifkan, serta ZooKeeper telah diaktifkan. Untuk informasi lebih lanjut, lihat Dokumentasi Kafka.

  • Layanan Tablestore telah diaktifkan, dan sebuah instance serta tabel data telah dibuat. Untuk informasi lebih lanjut, lihat Memulai dengan Model Kolom Lebar.

    Catatan

    Anda juga dapat menggunakan Tablestore Sink Connector untuk membuat tabel data tujuan secara otomatis. Untuk membuat tabel ini, atur auto.create menjadi true.

  • Pasangan AccessKey telah diperoleh. Untuk informasi lebih lanjut, lihat Membuat Pasangan AccessKey.

Langkah 1: Menyebarkan Tablestore Sink Connector

  1. Dapatkan paket Tablestore Sink Connector melalui salah satu metode berikut:

    • Unduh kode sumber dari Kode Sumber Tablestore Sink Connector di GitHub dan kompilasi kode tersebut.

      1. Jalankan perintah berikut untuk mengunduh kode sumber Tablestore Sink Connector menggunakan Git:

        git clone https://github.com/aliyun/kafka-connect-tablestore.git
      2. Pergi ke direktori tempat kode sumber disimpan, lalu jalankan perintah berikut untuk mengemas kode menggunakan Maven:

        mvn clean package -DskipTests

        Setelah kompilasi selesai, paket yang dihasilkan akan disimpan di direktori target. Contohnya, digunakan paket kafka-connect-tablestore-1.0.jar.

    • Unduh paket kafka-connect-tablestore yang telah dikompilasi.

  2. Salin paket ke direktori $KAFKA_HOME/libs pada setiap node.

Langkah 2: Memulai Tablestore Sink Connector

Tablestore Sink Connector dapat bekerja dalam mode standalone atau terdistribusi. Pilih mode sesuai kebutuhan bisnis Anda.

Untuk menggunakan Tablestore Sink Connector dalam mode standalone, ikuti langkah-langkah berikut:

  1. Ubah file konfigurasi pekerja connect-standalone.properties dan file konfigurasi konektor connect-tablestore-sink-quickstart.properties sesuai kebutuhan Anda.

    • Contoh cara mengubah file konfigurasi pekerja connect-standalone.properties

      File konfigurasi pekerja mencakup parameter koneksi Kafka, format serialisasi, dan frekuensi pengiriman offset. Contoh kode berikut adalah contoh yang diberikan oleh Apache Kafka tentang cara mengubah file konfigurasi pekerja. Untuk informasi lebih lanjut, lihat Kafka Connect.

      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #    http://www.apache.org/licenses/LICENSE-2.0
      #
      # Unless required by applicable law or agreed to in writing, software
      # distributed under the License is distributed on an "AS IS" BASIS,
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      # See the License for the specific language governing permissions and
      # limitations under the License.
      
      # These are defaults. This file just demonstrates how to override some settings.
      bootstrap.servers=localhost:9092
      
      # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
      # need to configure these based on the format they want their data in when loaded from or stored into Kafka
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
      # it to
      key.converter.schemas.enable=true
      value.converter.schemas.enable=true
      
      offset.storage.file.filename=/tmp/connect.offsets
      # Flush much faster than normal, which is useful for testing/debugging
      offset.flush.interval.ms=10000
      
      # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
      # (connectors, converters, transformations). The list should consist of top level directories that include 
      # any combination of: 
      # a) directories immediately containing jars with plugins and their dependencies
      # b) uber-jars with plugins and their dependencies
      # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
      # Note: symlinks will be followed to discover dependencies or plugins.
      # Examples: 
      # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
      #plugin.path=
    • Contoh cara mengubah file konfigurasi konektor connect-tablestore-sink-quickstart.properties

      File konfigurasi konektor mencakup kelas konektor, parameter koneksi Tablestore, dan pemetaan data. Untuk informasi lebih lanjut, lihat Deskripsi Konfigurasi.

      # Specify the connector name. 
      name=tablestore-sink
      # Specify the connector class. 
      connector.class=TableStoreSinkConnector
      # Specify the maximum number of tasks. 
      tasks.max=1
      # Specify the list of Kafka topics from which data is exported. 
      topics=test
      
      # Specify values for the following Tablestore connection parameters: 
      # The endpoint of the Tablestore instance. 
      tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
      # The AccessKey pair which consists of an AccessKey ID and an AccessKey secret. 
      tablestore.access.key.id =xxx
      tablestore.access.key.secret=xxx
      # The name of the Tablestore instance. 
      tablestore.instance.name=xxx
      
      # Specify the format string for the name of the destination table in Tablestore. <topic> is a placeholder for the topic from which you want to export data. Default value: <topic>. 
      # Examples:
      # If table.name.format=kafka_<topic> is specified, the message records from the topic named test are written to the data table named kafka_test. 
      # table.name.format=
      
      # Specify the primary key mode. Default value: kafka. 
      # If the primary key mode is set to kafka, <topic>_<partition> and <offset> are used as the primary key of the Tablestore data table. <topic>_<partition> specifies the Kafka topic and partition, which are separated by an underscore (_). <offset> specifies the offset of the message record in the partition. 
      # primarykey.mode=
      
      # Specify whether to automatically create a destination table. Default value: false. 
      auto.create=true
  2. Pergi ke direktori $KAFKA_HOME dan jalankan perintah berikut untuk mengaktifkan mode standalone:

    bin/connect-standalone.sh config/connect-standalone.properties config/connect-tablestore-sink-quickstart.properties

Untuk menggunakan Tablestore Sink Connector dalam mode terdistribusi, ikuti langkah-langkah berikut:

  1. Ubah file konfigurasi pekerja connect-distributed.properties sesuai kebutuhan bisnis Anda.

    File konfigurasi pekerja mencakup parameter koneksi Kafka, format serialisasi, frekuensi pengiriman offset, dan topik yang menyimpan informasi konektor. Kami merekomendasikan Anda membuat topik-topik tersebut terlebih dahulu. Contoh kode berikut adalah contoh yang diberikan oleh Apache Kafka tentang cara mengubah file konfigurasi pekerja. Untuk informasi lebih lanjut, lihat Kafka Connect.

    • offset.storage.topic: menentukan topik padat tempat offset konektor disimpan.

    • config.storage.topic: menentukan topik padat tempat konfigurasi konektor dan tugas disimpan. Jumlah partisi untuk topik padat harus diatur ke 1.

    • status.storage.topic: menentukan topik padat tempat informasi status tentang Kafka Connect disimpan.

    ##
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    ##
    
    # This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
    # to be used with the examples, and some settings may differ from those used in a production system, especially
    # the `bootstrap.servers` and those specifying replication factors.
    
    # A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
    bootstrap.servers=localhost:9092
    
    # unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
    group.id=connect-cluster
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
    # need to configure these based on the format they want their data in when loaded from or stored into Kafka
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
    # it to
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    
    # Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
    # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
    # the topic before starting Kafka Connect if a specific topic configuration is needed.
    # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
    # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
    # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=1
    #offset.storage.partitions=25
    
    # Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
    # and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
    # the topic before starting Kafka Connect if a specific topic configuration is needed.
    # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
    # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
    # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
    config.storage.topic=connect-configs
    config.storage.replication.factor=1
    
    # Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
    # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
    # the topic before starting Kafka Connect if a specific topic configuration is needed.
    # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
    # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
    # to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
    status.storage.topic=connect-status
    status.storage.replication.factor=1
    #status.storage.partitions=5
    
    # Flush much faster than normal, which is useful for testing/debugging
    offset.flush.interval.ms=10000
    
    # These are provided to inform the user about the presence of the REST host and port configs 
    # Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
    #rest.host.name=
    #rest.port=8083
    
    # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
    #rest.advertised.host.name=
    #rest.advertised.port=
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations). The list should consist of top level directories that include 
    # any combination of: 
    # a) directories immediately containing jars with plugins and their dependencies
    # b) uber-jars with plugins and their dependencies
    # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
    # Examples: 
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    #plugin.path=
  2. Pergi ke direktori $KAFKA_HOME dan jalankan perintah berikut untuk mengaktifkan mode terdistribusi:

    Penting

    Anda perlu memulai proses pekerja pada setiap node.

    bin/connect-distributed.sh config/connect-distributed.properties
  3. Kelola konektor menggunakan REST API. Untuk informasi lebih lanjut, lihat REST API.

    1. Buat file bernama connect-tablestore-sink-quickstart.json di jalur konfigurasi. Contoh kode berikut memberikan contoh isi yang perlu ditambahkan ke file tersebut.

      File konfigurasi konektor menentukan pasangan nilai-kunci untuk item konfigurasi menggunakan string dalam format JSON. Item-item ini mencakup kelas konektor, parameter koneksi Tablestore, dan pemetaan data. Untuk informasi lebih lanjut, lihat Deskripsi Konfigurasi.

      {
        "name": "tablestore-sink",
        "config": {
          "connector.class":"TableStoreSinkConnector",
          "tasks.max":"1",
          "topics":"test",
          "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
          "tablestore.access.key.id":"xxx",
          "tablestore.access.key.secret":"xxx",
          "tablestore.instance.name":"xxx",
          "table.name.format":"<topic>",
          "primarykey.mode":"kafka",
          "auto.create":"true"
        }
      }
    2. Jalankan perintah berikut untuk memulai klien Tablestore Sink Connector:

      curl -i -k  -H "Content-type: application/json" -X POST -d @config/connect-tablestore-sink-quickstart.json http://localhost:8083/connectors

      Dalam perintah sebelumnya, http://localhost:8083/connectors adalah alamat layanan REST Kafka. Ubah alamat sesuai kebutuhan bisnis Anda.

Langkah 3: Menghasilkan catatan pesan

  1. Pergi ke direktori $KAFKA_HOME dan jalankan perintah berikut untuk memulai klien produser konsol:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

    Tabel berikut menjelaskan parameter yang perlu dikonfigurasi untuk memulai klien produser konsol.

    Parameter

    Contoh

    Deskripsi

    --broker-list

    localhost:9092

    Alamat dan port broker dalam kluster Kafka.

    --topic

    test

    Nama topik. Secara default, topik dibuat secara otomatis saat Anda memulai Tablestore Sink Connector. Anda juga dapat membuat topik secara manual.

  2. Tulis pesan ke topik bernama test.

    • Pesan dalam Struct

      {
          "schema":{
              "type":"struct",
              "fields":[
                  {
                      "type":"int32",
                      "optional":false,
                      "field":"id"
                  },
                  {
                      "type":"string",
                      "optional":false,
                      "field":"product"
                  },
                  {
                      "type":"int64",
                      "optional":false,
                      "field":"quantity"
                  },
                  {
                      "type":"double",
                      "optional":false,
                      "field":"price"
                  }
              ],
              "optional":false,
              "name":"record"
          },
          "payload":{
              "id":1,
              "product":"foo",
              "quantity":100,
              "price":50
          }
      }
    • Pesan dalam Map

      {
          "schema":{
              "type":"map",
              "keys":{
                  "type":"string",
                  "optional":false
              },
              "values":{
                  "type":"int32",
                  "optional":false
              },
              "optional":false
          },
          "payload":{
              "id":1
          }
      }
  3. Masuk ke Konsol Tablestore untuk melihat data.

    Tabel data bernama test secara otomatis dibuat dalam instance Tablestore. Gambar berikut menunjukkan data dalam tabel data. Data di baris pertama adalah hasil dari pesan dalam Map yang diimpor, dan data di baris kedua adalah hasil dari pesan dalam Struct yang diimpor. fig_datanew