StarRocks menyediakan konektor Apache Flink (selanjutnya disebut Flink Connector) yang memungkinkan Anda mengimpor data ke tabel StarRocks melalui Flink. Dibandingkan dengan flink-connector-jdbc bawaan Flink, Flink Connector dari StarRocks menawarkan kinerja dan stabilitas yang lebih unggul, sehingga sangat cocok untuk skenario impor data berskala besar.
Informasi latar belakang
Flink Connector StarRocks meningkatkan efisiensi impor data secara signifikan dengan menyimpan cache batch kecil data di memori dan memanfaatkan fitur Stream Load milik StarRocks untuk impor batch. Konektor ini mendukung DataStream API, Table API & SQL, serta Python API.
Prasyarat
Anda telah membuat kluster yang mencakup layanan Flink.
Topik ini menggunakan kluster DataFlow dengan layanan Flink (selanjutnya disebut kluster Flink) yang dibuat di EMR pada ECS sebagai contoh. Untuk informasi selengkapnya, lihat Membuat kluster.
Anda telah membuat instans EMR Serverless StarRocks. Untuk informasi selengkapnya, lihat Membuat instans.
Batasan
Pastikan mesin tempat Flink berjalan dapat mengakses port http_port (default
8030) dan port query_port (default9030) pada node FE di instans StarRocks, serta port be_http_port (default8040) pada node BE.Menggunakan Flink Connector untuk mengimpor data ke StarRocks memerlukan izin SELECT dan INSERT pada tabel target.
Persyaratan kompatibilitas versi Flink Connector dengan lingkungan Java, Scala, dan versi Flink adalah sebagai berikut.
Konektor
Flink
StarRocks
Java
Scala
1.2.9
1.15~1.18
2.1 dan di atasnya
8
2.11, 2.12
1.2.8
1.13~1.17
2.1 dan di atasnya
8
2.11, 2.12
1.2.7
1.11~1.15
2.1 dan di atasnya
8
2.11, 2.12
Konfigurasi
Bagian ini memperkenalkan pengaturan parameter StarRocks dan pemetaan tipe data yang sesuai. Untuk informasi lebih rinci, silakan merujuk ke Memuat data secara berkelanjutan dari Apache Flink® | StarRocks.
Parameter
Parameter | Wajib | Nilai default | Deskripsi |
| Ya | NONE | Menentukan konektor sebagai StarRocks, tetap pada nilai |
| Ya | NONE | URL JDBC yang digunakan untuk terhubung ke StarRocks dan menjalankan kueri di StarRocks. Contoh: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030. Di sini, Catatan Untuk informasi tentang cara mendapatkan alamat internal node FE dari instans EMR Serverless StarRocks, lihat Melihat daftar dan detail instans. |
| Ya | NONE | Menentukan alamat internal dan port HTTP node FE, dalam format Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030. |
| Ya | NONE | Nama database StarRocks. |
| Ya | NONE | Nama tabel StarRocks. |
| Ya | NONE | Nama pengguna instans StarRocks. Misalnya, admin default. Menggunakan Flink Connector untuk mengimpor data ke StarRocks memerlukan izin SELECT dan INSERT pada tabel target. Jika akun pengguna Anda tidak memiliki izin tersebut, Anda perlu memberikannya. Untuk informasi selengkapnya, lihat Mengelola pengguna dan otorisasi data. |
| Ya | NONE | Kata sandi pengguna instans StarRocks. |
| Tidak | at-least-once | Menentukan tingkat jaminan semantik untuk sink, memastikan keandalan dan konsistensi saat data ditulis ke sistem target. Nilai yang valid:
|
| Tidak | AUTO | Antarmuka untuk mengimpor data. Parameter ini didukung mulai dari Flink Connector 1.2.4.
|
| Tidak | NONE | Menentukan awalan label yang digunakan oleh Stream Load. Jika versi Flink Connector adalah 1.2.8 atau lebih baru, dan sink menjamin semantik exactly-once, disarankan untuk mengonfigurasi awalan label. |
| Tidak | 94.371.840(90M) | Ukuran maksimum data yang terakumulasi di memori, setelah itu data diimpor ke StarRocks sekaligus melalui Stream Load. Menetapkan nilai yang lebih besar dapat meningkatkan kinerja impor tetapi dapat menyebabkan latensi impor yang lebih tinggi. Nilai yang valid: [64MB, 10GB]. Catatan
|
sink.buffer-flush.max-rows | Tidak | 500000 | Jumlah maksimum baris data yang terakumulasi di memori, setelah itu data diimpor ke StarRocks sekaligus melalui Stream Load. Nilai yang valid: [64000, 5000000]. Catatan Parameter ini hanya berlaku ketika |
sink.buffer-flush.interval-ms | Tidak | 300000 | Menetapkan interval waktu pengiriman data, mengontrol latensi data yang ditulis ke StarRocks. Nilai yang valid: [1000, 3600000]. Catatan Parameter ini hanya berlaku ketika |
sink.max-retries | Tidak | 3 | Jumlah percobaan ulang setelah kegagalan Stream Load. Jika jumlah ini terlampaui, tugas impor data akan melaporkan kesalahan. Nilai yang valid: [0, 10]. Catatan Parameter ini hanya berlaku ketika |
sink.connect.timeout-ms | Tidak | 30000 | Periode waktu habis untuk membuat koneksi HTTP dengan FE. Nilai yang valid: [100, 60000]. Sebelum Flink Connector v1.2.9, nilai default adalah |
sink.socket.timeout-ms | Tidak | -1 | Parameter ini didukung mulai dari Flink connector 1.2.10. Periode waktu habis bagi klien HTTP untuk menunggu data. Satuan: milidetik. Nilai default |
sink.wait-for-continue.timeout-ms | Tidak | 10000 | Parameter ini didukung mulai dari Flink Connector 1.2.7. Periode waktu habis untuk menunggu respons FE HTTP 100-continue. Nilai yang valid: [3000, 60000]. |
sink.ignore.update-before | Tidak | TRUE | Parameter ini didukung mulai dari Flink Connector 1.2.8. Saat mengimpor data ke tabel kunci primer, menentukan apakah akan mengabaikan catatan UPDATE_BEFORE dari Flink. Jika parameter ini diatur ke false, catatan tersebut diperlakukan sebagai operasi DELETE pada tabel kunci primer. |
sink.parallelism | Tidak | NONE | Paralelisme penulisan, hanya berlaku untuk Flink SQL. Jika tidak diatur, perencana Flink akan menentukan paralelisme. Dalam skenario multi-paralelisme, pengguna perlu memastikan bahwa data ditulis dalam urutan yang benar. |
sink.properties.* | Tidak | NONE | Parameter untuk Stream Load, mengontrol perilaku impor Stream Load. |
sink.properties.format | Tidak | csv | Format data untuk impor Stream Load. Flink Connector mengonversi data di memori ke format yang sesuai lalu mengimpornya ke StarRocks melalui Stream Load. Nilai yang valid: CSV atau JSON. |
sink.properties.column_separator | Tidak | \t | Pemisah kolom untuk data CSV. |
sink.properties.row_delimiter | Tidak | \n | Pemisah baris untuk data CSV. |
sink.properties.max_filter_ratio | Tidak | 0 | Tingkat toleransi maksimum untuk tugas impor, yaitu proporsi maksimum baris data yang dapat difilter karena masalah kualitas data. Nilai yang valid: 0~1. |
sink.properties.partial_update | Tidak | false | Menentukan apakah akan menggunakan pembaruan parsial. Nilai yang valid termasuk |
sink.properties.partial_update_mode | Tidak | row | Menentukan mode untuk pembaruan parsial. Nilai yang valid:
|
sink.properties.strict_mode | Tidak | false | Menentukan apakah akan mengaktifkan mode ketat untuk Stream Load. Mode ketat memengaruhi perilaku impor saat baris yang tidak memenuhi syarat (seperti nilai kolom yang tidak konsisten) muncul dalam data yang diimpor. Nilai yang valid: |
sink.properties.compression | Tidak | NONE | Parameter ini didukung mulai dari Flink Connector 1.2.10. Menentukan algoritma kompresi untuk Stream Load. Saat ini, hanya kompresi format JSON yang didukung. Nilai yang valid: Catatan Hanya StarRocks v3.2.7 dan versi yang lebih tinggi yang mendukung kompresi format JSON. |
Pemetaan tipe data
Tipe data Flink | Tipe data StarRocks |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY<T> | ARRAY<T> |
MAP<KT,VT> | JSON STRING |
ROW<arg T...> | JSON STRING |
Persiapan
Mendapatkan JAR Flink Connector dan mengunggahnya ke kluster Flink
Anda dapat memperoleh paket JAR Flink Connector melalui metode berikut.
Metode 1: Unduh langsung
Ambil file JAR Flink Connector versi berbeda dari Repositori Maven Central.
Metode 2: Dependensi Maven
Dalam file
pom.xmlproyek Maven Anda, tambahkan Flink Connector sebagai dependensi menggunakan format berikut.Untuk Flink Connector yang berlaku untuk Flink versi 1.15 dan seterusnya.
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>${connector_version}_flink-${flink_version}</version> </dependency>Untuk Flink Connector yang berlaku untuk versi Flink sebelum 1.15.
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>${connector_version}_flink-${flink_version}_${scala_version}</version> </dependency>
Metode 3: Kompilasi manual
Unduh kode Flink Connector.
Jalankan perintah berikut untuk mengompilasi kode sumber Flink Connector menjadi file JAR.
sh build.sh <flink_version>Sebagai contoh, jika versi Flink di lingkungan Anda adalah 1.17, Anda perlu menjalankan perintah berikut.
sh build.sh 1.17Setelah kompilasi selesai, temukan file JAR yang dihasilkan di direktori
target/.Sebagai contoh, nama file biasanya dalam format
flink-connector-starrocks-1.2.7_flink-1.17-SNAPSHOT.jar.CatatanRilis tidak resmi versi Flink Connector akan memiliki akhiran
SNAPSHOT.
Format penamaan file JAR Flink Connector adalah sebagai berikut:
Untuk Flink Connector yang berlaku untuk Flink versi 1.15 dan seterusnya, format penamaannya adalah
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar. Sebagai contoh, jika Anda telah menginstal Flink 1.17 dan ingin menggunakan Flink Connector versi 1.2.8, Anda dapat menggunakanflink-connector-starrocks-1.2.8_flink-1.17.jar.Untuk Flink Connector yang berlaku untuk versi Flink sebelum 1.15, format penamaannya adalah
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar. Sebagai contoh, jika Anda telah menginstal Flink 1.14 dan Scala 2.12, serta ingin menggunakan Flink Connector versi 1.2.7, Anda dapat menggunakanflink-connector-starrocks-1.2.7_flink-1.14_2.12.jar.CatatanSilakan ganti informasi berikut sesuai dengan situasi aktual Anda:
flink_version: Nomor versi Flink.scala_version: Nomor versi Scala.connector_version: Nomor versi Flink Connector.
Unggah file JAR Flink Connector yang diperoleh ke direktori
flink-{flink_version}/libkluster Flink.Sebagai contoh, jika Anda menggunakan kluster EMR dengan versi EMR-5.19.0, file JAR harus ditempatkan di direktori
/opt/apps/FLINK/flink-current/lib.
Memulai kluster Flink
Masuk ke node master kluster Flink. Untuk informasi selengkapnya, lihat Masuk ke kluster.
Jalankan perintah berikut untuk memulai kluster Flink.
/opt/apps/FLINK/flink-current/bin/start-cluster.sh
Contoh
Menulis data menggunakan Flink SQL
Buat database bernama
testdi StarRocks, dan buat tabel kunci primer bernamascore_boarddi dalamnya.CREATE DATABASE test; CREATE TABLE test.score_board( id int(11) NOT NULL COMMENT "", name varchar(65533) NULL DEFAULT "" COMMENT "", score int(11) NOT NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP PRIMARY KEY(id) DISTRIBUTED BY HASH(id);Masuk ke node master kluster Flink. Untuk informasi selengkapnya, lihat Masuk ke kluster.
Jalankan perintah berikut untuk memulai Flink SQL:
/opt/apps/FLINK/flink-current/bin/sql-client.shJalankan perintah berikut untuk membuat tabel bernama
score_boarddan memasukkan data ke dalamnya.CREATE TABLE `score_board` ( `id` INT, `name` STRING, `score` INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'admin', 'password' = '<password>', ); INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);Jika tujuannya adalah mengimpor data ke tabel kunci primer StarRocks, Anda harus secara eksplisit menentukan kunci primer dalam DDL tabel Flink. Untuk jenis tabel StarRocks lainnya (seperti tabel Duplicate Key), mendefinisikan kunci primer bersifat opsional.
Menulis data menggunakan Flink DataStream
Tulis pekerjaan Flink DataStream yang sesuai berdasarkan jenis catatan input yang berbeda.
Menulis data string dalam format CSV
Jika catatan input berupa string dalam format CSV, kode utama pekerjaan Flink DataStream yang sesuai ditunjukkan di bawah ini. Untuk kode lengkapnya, lihat LoadCsvRecords.
/** * Generate CSV-format records. Each record has three values separated by "\t". * These values will be loaded to the columns `id`, `name`, and `score` in the StarRocks table. */ String[] records = new String[]{ "1\tstarrocks-csv\t100", "2\tflink-csv\t100" }; DataStream<String> source = env.fromElements(records); /** * Configure the Flink connector with the required properties. * You also need to add properties "sink.properties.format" and "sink.properties.column_separator" * to tell the Flink connector the input records are CSV-format, and the column separator is "\t". * You can also use other column separators in the CSV-format records, * but remember to modify the "sink.properties.column_separator" correspondingly. */ StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) .withProperty("database-name", "test") .withProperty("table-name", "score_board") .withProperty("username", "root") .withProperty("password", "") .withProperty("sink.properties.format", "csv") .withProperty("sink.properties.column_separator", "\t") .build(); // Create the sink with the options. SinkFunction<String> starRockSink = StarRocksSink.sink(options); source.addSink(starRockSink);Menulis data string dalam format JSON
Jika catatan input berupa string dalam format JSON, kode utama pekerjaan Flink DataStream yang sesuai ditunjukkan di bawah ini. Untuk kode lengkapnya, lihat LoadJsonRecords.
/** * Generate JSON-format records. * Each record has three key-value pairs corresponding to the columns id, name, and score in the StarRocks table. */ String[] records = new String[]{ "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}", "{\"id\":2, \"name\":\"flink-json\", \"score\":100}", }; DataStream<String> source = env.fromElements(records); /** * Configure the Flink connector with the required properties. * You also need to add properties "sink.properties.format" and "sink.properties.strip_outer_array" * to tell the Flink connector the input records are JSON-format and to strip the outermost array structure. */ StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) .withProperty("database-name", "test") .withProperty("table-name", "score_board") .withProperty("username", "root") .withProperty("password", "") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build(); // Create the sink with the options. SinkFunction<String> starRockSink = StarRocksSink.sink(options); source.addSink(starRockSink);Menulis data objek Java kustom
Jika catatan input berupa objek Java kustom, kode utama pekerjaan Flink DataStream yang sesuai ditunjukkan di bawah ini. Untuk kode lengkapnya, lihat LoadCustomJavaRecords.
Dalam contoh ini, kelas POJO sederhana
RowDatadidefinisikan untuk merepresentasikan setiap catatan.public static class RowData { public int id; public String name; public int score; public RowData() {} public RowData(int id, String name, int score) { this.id = id; this.name = name; this.score = score; } }Kode utamanya ditunjukkan di bawah ini.
// Generate records which use RowData as the container. RowData[] records = new RowData[]{ new RowData(1, "starrocks-rowdata", 100), new RowData(2, "flink-rowdata", 100), }; DataStream<RowData> source = env.fromElements(records); // Configure the Flink connector with the required properties. StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) .withProperty("database-name", "test") .withProperty("table-name", "score_board") .withProperty("username", "root") .withProperty("password", "") .build(); /** * The Flink connector will use a Java object array (Object[]) to represent a row to be loaded into the StarRocks table, * and each element is the value for a column. * You need to define the schema of the Object[] which matches that of the StarRocks table. */ TableSchema schema = TableSchema.builder() .field("id", DataTypes.INT().notNull()) .field("name", DataTypes.STRING()) .field("score", DataTypes.INT()) // When the StarRocks table is a Primary Key table, you must specify notNull(), for example, DataTypes.INT().notNull(), for the primary key `id`. .primaryKey("id") .build(); // Transform the RowData to the Object[] according to the schema. RowDataTransformer transformer = new RowDataTransformer(); // Create the sink with the schema, options, and transformer. SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer); source.addSink(starRockSink);RowDataTransformerdidefinisikan sebagai berikut.private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> { /** * Set each element of the object array according to the input RowData. * The schema of the array matches that of the StarRocks table. */ @Override public void accept(Object[] internalRow, RowData rowData) { internalRow[0] = rowData.id; internalRow[1] = rowData.name; internalRow[2] = rowData.score; // When the StarRocks table is a Primary Key table, you need to set the last element to indicate whether the data loading is an UPSERT or DELETE operation. internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal(); } }
Menyinkronkan data menggunakan Flink CDC 3.0
Framework Flink CDC 3.0 memungkinkan Anda dengan mudah membangun pipeline ELT streaming dari sumber data CDC (seperti MySQL, Kafka) ke StarRocks. Melalui pipeline ini, Anda dapat mencapai fungsi-fungsi berikut:
Pembuatan database dan tabel secara otomatis
Sinkronisasi data penuh dan inkremental
Sinkronisasi Perubahan Skema
Mulai dari StarRocks Flink Connector v1.2.9, konektor ini telah diintegrasikan ke dalam framework Flink CDC 3.0 dan diberi nama StarRocks Pipeline Connector. Konektor ini memiliki semua fitur di atas dan direkomendasikan untuk digunakan dengan StarRocks v3.2.1 dan di atasnya agar dapat memanfaatkan sepenuhnya fitur fast_schema_evolution, yang lebih lanjut meningkatkan kecepatan penambahan dan penghapusan kolom serta mengurangi konsumsi sumber daya.
Praktik terbaik
Impor ke tabel kunci primer
Buat database bernama
testdi StarRocks, dan buat tabel kunci primerscore_boarddi dalamnya.CREATE DATABASE `test`; CREATE TABLE `test`.`score_board` ( `id` int(11) NOT NULL COMMENT "", `name` varchar(65533) NULL DEFAULT "" COMMENT "", `score` int(11) NOT NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP PRIMARY KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`);Masukkan data ke tabel StarRocks.
INSERT INTO `test`.`score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);Jalankan perintah berikut untuk memulai klien Flink SQL.
/opt/apps/FLINK/flink-current/bin/sql-client.shPerbarui data.
Pembaruan parsial
Pembaruan parsial memungkinkan Anda memperbarui hanya kolom tertentu (seperti
name) tanpa memengaruhi kolom lainnya (sepertiscore).Buat tabel
score_boarddi klien Flink SQL dan aktifkan fitur pembaruan parsial.CREATE TABLE `score_board` ( `id` INT, `name` STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'admin', 'password' = '<password>', 'sink.properties.partial_update' = 'true', -- only for Flink connector version <= 1.2.7 'sink.properties.columns' = 'id,name,__op' );sink.properties.partial_update: Mengaktifkan pembaruan parsial.sink.properties.columns: Menentukan kolom yang akan diperbarui. Jika versi Flink Connector kurang dari atau sama dengan 1.2.7, Anda juga perlu mengatur opsisink.properties.columnsmenjadiid,name,__opuntuk memberi tahu konektor Flink kolom mana yang perlu diperbarui. Perhatikan bahwa Anda perlu menambahkan bidang__opdi akhir. Bidang__opmenunjukkan apakah impor tersebut merupakan operasi UPSERT atau DELETE, dan nilainya diatur secara otomatis oleh konektor Flink.
Masukkan data pembaruan.
Masukkan dua baris data dengan kunci primer yang sama seperti data yang sudah ada, tetapi dengan nilai yang dimodifikasi pada kolom
name.INSERT INTO score_board VALUES (1, 'starrocks-update'), (2, 'flink-update');Kueri tabel StarRocks di SQL Editor.
SELECT * FROM `test`.`score_board`;Anda akan melihat bahwa hanya nilai pada kolom
nameyang berubah, sedangkan kolomscoretetap tidak berubah.
Pembaruan bersyarat
Contoh ini menunjukkan cara melakukan pembaruan bersyarat berdasarkan nilai kolom
score. Baris data diperbarui hanya ketika nilai kolomscoredalam data yang diimpor lebih besar dari atau sama dengan nilai saat ini di tabel StarRocks.Buat tabel
score_boarddi klien Flink SQL sebagai berikut.CREATE TABLE `score_board` ( `id` INT, `name` STRING, `score` INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'admin', 'password' = '<password>', 'sink.properties.merge_condition' = 'score', 'sink.version' = 'V1' );sink.properties.merge_condition: Diatur kescore, menunjukkan bahwa saat data ditulis, Flink Connector akan menggunakan kolomscoresebagai kondisi pembaruan.sink.version: Diatur keV1, menunjukkan bahwa Flink Connector menggunakan antarmuka Stream Load untuk mengimpor data.
Masukkan dua baris data ke tabel di klien Flink SQL.
Baris data memiliki kunci primer yang sama dengan baris di tabel StarRocks. Baris data pertama memiliki nilai yang lebih kecil pada kolom
score, sedangkan baris data kedua memiliki nilai yang lebih besar pada kolomscore.INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101);Kueri tabel StarRocks di SQL Editor.
SELECT * FROM `test`.`score_board`;Anda akan melihat bahwa hanya baris data kedua yang berubah, sedangkan baris data pertama tetap tidak berubah.

