全部产品
Search
文档中心

Tablestore:Deskripsi Konfigurasi

更新时间:Jul 06, 2025

Sebelum memulai Tablestore Sink Connector, Anda perlu menentukan pasangan nilai kunci untuk melewatkan parameter ke proses Kafka Connect. Topik ini memberikan contoh konfigurasi dan deskripsi parameter untuk mengonfigurasi Tablestore Sink Connector.

Contoh Konfigurasi

Item konfigurasi bervariasi tergantung pada apakah data disinkronkan dari Kafka ke tabel data atau tabel seri waktu di Tablestore. Contoh konfigurasi file berbeda berdasarkan mode kerja. Bagian ini menyediakan contoh konfigurasi sinkronisasi data dari Kafka ke tabel data di Tablestore. Untuk menyinkronkan data ke tabel seri waktu, tambahkan item konfigurasi spesifik untuk sinkronisasi data dari Kafka ke tabel seri waktu di Tablestore.

Mode Mandiri

Kode sampel berikut menunjukkan cara mengonfigurasi file konfigurasi dalam format .properties untuk Tablestore Sink Connector dalam mode mandiri:

# Tentukan nama konektor.
name=tablestore-sink
# Tentukan kelas konektor.
connector.class=TableStoreSinkConnector
# Tentukan jumlah maksimum tugas.
tasks.max=1
# Tentukan daftar topik Kafka dari mana data diekspor.
topics=test
# Tentukan nilai untuk parameter koneksi Tablestore berikut:
# Titik akhir instance Tablestore.
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# Pasangan AccessKey yang terdiri dari ID AccessKey dan rahasia AccessKey.
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Nama instance Tablestore.
tablestore.instance.name=xxx

# Tentukan parameter pemetaan data berikut:
# Tentukan parser yang digunakan untuk mengurai catatan pesan Kafka.
# DefaultEventParser dari Tablestore Sink Connector mendukung kelas Struct dan Map dari Kafka Connect. Anda juga dapat menggunakan EventParser kustom.
event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser

# Tentukan string format untuk nama tabel Tablestore tujuan. <topic> dapat digunakan dalam string sebagai placeholder untuk topik dari mana Anda ingin mengekspor data.
# topics.assign.tables diberi prioritas lebih tinggi daripada table.name.format. Jika topics.assign.tables ditentukan, abaikan konfigurasi table.name.format.
# Misalnya, jika table.name.format diatur ke kafka_<topic> dan nama topik Kafka dari mana Anda ingin mengekspor data adalah test, catatan pesan Kafka dari topik test dipetakan ke tabel bernama kafka_test di Tablestore.
table.name.format=<topic>
# Tentukan pemetaan antara topik Kafka dan tabel Tablestore tujuan. Nilainya harus dalam format <topic>:<tablename>. Nama topik dan nama tabel dipisahkan dengan titik dua (:). Jika Anda ingin menentukan beberapa pemetaan, pisahkan mereka dengan koma (,).
# Jika pemetaan tidak ditentukan, konfigurasi table.name.format digunakan.
# topics.assign.tables=test:test_kafka

# Tentukan mode primary key. Nilai valid: kafka, record_key, dan record_value. Nilai default: kafka.
# kafka: <connect_topic>_<connect_partition> dan <connect_offset> digunakan sebagai primary key dari tabel data.
# record_key: Bidang dalam kunci rekaman digunakan sebagai primary key dari tabel data.
# record_value: Bidang dalam nilai rekaman digunakan sebagai primary key dari tabel data.
primarykey.mode=kafka

# Tentukan nama dan tipe data kolom primary key di tabel data Tablestore tujuan.
# Format nama kolom primary key adalah tablestore.<tablename>.primarykey.name. Format tipe data kolom primary key adalah tablestore.<tablename>.primarykey.type.
# <tablename> adalah placeholder untuk nama tabel data.
# Jika mode primary key adalah kafka, Anda tidak perlu menentukan nama dan tipe data kolom primary key. Nama kolom primary key default {"topic_partition","offset"} dan tipe data default {string, integer} dari kolom primary key digunakan.
# Jika mode primary key adalah record_key atau record_value, Anda harus menentukan nama dan tipe data kolom primary key.
# tablestore.test.primarykey.name=A,B
# tablestore.test.primarykey.type=string,integer

# Tentukan daftar putih kolom atribut untuk memfilter bidang dalam nilai rekaman untuk mendapatkan kolom atribut yang diperlukan.
# Secara default, daftar putih kolom atribut kosong. Semua bidang dalam nilai rekaman digunakan sebagai kolom atribut tabel data.
# Format nama kolom atribut adalah tablestore.<tablename>.columns.whitelist.name. Format tipe data kolom atribut adalah tablestore.<tablename>.columns.whitelist.type.
# <tablename> adalah placeholder untuk nama tabel data.
# tablestore.test.columns.whitelist.name=A,B
# tablestore.test.columns.whitelist.type=string,integer

