Topik ini menjelaskan cara menggunakan Konektor StarRocks.
Latar Belakang
StarRocks adalah gudang data Massively Parallel Processing (MPP) generasi baru yang menyediakan kinerja query sangat cepat di berbagai skenario. StarRocks dirancang untuk memberikan pengalaman analitik data yang cepat dan terpadu. Berikut adalah manfaat utama StarRocks:
Kompatibel dengan protokol MySQL, sehingga Anda dapat menggunakan klien MySQL atau alat business intelligence (BI) umum untuk mengakses StarRocks dalam analitik data.
Menggunakan arsitektur terdistribusi dengan kemampuan berikut:
Membagi tabel secara horizontal dan menyimpan data dalam beberapa replika.
Menyediakan skalabilitas cluster yang fleksibel untuk mendukung analitik hingga 10 PB data.
Mendukung arsitektur MPP untuk mempercepat komputasi data.
Menyediakan beberapa replika untuk memastikan toleransi kesalahan.
Konektor Flink menyimpan data dalam cache dan menggunakan Stream Load untuk mengimpor data secara batch guna menghasilkan tabel hasil, serta membaca data secara batch untuk menghasilkan tabel sumber. Tabel berikut menjelaskan kemampuan yang didukung oleh Konektor StarRocks.
Item | Deskripsi |
Tipe tabel | Tabel sumber, tabel dimensi, tabel sink, dan sink pemasukan data |
Mode operasi | Mode streaming dan mode batch |
Format data | CSV |
Metrik | Tidak tersedia |
Tipe API | DataStream API, SQL API, dan YAML API untuk pemasukan data |
Pembaruan atau penghapusan data dalam tabel hasil | Didukung |
Prasyarat
Cluster StarRocks telah dibuat. Cluster tersebut dapat berupa cluster StarRocks EMR atau cluster StarRocks yang dikelola sendiri yang dihosting pada instance Elastic Compute Service (ECS).
Batasan
Konektor StarRocks hanya mendukung semantik at-least-once dan exactly-once.
Hanya Ververica Runtime (VVR) 11.1 atau versi lebih baru yang mendukung lookup joins dengan tabel dimensi StarRocks.
Untuk mencegah pembatasan jaringan, masukkan port berikut ke dalam daftar putih kluster StarRocks Anda di grup keamanan atau firewall:
9030,8030,8040,9060,8060,9020.
Pernyataan SQL
Fitur
StarRocks dari E-MapReduce (EMR) mendukung pernyataan CREATE TABLE AS (CTAS) dan CREATE DATABASE AS (CDAS). Pernyataan CREATE TABLE AS digunakan untuk menyinkronkan skema dan data dari satu tabel, sedangkan pernyataan CREATE DATABASE AS digunakan untuk menyinkronkan data seluruh database atau skema dan data dari beberapa tabel dalam database yang sama. Untuk informasi lebih lanjut, lihat Gunakan pernyataan CREATE TABLE AS dan CREATE DATABASE AS dari Realtime Compute for Apache Flink untuk menyinkronkan data dari instance ApsaraDB RDS for MySQL ke cluster StarRocks.
Sintaksis
CREATE TABLE USER_RESULT(
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);Opsi konektor dalam klausa WITH
Kategori | Opsi | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
Umum | connector | Tipe tabel. | String | Ya | Tidak ada nilai default | Setel nilainya menjadi |
jdbc-url | URL Java Database Connectivity (JDBC) yang digunakan untuk terhubung ke database. | String | Ya | Tidak ada nilai default | IP address dan port JDBC frontend (FE) yang ditentukan digunakan. Nilai opsi ini dalam format | |
database-name | Nama database StarRocks. | String | Ya | Tidak ada nilai default | N/A | |
table-name | Nama tabel StarRocks. | String | Ya | Tidak ada nilai default | N/A | |
username | Nama pengguna yang digunakan untuk terhubung ke database StarRocks. | String | Ya | Tidak ada nilai default | N/A | |
password | Kata sandi yang digunakan untuk terhubung ke database StarRocks. | String | Ya | Tidak ada nilai default | N/A | |
starrocks.create.table.properties | Properti tabel StarRocks. | String | Tidak | Tidak ada nilai default | Properti awal tabel StarRocks, seperti mesin dan jumlah replika, ditentukan. Contoh: 'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1' | |
Spesifik sumber | scan-url | URL untuk pemindaian data. | String | Tidak | Tidak ada nilai default | IP address dan port HTTP frontend (FE) yang ditentukan digunakan. Nilai opsi ini dalam format Catatan Pisahkan beberapa pasangan IP address dan nomor port dengan titik koma (;). |
scan.connect.timeout-ms | Periode timeout untuk konektor StarRocks dari Realtime Compute for Apache Flink untuk terhubung ke database StarRocks. Jika durasi koneksi melebihi nilai opsi ini, kesalahan akan dikembalikan. | String | Tidak | 1000 | Satuan: milidetik. | |
scan.params.keep-alive-min | Periode keep-alive tugas query. | String | Tidak | 10 | N/A | |
scan.params.query-timeout-s | Periode timeout tugas query. Jika tidak ada hasil query yang dikembalikan dalam periode yang ditentukan oleh opsi ini, tugas query dihentikan. | String | Tidak | 600 | Satuan: detik. | |
scan.params.mem-limit-byte | Memori maksimum untuk satu query dalam node backend (BE). | String | Tidak | 1073741824 (1 GB) | Satuan: byte. | |
scan.max-retries | Jumlah maksimum percobaan ulang saat query gagal. Jika jumlah percobaan ulang mencapai nilai opsi ini, kesalahan akan dikembalikan. | String | Tidak | 1 | N/A | |
Spesifik sink | load-url | URL untuk impor data. | String | Ya | Tidak ada nilai default | IP address dan port HTTP frontend (FE) yang ditentukan digunakan. Nilai opsi ini dalam format Catatan Pisahkan beberapa pasangan IP address dan nomor port dengan titik koma (;). |
sink.semantic | Semantik untuk penulisan data. | String | Tidak | at-least-once | Nilai valid:
| |
sink.buffer-flush.max-bytes | Jumlah maksimum data yang diizinkan dalam buffer. | String | Tidak | 94371840 (90 MB) | Nilai valid: 64 MB hingga 10 GB. | |
sink.buffer-flush.max-rows | Jumlah maksimum baris yang diizinkan dalam buffer. | String | Tidak | 500000 | Nilai valid: 64000 hingga 5000000. | |
sink.buffer-flush.interval-ms | Interval refresh buffer. | String | Tidak | 300000 | Nilai valid: 1000 hingga 3600000. Satuan: milidetik. | |
sink.max-retries | Jumlah maksimum percobaan ulang untuk menulis data ke tabel. | String | Tidak | 3 | Nilai valid: 0 hingga 10. | |
sink.connect.timeout-ms | Periode timeout untuk terhubung ke database StarRocks. | String | Tidak | 1000 | Nilai valid: 100 hingga 60000. Satuan: milidetik. | |
sink.properties.* | Properti tabel sink. | String | Tidak | Tidak ada nilai default | Properti impor Stream Load. Misalnya, properti sink.properties.format menentukan format data yang diimpor dalam mode Stream Load. Format data bisa berupa CSV. Untuk informasi lebih lanjut tentang opsi, lihat Stream Load. | |
Spesifik tabel dimensi | lookup.cache.enabled | Menentukan apakah akan menyimpan cache tabel dimensi. | Boolean | Tidak | true | Nilai valid:
Penting
|
Pemetaan tipe data
Tipe data StarRocks | Tipe data Realtime Compute for Apache Flink |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
BIGINT UNSIGNED Catatan Hanya VVR 8.0.10 atau lebih baru yang mendukung pemetaan tipe data ini. | DECIMAL(20,0) |
LARGEINT | DECIMAL(20,0) |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR(m) Catatan
| CHAR(n) |
VARCHAR(m) Catatan
| CHAR(n) |
VARCHAR | STRING |
VARBINARY Catatan Hanya VVR 8.0.10 atau lebih baru yang mendukung pemetaan tipe data ini. | VARBINARY |
Kode contoh
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
PRIMARY KEY(`runoob_id`)
NOT ENFORCED
) WITH (
'jdbc-url' = 'jdbc:mysql://ip:9030',
'connector' = 'starrocks',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;StarRocks memungkinkan kolom kunci primer bernilai null, tetapi Flink mengharuskan kunci primer tidak bernilai null dan unik demi konsistensi data. Kolom kunci primer bernilai null di StarRocks akan menghasilkan error: Invalid primary key. Column 'xxx' is nullable. Untuk informasi selengkapnya, lihat Mengapa saya mendapatkan error "Invalid primary key. Column 'xxx' is nullable." saat menulis ke tabel?.
Pemasukan data
Anda dapat menggunakan Konektor Pipeline StarRocks untuk menulis catatan data dan perubahan skema tabel dari sumber data hulu ke database StarRocks eksternal. Baik StarRocks open source maupun EMR Serverless StarRocks yang dikelola sepenuhnya didukung.
Fitur
Pembuatan database dan tabel otomatis
Jika database dan tabel hulu tidak ada di instance StarRocks hilir, database dan tabel akan dibuat secara otomatis. Anda dapat mengonfigurasi opsi
table.create.properties.*untuk menentukan opsi pembuatan tabel otomatis.Sinkronisasi perubahan skema tabel
Konektor StarRocks secara otomatis menerapkan event CreateTableEvent, AddColumnEvent, dan DropColumnEvent ke database hilir.
Mulai dari VVR 11.1, konversi tipe kolom yang kompatibel didukung. Untuk informasi lebih lanjut, lihat ALTER TABLE dalam dokumentasi StarRocks.
Catatan penggunaan
Konektor StarRocks hanya mendukung semantik at-least-once dan menggunakan tabel kunci utama untuk memastikan idempotensi operasi penulisan.
Tabel tempat data disinkronkan harus berisi kunci utama. Untuk tabel yang tidak memiliki kunci utama, Anda harus menentukan kunci utama untuk setiap tabel dalam blok pernyataan
TRANSFORMsebelum data dalam tabel dapat ditulis ke database hilir. Contoh kode:transform: - source-table: ... primary-keys: id, ...Kunci bucket tabel yang dibuat secara otomatis harus sama dengan kunci utama, dan tabel tidak boleh berisi kunci partisi.
Selama sinkronisasi perubahan skema tabel, kolom baru hanya dapat ditambahkan di akhir kolom yang ada. Secara default, mode Lenient digunakan untuk evolusi skema. Dalam mode ini, kolom yang dimasukkan di posisi lain dalam tabel secara otomatis dipindahkan ke akhir kolom yang ada.
Jika Anda menggunakan versi StarRocks lebih awal dari 2.5.7, Anda harus secara eksplisit menentukan jumlah bucket menggunakan opsi
table.create.num-buckets. Jika Anda menggunakan StarRocks 2.5.7 atau lebih baru, jumlah bucket ditentukan secara otomatis. Untuk informasi lebih lanjut, lihat Distribusi Data.Jika Anda menggunakan StarRocks 3.2 atau lebih baru, kami sarankan Anda mengatur opsi
table.create.properties.fast_schema_evolutionke true untuk mempercepat perubahan skema tabel.Masalah data stream mungkin muncul selama ingest data ke EMR Serverless StarRocks menggunakan Flink CDC. Untuk menghindari masalah ini, gunakan salah satu opsi berikut:
Gunakan pekerjaan SQL dan atur
sink.version=V1dalam draft pekerjaan.Tetap gunakan Flink CDC, tetapi aktifkan
FE emr_internal_redirect.Gunakan instans EMR Serverless StarRocks dengan Private Zone bawaan daripada SLB untuk load balancing.
Sintaksis
source:
...
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
sink.buffer-flush.interval-ms: 5000 # Flush buffered data every 5 seconds.Opsi konektor
Opsi | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
| Nama konektor. | String | Ya | Tidak ada nilai default | Setel nilainya menjadi |
| Nama tampilan sink. | String | Tidak | Tidak ada nilai default | N/A |
| URL JDBC yang digunakan untuk terhubung ke database. | String | Ya | Tidak ada nilai default | Anda dapat menentukan beberapa URL. Pisahkan URL dengan koma ( |
| URL HTTP yang digunakan untuk terhubung ke node FE. | String | Ya | Tidak ada nilai default | Anda dapat menentukan beberapa URL. Pisahkan URL dengan titik koma ( |
| Nama pengguna yang digunakan untuk terhubung ke database StarRocks. | String | Ya | Tidak ada nilai default | Izin SELECT dan INSERT pada tabel tujuan harus diberikan kepada pengguna. Anda dapat memberikan izin yang diperlukan kepada pengguna dengan menggunakan perintah GRANT dari StarRocks. |
| Kata sandi yang digunakan untuk terhubung ke database StarRocks. | String | Ya | Tidak ada nilai default | N/A |
| Semantik untuk penulisan data. | String | Tidak | at-least-once | Nilai valid:
|
| Awalan label yang digunakan untuk Stream Load. | String | Tidak | Tidak ada nilai default | N/A |
| Periode timeout untuk koneksi HTTP. | Integer | Tidak | 30000 | Satuan: milidetik. Nilai valid: 100 hingga 60000. |
| Periode timeout untuk klien menunggu respons 100 Continue dari server. | Integer | Tidak | 30000 | Satuan: milidetik. Nilai valid: 3000 hingga 600000. |
| Ukuran data yang dapat disimpan dalam cache di memori sebelum data ditulis ke database StarRocks. | Long | Tidak | 157286400 | Satuan: byte. Nilai valid: 64 MB hingga 10 GB. Catatan
|
| Jumlah rekaman yang dapat disimpan dalam cache di memori sebelum data ditulis ke database StarRocks. | Long | Tidak | 500000 | Nilai valid: 64000 hingga 5000000. |
| Interval antara dua operasi flush berturut-turut untuk setiap tabel. | Long | Tidak | 300000 | Satuan: milidetik. Catatan Untuk dataset yang lebih kecil, kurangi opsi ini untuk memastikan data dalam buffer segera di-flush. |
| Jumlah maksimum percobaan ulang. | Long | Tidak | 3 | Nilai valid: 0 hingga 1000. |
| Interval antara dua pemeriksaan berturut-turut untuk mendeteksi apakah operasi flush perlu dilakukan. | Long | Tidak | 50 | Satuan: milidetik. |
| Jumlah thread selama impor data dalam mode Stream Load. | Integer | Tidak | 2 | N/A |
| Menentukan apakah akan menggunakan antarmuka transaksi Stream Load untuk impor data. | Boolean | Tidak | true | Pengaturan opsi ini hanya berlaku saat database yang didukung digunakan. |
| Opsi tambahan yang disediakan untuk sink. | String | Tidak | Tidak ada nilai default | Anda dapat melihat opsi yang didukung dalam mode Stream Load. |
| Jumlah bucket dari tabel yang dibuat secara otomatis. | Integer | Tidak | Tidak ada nilai default |
|
| Opsi tambahan yang harus ditentukan saat tabel dibuat secara otomatis. | String | Tidak | Tidak ada nilai default | Sebagai contoh, Anda dapat menambahkan konfigurasi |
| Durasi timeout untuk operasi perubahan skema. | Durasi | Tidak | 30 min | Nilai opsi ini harus diatur ke bilangan bulat. Satuan: detik. Catatan Jika durasi operasi perubahan skema melebihi nilai yang ditentukan oleh opsi ini, penyebaran gagal. |
| Jumlah byte yang dialokasikan untuk setiap karakter Unicode. | Integer | Tidak | 3 | Dalam Flink CDC, panjang VARCHAR diukur dalam karakter, sedangkan di StarRocks diukur dalam byte. Secara umum, encoding UTF-8 menggunakan paling banyak 3 byte per karakter Unicode. Namun, beberapa karakter langka dan emoji mungkin memerlukan lebih dari 4 byte. |
Pemetaan tipe data
StarRocks tidak mendukung semua tipe data Change Data Capture (CDC) YAML. Jika Anda menulis data dengan tipe yang tidak didukung ke database hilir, pekerjaan tersebut akan gagal. Anda dapat menggunakan fungsi bawaan CAST dalam komponen transform untuk mengonversi tipe data yang tidak didukung atau menggunakan pernyataan proyeksi untuk menghapus data dengan tipe yang tidak didukung dari tabel sink. Untuk informasi lebih lanjut, lihat Referensi Pengembangan Pemasukan Data.
Tipe data CDC | Tipe data StarRocks | Keterangan |
TINYINT | TINYINT | Tidak berlaku. |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | DATETIME | |
DECIMAL(p, s) | DECIMAL(p, s) | StarRocks tidak mendukung DECIMAL sebagai tipe data kunci. Oleh karena itu, jika kolom dengan tipe data DECIMAL pada tabel data hulu digunakan sebagai kunci primer, tipe data kunci dalam skema tabel yang disinkronkan ke StarRocks secara otomatis diubah dari DECIMAL menjadi VARCHAR. |
CHAR(n) (n ≤ 85) | CHAR(n × 3) | Panjang kolom bertipe CHAR di CDC menentukan jumlah karakter yang dapat disimpan, sedangkan panjang kolom bertipe CHAR di StarRocks menentukan jumlah byte dalam encoding UTF-8 yang dapat disimpan. Dalam kebanyakan kasus, satu karakter Tionghoa yang dikodekan dalam UTF-8 tidak melebihi tiga byte. Oleh karena itu, setelah kolom bertipe CHAR di CDC dipetakan ke kolom bertipe CHAR di StarRocks, panjang kolom tersebut menjadi tiga kali lipat dari panjang aslinya sebelum pemetaan. Catatan
|
CHAR(n) (n > 85) | VARCHAR(n × 3) | Panjang kolom bertipe CHAR di CDC menentukan jumlah karakter yang dapat disimpan, sedangkan panjang kolom bertipe CHAR di StarRocks menentukan jumlah byte dalam encoding UTF-8 yang dapat disimpan. Dalam kebanyakan kasus, satu karakter Tionghoa yang dikodekan dalam UTF-8 tidak melebihi tiga byte. Oleh karena itu, setelah kolom bertipe CHAR di CDC dipetakan ke kolom bertipe VARCHAR di StarRocks, panjang kolom tersebut menjadi tiga kali lipat dari panjang aslinya. Catatan
|
VARCHAR(n) | VARCHAR(n × 3) | Panjang kolom bertipe VARCHAR di CDC menentukan jumlah karakter yang dapat disimpan. Namun, panjang kolom bertipe VARCHAR di StarRocks menentukan jumlah byte dalam encoding UTF-8 yang dapat disimpan. Dalam kebanyakan kasus, satu karakter Tionghoa dalam UTF-8 memerlukan tidak lebih dari tiga byte. Oleh karena itu, setelah pemetaan dari CDC ke StarRocks, panjang kolom VARCHAR menjadi tiga kali lipat dari nilai aslinya. |
BINARY(n) | BINARY(n+2) | Dua byte padding ditambahkan untuk mencegah kesalahan. |
VARBINARY(n) | VARBINARY(n+1) | Satu byte padding ditambahkan untuk mencegah kesalahan. |
Evolusi skema
Sebagai sink ingesti data, StarRocks mendukung event evolusi skema berikut:
CREATE TABLE
CatatanJika tabel StarRocks sudah ada, konektor tidak mencoba membuatnya lagi. Anda harus memastikan skema tabel yang ada kompatibel dengan skema sumber.
ADD COLUMN
CatatanStarRocks mensyaratkan kolom kunci primer selalu berada di posisi pertama. Kolom yang baru ditambahkan juga harus mematuhi pembatasan ini.
MODIFY COLUMN TYPE
CatatanUntuk informasi selengkapnya mengenai konversi yang didukung, lihat dokumentasi resmi StarRocks.
DROP COLUMN
TRUNCATE TABLE
DROP TABLE