All Products
Search
Document Center

E-MapReduce:Flink Connector

Last Updated:Mar 27, 2026

Connector Flink StarRocks memuat data ke StarRocks secara batch menggunakan Stream Load, menawarkan throughput yang jauh lebih tinggi dibandingkan connector Java Database Connectivity (JDBC) bawaan Apache Flink (flink-connector-jdbc). Alibaba Cloud menyediakan connector Flink StarRocks untuk membantu Anda menyimpan data dalam cache, lalu mengimpornya ke StarRocks sekaligus dalam mode Stream Load. Topik ini menjelaskan cara menyiapkan connector dan menulis data ke StarRocks, dilengkapi contoh lengkap untuk sinkronisasi data MySQL secara real-time.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Apache Flink 1.11 atau versi yang lebih baru (disarankan versi 1.13)

  • Kluster StarRocks dengan port HTTP frontend yang dapat diakses

Tambahkan connector ke proyek Anda

Unduh kode sumber connector dari repositori starrocks-connector-for-apache-flink di GitHub.

Tambahkan dependensi berikut ke file pom.xml proyek Anda:

<dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- untuk Flink 1.11 dan 1.12 -->
    <version>x.x.x_flink-1.11</version>
    <!-- untuk Flink 1.13 -->
    <version>x.x.x_flink-1.13</version>
</dependency>
Ganti x.x.x dengan versi terbaru. Periksa halaman informasi versi untuk rilis terkini.

Menulis data menggunakan connector Flink

Connector ini menyediakan dua pendekatan untuk menulis data ke StarRocks. Gunakan Metode 1 untuk aliran JSON mentah; gunakan Metode 2 untuk pembuatan tabel berbasis SQL dan data terstruktur.

Metode 1: DataStream API

// -------- sink dengan aliran string JSON mentah --------
fromElements(new String[]{
    "{\"score\": \"99\", \"name\": \"stephen\"}",
    "{\"score\": \"100\", \"name\": \"lebron\"}"
}).addSink(
    StarRocksSink.sink(
        // opsi sink
        StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
            .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
            .withProperty("username", "xxx")
            .withProperty("password", "xxx")
            .withProperty("table-name", "xxx")
            .withProperty("database-name", "xxx")
            .withProperty("sink.properties.format", "json")
            .withProperty("sink.properties.strip_outer_array", "true")
            .build()
    )
);


// -------- sink dengan transformasi aliran --------
class RowData {
    public int score;
    public String name;
    public RowData(int score, String name) {
        ......
    }
}
fromElements(
    new RowData[]{
        new RowData(99, "stephen"),
        new RowData(100, "lebron")
    }
).addSink(
    StarRocksSink.sink(
        // struktur tabel
        TableSchema.builder()
            .field("score", DataTypes.INT())
            .field("name", DataTypes.VARCHAR(20))
            .build(),
        // opsi sink
        StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
            .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
            .withProperty("username", "xxx")
            .withProperty("password", "xxx")
            .withProperty("table-name", "xxx")
            .withProperty("database-name", "xxx")
            .withProperty("sink.properties.column_separator", "\\x01")
            .withProperty("sink.properties.row_delimiter", "\\x02")
            .build(),
        // atur slot dengan streamRowData
        (slots, streamRowData) -> {
            slots[0] = streamRowData.score;
            slots[1] = streamRowData.name;
        }
    )
);

Metode 2: Pembuatan tabel SQL

Sebelum menjalankan metode ini, tambahkan com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory ke file src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.

// buat tabel dengan `struktur` dan `properti`
tEnv.executeSql(
    "CREATE TABLE USER_RESULT(" +
        "name VARCHAR," +
        "score BIGINT" +
    ") WITH ( " +
        "'connector' = 'starrocks'," +
        "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," +
        "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," +
        "'database-name' = 'xxx'," +
        "'table-name' = 'xxx'," +
        "'username' = 'xxx'," +
        "'password' = 'xxx'," +
        "'sink.buffer-flush.max-rows' = '1000000'," +
        "'sink.buffer-flush.max-bytes' = '300000000'," +
        "'sink.buffer-flush.interval-ms' = '5000'," +
        "'sink.properties.column_separator' = '\\x01'," +
        "'sink.properties.row_delimiter' = '\\x02'," +
        "'sink.max-retries' = '3'" +
        "'sink.properties.*' = 'xxx'" + // properti stream load seperti `'sink.properties.columns' = 'k1, v1'`
    ")"
);

Parameter sink

Tabel berikut menjelaskan semua parameter untuk connector sink Flink.