# Tentukan cara menulis catatan pesan Kafka ke tabel Tablestore tujuan:
# Tentukan mode tulis. Nilai valid: put dan update. Nilai default: put.
# put: Data dalam tabel tujuan ditimpa oleh catatan pesan Kafka.
# update: Data dalam tabel tujuan diperbarui oleh catatan pesan Kafka.
insert.mode=put
# Tentukan apakah data ditulis dalam urutan data dibaca. Nilai default: true. Anda dapat menonaktifkan opsi ini untuk meningkatkan kinerja tulis.
insert.order.enable=true
# Tentukan apakah tabel tujuan dibuat secara otomatis. Nilai default: false.
auto.create=false

# Tentukan mode penghapusan. Nilai valid: none, row, column, dan row_and_column. Nilai default: none.
# none: Tidak ada operasi penghapusan yang dapat dilakukan.
# row: Baris dapat dihapus.
# column: Kolom atribut dapat dihapus.
# row_and_column: Baris dan kolom atribut dapat dihapus.
delete.mode=none

# Tentukan jumlah maksimum baris yang dapat dimasukkan dalam antrian buffer di memori saat data ditulis ke tabel data. Nilai default: 1024. Nilai parameter ini harus merupakan eksponen dari 2.
buffer.size=1024
# Tentukan jumlah thread callback yang digunakan saat data ditulis ke tabel data. Nilai default = Jumlah vCPU + 1.
# max.thread.count=
# Tentukan jumlah maksimum permintaan tulis konkuren yang dapat dikirim untuk menulis data ke tabel data. Nilai default: 10.
max.concurrency=10
# Tentukan jumlah bucket ke mana data ditulis. Nilai default: 3. Jika Anda meningkatkan nilai parameter ini, kemampuan tulis konkuren dapat ditingkatkan. Namun, Anda tidak dapat mengatur nilai parameter ini lebih besar dari jumlah maksimum permintaan tulis konkuren yang Anda tentukan.
bucket.count=3
# Tentukan interval refresh antrian buffer saat data ditulis ke tabel data. Unit: milidetik. Nilai default: 10000.
flush.Interval=10000

# Tentukan cara memproses data kotor:
# Kesalahan mungkin terjadi saat catatan pesan Kafka diurai atau ditulis ke tabel data. Anda dapat menentukan dua parameter berikut untuk menentukan cara memperbaiki kesalahan:
# Tentukan kemampuan toleransi kesalahan. Nilai valid: none dan all. Nilai default: none.
# none: Kesalahan menyebabkan tugas impor data yang menggunakan Tablestore Sink Connector gagal.
# all: Catatan pesan yang melaporkan kesalahan dilewati dan dicatat.
runtime.error.tolerance=none
# Tentukan bagaimana data kotor dicatat. Nilai valid: ignore, kafka, dan tablestore. Nilai default: ignore.
# ignore: Semua kesalahan diabaikan.
# kafka: Catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan di topik Kafka yang berbeda.
# tablestore: Catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan di tabel data Tablestore yang berbeda.
runtime.error.mode=ignore

# Jika Anda mengatur runtime.error.mode ke kafka, Anda harus menentukan alamat kluster Kafka dan topik.
# runtime.error.bootstrap.servers=localhost:9092
# runtime.error.topic.name=errors

# Jika Anda mengatur runtime.error.mode ke tablestore, Anda harus menentukan nama tabel data Tablestore.
# runtime.error.table.name=errors

Mode Terdistribusi

Kode sampel berikut menunjukkan cara mengonfigurasi file konfigurasi dalam format .json untuk Tablestore Sink Connector dalam mode terdistribusi:

