Topik ini menjelaskan cara menggunakan Konektor Lindorm.
Latar Belakang
Lindorm adalah layanan basis data hiper-konvergen cloud-native yang dioptimalkan untuk menyimpan dan memproses data multimodal dalam berbagai skenario seperti IoT, Internet, dan Internet of Vehicles (IoV). Lindorm cocok untuk skenario seperti pencatatan, pemantauan, penagihan, periklanan, jejaring sosial, perjalanan, dan manajemen risiko. Lindorm juga merupakan salah satu layanan basis data yang mendukung bisnis inti Alibaba Group.
Lindorm menyediakan fitur-fitur berikut:
Mendukung akses dan pemrosesan terpadu dari berbagai jenis data seperti tabel lebar, deret waktu, teks, objek, aliran, dan ruang.
Kompatibel dengan beberapa antarmuka standar seperti SQL, Apache HBase, Apache Cassandra, Amazon S3, Time Series Database (TSDB), Hadoop Distributed File System (HDFS), Apache Solr, dan Kafka. Lindorm juga dapat diintegrasikan secara mulus dengan alat ekosistem pihak ketiga.
Tabel berikut menggambarkan kemampuan yang didukung oleh Konektor Lindorm.
Item | Deskripsi |
Jenis tabel | Tabel dimensi dan tabel sink |
Mode operasi | Mode streaming |
Format data | Tidak tersedia |
Metrik | |
Jenis API | SQL API |
Mesin Lindorm | LindormTable |
Pembaruan atau penghapusan data dalam tabel sink | Didukung |
Catatan penggunaan
Tabel HBase Lindorm tidak didukung.
Mesin tabel lebar Lindorm dan tabel Lindorm harus dibuat terlebih dahulu. Untuk informasi lebih lanjut, lihat Buat Instans.
Koneksi jaringan harus dibuat antara kluster Lindorm dan ruang kerja Flink. Sebagai contoh, kluster Lindorm dan Realtime Compute for Apache Flink berada dalam virtual private cloud (VPC) yang sama.
Sintaksis
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);Opsi Konektor
Umum
Opsi
Deskripsi
Tipe
Diperlukan
Nilai default
Catatan
connector
Jenis tabel.
String
Ya
Tidak ada nilai default
Setel nilainya menjadi lindorm.
seedserver
Titik akhir server Lindorm.
String
Ya
Tidak ada nilai default
Realtime Compute for Apache Flink memanggil API Java ApsaraDB untuk HBase untuk mengakses Lindorm dan menggunakan LindormTable. Titik akhir server Lindorm berada dalam format
host:port. Untuk informasi lebih lanjut, lihat Gunakan Flink untuk terhubung dan menggunakan LindormTable.namespace
Namespace database Lindorm.
String
Ya
Tidak ada nilai default
Tidak tersedia
username
Nama pengguna yang digunakan untuk mengakses database Lindorm.
String
Ya
Tidak ada nilai default
Tidak tersedia
password
Kata sandi yang digunakan untuk mengakses database Lindorm.
String
Ya
Tidak ada nilai default
Tidak tersedia
tableName
Nama tabel Lindorm.
String
Ya
Tidak ada nilai default
Tidak tersedia
columnFamily
Nama keluarga kolom tabel Lindorm.
String
Ya
Tidak ada nilai default
Jika nama keluarga kolom tidak ditentukan saat Anda membuat tabel Lindorm, masukkan nama keluarga kolom default f.
retryIntervalMs
Interval waktu operasi baca diulang ketika pembacaan data gagal.
Integer
Tidak
1000
Satuan: milidetik.
maxRetryTimes
Jumlah maksimum percobaan ulang untuk membaca atau menulis data.
Integer
Tidak
5
Tidak tersedia
Spesifik Sink
Opsi
Deskripsi
Tipe
Diperlukan
Nilai default
Catatan
bufferSize
Jumlah catatan data yang dapat ditulis pada satu waktu.
Integer
Tidak
500
Tidak tersedia
flushIntervalMs
Interval waktu data ditulis ke tabel ketika jumlah data sedikit.
Integer
Tidak
2000
Satuan: milidetik.
ignoreDelete
Menentukan apakah akan melewati operasi penghapusan.
Boolean
Tidak
false
Nilai valid:
true
false (default)
dynamicColumnSink
Menentukan apakah akan mengaktifkan fitur tabel dinamis. Untuk informasi lebih lanjut tentang fitur tabel dinamis, lihat bagian Tabel Dinamis dari topik ini.
Boolean
Tidak
false
Nilai valid:
true
false (default)
excludeUpdateColumns
Bidang yang tidak diperbarui. Nilai pembaruan dari bidang yang ditentukan tidak dimasukkan ke dalam tabel sink.
String
Tidak
Tidak ada nilai default
Pisahkan beberapa bidang dengan koma (
,). Sebagai contoh, jika Anda menyetel opsiexcludeUpdateColumnsmenjadia,b,c, pembaruan pada bidanga, b, dan cdiabaikan.CatatanHanya VVR 8.0.9 atau yang lebih baru yang mendukung opsi ini.
Spesifik Tabel Dimensi
Opsi
Deskripsi
Tipe
Diperlukan
Nilai default
Catatan
partitionedJoin
Menentukan apakah akan menggunakan JoinKey untuk partisi.
Boolean
Tidak
false
Nilai valid:
true: JoinKey digunakan untuk partisi. Data didistribusikan ke setiap node JOIN untuk meningkatkan tingkat hit cache.
false (default): JoinKey tidak digunakan untuk partisi.
shuffleEmptyKey
Menentukan apakah akan mengacak kunci upstream kosong secara acak ke node downstream.
Boolean
Tidak
false
Nilai valid:
true: Sistem mengacak kunci upstream kosong secara acak ke node downstream.
false (default): Sistem mengacak kunci upstream kosong ke thread paralel pertama dari setiap node downstream. Thread paralel pertama bernomor 0.
cache
Kebijakan cache.
String
Tidak
None
Nilai valid:
None (default): Tidak ada data yang di-cache.
LRU: Hanya data yang baru saja diakses dalam tabel dimensi yang di-cache.
Jika Anda menyetel opsi
cachemenjadiLRU, Anda harus mengonfigurasi opsicacheSizedancacheTTLMs.cacheSize
Jumlah baris data yang dapat di-cache.
Integer
Tidak
1000
Jika Anda menyetel opsi
cachemenjadiLRU, Anda dapat mengonfigurasi opsicacheSize.cacheTTLMs
Periode waktu habis cache.
Integer
Tidak
Tidak ada nilai default
Satuan: milidetik. Jika Anda menyetel opsi
cachemenjadiLRU, Anda dapat mengonfigurasi opsicacheTTLMs. Secara default, entri cache tidak kedaluwarsa.cacheEmpty
Menentukan apakah akan menyimpan cache kueri JOIN yang nilai balikannya kosong.
Boolean
Tidak
true
CatatanKonektor Lindorm mendukung join pencarian satu-ke-banyak. Perhatikan dengan cermat strategi caching dan throughput untuk performa optimal.
async
Menentukan apakah akan mengaktifkan sinkronisasi data dalam mode asinkron.
Boolean
Tidak
false
Nilai valid:
true
false (default)
asyncLindormRpcTimeoutMs
Periode timeout ketika data diminta dalam mode asinkron.
Integer
Tidak
300000
Satuan: milidetik.
Tabel dinamis
Fitur tabel dinamis cocok untuk skenario di mana tidak ada kolom yang ditentukan dalam tabel dan kolom dibuat serta disisipkan ke dalam tabel berdasarkan status penyebaran. Sebagai contoh, Anda menggunakan hari sebagai kunci utama dan jam sebagai kolom untuk menghitung volume transaksi per jam setiap hari. Data untuk setiap jam dihasilkan secara dinamis. Tabel berikut menunjukkan tabel dinamis.
Kunci utama | Nama kolom: 00:00 | Nama kolom: 01:00 |
2025-06-01 | 45 | 32 |
2025-06-02 | 76 | 34 |
Tabel dinamis harus mematuhi aturan DDL berikut: Beberapa kolom pertama didefinisikan sebagai kunci utama. Nilai kolom pertama di dua kolom terakhir digunakan sebagai nama kolom, nilai kolom terakhir digunakan sebagai nilai dari kolom sebelumnya, dan tipe data dari dua kolom terakhir harus VARCHAR. Contoh kode:
CREATE TABLE lindorm_dynamic_output(
pk1 varchar,
pk2 varchar,
pk3 varchar,
c1 varchar,
c2 varchar,
PRIMARY KEY (pk1,pk2,pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);Dalam contoh sebelumnya, pk1, pk2, dan pk3 digunakan sebagai kunci utama. c1 dan c2 adalah dua kolom yang diperlukan untuk tabel dinamis dan harus menjadi dua kolom terakhir. Kecuali c1 dan c2, kolom yang tidak digunakan sebagai kunci utama tidak diperbolehkan. Setiap kali data ditulis ke tabel sink Lindorm, kolom ditambahkan atau dimodifikasi dalam catatan data yang sesuai dengan nilai <pk1, pk2, pk3> dari kunci utama. Nilai c1 digunakan sebagai nama kolom, dan nilai c2 digunakan sebagai nilai kolom. Setiap kali catatan data diterima, hanya nilai dalam satu kolom yang ditambahkan atau diubah. Nilai kolom lainnya tetap tidak berubah.
Pemetaan tipe data
Semua data di Lindorm berada dalam format biner. Tabel berikut menunjukkan cara mengonversi data menjadi byte data biner atau mengurai byte data biner berdasarkan tipe data bidang di Realtime Compute for Apache Flink.
Tipe data SQL Flink | Metode untuk mengonversi data menjadi byte untuk Lindorm | Metode untuk mengurai byte dari Lindorm |
CHAR | org.apache.flink.table.data.StringData::toBytes | org.apache.flink.table.data.StringData::fromBytes |
VARCHAR | ||
BOOLEAN | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
BINARY | Konversi langsung data menjadi byte. | Kembalikan byte secara langsung. |
VARBINARY | ||
DECIMAL | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
TINYINT | Enkapsulasi langsung data ke byte pertama dari byte[]. | Kembalikan bytes[0] secara langsung. |
SMALLINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short) | com.alibaba.lindorm.client.core.utils.Bytes::toShort |
INT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) | com.alibaba.lindorm.client.core.utils.Bytes::toInt |
BIGINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) | com.alibaba.lindorm.client.core.utils.Bytes::toLong |
FLOAT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float) | com.alibaba.lindorm.client.core.utils.Bytes::toFloat |
DOUBLE | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double) | com.alibaba.lindorm.client.core.utils.Bytes::toDouble |
DATE | Panggil com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) setelah jumlah hari sejak 1 Januari 1970 diperoleh. | Panggil com.alibaba.lindorm.client.core.utils.Bytes::toInt untuk mendapatkan jumlah hari sejak 1 Januari 1970. |
TIME | Panggil com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) setelah jumlah milidetik sejak 00:00:00 hari saat ini diperoleh. | Panggil com.alibaba.lindorm.client.core.utils.Bytes::toInt untuk mendapatkan jumlah milidetik sejak 00:00:00 hari saat ini. |
TIMESTAMP | Panggil com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) setelah jumlah milidetik sejak 00:00:00 pada 1 Januari 1970 diperoleh. | Panggil com.alibaba.lindorm.client.core.utils.Bytes::toLong untuk mendapatkan jumlah milidetik sejak 00:00:00 pada 1 Januari 1970. |
Contoh kode
CREATE TEMPORARY TABLE example_source(
id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.id.kind' = 'sequence',
'fields.id.start' = '0',
'fields.id.end' = '9'
);
CREATE TEMPORARY TABLE lindorm_hbase_dim(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_dim_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
CREATE TEMPORARY TABLE lindorm_hbase_sink(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_sink_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;