Impor ke kolom Bitmap
Tipe Bitmap umumnya digunakan untuk mempercepat skenario penghitungan deduplikasi eksak, seperti menghitung pengunjung unik (UV). Berikut adalah contoh lengkap yang menunjukkan cara mengimpor data ke kolom Bitmap dalam tabel StarRocks melalui Flink SQL dan mengkueri jumlah UV di StarRocks.
Buat tabel agregat StarRocks di SQL Editor.
Buat tabel agregat bernama
page_uvdi databasetest, di mana:Kolom
visit_usersdidefinisikan sebagai tipe BITMAP dan dikonfigurasi dengan fungsi agregat BITMAP_UNION.page_iddanvisit_dateberfungsi sebagai kunci agregat (AGGREGATE KEY) untuk pengelompokan dan deduplikasi.
CREATE TABLE `test`.`page_uv` ( `page_id` INT NOT NULL COMMENT 'page ID', `visit_date` datetime NOT NULL COMMENT 'access time', `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID' ) ENGINE=OLAP AGGREGATE KEY(`page_id`, `visit_date`) DISTRIBUTED BY HASH(`page_id`);Buat tabel di klien Flink SQL.
Karena Flink tidak mendukung tipe Bitmap, pemetaan kolom dan konversi tipe perlu diimplementasikan sebagai berikut:
Di tabel Flink, definisikan kolom
visit_user_idsebagai tipe BIGINT untuk merepresentasikan kolomvisit_usersdi tabel StarRocks.Gunakan konfigurasi
sink.properties.columnsuntuk mengonversi data di kolomvisit_user_idke tipe Bitmap melalui fungsi to_bitmap.
CREATE TABLE `page_uv` ( `page_id` INT, `visit_date` TIMESTAMP, `visit_user_id` BIGINT ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'page_uv', 'username' = 'admin', 'password' = '<password>', 'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)' );Masukkan data di klien Flink SQL.
Masukkan beberapa baris data ke tabel
page_uv, mensimulasikan pengguna berbeda yang mengakses halaman pada waktu berbeda.visit_user_idbertipe BIGINT, dan Flink akan secara otomatis mengonversinya ke tipe Bitmap.INSERT INTO `page_uv` VALUES (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13), (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23), (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33), (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13), (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);Kueri jumlah UV di SQL Editor.
Gunakan kemampuan agregasi StarRocks untuk menghitung jumlah pengunjung unik (UV) untuk setiap halaman menggunakan
COUNT(DISTINCT visit_users).SELECT page_id, COUNT(DISTINCT visit_users) FROM page_uv GROUP BY page_id;Hasil yang dikembalikan ditunjukkan di bawah ini.

Impor ke kolom HLL
HLL (HyperLogLog) adalah tipe data yang digunakan untuk penghitungan deduplikasi perkiraan, cocok untuk menghitung pengunjung unik (UV) dalam skenario data berskala besar. Berikut adalah contoh lengkap yang menunjukkan cara mengimpor data ke kolom HLL dalam tabel StarRocks melalui Flink SQL dan mengkueri jumlah UV di StarRocks.
Buat tabel agregat StarRocks di SQL Editor.
Buat tabel agregat bernama
hll_uvdi databasetest, di mana:Kolom
visit_usersdidefinisikan sebagai tipe HLL dan dikonfigurasi dengan fungsi agregat HLL_UNION.page_iddanvisit_dateberfungsi sebagai kunci agregat (AGGREGATE KEY) untuk pengelompokan dan deduplikasi.
CREATE TABLE `test`.`hll_uv` ( `page_id` INT NOT NULL COMMENT 'page ID', `visit_date` DATETIME NOT NULL COMMENT 'access time', `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID' ) ENGINE=OLAP AGGREGATE KEY(`page_id`, `visit_date`) DISTRIBUTED BY HASH(`page_id`);Buat tabel di klien Flink SQL.
Karena Flink tidak mendukung tipe HLL, pemetaan kolom dan konversi tipe perlu diimplementasikan sebagai berikut:
Di tabel Flink, definisikan kolom
visit_user_idsebagai tipe BIGINT untuk merepresentasikan kolomvisit_usersdi tabel StarRocks.Gunakan konfigurasi
sink.properties.columnsuntuk pemetaan kolom, dan konversikan data tipe BIGINTvisit_user_idke tipe HLL melalui fungsi hll_hash.
CREATE TABLE `hll_uv` ( `page_id` INT, `visit_date` TIMESTAMP, `visit_user_id` BIGINT ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'hll_uv', 'username' = 'admin', 'password' = '<password>', 'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)' );Masukkan data di klien Flink SQL.
Masukkan beberapa baris data ke tabel
hll_uv, mensimulasikan pengguna berbeda yang mengakses halaman pada waktu berbeda.visit_user_idbertipe BIGINT, dan Flink akan secara otomatis mengonversinya ke tipe HLL.INSERT INTO `hll_uv` VALUES (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78), (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2), (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);Kueri jumlah UV di SQL Editor.
Gunakan kemampuan agregasi StarRocks untuk menghitung jumlah pengunjung unik (UV) untuk setiap halaman menggunakan
COUNT(DISTINCT visit_users).SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;Hasil yang dikembalikan ditunjukkan di bawah ini.