{
  "name": "tablestore-sink",
  "config": {
    // Tentukan kelas konektor.
    "connector.class":"TableStoreSinkConnector",
    // Tentukan jumlah maksimum tugas.
    "tasks.max":"3",
    // Tentukan daftar topik Kafka dari mana Anda ingin mengekspor data.
    "topics":"test",
    // Tentukan nilai untuk parameter koneksi Tablestore berikut:
    // Titik akhir instance Tablestore.
    "tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
    // Pasangan AccessKey yang terdiri dari ID AccessKey dan rahasia AccessKey.
    "tablestore.access.key.id":"xxx",
    "tablestore.access.key.secret":"xxx",
    // Nama instance Tablestore.
    "tablestore.instance.name":"xxx",

    // Tentukan parameter pemetaan data berikut:
    // Tentukan parser yang digunakan untuk mengurai catatan pesan Kafka.
    // DefaultEventParser dari Tablestore Sink Connector mendukung kelas Struct dan Map dari Kafka Connect. Anda juga dapat menggunakan EventParser kustom.
    "event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",

    // Tentukan string format untuk nama tabel Tablestore tujuan. <topic> dapat digunakan dalam string sebagai placeholder untuk topik dari mana Anda ingin mengekspor data.
    // topics.assign.tables diberi prioritas lebih tinggi daripada table.name.format. Jika topics.assign.tables ditentukan, abaikan konfigurasi table.name.format.
    // Misalnya, jika table.name.format diatur ke kafka_<topic> dan nama topik Kafka dari mana Anda ingin mengekspor data adalah test, catatan pesan Kafka dari topik test dipetakan ke tabel bernama kafka_test di Tablestore.
    "table.name.format":"<topic>",
    // Tentukan pemetaan antara topik Kafka dan tabel Tablestore tujuan. Nilainya harus dalam format <topic>:<tablename>. Nama topik dan nama tabel dipisahkan dengan titik dua (:). Jika Anda ingin menentukan beberapa pemetaan, pisahkan mereka dengan koma (,).
    // Jika pemetaan tidak ditentukan, konfigurasi table.name.format digunakan.
    // "topics.assign.tables":"test:test_kafka",

    // Tentukan mode primary key. Nilai valid: kafka, record_key, dan record_value. Nilai default: kafka.
    // kafka: <connect_topic>_<connect_partition> dan <connect_offset> digunakan sebagai primary key dari tabel data.
    // record_key: Bidang dalam kunci rekaman digunakan sebagai primary key dari tabel data.
    // record_value: Bidang dalam nilai rekaman digunakan sebagai primary key dari tabel data.
    "primarykey.mode":"kafka",

    // Tentukan nama dan tipe data kolom primary key di tabel data Tablestore tujuan.
    // Format nama kolom primary key adalah tablestore.<tablename>.primarykey.name. Format tipe data kolom primary key adalah tablestore.<tablename>.primarykey.type.
    // <tablename> adalah placeholder untuk nama tabel data.
    // Jika mode primary key adalah kafka, Anda tidak perlu menentukan nama dan tipe data kolom primary key. Nama kolom primary key default {"topic_partition","offset"} dan tipe data default {string, integer} dari kolom primary key digunakan.
    // Jika mode primary key adalah record_key atau record_value, Anda harus menentukan nama dan tipe data kolom primary key.
    // "tablestore.test.primarykey.name":"A,B",
    // "tablestore.test.primarykey.type":"string,integer",

    // Tentukan daftar putih kolom atribut untuk memfilter bidang dalam nilai rekaman untuk mendapatkan kolom atribut yang diperlukan.
    // Secara default, daftar putih kolom atribut kosong. Semua bidang dalam nilai rekaman digunakan sebagai kolom atribut tabel data.
    // Format nama kolom atribut adalah tablestore.<tablename>.columns.whitelist.name. Format tipe data kolom atribut adalah tablestore.<tablename>.columns.whitelist.type.
    // <tablename> adalah placeholder untuk nama tabel data.
    // "tablestore.test.columns.whitelist.name":"A,B",
    // "tablestore.test.columns.whitelist.type":"string,integer",

    // Tentukan cara menulis catatan pesan Kafka ke tabel Tablestore tujuan:
    // Tentukan mode tulis. Nilai valid: put dan update. Nilai default: put.
    // put: Data dalam tabel ditimpa oleh catatan pesan Kafka.
    // update: Data dalam tabel diperbarui oleh catatan pesan Kafka.
    "insert.mode":"put",
    // Tentukan apakah data ditulis dalam urutan data dibaca. Nilai default: true. Anda dapat menonaktifkan opsi ini untuk meningkatkan kinerja tulis.
    "insert.order.enable":"true",
    // Tentukan apakah tabel tujuan dibuat secara otomatis. Nilai default: false.
    "auto.create":"false",

    // Tentukan mode penghapusan. Nilai valid: none, row, column, dan row_and_column. Nilai default: none.
    // none: Tidak ada operasi penghapusan yang dapat dilakukan.
    // row: Baris dapat dihapus.
    // column: Kolom atribut dapat dihapus.
    // row_and_column: Baris dan kolom atribut dapat dihapus.
    "delete.mode":"none",

    // Tentukan jumlah maksimum baris yang dapat dimasukkan dalam antrian buffer di memori saat data ditulis ke tabel data. Nilai default: 1024. Nilai parameter ini harus merupakan eksponen dari 2.
    "buffer.size":"1024",
    // Tentukan jumlah thread callback yang digunakan saat data ditulis ke tabel data. Nilai default = Jumlah vCPU + 1.
    // "max.thread.count":
    // Tentukan jumlah maksimum permintaan tulis konkuren yang dapat dikirim untuk menulis data ke tabel data. Nilai default: 10.
    "max.concurrency":"10",
    // Tentukan jumlah bucket ke mana data ditulis. Nilai default: 3. Anda dapat meningkatkan nilai parameter ini untuk meningkatkan kemampuan tulis konkuren. Namun, Anda tidak dapat mengatur nilai parameter ini lebih besar dari jumlah maksimum permintaan tulis konkuren yang Anda tentukan.
    "bucket.count":"3",
    // Tentukan interval refresh antrian buffer saat data ditulis ke tabel data. Unit: milidetik. Nilai default: 10000.
    "flush.Interval":"10000",

    // Tentukan cara memproses data kotor:
    // Kesalahan mungkin terjadi saat catatan pesan Kafka diurai atau ditulis ke tabel data. Anda dapat menentukan dua parameter berikut untuk menentukan cara memperbaiki kesalahan:
    // Tentukan kemampuan toleransi kesalahan. Nilai valid: none dan all. Nilai default: none.
    // none: Kesalahan menyebabkan tugas impor data yang menggunakan Tablestore Sink Connector gagal.
    // all: Catatan pesan yang melaporkan kesalahan dilewati dan dicatat.
    "runtime.error.tolerance":"none",
    // Tentukan bagaimana data kotor dicatat. Nilai valid: ignore, kafka, dan tablestore. Nilai default: ignore.
    // ignore: Semua kesalahan diabaikan.
    // kafka: Catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan di topik Kafka yang berbeda.
    // tablestore: Catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan di tabel data Tablestore yang berbeda.
    "runtime.error.mode":"ignore"

    // Jika Anda mengatur runtime.error.mode ke kafka, Anda harus menentukan alamat kluster Kafka dan topik.
    // "runtime.error.bootstrap.servers":"localhost:9092",
    // "runtime.error.topic.name":"errors",

    // Jika Anda mengatur runtime.error.mode ke tablestore, Anda harus menentukan nama tabel data Tablestore.
    // "runtime.error.table.name":"errors",
  }

Parameter

Tabel berikut menjelaskan parameter dalam file konfigurasi. Parameter terkait seri waktu hanya perlu dikonfigurasi saat menyinkronkan data dari Kafka ke tabel seri waktu di Tablestore.

Parameter Kafka Connect

Parameter

Tipe

Diperlukan

Contoh

Deskripsi

name

string

Ya

tablestore-sink

Nama konektor. Nama konektor harus unik.

connector.class

class

Ya

TableStoreSinkConnector

Kelas Java konektor.

Jika Anda ingin menggunakan konektor, tentukan kelas konektor dengan menggunakan connector.class. Anda dapat mengatur connector.class ke nama lengkap atau alias kelas konektor. Nama lengkap kelas konektor adalah com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector dan alias kelas konektor adalah TableStoreSinkConnector. Contoh:

connector.class=com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector

tasks.max

integer

Ya

3

Jumlah maksimum tugas yang dapat dibuat untuk konektor.

Jika jumlah maksimum tugas gagal dibuat, lebih sedikit tugas mungkin dibuat.

key.converter

string

Tidak

org.apache.kafka.connect.json.JsonConverter

Konverter kunci yang digunakan untuk mengganti konverter kunci default yang ditentukan dalam file konfigurasi pekerja.

value.converter

string

Tidak

org.apache.kafka.connect.json.JsonConverter

Konverter nilai yang digunakan untuk mengganti konverter nilai default yang ditentukan dalam file konfigurasi pekerja.

topics

list

Ya

test

Daftar topik Kafka yang dapat ditentukan untuk konektor. Pisahkan beberapa topik Kafka dengan koma (,).

Anda harus menentukan topik untuk mengelola topik yang ditentukan untuk konektor.

Parameter Koneksi Konektor

Parameter

Tipe

Diperlukan

Contoh

Deskripsi

tablestore.endpoint

string

Ya

https://xxx.xxx.ots.aliyuncs.com

Titik akhir instance Tablestore. Untuk informasi lebih lanjut, lihat Endpoints.

tablestore.mode

string

Ya

timeseries

Tipe tabel tujuan. Nilai default: normal. Nilai valid:

  • normal: tabel data di Tablestore.

  • timeseries: tabel seri waktu di Tablestore.

tablestore.access.key.id

string

Ya

LTAn********************

ID AccessKey dan rahasia AccessKey akun Anda. Untuk informasi lebih lanjut tentang cara memperoleh ID AccessKey dan rahasia AccessKey, lihat Buat Pasangan AccessKey.

tablestore.access.key.secret

string

Ya

zbnK**************************

tablestore.auth.mode

string

Ya

aksk

Mode autentikasi. Nilai default: aksk. Nilai valid:

  • aksk: menggunakan ID AccessKey dan rahasia AccessKey akun Alibaba Cloud atau pengguna RAM untuk autentikasi. Dalam topik ini, tablestore.auth.mode diatur ke aksk.

  • sts: menggunakan kredensial akses sementara yang diperoleh dari Security Token Service (STS) untuk autentikasi. Jika Tablestore terhubung ke Message Queue for Apache Kafka, atur tablestore.auth.mode ke sts. Ini adalah nilai default.

tablestore.instance.name

string

Ya

myotstest

Nama instance Tablestore.

Parameter Pemetaan Data Konektor

Parameter

Tipe

Diperlukan

Contoh

Deskripsi

event.parse.class

class

Ya

DefaultEventParser

Kelas Java dari EventParser. Nilai default: DefaultEventParser. Parser mengurai catatan pesan Kafka untuk mendapatkan kolom primary key dan kolom atribut tabel data.

Penting

Tablestore memberikan batasan pada ukuran nilai kolom. Nilai kolom primary key bertipe string atau biner tidak boleh melebihi 1 KB, dan nilai kolom atribut tidak boleh melebihi 2 MB. Untuk informasi lebih lanjut, lihat Batasan.

Jika nilai kolom melebihi batas setelah tipe data dikonversi, catatan pesan Kafka diproses sebagai data kotor.

Untuk menggunakan DefaultEventParser, kunci atau nilai catatan pesan Kafka harus berupa kelas Struct atau Map dari Kafka Connect. Bidang yang dipilih dalam Struct harus berupa tipe data yang didukung oleh Tablestore Sink Connector. Bidang tersebut dikonversi ke data tipe Tablestore berdasarkan tabel pemetaan tipe data dan kemudian ditulis ke tabel data. Tipe data nilai dalam Map harus merupakan tipe data yang didukung oleh Tablestore Sink Connector. Tablestore Sink Connector mendukung tipe data yang sama dalam Struct dan Map. Nilai dalam Map dikonversi ke data bertipe biner dan kemudian ditulis ke tabel data.

Jika tipe data catatan pesan Kafka tidak kompatibel dengan Tablestore Sink Connector, Anda dapat memanggil operasi yang didefinisikan oleh com.aliyun.tablestore.kafka.connect.parsers.EventParser untuk mengonfigurasi parser.

table.name.format

string

Tidak

kafka_<topic>

String format untuk nama tabel data Tablestore tujuan. Nilai default: <topic>. <topic> dapat digunakan dalam string sebagai placeholder untuk topik dari mana Anda ingin mengekspor data. Misalnya, jika table.name.format diatur ke kafka_<topic>, dan nama topik Kafka dari mana Anda ingin mengekspor data adalah test, catatan pesan Kafka dari topik test dipetakan ke tabel bernama kafka_test di Tablestore.

topics.assign.tables diberi prioritas lebih tinggi daripada table.name.format. Jika topics.assign.tables ditentukan, abaikan konfigurasi table.name.format.

topics.assign.tables

list

Ya

test:destTable

Menentukan pemetaan antara topik dan tabel Tablestore tujuan dalam format <topic_1>:<tablename_1>,<topic_2>:<tablename_2>. Pisahkan beberapa pemetaan dengan koma (,). Misalnya, test:destTable menentukan bahwa catatan pesan dari topik bernama test ditulis ke tabel data bernama destTable.

topics.assign.tables diberi prioritas lebih tinggi daripada table.name.format. Jika topics.assign.tables ditentukan, abaikan konfigurasi table.name.format.

primarykey.mode

string

Tidak

kafka

Mode primary key tabel data. Nilai valid:

  • kafka: <connect_topic>_<connect_partition> dan <connect_offset> digunakan sebagai primary key tabel data. Topik Kafka <connect_topic> dan partisi <connect_partition> dipisahkan dengan garis bawah (_), dan <connect_offset> menentukan offset catatan pesan dalam partisi.

  • record_key: Bidang kelas Struct atau kunci kelas Map dalam kunci rekaman digunakan sebagai primary key tabel data.

  • record_value: Bidang kelas Struct atau kunci kelas Map dalam nilai rekaman digunakan sebagai primary key tabel data.

Konfigurasikan parameter ini bersama dengan tablestore.<tablename>.primarykey.name dan tablestore.<tablename>.primarykey.type. Nilai parameter ini tidak peka huruf besar/kecil.

tablestore.<tablename>.primarykey.name

list

Tidak

A,B

Nama kolom primary key tabel data. <tablename> adalah placeholder untuk nama tabel data. Nilai parameter ini berisi satu hingga empat nama kolom primary key yang dipisahkan dengan koma (,).

Nama kolom primary key bervariasi dengan mode primary key.

  • Jika mode primary key diatur ke kafka, nilai default parameter ini adalah topic_partition,offset. Dalam mode primary key kafka, Anda tidak perlu menentukan nama kolom primary key. Bahkan jika nama kolom primary key ditentukan, nama kolom primary key default yang diutamakan.

  • Jika mode primary key diatur ke record_key, bidang kelas Struct atau kunci kelas Map yang memiliki nama yang sama dengan nama kolom primary key yang ditentukan diekstraksi dari kunci rekaman sebagai primary key tabel data. Dalam mode primary key record_key, Anda harus menentukan nama kolom primary key.

  • Jika mode primary key diatur ke record_value, bidang kelas Struct atau kunci kelas Map yang memiliki nama yang sama dengan nama kolom primary key yang ditentukan diekstraksi dari nilai rekaman sebagai primary key tabel data. Dalam mode primary key record_value, Anda harus menentukan nama kolom primary key.

Kolom primary key tabel data Tablestore berurutan. Anda perlu memperhatikan urutan kolom primary key saat Anda mendefinisikan tablestore.<tablename>.primarykey.name. Misalnya, PRIMARY KEY (A, B, C) dan PRIMARY KEY (A, C, B) memiliki skema yang berbeda.

tablestore.<tablename>.primarykey.type

list

Tidak

string, integer

Tipe data kolom primary key dalam tabel data. <tablename> adalah placeholder untuk nama tabel data. Pisahkan tipe data kolom primary key dengan koma (,). Urutan tipe data kolom primary key harus sesuai dengan urutan nama kolom primary key yang ditentukan oleh tablestore.<tablename>.primarykey.name. Nilai parameter ini tidak peka huruf besar/kecil. Nilai valid: integer, string, binary, dan auto_increment.

Tipe data kolom primary key bervariasi dengan mode primary key.

  • Jika mode primary key diatur ke kafka, nilai default parameter ini adalah string, integer.

    Dalam mode primary key kafka, Anda tidak perlu menentukan tipe data kolom primary key. Bahkan jika tipe data kolom primary key ditentukan, tipe data default kolom primary key yang diutamakan.

  • Jika mode primary key diatur ke record_key atau record_value, Anda harus menentukan tipe data kolom primary key.

    Jika tipe data kolom primary key yang ditentukan bertentangan dengan tipe data yang didefinisikan dalam skema Kafka, terjadi kesalahan penguraian. Dalam hal ini, Anda dapat mengonfigurasi parameter Runtime Error untuk memperbaiki kesalahan.

    Jika parameter ini diatur ke auto_increment, bidang catatan pesan Kafka dimasukkan ke tabel data sebagai kolom primary key auto-increment saat data ditulis ke tabel data.

tablestore.<tablename>.columns.whitelist.name

list

Tidak

A,B

Nama kolom atribut dalam daftar putih kolom atribut. <tablename> adalah placeholder untuk nama tabel data. Pisahkan nama kolom atribut dengan koma (,).

Jika Anda tidak mengonfigurasi parameter ini, semua bidang kelas Struct atau semua kunci kelas Map dalam nilai rekaman digunakan sebagai kolom atribut tabel data. Jika Anda mengonfigurasi parameter ini, bidang dalam nilai rekaman difilter berdasarkan daftar putih kolom atribut yang ditentukan untuk mendapatkan kolom atribut yang diperlukan.

tablestore.<tablename>.columns.whitelist.type

list

Tidak

string, integer

Tipe data kolom atribut dalam daftar putih kolom atribut. <tablename> adalah placeholder untuk nama tabel data. Pisahkan tipe data kolom atribut dengan koma (,). Urutan tipe data kolom atribut harus sesuai dengan urutan nama kolom atribut yang ditentukan oleh tablestore.<tablename>.columns.whitelist.name. Nilai parameter ini tidak peka huruf besar/kecil. Nilai valid: integer, string, binary, boolean, dan double.

Parameter Tulis Konektor

Parameter

Tipe

Diperlukan

Contoh

Deskripsi

insert.mode

string

Tidak

put

Mode tulis. Nilai default: put. Nilai valid:

  • put: Data yang ada ditimpa oleh baris data yang Anda tulis ke tabel. Nilai ini sesuai dengan operasi PutRow dari Tablestore.

  • update: Saat Anda memperbarui baris data, kolom atribut ditambahkan ke baris atau nilai kolom atribut yang ada diperbarui. Nilai ini sesuai dengan operasi UpdateRow dari Tablestore.

Nilai parameter ini tidak peka huruf besar/kecil.

insert.order.enable

boolean

Tidak

true

Menentukan apakah data ditulis ke tabel data dalam urutan data dibaca. Nilai default: true. Nilai valid:

  • true: Catatan pesan Kafka ditulis ke tabel data dalam urutan catatan pesan dibaca.

  • false: Catatan pesan Kafka ditulis ke tabel data tanpa urutan tertentu. Ini meningkatkan kinerja tulis.

auto.create

boolean

Tidak

false

Menentukan apakah tabel tujuan dibuat secara otomatis. Tabel data atau tabel seri waktu dapat dibuat secara otomatis. Nilai default: false. Nilai valid:

  • true: Sistem secara otomatis membuat tabel tujuan Tablestore.

  • false: Sistem tidak secara otomatis membuat tabel tujuan Tablestore.

delete.mode

string

Tidak

none

Mode penghapusan. Konfigurasi parameter ini hanya berlaku saat data disinkronkan ke tabel data dan mode primary key diatur ke record_key. Nilai default: none. Nilai valid:

  • none: Tidak ada operasi penghapusan yang dapat dilakukan.

  • row: Baris dapat dihapus. Jika nilai rekaman kosong, baris yang sesuai dihapus.

  • column: Kolom atribut dapat dihapus. Jika nilai bidang kelas Struct atau nilai kunci kelas Map dalam nilai rekaman kosong, kolom atribut yang sesuai dihapus.

  • row_and_column: Baris dan kolom atribut dapat dihapus.

Nilai parameter ini tidak peka huruf besar/kecil.

Parameter ini ditentukan berdasarkan nilai parameter insert.mode. Untuk informasi lebih lanjut, lihat Lampiran: Sintaks Hapus.

buffer.size

integer

Tidak

1024

Jumlah maksimum baris yang dapat dimasukkan dalam antrian buffer di memori saat data ditulis ke tabel data. Nilai default: 1024. Nilai parameter ini harus merupakan eksponen dari 2.

max.thread.count

integer

Tidak

3

Jumlah thread callback yang digunakan saat data ditulis ke tabel data. Nilai default = Jumlah vCPU + 1.

max.concurrency

integer

Tidak

10

Jumlah maksimum permintaan tulis konkuren yang dapat dikirim untuk menulis data ke tabel data. Nilai default: 10.

bucket.count

integer

Tidak

3

Jumlah bucket ke mana data ditulis. Nilai default: 3. Jika Anda meningkatkan nilai parameter ini, kemampuan tulis konkuren dapat ditingkatkan. Namun, Anda tidak dapat mengatur nilai parameter ini lebih besar dari jumlah maksimum permintaan tulis konkuren yang Anda tentukan.

flush.Interval

integer

Tidak

10000

Interval refresh antrian buffer saat data ditulis ke tabel data. Unit: milidetik. Nilai default: 10000.

Parameter Kesalahan Waktu Proses Konektor

Parameter

Tipe

Diperlukan

Contoh

Deskripsi

runtime.error.tolerance

string

Tidak

none

Kebijakan penanganan kesalahan yang digunakan jika terjadi kesalahan saat catatan pesan Kafka diurai atau ditulis ke tabel. Nilai default: none. Nilai valid:

  • none: Kesalahan menyebabkan tugas impor data yang menggunakan Tablestore Sink Connector gagal.

  • all: Catatan pesan yang melaporkan kesalahan dilewati dan dicatat.

Nilai parameter ini tidak peka huruf besar/kecil.

runtime.error.mode

string

Tidak

ignore

Menentukan cara memproses catatan pesan yang melaporkan kesalahan saat catatan pesan Kafka diurai atau ditulis ke tabel. Nilai default: ignore. Nilai valid:

  • ignore: Semua kesalahan diabaikan.

  • kafka: Catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan di topik Kafka yang berbeda. Dalam hal ini, Anda perlu menentukan runtime.error.bootstrap.servers dan runtime.error.topic.name. Kunci dan nilai catatan pesan Kafka yang melaporkan kesalahan di topik baru sama dengan kunci dan nilai catatan pesan di topik dari mana Anda ingin mengekspor data. Bidang ErrorInfo disertakan dalam header untuk mencatat pesan kesalahan.

  • tablestore: Catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan di tabel data Tablestore yang berbeda. Dalam hal ini, Anda perlu menentukan runtime.error.table.name. Kolom primary key tabel data yang digunakan untuk mencatat catatan pesan yang melaporkan kesalahan dan pesan kesalahan adalah topic_partition (tipe string) dan offset (tipe integer). Kolom atribut tabel data adalah key (tipe bytes), value (tipe bytes), dan error_info (tipe string).

Jika runtime.error.mode diatur ke kafka, Anda perlu melakukan serialisasi header, kunci, dan nilai catatan pesan Kafka. Jika runtime.error.mode diatur ke tablestore, Anda perlu melakukan serialisasi kunci dan nilai catatan pesan Kafka. Secara default, org.apache.kafka.connect.json.JsonConverter digunakan untuk serialisasi data dan schemas.enable diatur ke true. Anda dapat menggunakan JsonConverter untuk deserialisasi data untuk mendapatkan data asli. Untuk informasi lebih lanjut tentang Converter, lihat Kafka Converter.

runtime.error.bootstrap.servers

string

Tidak

localhost:9092

Alamat kluster Kafka tempat catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan.

runtime.error.topic.name

string

Tidak

errors

Nama topik Kafka yang menyimpan catatan pesan yang melaporkan kesalahan dan pesan kesalahan.

runtime.error.table.name

string

Tidak

errors

Nama tabel Tablestore yang menyimpan catatan pesan yang melaporkan kesalahan dan pesan kesalahan.

Parameter Terkait Seri Waktu

Parameter

Tipe

Diperlukan

Contoh

Deskripsi

tablestore.timeseries.<tablename>.measurement

string

Ya

mName

Menentukan bahwa nilai yang sesuai dengan kunci yang ditentukan dalam data berformat JSON ditulis ke tabel seri waktu sebagai nilai bidang _m_name.

Jika tablestore.timeseries.<tablename>.measurement diatur ke <topic>, nilai yang sesuai dengan kunci topik catatan pesan Kafka ditulis ke tabel seri waktu sebagai nilai bidang _m_name.

<tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda. Misalnya, jika nama tabel seri waktu adalah test, nama parameternya adalah tablestore.timeseries.test.measurement.

tablestore.timeseries.<tablename>.dataSource

string

Ya

ds

Menentukan bahwa nilai yang sesuai dengan kunci ds dalam data berformat JSON ditulis ke tabel seri waktu sebagai nilai bidang _data_source.

<tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda.

tablestore.timeseries.<tablename>.tags

list

Ya

region,level

Menentukan bahwa nilai yang sesuai dengan kunci region dan level dalam data berformat JSON ditulis ke tabel seri waktu sebagai nilai bidang tags.

<tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda.

tablestore.timeseries.<tablename>.time

string

Ya

timestamp

Menentukan bahwa nilai yang sesuai dengan kunci timestamp dalam data berformat JSON ditulis ke tabel seri waktu sebagai nilai bidang _time.

<tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda.

tablestore.timeseries.<tablename>.time.unit

string

Ya

MILLISECONDS

Satuan nilai parameter tablestore.timeseries.<tablename>.time. Nilai valid: SECONDS, MILLISECONDS, MICROSECONDS, dan NANOSECONDS.

<tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda.

tablestore.timeseries.<tablename>.field.name

list

Tidak

cpu,io

Menentukan bahwa kunci cpu dan io dalam data berformat JSON ditulis ke tabel seri waktu sebagai nama _field_name dan nilai yang sesuai dengan kunci cpu dan io dalam data berformat JSON ditulis ke tabel seri waktu sebagai nilai _field_name.

<tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda.

tablestore.timeseries.<tablename>.field.type

string

Tidak

double,integer

Tipe data bidang yang ditentukan oleh tablestore.timeseries.<tablename>.field.name. Nilai valid: double, integer, string, binary, dan boolean. Pisahkan beberapa tipe data dengan koma (,).

<tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda.

tablestore.timeseries.mapAll

boolean

Tidak

false

Menentukan apakah bidang selain bidang primary key dan bidang waktu dalam data berformat JSON ditulis ke tabel seri waktu sebagai bidang.

Jika tablestore.timeseries.mapAll diatur ke false, Anda harus mengonfigurasi parameter tablestore.timeseries.<tablename>.field.name dan tablestore.timeseries.<tablename>.field.type.

tablestore.timeseries.toLowerCase

boolean

Tidak

true

Menentukan apakah kunci dalam bidang dikonversi ke huruf kecil sebelum ditulis/dan kemudian ditulis ke tabel seri waktu. Kunci dalam bidang adalah kunci dalam bidang non-primary key atau non-waktu, atau kunci yang ditentukan dalam tablestore.timeseries.<tablename>.field.name.

tablestore.timeseries.rowsPerBatch

integer

Tidak

50

Jumlah maksimum baris yang dapat ditulis ke Tablestore dalam satu permintaan. Nilai maksimum dan default adalah 200.

Lampiran: Pemetaan Tipe Data antara Kafka dan Tablestore

Tabel berikut menjelaskan pemetaan tipe data antara Kafka dan Tablestore.

Skema tipe Kafka

Tipe data Tablestore

STRING

STRING

INT8, INT16, INT32, dan INT64

INTEGER

FLOAT32 dan FLOAT64

DOUBLE

BOOLEAN

BOOLEAN

BYTES

BINARY

Lampiran: Sintaks Hapus

Catatan

Fitur ini didukung hanya saat data disinkronkan dari Kafka ke tabel data di Tablestore.

Tabel berikut menjelaskan metode penulisan data ke tabel data Tablestore berdasarkan konfigurasi mode tulis (insert.mode) dan mode penghapusan (delete.mode), ketika catatan pesan berisi nilai kosong dan data disinkronkan dari Kafka ke tabel data di Tablestore.

insert.mode

put

update

delete.mode

none

row

column

row_and_column

none

row

column

row_and_column

Nilai Kosong

Timpa

Hapus baris

Timpa

Hapus baris

Data kotor

Hapus baris

Data kotor

Hapus baris

Semua bidang kosong dalam nilai

Timpa

Timpa

Timpa

Timpa

Data kotor

Data kotor

Hapus kolom

Hapus kolom

Beberapa bidang kosong dalam nilai

Timpa

Timpa

Timpa

Timpa

Abaikan nilai kosong

Abaikan nilai kosong

Hapus kolom

Hapus kolom