Parameter Wajib Nilai default Tipe Deskripsi
connector Ya String Jenis connector. Atur ke starrocks.
jdbc-url Ya String URL JDBC untuk menghubungkan ke StarRocks dan menjalankan kueri.
load-url Ya String Alamat HTTP frontend, dalam format fe_ip:http_port;fe_ip:http_port. Pisahkan beberapa frontend dengan titik koma (;).
database-name Ya String Nama database StarRocks.
table-name Ya String Nama tabel StarRocks.
username Ya String Username untuk menghubungkan ke database StarRocks.
password Ya String Password untuk menghubungkan ke database StarRocks.
sink.semantic Tidak at-least-once String Semantik pengiriman. Nilai yang valid: at-least-once dan exactly-once.
sink.buffer-flush.max-bytes Tidak 94371840 (90 MB) String Ukuran buffer maksimum sebelum flush dipicu. Rentang valid: 64 MB hingga 10 GB.
sink.buffer-flush.max-rows Tidak 500000 String Jumlah maksimum baris dalam buffer sebelum flush dipicu. Rentang valid: 64000 hingga 5000000.
sink.buffer-flush.interval-ms Tidak 300000 String Interval flush buffer dalam milidetik. Rentang valid: 1000 hingga 3600000.
sink.max-retries Tidak 1 String Jumlah maksimum upaya pengulangan saat terjadi kegagalan. Rentang valid: 0 hingga 10.
sink.connect.timeout-ms Tidak 1000 String Batas waktu koneksi untuk alamat HTTP antarmuka depan yang ditentukan dalam load-url, dalam milidetik. Rentang valid: 100–60.000.
sink.properties.* Tidak String Properti Stream Load yang diteruskan langsung ke StarRocks.

Catatan penggunaan

Format dan pemisah default

Data diimpor dalam format CSV secara default. Untuk menggunakan pemisah kustom, atur:

  • sink.properties.row_delimiter — pemisah baris (misalnya, \\x02). Didukung di StarRocks 1.15.0 dan versi yang lebih baru.

  • sink.properties.column_separator — pemisah kolom (misalnya, \\x01).

Jika Anda tidak dapat menemukan pemisah CSV yang sesuai untuk data Anda, alihkan ke format JSON dengan mengatur sink.properties.format=json dan sink.properties.strip_outer_array=true. Perhatikan bahwa performa impor JSON mungkin lebih rendah dibandingkan CSV.

Saat membuat tabel dan menyinkronkan data di client SQL, escape backslash (\):

'sink.properties.column_separator' = '\\x01'
'sink.properties.row_delimiter' = '\\x02'

Semantik exactly-once

Penting

Untuk menggunakan sink.semantic=exactly-once, sistem eksternal harus mendukung protokol two-phase commit. StarRocks tidak mendukung protokol ini secara native, sehingga pengiriman exactly-once bergantung pada checkpointing Flink. Alurnya bekerja sebagai berikut: 1. Saat Flink menghasilkan checkpoint, sekumpulan data dan labelnya disimpan dalam cache sebagai state. 2. Sistem diblokir hingga data yang di-cache berhasil ditulis ke StarRocks. 3. Setelah penulisan selesai, Flink memulai checkpoint berikutnya. Jika StarRocks menjadi tidak tersedia, operator sink stream Flink mungkin terblokir dalam waktu lama akibat kegagalan koneksi, memicu peringatan dan memaksa pekerjaan impor Flink dihentikan.

Penyelesaian masalah konektivitas

Jika kueri data berhasil tetapi penulisan gagal, verifikasi bahwa mesin Anda dapat mencapai port HTTP backend StarRocks. Saat Anda mengirimkan pekerjaan impor, frontend meneruskan operasi tulis ke alamat IP internal dan port HTTP backend.

Sebagai contoh, jika mesin Anda memiliki alamat IP publik dan frontend serta backend kluster dapat diakses melalui alamat IP publik, tetapi backend menggunakan alamat IP internal untuk komunikasi intra-kluster — menentukan alamat frontend publik dalam load-url menyebabkan frontend meneruskan penulisan ke IP internal backend. Jika mesin Anda tidak dapat mencapai IP internal tersebut, penulisan akan gagal.

Jika pekerjaan impor berhenti secara tak terduga, tingkatkan kapasitas memori pekerjaan tersebut.

Menyinkronkan data MySQL ke StarRocks menggunakan CDC

Gunakan connector change data capture (CDC) Flink dan tool migrasi StarRocks untuk menyinkronkan database MySQL ke kluster StarRocks secara near real-time. Tool migrasi StarRocks secara otomatis menghasilkan pernyataan CREATE TABLE berdasarkan skema MySQL dan konfigurasi kluster StarRocks.

Flink
Diagram dan beberapa informasi dalam bagian ini bersumber dari Realtime synchronization from MySQL dalam dokumentasi open-source StarRocks.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Apache Flink 1.13 (disarankan) atau 1.11 / 1.12

  • Akses ke database MySQL dan kluster StarRocks

