Sebelum memulai Tablestore Sink Connector, Anda perlu menentukan pasangan nilai kunci untuk melewatkan parameter ke proses Kafka Connect. Topik ini memberikan contoh konfigurasi dan deskripsi parameter untuk mengonfigurasi Tablestore Sink Connector.
Contoh Konfigurasi
Item konfigurasi bervariasi tergantung pada apakah data disinkronkan dari Kafka ke tabel data atau tabel seri waktu di Tablestore. Contoh konfigurasi file berbeda berdasarkan mode kerja. Bagian ini menyediakan contoh konfigurasi sinkronisasi data dari Kafka ke tabel data di Tablestore. Untuk menyinkronkan data ke tabel seri waktu, tambahkan item konfigurasi spesifik untuk sinkronisasi data dari Kafka ke tabel seri waktu di Tablestore.
Mode Mandiri
Kode sampel berikut menunjukkan cara mengonfigurasi file konfigurasi dalam format .properties untuk Tablestore Sink Connector dalam mode mandiri:
# Tentukan nama konektor.
name=tablestore-sink
# Tentukan kelas konektor.
connector.class=TableStoreSinkConnector
# Tentukan jumlah maksimum tugas.
tasks.max=1
# Tentukan daftar topik Kafka dari mana data diekspor.
topics=test
# Tentukan nilai untuk parameter koneksi Tablestore berikut:
# Titik akhir instance Tablestore.
tablestore.endpoint=https://xxx.xxx.ots.aliyuncs.com
# Pasangan AccessKey yang terdiri dari ID AccessKey dan rahasia AccessKey.
tablestore.access.key.id=xxx
tablestore.access.key.secret=xxx
# Nama instance Tablestore.
tablestore.instance.name=xxx
# Tentukan parameter pemetaan data berikut:
# Tentukan parser yang digunakan untuk mengurai catatan pesan Kafka.
# DefaultEventParser dari Tablestore Sink Connector mendukung kelas Struct dan Map dari Kafka Connect. Anda juga dapat menggunakan EventParser kustom.
event.parse.class=com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser
# Tentukan string format untuk nama tabel Tablestore tujuan. <topic> dapat digunakan dalam string sebagai placeholder untuk topik dari mana Anda ingin mengekspor data.
# topics.assign.tables diberi prioritas lebih tinggi daripada table.name.format. Jika topics.assign.tables ditentukan, abaikan konfigurasi table.name.format.
# Misalnya, jika table.name.format diatur ke kafka_<topic> dan nama topik Kafka dari mana Anda ingin mengekspor data adalah test, catatan pesan Kafka dari topik test dipetakan ke tabel bernama kafka_test di Tablestore.
table.name.format=<topic>
# Tentukan pemetaan antara topik Kafka dan tabel Tablestore tujuan. Nilainya harus dalam format <topic>:<tablename>. Nama topik dan nama tabel dipisahkan dengan titik dua (:). Jika Anda ingin menentukan beberapa pemetaan, pisahkan mereka dengan koma (,).
# Jika pemetaan tidak ditentukan, konfigurasi table.name.format digunakan.
# topics.assign.tables=test:test_kafka
# Tentukan mode primary key. Nilai valid: kafka, record_key, dan record_value. Nilai default: kafka.
# kafka: <connect_topic>_<connect_partition> dan <connect_offset> digunakan sebagai primary key dari tabel data.
# record_key: Bidang dalam kunci rekaman digunakan sebagai primary key dari tabel data.
# record_value: Bidang dalam nilai rekaman digunakan sebagai primary key dari tabel data.
primarykey.mode=kafka
# Tentukan nama dan tipe data kolom primary key di tabel data Tablestore tujuan.
# Format nama kolom primary key adalah tablestore.<tablename>.primarykey.name. Format tipe data kolom primary key adalah tablestore.<tablename>.primarykey.type.
# <tablename> adalah placeholder untuk nama tabel data.
# Jika mode primary key adalah kafka, Anda tidak perlu menentukan nama dan tipe data kolom primary key. Nama kolom primary key default {"topic_partition","offset"} dan tipe data default {string, integer} dari kolom primary key digunakan.
# Jika mode primary key adalah record_key atau record_value, Anda harus menentukan nama dan tipe data kolom primary key.
# tablestore.test.primarykey.name=A,B
# tablestore.test.primarykey.type=string,integer
# Tentukan daftar putih kolom atribut untuk memfilter bidang dalam nilai rekaman untuk mendapatkan kolom atribut yang diperlukan.
# Secara default, daftar putih kolom atribut kosong. Semua bidang dalam nilai rekaman digunakan sebagai kolom atribut tabel data.
# Format nama kolom atribut adalah tablestore.<tablename>.columns.whitelist.name. Format tipe data kolom atribut adalah tablestore.<tablename>.columns.whitelist.type.
# <tablename> adalah placeholder untuk nama tabel data.
# tablestore.test.columns.whitelist.name=A,B
# tablestore.test.columns.whitelist.type=string,integer
# Tentukan cara menulis catatan pesan Kafka ke tabel Tablestore tujuan:
# Tentukan mode tulis. Nilai valid: put dan update. Nilai default: put.
# put: Data dalam tabel tujuan ditimpa oleh catatan pesan Kafka.
# update: Data dalam tabel tujuan diperbarui oleh catatan pesan Kafka.
insert.mode=put
# Tentukan apakah data ditulis dalam urutan data dibaca. Nilai default: true. Anda dapat menonaktifkan opsi ini untuk meningkatkan kinerja tulis.
insert.order.enable=true
# Tentukan apakah tabel tujuan dibuat secara otomatis. Nilai default: false.
auto.create=false
# Tentukan mode penghapusan. Nilai valid: none, row, column, dan row_and_column. Nilai default: none.
# none: Tidak ada operasi penghapusan yang dapat dilakukan.
# row: Baris dapat dihapus.
# column: Kolom atribut dapat dihapus.
# row_and_column: Baris dan kolom atribut dapat dihapus.
delete.mode=none
# Tentukan jumlah maksimum baris yang dapat dimasukkan dalam antrian buffer di memori saat data ditulis ke tabel data. Nilai default: 1024. Nilai parameter ini harus merupakan eksponen dari 2.
buffer.size=1024
# Tentukan jumlah thread callback yang digunakan saat data ditulis ke tabel data. Nilai default = Jumlah vCPU + 1.
# max.thread.count=
# Tentukan jumlah maksimum permintaan tulis konkuren yang dapat dikirim untuk menulis data ke tabel data. Nilai default: 10.
max.concurrency=10
# Tentukan jumlah bucket ke mana data ditulis. Nilai default: 3. Jika Anda meningkatkan nilai parameter ini, kemampuan tulis konkuren dapat ditingkatkan. Namun, Anda tidak dapat mengatur nilai parameter ini lebih besar dari jumlah maksimum permintaan tulis konkuren yang Anda tentukan.
bucket.count=3
# Tentukan interval refresh antrian buffer saat data ditulis ke tabel data. Unit: milidetik. Nilai default: 10000.
flush.Interval=10000
# Tentukan cara memproses data kotor:
# Kesalahan mungkin terjadi saat catatan pesan Kafka diurai atau ditulis ke tabel data. Anda dapat menentukan dua parameter berikut untuk menentukan cara memperbaiki kesalahan:
# Tentukan kemampuan toleransi kesalahan. Nilai valid: none dan all. Nilai default: none.
# none: Kesalahan menyebabkan tugas impor data yang menggunakan Tablestore Sink Connector gagal.
# all: Catatan pesan yang melaporkan kesalahan dilewati dan dicatat.
runtime.error.tolerance=none
# Tentukan bagaimana data kotor dicatat. Nilai valid: ignore, kafka, dan tablestore. Nilai default: ignore.
# ignore: Semua kesalahan diabaikan.
# kafka: Catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan di topik Kafka yang berbeda.
# tablestore: Catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan di tabel data Tablestore yang berbeda.
runtime.error.mode=ignore
# Jika Anda mengatur runtime.error.mode ke kafka, Anda harus menentukan alamat kluster Kafka dan topik.
# runtime.error.bootstrap.servers=localhost:9092
# runtime.error.topic.name=errors
# Jika Anda mengatur runtime.error.mode ke tablestore, Anda harus menentukan nama tabel data Tablestore.
# runtime.error.table.name=errorsMode Terdistribusi
Kode sampel berikut menunjukkan cara mengonfigurasi file konfigurasi dalam format .json untuk Tablestore Sink Connector dalam mode terdistribusi:
{
"name": "tablestore-sink",
"config": {
// Tentukan kelas konektor.
"connector.class":"TableStoreSinkConnector",
// Tentukan jumlah maksimum tugas.
"tasks.max":"3",
// Tentukan daftar topik Kafka dari mana Anda ingin mengekspor data.
"topics":"test",
// Tentukan nilai untuk parameter koneksi Tablestore berikut:
// Titik akhir instance Tablestore.
"tablestore.endpoint":"https://xxx.xxx.ots.aliyuncs.com",
// Pasangan AccessKey yang terdiri dari ID AccessKey dan rahasia AccessKey.
"tablestore.access.key.id":"xxx",
"tablestore.access.key.secret":"xxx",
// Nama instance Tablestore.
"tablestore.instance.name":"xxx",
// Tentukan parameter pemetaan data berikut:
// Tentukan parser yang digunakan untuk mengurai catatan pesan Kafka.
// DefaultEventParser dari Tablestore Sink Connector mendukung kelas Struct dan Map dari Kafka Connect. Anda juga dapat menggunakan EventParser kustom.
"event.parse.class":"com.aliyun.tablestore.kafka.connect.parsers.DefaultEventParser",
// Tentukan string format untuk nama tabel Tablestore tujuan. <topic> dapat digunakan dalam string sebagai placeholder untuk topik dari mana Anda ingin mengekspor data.
// topics.assign.tables diberi prioritas lebih tinggi daripada table.name.format. Jika topics.assign.tables ditentukan, abaikan konfigurasi table.name.format.
// Misalnya, jika table.name.format diatur ke kafka_<topic> dan nama topik Kafka dari mana Anda ingin mengekspor data adalah test, catatan pesan Kafka dari topik test dipetakan ke tabel bernama kafka_test di Tablestore.
"table.name.format":"<topic>",
// Tentukan pemetaan antara topik Kafka dan tabel Tablestore tujuan. Nilainya harus dalam format <topic>:<tablename>. Nama topik dan nama tabel dipisahkan dengan titik dua (:). Jika Anda ingin menentukan beberapa pemetaan, pisahkan mereka dengan koma (,).
// Jika pemetaan tidak ditentukan, konfigurasi table.name.format digunakan.
// "topics.assign.tables":"test:test_kafka",
// Tentukan mode primary key. Nilai valid: kafka, record_key, dan record_value. Nilai default: kafka.
// kafka: <connect_topic>_<connect_partition> dan <connect_offset> digunakan sebagai primary key dari tabel data.
// record_key: Bidang dalam kunci rekaman digunakan sebagai primary key dari tabel data.
// record_value: Bidang dalam nilai rekaman digunakan sebagai primary key dari tabel data.
"primarykey.mode":"kafka",
// Tentukan nama dan tipe data kolom primary key di tabel data Tablestore tujuan.
// Format nama kolom primary key adalah tablestore.<tablename>.primarykey.name. Format tipe data kolom primary key adalah tablestore.<tablename>.primarykey.type.
// <tablename> adalah placeholder untuk nama tabel data.
// Jika mode primary key adalah kafka, Anda tidak perlu menentukan nama dan tipe data kolom primary key. Nama kolom primary key default {"topic_partition","offset"} dan tipe data default {string, integer} dari kolom primary key digunakan.
// Jika mode primary key adalah record_key atau record_value, Anda harus menentukan nama dan tipe data kolom primary key.
// "tablestore.test.primarykey.name":"A,B",
// "tablestore.test.primarykey.type":"string,integer",
// Tentukan daftar putih kolom atribut untuk memfilter bidang dalam nilai rekaman untuk mendapatkan kolom atribut yang diperlukan.
// Secara default, daftar putih kolom atribut kosong. Semua bidang dalam nilai rekaman digunakan sebagai kolom atribut tabel data.
// Format nama kolom atribut adalah tablestore.<tablename>.columns.whitelist.name. Format tipe data kolom atribut adalah tablestore.<tablename>.columns.whitelist.type.
// <tablename> adalah placeholder untuk nama tabel data.
// "tablestore.test.columns.whitelist.name":"A,B",
// "tablestore.test.columns.whitelist.type":"string,integer",
// Tentukan cara menulis catatan pesan Kafka ke tabel Tablestore tujuan:
// Tentukan mode tulis. Nilai valid: put dan update. Nilai default: put.
// put: Data dalam tabel ditimpa oleh catatan pesan Kafka.
// update: Data dalam tabel diperbarui oleh catatan pesan Kafka.
"insert.mode":"put",
// Tentukan apakah data ditulis dalam urutan data dibaca. Nilai default: true. Anda dapat menonaktifkan opsi ini untuk meningkatkan kinerja tulis.
"insert.order.enable":"true",
// Tentukan apakah tabel tujuan dibuat secara otomatis. Nilai default: false.
"auto.create":"false",
// Tentukan mode penghapusan. Nilai valid: none, row, column, dan row_and_column. Nilai default: none.
// none: Tidak ada operasi penghapusan yang dapat dilakukan.
// row: Baris dapat dihapus.
// column: Kolom atribut dapat dihapus.
// row_and_column: Baris dan kolom atribut dapat dihapus.
"delete.mode":"none",
// Tentukan jumlah maksimum baris yang dapat dimasukkan dalam antrian buffer di memori saat data ditulis ke tabel data. Nilai default: 1024. Nilai parameter ini harus merupakan eksponen dari 2.
"buffer.size":"1024",
// Tentukan jumlah thread callback yang digunakan saat data ditulis ke tabel data. Nilai default = Jumlah vCPU + 1.
// "max.thread.count":
// Tentukan jumlah maksimum permintaan tulis konkuren yang dapat dikirim untuk menulis data ke tabel data. Nilai default: 10.
"max.concurrency":"10",
// Tentukan jumlah bucket ke mana data ditulis. Nilai default: 3. Anda dapat meningkatkan nilai parameter ini untuk meningkatkan kemampuan tulis konkuren. Namun, Anda tidak dapat mengatur nilai parameter ini lebih besar dari jumlah maksimum permintaan tulis konkuren yang Anda tentukan.
"bucket.count":"3",
// Tentukan interval refresh antrian buffer saat data ditulis ke tabel data. Unit: milidetik. Nilai default: 10000.
"flush.Interval":"10000",
// Tentukan cara memproses data kotor:
// Kesalahan mungkin terjadi saat catatan pesan Kafka diurai atau ditulis ke tabel data. Anda dapat menentukan dua parameter berikut untuk menentukan cara memperbaiki kesalahan:
// Tentukan kemampuan toleransi kesalahan. Nilai valid: none dan all. Nilai default: none.
// none: Kesalahan menyebabkan tugas impor data yang menggunakan Tablestore Sink Connector gagal.
// all: Catatan pesan yang melaporkan kesalahan dilewati dan dicatat.
"runtime.error.tolerance":"none",
// Tentukan bagaimana data kotor dicatat. Nilai valid: ignore, kafka, dan tablestore. Nilai default: ignore.
// ignore: Semua kesalahan diabaikan.
// kafka: Catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan di topik Kafka yang berbeda.
// tablestore: Catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan di tabel data Tablestore yang berbeda.
"runtime.error.mode":"ignore"
// Jika Anda mengatur runtime.error.mode ke kafka, Anda harus menentukan alamat kluster Kafka dan topik.
// "runtime.error.bootstrap.servers":"localhost:9092",
// "runtime.error.topic.name":"errors",
// Jika Anda mengatur runtime.error.mode ke tablestore, Anda harus menentukan nama tabel data Tablestore.
// "runtime.error.table.name":"errors",
}Parameter
Tabel berikut menjelaskan parameter dalam file konfigurasi. Parameter terkait seri waktu hanya perlu dikonfigurasi saat menyinkronkan data dari Kafka ke tabel seri waktu di Tablestore.
Parameter Kafka Connect
Parameter | Tipe | Diperlukan | Contoh | Deskripsi |
name | string | Ya | tablestore-sink | Nama konektor. Nama konektor harus unik. |
connector.class | class | Ya | TableStoreSinkConnector | Kelas Java konektor. Jika Anda ingin menggunakan konektor, tentukan kelas konektor dengan menggunakan connector.class. Anda dapat mengatur connector.class ke nama lengkap atau alias kelas konektor. Nama lengkap kelas konektor adalah com.aliyun.tablestore.kafka.connect.TableStoreSinkConnector dan alias kelas konektor adalah TableStoreSinkConnector. Contoh: |
tasks.max | integer | Ya | 3 | Jumlah maksimum tugas yang dapat dibuat untuk konektor. Jika jumlah maksimum tugas gagal dibuat, lebih sedikit tugas mungkin dibuat. |
key.converter | string | Tidak | org.apache.kafka.connect.json.JsonConverter | Konverter kunci yang digunakan untuk mengganti konverter kunci default yang ditentukan dalam file konfigurasi pekerja. |
value.converter | string | Tidak | org.apache.kafka.connect.json.JsonConverter | Konverter nilai yang digunakan untuk mengganti konverter nilai default yang ditentukan dalam file konfigurasi pekerja. |
topics | list | Ya | test | Daftar topik Kafka yang dapat ditentukan untuk konektor. Pisahkan beberapa topik Kafka dengan koma (,). Anda harus menentukan topik untuk mengelola topik yang ditentukan untuk konektor. |
Parameter Koneksi Konektor
Parameter | Tipe | Diperlukan | Contoh | Deskripsi |
tablestore.endpoint | string | Ya | https://xxx.xxx.ots.aliyuncs.com | Titik akhir instance Tablestore. Untuk informasi lebih lanjut, lihat Endpoints. |
tablestore.mode | string | Ya | timeseries | Tipe tabel tujuan. Nilai default: normal. Nilai valid:
|
tablestore.access.key.id | string | Ya | LTAn******************** | ID AccessKey dan rahasia AccessKey akun Anda. Untuk informasi lebih lanjut tentang cara memperoleh ID AccessKey dan rahasia AccessKey, lihat Buat Pasangan AccessKey. |
tablestore.access.key.secret | string | Ya | zbnK************************** | |
tablestore.auth.mode | string | Ya | aksk | Mode autentikasi. Nilai default: aksk. Nilai valid:
|
tablestore.instance.name | string | Ya | myotstest | Nama instance Tablestore. |
Parameter Pemetaan Data Konektor
Parameter | Tipe | Diperlukan | Contoh | Deskripsi |
event.parse.class | class | Ya | DefaultEventParser | Kelas Java dari EventParser. Nilai default: DefaultEventParser. Parser mengurai catatan pesan Kafka untuk mendapatkan kolom primary key dan kolom atribut tabel data. Penting Tablestore memberikan batasan pada ukuran nilai kolom. Nilai kolom primary key bertipe string atau biner tidak boleh melebihi 1 KB, dan nilai kolom atribut tidak boleh melebihi 2 MB. Untuk informasi lebih lanjut, lihat Batasan. Jika nilai kolom melebihi batas setelah tipe data dikonversi, catatan pesan Kafka diproses sebagai data kotor. Untuk menggunakan DefaultEventParser, kunci atau nilai catatan pesan Kafka harus berupa kelas Struct atau Map dari Kafka Connect. Bidang yang dipilih dalam Struct harus berupa tipe data yang didukung oleh Tablestore Sink Connector. Bidang tersebut dikonversi ke data tipe Tablestore berdasarkan tabel pemetaan tipe data dan kemudian ditulis ke tabel data. Tipe data nilai dalam Map harus merupakan tipe data yang didukung oleh Tablestore Sink Connector. Tablestore Sink Connector mendukung tipe data yang sama dalam Struct dan Map. Nilai dalam Map dikonversi ke data bertipe biner dan kemudian ditulis ke tabel data. Jika tipe data catatan pesan Kafka tidak kompatibel dengan Tablestore Sink Connector, Anda dapat memanggil operasi yang didefinisikan oleh com.aliyun.tablestore.kafka.connect.parsers.EventParser untuk mengonfigurasi parser. |
table.name.format | string | Tidak | kafka_<topic> | String format untuk nama tabel data Tablestore tujuan. Nilai default: <topic>. <topic> dapat digunakan dalam string sebagai placeholder untuk topik dari mana Anda ingin mengekspor data. Misalnya, jika table.name.format diatur ke kafka_<topic>, dan nama topik Kafka dari mana Anda ingin mengekspor data adalah test, catatan pesan Kafka dari topik test dipetakan ke tabel bernama kafka_test di Tablestore. topics.assign.tables diberi prioritas lebih tinggi daripada table.name.format. Jika topics.assign.tables ditentukan, abaikan konfigurasi table.name.format. |
topics.assign.tables | list | Ya | test:destTable | Menentukan pemetaan antara topik dan tabel Tablestore tujuan dalam format topics.assign.tables diberi prioritas lebih tinggi daripada table.name.format. Jika topics.assign.tables ditentukan, abaikan konfigurasi table.name.format. |
primarykey.mode | string | Tidak | kafka | Mode primary key tabel data. Nilai valid:
Konfigurasikan parameter ini bersama dengan tablestore.<tablename>.primarykey.name dan tablestore.<tablename>.primarykey.type. Nilai parameter ini tidak peka huruf besar/kecil. |
tablestore.<tablename>.primarykey.name | list | Tidak | A,B | Nama kolom primary key tabel data. <tablename> adalah placeholder untuk nama tabel data. Nilai parameter ini berisi satu hingga empat nama kolom primary key yang dipisahkan dengan koma (,). Nama kolom primary key bervariasi dengan mode primary key.
Kolom primary key tabel data Tablestore berurutan. Anda perlu memperhatikan urutan kolom primary key saat Anda mendefinisikan tablestore.<tablename>.primarykey.name. Misalnya, PRIMARY KEY (A, B, C) dan PRIMARY KEY (A, C, B) memiliki skema yang berbeda. |
tablestore.<tablename>.primarykey.type | list | Tidak | string, integer | Tipe data kolom primary key dalam tabel data. <tablename> adalah placeholder untuk nama tabel data. Pisahkan tipe data kolom primary key dengan koma (,). Urutan tipe data kolom primary key harus sesuai dengan urutan nama kolom primary key yang ditentukan oleh tablestore.<tablename>.primarykey.name. Nilai parameter ini tidak peka huruf besar/kecil. Nilai valid: integer, string, binary, dan auto_increment. Tipe data kolom primary key bervariasi dengan mode primary key.
|
tablestore.<tablename>.columns.whitelist.name | list | Tidak | A,B | Nama kolom atribut dalam daftar putih kolom atribut. <tablename> adalah placeholder untuk nama tabel data. Pisahkan nama kolom atribut dengan koma (,). Jika Anda tidak mengonfigurasi parameter ini, semua bidang kelas Struct atau semua kunci kelas Map dalam nilai rekaman digunakan sebagai kolom atribut tabel data. Jika Anda mengonfigurasi parameter ini, bidang dalam nilai rekaman difilter berdasarkan daftar putih kolom atribut yang ditentukan untuk mendapatkan kolom atribut yang diperlukan. |
tablestore.<tablename>.columns.whitelist.type | list | Tidak | string, integer | Tipe data kolom atribut dalam daftar putih kolom atribut. |
Parameter Tulis Konektor
Parameter | Tipe | Diperlukan | Contoh | Deskripsi |
insert.mode | string | Tidak | put | Mode tulis. Nilai default: put. Nilai valid:
Nilai parameter ini tidak peka huruf besar/kecil. |
insert.order.enable | boolean | Tidak | true | Menentukan apakah data ditulis ke tabel data dalam urutan data dibaca. Nilai default: true. Nilai valid:
|
auto.create | boolean | Tidak | false | Menentukan apakah tabel tujuan dibuat secara otomatis. Tabel data atau tabel seri waktu dapat dibuat secara otomatis. Nilai default: false. Nilai valid:
|
delete.mode | string | Tidak | none | Mode penghapusan. Konfigurasi parameter ini hanya berlaku saat data disinkronkan ke tabel data dan mode primary key diatur ke record_key. Nilai default: none. Nilai valid:
Nilai parameter ini tidak peka huruf besar/kecil. Parameter ini ditentukan berdasarkan nilai parameter insert.mode. Untuk informasi lebih lanjut, lihat Lampiran: Sintaks Hapus. |
buffer.size | integer | Tidak | 1024 | Jumlah maksimum baris yang dapat dimasukkan dalam antrian buffer di memori saat data ditulis ke tabel data. Nilai default: 1024. Nilai parameter ini harus merupakan eksponen dari 2. |
max.thread.count | integer | Tidak | 3 | Jumlah thread callback yang digunakan saat data ditulis ke tabel data. Nilai default = |
max.concurrency | integer | Tidak | 10 | Jumlah maksimum permintaan tulis konkuren yang dapat dikirim untuk menulis data ke tabel data. Nilai default: 10. |
bucket.count | integer | Tidak | 3 | Jumlah bucket ke mana data ditulis. Nilai default: 3. Jika Anda meningkatkan nilai parameter ini, kemampuan tulis konkuren dapat ditingkatkan. Namun, Anda tidak dapat mengatur nilai parameter ini lebih besar dari jumlah maksimum permintaan tulis konkuren yang Anda tentukan. |
flush.Interval | integer | Tidak | 10000 | Interval refresh antrian buffer saat data ditulis ke tabel data. Unit: milidetik. Nilai default: 10000. |
Parameter Kesalahan Waktu Proses Konektor
Parameter | Tipe | Diperlukan | Contoh | Deskripsi |
runtime.error.tolerance | string | Tidak | none | Kebijakan penanganan kesalahan yang digunakan jika terjadi kesalahan saat catatan pesan Kafka diurai atau ditulis ke tabel. Nilai default: none. Nilai valid:
Nilai parameter ini tidak peka huruf besar/kecil. |
runtime.error.mode | string | Tidak | ignore | Menentukan cara memproses catatan pesan yang melaporkan kesalahan saat catatan pesan Kafka diurai atau ditulis ke tabel. Nilai default: ignore. Nilai valid:
Jika runtime.error.mode diatur ke kafka, Anda perlu melakukan serialisasi header, kunci, dan nilai catatan pesan Kafka. Jika runtime.error.mode diatur ke tablestore, Anda perlu melakukan serialisasi kunci dan nilai catatan pesan Kafka. Secara default, org.apache.kafka.connect.json.JsonConverter digunakan untuk serialisasi data dan schemas.enable diatur ke true. Anda dapat menggunakan JsonConverter untuk deserialisasi data untuk mendapatkan data asli. Untuk informasi lebih lanjut tentang Converter, lihat Kafka Converter. |
runtime.error.bootstrap.servers | string | Tidak | localhost:9092 | Alamat kluster Kafka tempat catatan pesan yang melaporkan kesalahan dan pesan kesalahan disimpan. |
runtime.error.topic.name | string | Tidak | errors | Nama topik Kafka yang menyimpan catatan pesan yang melaporkan kesalahan dan pesan kesalahan. |
runtime.error.table.name | string | Tidak | errors | Nama tabel Tablestore yang menyimpan catatan pesan yang melaporkan kesalahan dan pesan kesalahan. |
Parameter Terkait Seri Waktu
Parameter | Tipe | Diperlukan | Contoh | Deskripsi |
tablestore.timeseries.<tablename>.measurement | string | Ya | mName | Menentukan bahwa nilai yang sesuai dengan kunci yang ditentukan dalam data berformat JSON ditulis ke tabel seri waktu sebagai nilai bidang _m_name. Jika tablestore.timeseries.<tablename>.measurement diatur ke <topic>, nilai yang sesuai dengan kunci topik catatan pesan Kafka ditulis ke tabel seri waktu sebagai nilai bidang _m_name. <tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda. Misalnya, jika nama tabel seri waktu adalah test, nama parameternya adalah tablestore.timeseries.test.measurement. |
tablestore.timeseries.<tablename>.dataSource | string | Ya | ds | Menentukan bahwa nilai yang sesuai dengan kunci ds dalam data berformat JSON ditulis ke tabel seri waktu sebagai nilai bidang _data_source. <tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda. |
tablestore.timeseries.<tablename>.tags | list | Ya | region,level | Menentukan bahwa nilai yang sesuai dengan kunci region dan level dalam data berformat JSON ditulis ke tabel seri waktu sebagai nilai bidang tags. <tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda. |
tablestore.timeseries.<tablename>.time | string | Ya | timestamp | Menentukan bahwa nilai yang sesuai dengan kunci timestamp dalam data berformat JSON ditulis ke tabel seri waktu sebagai nilai bidang _time. <tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda. |
tablestore.timeseries.<tablename>.time.unit | string | Ya | MILLISECONDS | Satuan nilai parameter tablestore.timeseries.<tablename>.time. Nilai valid: SECONDS, MILLISECONDS, MICROSECONDS, dan NANOSECONDS. <tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda. |
tablestore.timeseries.<tablename>.field.name | list | Tidak | cpu,io | Menentukan bahwa kunci cpu dan io dalam data berformat JSON ditulis ke tabel seri waktu sebagai nama _field_name dan nilai yang sesuai dengan kunci cpu dan io dalam data berformat JSON ditulis ke tabel seri waktu sebagai nilai _field_name. <tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda. |
tablestore.timeseries.<tablename>.field.type | string | Tidak | double,integer | Tipe data bidang yang ditentukan oleh tablestore.timeseries.<tablename>.field.name. Nilai valid: double, integer, string, binary, dan boolean. Pisahkan beberapa tipe data dengan koma (,). <tablename> dalam parameter adalah placeholder untuk nama tabel seri waktu. Ubah nama parameter berdasarkan kebutuhan bisnis Anda. |
tablestore.timeseries.mapAll | boolean | Tidak | false | Menentukan apakah bidang selain bidang primary key dan bidang waktu dalam data berformat JSON ditulis ke tabel seri waktu sebagai bidang. Jika tablestore.timeseries.mapAll diatur ke false, Anda harus mengonfigurasi parameter tablestore.timeseries.<tablename>.field.name dan tablestore.timeseries.<tablename>.field.type. |
tablestore.timeseries.toLowerCase | boolean | Tidak | true | Menentukan apakah kunci dalam bidang dikonversi ke huruf kecil sebelum ditulis/dan kemudian ditulis ke tabel seri waktu. Kunci dalam bidang adalah kunci dalam bidang non-primary key atau non-waktu, atau kunci yang ditentukan dalam tablestore.timeseries.<tablename>.field.name. |
tablestore.timeseries.rowsPerBatch | integer | Tidak | 50 | Jumlah maksimum baris yang dapat ditulis ke Tablestore dalam satu permintaan. Nilai maksimum dan default adalah 200. |
Lampiran: Pemetaan Tipe Data antara Kafka dan Tablestore
Tabel berikut menjelaskan pemetaan tipe data antara Kafka dan Tablestore.
Skema tipe Kafka | Tipe data Tablestore |
STRING | STRING |
INT8, INT16, INT32, dan INT64 | INTEGER |
FLOAT32 dan FLOAT64 | DOUBLE |
BOOLEAN | BOOLEAN |
BYTES | BINARY |
Lampiran: Sintaks Hapus
Fitur ini didukung hanya saat data disinkronkan dari Kafka ke tabel data di Tablestore.
Tabel berikut menjelaskan metode penulisan data ke tabel data Tablestore berdasarkan konfigurasi mode tulis (insert.mode) dan mode penghapusan (delete.mode), ketika catatan pesan berisi nilai kosong dan data disinkronkan dari Kafka ke tabel data di Tablestore.
insert.mode | put | update | ||||||
delete.mode | none | row | column | row_and_column | none | row | column | row_and_column |
Nilai Kosong | Timpa | Hapus baris | Timpa | Hapus baris | Data kotor | Hapus baris | Data kotor | Hapus baris |
Semua bidang kosong dalam nilai | Timpa | Timpa | Timpa | Timpa | Data kotor | Data kotor | Hapus kolom | Hapus kolom |
Beberapa bidang kosong dalam nilai | Timpa | Timpa | Timpa | Timpa | Abaikan nilai kosong | Abaikan nilai kosong | Hapus kolom | Hapus kolom |