Connector Flink StarRocks memuat data ke StarRocks secara batch menggunakan Stream Load, menawarkan throughput yang jauh lebih tinggi dibandingkan connector Java Database Connectivity (JDBC) bawaan Apache Flink (flink-connector-jdbc). Alibaba Cloud menyediakan connector Flink StarRocks untuk membantu Anda menyimpan data dalam cache, lalu mengimpornya ke StarRocks sekaligus dalam mode Stream Load. Topik ini menjelaskan cara menyiapkan connector dan menulis data ke StarRocks, dilengkapi contoh lengkap untuk sinkronisasi data MySQL secara real-time.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
-
Apache Flink 1.11 atau versi yang lebih baru (disarankan versi 1.13)
-
Kluster StarRocks dengan port HTTP frontend yang dapat diakses
Tambahkan connector ke proyek Anda
Unduh kode sumber connector dari repositori starrocks-connector-for-apache-flink di GitHub.
Tambahkan dependensi berikut ke file pom.xml proyek Anda:
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- untuk Flink 1.11 dan 1.12 -->
<version>x.x.x_flink-1.11</version>
<!-- untuk Flink 1.13 -->
<version>x.x.x_flink-1.13</version>
</dependency>
Ganti x.x.x dengan versi terbaru. Periksa halaman informasi versi untuk rilis terkini.
Menulis data menggunakan connector Flink
Connector ini menyediakan dua pendekatan untuk menulis data ke StarRocks. Gunakan Metode 1 untuk aliran JSON mentah; gunakan Metode 2 untuk pembuatan tabel berbasis SQL dan data terstruktur.
Metode 1: DataStream API
// -------- sink dengan aliran string JSON mentah --------
fromElements(new String[]{
"{\"score\": \"99\", \"name\": \"stephen\"}",
"{\"score\": \"100\", \"name\": \"lebron\"}"
}).addSink(
StarRocksSink.sink(
// opsi sink
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
.withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
.withProperty("username", "xxx")
.withProperty("password", "xxx")
.withProperty("table-name", "xxx")
.withProperty("database-name", "xxx")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build()
)
);
// -------- sink dengan transformasi aliran --------
class RowData {
public int score;
public String name;
public RowData(int score, String name) {
......
}
}
fromElements(
new RowData[]{
new RowData(99, "stephen"),
new RowData(100, "lebron")
}
).addSink(
StarRocksSink.sink(
// struktur tabel
TableSchema.builder()
.field("score", DataTypes.INT())
.field("name", DataTypes.VARCHAR(20))
.build(),
// opsi sink
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
.withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
.withProperty("username", "xxx")
.withProperty("password", "xxx")
.withProperty("table-name", "xxx")
.withProperty("database-name", "xxx")
.withProperty("sink.properties.column_separator", "\\x01")
.withProperty("sink.properties.row_delimiter", "\\x02")
.build(),
// atur slot dengan streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.score;
slots[1] = streamRowData.name;
}
)
);
Metode 2: Pembuatan tabel SQL
Sebelum menjalankan metode ini, tambahkan com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory ke file src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.
// buat tabel dengan `struktur` dan `properti`
tEnv.executeSql(
"CREATE TABLE USER_RESULT(" +
"name VARCHAR," +
"score BIGINT" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," +
"'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," +
"'database-name' = 'xxx'," +
"'table-name' = 'xxx'," +
"'username' = 'xxx'," +
"'password' = 'xxx'," +
"'sink.buffer-flush.max-rows' = '1000000'," +
"'sink.buffer-flush.max-bytes' = '300000000'," +
"'sink.buffer-flush.interval-ms' = '5000'," +
"'sink.properties.column_separator' = '\\x01'," +
"'sink.properties.row_delimiter' = '\\x02'," +
"'sink.max-retries' = '3'" +
"'sink.properties.*' = 'xxx'" + // properti stream load seperti `'sink.properties.columns' = 'k1, v1'`
")"
);
Parameter sink
Tabel berikut menjelaskan semua parameter untuk connector sink Flink.
| Parameter | Wajib | Nilai default | Tipe | Deskripsi |
|---|---|---|---|---|
connector |
Ya | — | String | Jenis connector. Atur ke starrocks. |
jdbc-url |
Ya | — | String | URL JDBC untuk menghubungkan ke StarRocks dan menjalankan kueri. |
load-url |
Ya | — | String | Alamat HTTP frontend, dalam format fe_ip:http_port;fe_ip:http_port. Pisahkan beberapa frontend dengan titik koma (;). |
database-name |
Ya | — | String | Nama database StarRocks. |
table-name |
Ya | — | String | Nama tabel StarRocks. |
username |
Ya | — | String | Username untuk menghubungkan ke database StarRocks. |
password |
Ya | — | String | Password untuk menghubungkan ke database StarRocks. |
sink.semantic |
Tidak | at-least-once |
String | Semantik pengiriman. Nilai yang valid: at-least-once dan exactly-once. |
sink.buffer-flush.max-bytes |
Tidak | 94371840 (90 MB) |
String | Ukuran buffer maksimum sebelum flush dipicu. Rentang valid: 64 MB hingga 10 GB. |
sink.buffer-flush.max-rows |
Tidak | 500000 |
String | Jumlah maksimum baris dalam buffer sebelum flush dipicu. Rentang valid: 64000 hingga 5000000. |
sink.buffer-flush.interval-ms |
Tidak | 300000 |
String | Interval flush buffer dalam milidetik. Rentang valid: 1000 hingga 3600000. |
sink.max-retries |
Tidak | 1 |
String | Jumlah maksimum upaya pengulangan saat terjadi kegagalan. Rentang valid: 0 hingga 10. |
sink.connect.timeout-ms |
Tidak | 1000 |
String | Batas waktu koneksi untuk alamat HTTP antarmuka depan yang ditentukan dalam load-url, dalam milidetik. Rentang valid: 100–60.000. |
sink.properties.* |
Tidak | — | String | Properti Stream Load yang diteruskan langsung ke StarRocks. |
Catatan penggunaan
Format dan pemisah default
Data diimpor dalam format CSV secara default. Untuk menggunakan pemisah kustom, atur:
-
sink.properties.row_delimiter— pemisah baris (misalnya,\\x02). Didukung di StarRocks 1.15.0 dan versi yang lebih baru. -
sink.properties.column_separator— pemisah kolom (misalnya,\\x01).
Jika Anda tidak dapat menemukan pemisah CSV yang sesuai untuk data Anda, alihkan ke format JSON dengan mengatur sink.properties.format=json dan sink.properties.strip_outer_array=true. Perhatikan bahwa performa impor JSON mungkin lebih rendah dibandingkan CSV.
Saat membuat tabel dan menyinkronkan data di client SQL, escape backslash (\):
'sink.properties.column_separator' = '\\x01'
'sink.properties.row_delimiter' = '\\x02'
Semantik exactly-once
Untuk menggunakan sink.semantic=exactly-once, sistem eksternal harus mendukung protokol two-phase commit. StarRocks tidak mendukung protokol ini secara native, sehingga pengiriman exactly-once bergantung pada checkpointing Flink. Alurnya bekerja sebagai berikut: 1. Saat Flink menghasilkan checkpoint, sekumpulan data dan labelnya disimpan dalam cache sebagai state. 2. Sistem diblokir hingga data yang di-cache berhasil ditulis ke StarRocks. 3. Setelah penulisan selesai, Flink memulai checkpoint berikutnya. Jika StarRocks menjadi tidak tersedia, operator sink stream Flink mungkin terblokir dalam waktu lama akibat kegagalan koneksi, memicu peringatan dan memaksa pekerjaan impor Flink dihentikan.
Penyelesaian masalah konektivitas
Jika kueri data berhasil tetapi penulisan gagal, verifikasi bahwa mesin Anda dapat mencapai port HTTP backend StarRocks. Saat Anda mengirimkan pekerjaan impor, frontend meneruskan operasi tulis ke alamat IP internal dan port HTTP backend.
Sebagai contoh, jika mesin Anda memiliki alamat IP publik dan frontend serta backend kluster dapat diakses melalui alamat IP publik, tetapi backend menggunakan alamat IP internal untuk komunikasi intra-kluster — menentukan alamat frontend publik dalam load-url menyebabkan frontend meneruskan penulisan ke IP internal backend. Jika mesin Anda tidak dapat mencapai IP internal tersebut, penulisan akan gagal.
Jika pekerjaan impor berhenti secara tak terduga, tingkatkan kapasitas memori pekerjaan tersebut.
Menyinkronkan data MySQL ke StarRocks menggunakan CDC
Gunakan connector change data capture (CDC) Flink dan tool migrasi StarRocks untuk menyinkronkan database MySQL ke kluster StarRocks secara near real-time. Tool migrasi StarRocks secara otomatis menghasilkan pernyataan CREATE TABLE berdasarkan skema MySQL dan konfigurasi kluster StarRocks.
Diagram dan beberapa informasi dalam bagian ini bersumber dari Realtime synchronization from MySQL dalam dokumentasi open-source StarRocks.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
-
Apache Flink 1.13 (disarankan) atau 1.11 / 1.12
-
Akses ke database MySQL dan kluster StarRocks
Siapkan lingkungan
-
Unduh Apache Flink. Disarankan menggunakan Apache Flink 1.13. Versi paling awal yang didukung adalah 1.11.
-
Unduh connector Flink CDC untuk MySQL. Pilih paket yang sesuai dengan versi Flink Anda.
-
Unduh connector Flink StarRocks.
PentingConnector Flink StarRocks berbeda untuk Apache Flink 1.13, 1.11, dan 1.12. Unduh versi yang tepat.
-
Salin kedua file
flink-sql-connector-mysql-cdc-xxx.jardanflink-connector-starrocks-xxx.jarke direktoriflink-xxx/lib/. -
Unduh paket smt.tar.gz, ekstrak, lalu edit file konfigurasi dengan detail koneksi Anda. File konfigurasi menggunakan bagian-bagian berikut: Contoh konfigurasi:
-
db— informasi koneksi database MySQL -
be_num— jumlah node dalam kluster StarRocks -
[table-rule.N]— aturan pencocokan menggunakan ekspresi reguler untuk nama database dan nama tabel. Konfigurasikan satu bagian per set aturan. -
flink.starrocks.*— konfigurasi kluster StarRocks
[db] host = 192.168.**.** port = 3306 user = root password = [other] # jumlah backend di StarRocks be_num = 3 # `decimal_v3` didukung sejak StarRocks-1.18.1 use_decimal_v3 = false # file untuk menyimpan DDL SQL hasil konversi output_dir = ./result [table-rule.1] # pola untuk mencocokkan database guna mengatur properti database = ^console_19321.*$ # pola untuk mencocokkan tabel guna mengatur properti table = ^.*$ ############################################ ### konfigurasi sink flink ### JANGAN atur `connector`, `table-name`, `database-name`, karena di-generate otomatis ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000 -
Buat tabel dan mulai sinkronisasi
-
Jalankan
starrocks-migrate-tooluntuk menghasilkan pernyataanCREATE TABLE. Output ditulis ke direktori/result. Jalankanls resultuntuk memverifikasi file yang dihasilkan:flink-create.1.sql smt.tar.gz starrocks-create.all.sql flink-create.all.sql starrocks-create.1.sql -
Buat database dan tabel di StarRocks:
Mysql -h <alamat IP frontend> -P 9030 -u root -p < starrocks-create.1.sql -
Buat tabel Flink dan mulai sinkronisasi data berkelanjutan:
PentingJika Anda menggunakan Apache Flink versi sebelum 1.13, Anda tidak dapat menjalankan skrip SQL secara langsung. Eksekusi pernyataan SQL satu per satu dan aktifkan binary logging untuk database MySQL.
bin/sql-client.sh -f flink-create.1.sql -
Periksa status pekerjaan Flink yang sedang berjalan:
bin/flink list -runningLihat detail dan status pekerjaan di UI web Flink atau di file log di bawah
$FLINK_HOME/log.
Konfigurasi beberapa set aturan
Untuk menyinkronkan beberapa database atau tabel dengan properti berbeda, tambahkan satu bagian [table-rule.N] per set aturan. Konfigurasikan parameter flink.starrocks.sink secara independen untuk setiap set aturan, misalnya frekuensi impor yang berbeda.
Contoh konfigurasi dengan dua set aturan:
[table-rule.1]
# pola untuk mencocokkan database guna mengatur properti
database = ^console_19321.*$
# pola untuk mencocokkan tabel guna mengatur properti
table = ^.*$
############################################
### konfigurasi sink flink
### JANGAN atur `connector`, `table-name`, `database-name`, karena di-generate otomatis
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
flink.starrocks.load-url= 192.168.**.**:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=15000
[table-rule.2]
# pola untuk mencocokkan database guna mengatur properti
database = ^database2.*$
# pola untuk mencocokkan tabel guna mengatur properti
table = ^.*$
############################################
### konfigurasi sink flink
### JANGAN atur `connector`, `table-name`, `database-name`, karena di-generate otomatis
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
flink.starrocks.load-url= 192.168.**.**:8030
flink.starrocks.username=root
flink.starrocks.password=
# Jika Anda tidak dapat memilih pemisah CSV yang sesuai, alihkan ke format JSON.
# Perhatikan bahwa performa impor JSON mungkin menurun dibandingkan CSV.
flink.starrocks.sink.properties.strip_outer_array=true
flink.starrocks.sink.properties.format=json
Konsolidasi tabel yang di-shard
Setelah sharding horizontal, data dari tabel besar mungkin terbagi di beberapa tabel atau database. Konfigurasikan satu set aturan untuk menyinkronkan beberapa tabel sumber ke satu tabel StarRocks.
Sebagai contoh, jika edu_db_1 dan edu_db_2 masing-masing berisi course_1 dan course_2 dengan skema yang sama, aturan berikut mengonsolidasi keempat tabel tersebut ke satu tabel StarRocks:
[table-rule.3]
# pola untuk mencocokkan database guna mengatur properti
database = ^edu_db_[0-9]*$
# pola untuk mencocokkan tabel guna mengatur properti
table = ^course_[0-9]*$
############################################
### konfigurasi sink flink
### JANGAN atur `connector`, `table-name`, `database-name`, karena di-generate otomatis
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
flink.starrocks.load-url= 192.168.**.**:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=5000
Setelah menjalankan starrocks-migrate-tool, hubungan sinkronisasi many-to-one dibuat secara otomatis. Tabel StarRocks yang dihasilkan diberi nama course__auto_shard secara default. Untuk mengganti namanya, edit file SQL yang dihasilkan sebelum menjalankannya.