Siapkan lingkungan

  1. Unduh Apache Flink. Disarankan menggunakan Apache Flink 1.13. Versi paling awal yang didukung adalah 1.11.

  2. Unduh connector Flink CDC untuk MySQL. Pilih paket yang sesuai dengan versi Flink Anda.

  3. Unduh connector Flink StarRocks.

    Penting

    Connector Flink StarRocks berbeda untuk Apache Flink 1.13, 1.11, dan 1.12. Unduh versi yang tepat.

  4. Salin kedua file flink-sql-connector-mysql-cdc-xxx.jar dan flink-connector-starrocks-xxx.jar ke direktori flink-xxx/lib/.

  5. Unduh paket smt.tar.gz, ekstrak, lalu edit file konfigurasi dengan detail koneksi Anda. File konfigurasi menggunakan bagian-bagian berikut: Contoh konfigurasi:

    • db — informasi koneksi database MySQL

    • be_num — jumlah node dalam kluster StarRocks

    • [table-rule.N] — aturan pencocokan menggunakan ekspresi reguler untuk nama database dan nama tabel. Konfigurasikan satu bagian per set aturan.

    • flink.starrocks.* — konfigurasi kluster StarRocks

    [db]
    host = 192.168.**.**
    port = 3306
    user = root
    password =
    
    [other]
    # jumlah backend di StarRocks
    be_num = 3
    # `decimal_v3` didukung sejak StarRocks-1.18.1
    use_decimal_v3 = false
    # file untuk menyimpan DDL SQL hasil konversi
    output_dir = ./result
    
    [table-rule.1]
    # pola untuk mencocokkan database guna mengatur properti
    database = ^console_19321.*$
    # pola untuk mencocokkan tabel guna mengatur properti
    table = ^.*$
    
    ############################################
    ### konfigurasi sink flink
    ### JANGAN atur `connector`, `table-name`, `database-name`, karena di-generate otomatis
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=15000

Buat tabel dan mulai sinkronisasi

  1. Jalankan starrocks-migrate-tool untuk menghasilkan pernyataan CREATE TABLE. Output ditulis ke direktori /result. Jalankan ls result untuk memverifikasi file yang dihasilkan:

    flink-create.1.sql    smt.tar.gz              starrocks-create.all.sql
    flink-create.all.sql  starrocks-create.1.sql
  2. Buat database dan tabel di StarRocks:

    Mysql -h <alamat IP frontend> -P 9030 -u root -p < starrocks-create.1.sql
  3. Buat tabel Flink dan mulai sinkronisasi data berkelanjutan:

    Penting

    Jika Anda menggunakan Apache Flink versi sebelum 1.13, Anda tidak dapat menjalankan skrip SQL secara langsung. Eksekusi pernyataan SQL satu per satu dan aktifkan binary logging untuk database MySQL.

    bin/sql-client.sh -f flink-create.1.sql
  4. Periksa status pekerjaan Flink yang sedang berjalan:

    bin/flink list -running

    Lihat detail dan status pekerjaan di UI web Flink atau di file log di bawah $FLINK_HOME/log.

Konfigurasi beberapa set aturan

Untuk menyinkronkan beberapa database atau tabel dengan properti berbeda, tambahkan satu bagian [table-rule.N] per set aturan. Konfigurasikan parameter flink.starrocks.sink secara independen untuk setiap set aturan, misalnya frekuensi impor yang berbeda.

Contoh konfigurasi dengan dua set aturan:

[table-rule.1]
# pola untuk mencocokkan database guna mengatur properti
database = ^console_19321.*$
# pola untuk mencocokkan tabel guna mengatur properti
table = ^.*$

############################################
### konfigurasi sink flink
### JANGAN atur `connector`, `table-name`, `database-name`, karena di-generate otomatis
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
flink.starrocks.load-url= 192.168.**.**:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=15000

[table-rule.2]
# pola untuk mencocokkan database guna mengatur properti
database = ^database2.*$
# pola untuk mencocokkan tabel guna mengatur properti
table = ^.*$

############################################
### konfigurasi sink flink
### JANGAN atur `connector`, `table-name`, `database-name`, karena di-generate otomatis
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
flink.starrocks.load-url= 192.168.**.**:8030
flink.starrocks.username=root
flink.starrocks.password=
# Jika Anda tidak dapat memilih pemisah CSV yang sesuai, alihkan ke format JSON.
# Perhatikan bahwa performa impor JSON mungkin menurun dibandingkan CSV.
flink.starrocks.sink.properties.strip_outer_array=true
flink.starrocks.sink.properties.format=json

Konsolidasi tabel yang di-shard

Setelah sharding horizontal, data dari tabel besar mungkin terbagi di beberapa tabel atau database. Konfigurasikan satu set aturan untuk menyinkronkan beberapa tabel sumber ke satu tabel StarRocks.

Sebagai contoh, jika edu_db_1 dan edu_db_2 masing-masing berisi course_1 dan course_2 dengan skema yang sama, aturan berikut mengonsolidasi keempat tabel tersebut ke satu tabel StarRocks:

[table-rule.3]
# pola untuk mencocokkan database guna mengatur properti
database = ^edu_db_[0-9]*$
# pola untuk mencocokkan tabel guna mengatur properti
table = ^course_[0-9]*$

############################################
### konfigurasi sink flink
### JANGAN atur `connector`, `table-name`, `database-name`, karena di-generate otomatis
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
flink.starrocks.load-url= 192.168.**.**:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=5000

Setelah menjalankan starrocks-migrate-tool, hubungan sinkronisasi many-to-one dibuat secara otomatis. Tabel StarRocks yang dihasilkan diberi nama course__auto_shard secara default. Untuk mengganti namanya, edit file SQL yang dihasilkan sebelum menjalankannya.