全部产品
Search
文档中心

E-MapReduce:Flink Connector

更新时间:Nov 10, 2025

StarRocks menyediakan konektor Apache Flink (selanjutnya disebut Flink Connector) yang memungkinkan Anda mengimpor data ke tabel StarRocks melalui Flink. Dibandingkan dengan flink-connector-jdbc bawaan Flink, Flink Connector dari StarRocks menawarkan kinerja dan stabilitas yang lebih unggul, sehingga sangat cocok untuk skenario impor data berskala besar.

Informasi latar belakang

Flink Connector StarRocks meningkatkan efisiensi impor data secara signifikan dengan menyimpan cache batch kecil data di memori dan memanfaatkan fitur Stream Load milik StarRocks untuk impor batch. Konektor ini mendukung DataStream API, Table API & SQL, serta Python API.

Prasyarat

  • Anda telah membuat kluster yang mencakup layanan Flink.

    Topik ini menggunakan kluster DataFlow dengan layanan Flink (selanjutnya disebut kluster Flink) yang dibuat di EMR pada ECS sebagai contoh. Untuk informasi selengkapnya, lihat Membuat kluster.

  • Anda telah membuat instans EMR Serverless StarRocks. Untuk informasi selengkapnya, lihat Membuat instans.

Batasan

  • Pastikan mesin tempat Flink berjalan dapat mengakses port http_port (default 8030) dan port query_port (default 9030) pada node FE di instans StarRocks, serta port be_http_port (default 8040) pada node BE.

  • Menggunakan Flink Connector untuk mengimpor data ke StarRocks memerlukan izin SELECT dan INSERT pada tabel target.

  • Persyaratan kompatibilitas versi Flink Connector dengan lingkungan Java, Scala, dan versi Flink adalah sebagai berikut.

    Konektor

    Flink

    StarRocks

    Java

    Scala

    1.2.9

    1.15~1.18

    2.1 dan di atasnya

    8

    2.11, 2.12

    1.2.8

    1.13~1.17

    2.1 dan di atasnya

    8

    2.11, 2.12

    1.2.7

    1.11~1.15

    2.1 dan di atasnya

    8

    2.11, 2.12

Konfigurasi

Bagian ini memperkenalkan pengaturan parameter StarRocks dan pemetaan tipe data yang sesuai. Untuk informasi lebih rinci, silakan merujuk ke Memuat data secara berkelanjutan dari Apache Flink® | StarRocks.

Parameter

Parameter

Wajib

Nilai default

Deskripsi

connector

Ya

NONE

Menentukan konektor sebagai StarRocks, tetap pada nilai starrocks.

jdbc-url

Ya

NONE

URL JDBC yang digunakan untuk terhubung ke StarRocks dan menjalankan kueri di StarRocks.

Contoh: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030. Di sini, fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com adalah alamat internal node FE dari instans EMR Serverless StarRocks.

Catatan

Untuk informasi tentang cara mendapatkan alamat internal node FE dari instans EMR Serverless StarRocks, lihat Melihat daftar dan detail instans.

load-url

Ya

NONE

Menentukan alamat internal dan port HTTP node FE, dalam format Alamat internal node FE dari instans EMR Serverless StarRocks:8030.

Contoh: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030.

database-name

Ya

NONE

Nama database StarRocks.

table-name

Ya

NONE

Nama tabel StarRocks.

username

Ya

NONE

Nama pengguna instans StarRocks. Misalnya, admin default.

Menggunakan Flink Connector untuk mengimpor data ke StarRocks memerlukan izin SELECT dan INSERT pada tabel target. Jika akun pengguna Anda tidak memiliki izin tersebut, Anda perlu memberikannya. Untuk informasi selengkapnya, lihat Mengelola pengguna dan otorisasi data.

password

Ya

NONE

Kata sandi pengguna instans StarRocks.

sink.semantic

Tidak

at-least-once

Menentukan tingkat jaminan semantik untuk sink, memastikan keandalan dan konsistensi saat data ditulis ke sistem target. Nilai yang valid:

  • at-least-once: Memastikan data ditulis setidaknya sekali, dengan kemungkinan adanya duplikasi data.

  • exactly-once: Memastikan data ditulis tepat satu kali, tanpa duplikasi atau kehilangan.

sink.version

Tidak

AUTO

Antarmuka untuk mengimpor data. Parameter ini didukung mulai dari Flink Connector 1.2.4.

  • V1: Menggunakan antarmuka Stream Load untuk mengimpor data. Versi Flink Connector sebelum 1.2.4 hanya mendukung mode ini.

  • V2: Menggunakan antarmuka transaksi Stream Load untuk mengimpor data. Memerlukan StarRocks versi 2.4 atau lebih tinggi. V2 direkomendasikan karena mengurangi penggunaan memori dan menyediakan implementasi exactly-once yang lebih stabil.

  • AUTO: Secara otomatis memilih V2 jika versi StarRocks mendukung antarmuka transaksi Stream Load, jika tidak maka memilih V1.

sink.label-prefix

Tidak

NONE

Menentukan awalan label yang digunakan oleh Stream Load. Jika versi Flink Connector adalah 1.2.8 atau lebih baru, dan sink menjamin semantik exactly-once, disarankan untuk mengonfigurasi awalan label.

sink.buffer-flush.max-bytes

Tidak

94.371.840(90M)

Ukuran maksimum data yang terakumulasi di memori, setelah itu data diimpor ke StarRocks sekaligus melalui Stream Load. Menetapkan nilai yang lebih besar dapat meningkatkan kinerja impor tetapi dapat menyebabkan latensi impor yang lebih tinggi.

Nilai yang valid: [64MB, 10GB].

Catatan
  • Parameter ini hanya berlaku ketika sink.semantic diatur ke at-least-once.

  • Ketika sink.semantic diatur ke exactly-once, data di memori hanya dikosongkan saat checkpoint Flink dipicu. Dalam kasus ini, parameter sink.buffer-flush.max-bytes tidak berlaku karena data tidak secara otomatis dikosongkan saat ambang batas tercapai.

sink.buffer-flush.max-rows

Tidak

500000

Jumlah maksimum baris data yang terakumulasi di memori, setelah itu data diimpor ke StarRocks sekaligus melalui Stream Load.

Nilai yang valid: [64000, 5000000].

Catatan

Parameter ini hanya berlaku ketika sink.version diatur ke V1 dan sink.semantic diatur ke at-least-once.

sink.buffer-flush.interval-ms

Tidak

300000

Menetapkan interval waktu pengiriman data, mengontrol latensi data yang ditulis ke StarRocks.

Nilai yang valid: [1000, 3600000].

Catatan

Parameter ini hanya berlaku ketika sink.semantic diatur ke at-least-once.

sink.max-retries

Tidak

3

Jumlah percobaan ulang setelah kegagalan Stream Load. Jika jumlah ini terlampaui, tugas impor data akan melaporkan kesalahan.

Nilai yang valid: [0, 10].

Catatan

Parameter ini hanya berlaku ketika sink.version diatur ke V1.

sink.connect.timeout-ms

Tidak

30000

Periode waktu habis untuk membuat koneksi HTTP dengan FE.

Nilai yang valid: [100, 60000].

Sebelum Flink Connector v1.2.9, nilai default adalah 1000.

sink.socket.timeout-ms

Tidak

-1

Parameter ini didukung mulai dari Flink connector 1.2.10. Periode waktu habis bagi klien HTTP untuk menunggu data. Satuan: milidetik. Nilai default -1 menunjukkan tidak ada waktu habis.

sink.wait-for-continue.timeout-ms

Tidak

10000

Parameter ini didukung mulai dari Flink Connector 1.2.7. Periode waktu habis untuk menunggu respons FE HTTP 100-continue.

Nilai yang valid: [3000, 60000].

sink.ignore.update-before

Tidak

TRUE

Parameter ini didukung mulai dari Flink Connector 1.2.8. Saat mengimpor data ke tabel kunci primer, menentukan apakah akan mengabaikan catatan UPDATE_BEFORE dari Flink. Jika parameter ini diatur ke false, catatan tersebut diperlakukan sebagai operasi DELETE pada tabel kunci primer.

sink.parallelism

Tidak

NONE

Paralelisme penulisan, hanya berlaku untuk Flink SQL. Jika tidak diatur, perencana Flink akan menentukan paralelisme. Dalam skenario multi-paralelisme, pengguna perlu memastikan bahwa data ditulis dalam urutan yang benar.

sink.properties.*

Tidak

NONE

Parameter untuk Stream Load, mengontrol perilaku impor Stream Load.

sink.properties.format

Tidak

csv

Format data untuk impor Stream Load. Flink Connector mengonversi data di memori ke format yang sesuai lalu mengimpornya ke StarRocks melalui Stream Load. Nilai yang valid: CSV atau JSON.

sink.properties.column_separator

Tidak

\t

Pemisah kolom untuk data CSV.

sink.properties.row_delimiter

Tidak

\n

Pemisah baris untuk data CSV.

sink.properties.max_filter_ratio

Tidak

0

Tingkat toleransi maksimum untuk tugas impor, yaitu proporsi maksimum baris data yang dapat difilter karena masalah kualitas data.

Nilai yang valid: 0~1.

sink.properties.partial_update

Tidak

false

Menentukan apakah akan menggunakan pembaruan parsial. Nilai yang valid termasuk TRUE dan FALSE (default).

sink.properties.partial_update_mode

Tidak

row

Menentukan mode untuk pembaruan parsial. Nilai yang valid:

  • row (default): Menentukan untuk melakukan pembaruan parsial dalam mode baris, yang lebih cocok untuk skenario pembaruan real-time dengan banyak kolom dan batch kecil.

  • column: Menentukan untuk melakukan pembaruan parsial dalam mode kolom, yang lebih cocok untuk skenario pembaruan pemrosesan batch dengan sedikit kolom dan banyak baris. Dalam skenario ini, mengaktifkan mode kolom dapat secara signifikan meningkatkan kinerja pembaruan.

sink.properties.strict_mode

Tidak

false

Menentukan apakah akan mengaktifkan mode ketat untuk Stream Load. Mode ketat memengaruhi perilaku impor saat baris yang tidak memenuhi syarat (seperti nilai kolom yang tidak konsisten) muncul dalam data yang diimpor.

Nilai yang valid: true dan false.

sink.properties.compression

Tidak

NONE

Parameter ini didukung mulai dari Flink Connector 1.2.10. Menentukan algoritma kompresi untuk Stream Load. Saat ini, hanya kompresi format JSON yang didukung.

Nilai yang valid: lz4_frame.

Catatan

Hanya StarRocks v3.2.7 dan versi yang lebih tinggi yang mendukung kompresi format JSON.

Pemetaan tipe data

Tipe data Flink

Tipe data StarRocks

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

BINARY

INT

CHAR

STRING

VARCHAR

STRING

STRING

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE(N)

DATETIME

TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)

DATETIME

ARRAY<T>

ARRAY<T>

MAP<KT,VT>

JSON STRING

ROW<arg T...>

JSON STRING

Persiapan

Mendapatkan JAR Flink Connector dan mengunggahnya ke kluster Flink

  1. Anda dapat memperoleh paket JAR Flink Connector melalui metode berikut.

    Metode 1: Unduh langsung

    Ambil file JAR Flink Connector versi berbeda dari Repositori Maven Central.

    Metode 2: Dependensi Maven

    Dalam file pom.xml proyek Maven Anda, tambahkan Flink Connector sebagai dependensi menggunakan format berikut.

    • Untuk Flink Connector yang berlaku untuk Flink versi 1.15 dan seterusnya.

      <dependency>
          <groupId>com.starrocks</groupId>
          <artifactId>flink-connector-starrocks</artifactId>
          <version>${connector_version}_flink-${flink_version}</version>
      </dependency>
    • Untuk Flink Connector yang berlaku untuk versi Flink sebelum 1.15.

      <dependency>
          <groupId>com.starrocks</groupId>
          <artifactId>flink-connector-starrocks</artifactId>
          <version>${connector_version}_flink-${flink_version}_${scala_version}</version>
      </dependency>

    Metode 3: Kompilasi manual

    1. Unduh kode Flink Connector.

    2. Jalankan perintah berikut untuk mengompilasi kode sumber Flink Connector menjadi file JAR.

      sh build.sh <flink_version>

      Sebagai contoh, jika versi Flink di lingkungan Anda adalah 1.17, Anda perlu menjalankan perintah berikut.

      sh build.sh 1.17
    3. Setelah kompilasi selesai, temukan file JAR yang dihasilkan di direktori target/.

      Sebagai contoh, nama file biasanya dalam format flink-connector-starrocks-1.2.7_flink-1.17-SNAPSHOT.jar.

      Catatan

      Rilis tidak resmi versi Flink Connector akan memiliki akhiran SNAPSHOT.

    Format penamaan file JAR Flink Connector adalah sebagai berikut:

    • Untuk Flink Connector yang berlaku untuk Flink versi 1.15 dan seterusnya, format penamaannya adalah flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar. Sebagai contoh, jika Anda telah menginstal Flink 1.17 dan ingin menggunakan Flink Connector versi 1.2.8, Anda dapat menggunakan flink-connector-starrocks-1.2.8_flink-1.17.jar.

    • Untuk Flink Connector yang berlaku untuk versi Flink sebelum 1.15, format penamaannya adalah flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar. Sebagai contoh, jika Anda telah menginstal Flink 1.14 dan Scala 2.12, serta ingin menggunakan Flink Connector versi 1.2.7, Anda dapat menggunakan flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar.

      Catatan

      Silakan ganti informasi berikut sesuai dengan situasi aktual Anda:

      • flink_version: Nomor versi Flink.

      • scala_version: Nomor versi Scala.

      • connector_version: Nomor versi Flink Connector.

  2. Unggah file JAR Flink Connector yang diperoleh ke direktori flink-{flink_version}/lib kluster Flink.

    Sebagai contoh, jika Anda menggunakan kluster EMR dengan versi EMR-5.19.0, file JAR harus ditempatkan di direktori /opt/apps/FLINK/flink-current/lib.

Memulai kluster Flink

  1. Masuk ke node master kluster Flink. Untuk informasi selengkapnya, lihat Masuk ke kluster.

  2. Jalankan perintah berikut untuk memulai kluster Flink.

    /opt/apps/FLINK/flink-current/bin/start-cluster.sh

Contoh

Menulis data menggunakan Flink SQL

  1. Buat database bernama test di StarRocks, dan buat tabel kunci primer bernama score_board di dalamnya.

    CREATE DATABASE test;
    
    CREATE TABLE test.score_board(
        id int(11) NOT NULL COMMENT "",
        name varchar(65533) NULL DEFAULT "" COMMENT "",
        score int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(id)
    DISTRIBUTED BY HASH(id);
  2. Masuk ke node master kluster Flink. Untuk informasi selengkapnya, lihat Masuk ke kluster.

  3. Jalankan perintah berikut untuk memulai Flink SQL:

    /opt/apps/FLINK/flink-current/bin/sql-client.sh
  4. Jalankan perintah berikut untuk membuat tabel bernama score_board dan memasukkan data ke dalamnya.

    CREATE TABLE `score_board` (
        `id` INT,
        `name` STRING,
        `score` INT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'score_board',
        'username' = 'admin',
        'password' = '<password>',
    );
    
    INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

    Jika tujuannya adalah mengimpor data ke tabel kunci primer StarRocks, Anda harus secara eksplisit menentukan kunci primer dalam DDL tabel Flink. Untuk jenis tabel StarRocks lainnya (seperti tabel Duplicate Key), mendefinisikan kunci primer bersifat opsional.

Menulis data menggunakan Flink DataStream

Tulis pekerjaan Flink DataStream yang sesuai berdasarkan jenis catatan input yang berbeda.

  • Menulis data string dalam format CSV

    Jika catatan input berupa string dalam format CSV, kode utama pekerjaan Flink DataStream yang sesuai ditunjukkan di bawah ini. Untuk kode lengkapnya, lihat LoadCsvRecords.

    /**
     * Generate CSV-format records. Each record has three values separated by "\t". 
     * These values will be loaded to the columns `id`, `name`, and `score` in the StarRocks table.
     */
    String[] records = new String[]{
            "1\tstarrocks-csv\t100",
            "2\tflink-csv\t100"
    };
    DataStream<String> source = env.fromElements(records);
    
    /**
     * Configure the Flink connector with the required properties.
     * You also need to add properties "sink.properties.format" and "sink.properties.column_separator"
     * to tell the Flink connector the input records are CSV-format, and the column separator is "\t".
     * You can also use other column separators in the CSV-format records,
     * but remember to modify the "sink.properties.column_separator" correspondingly.
     */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", jdbcUrl)
            .withProperty("load-url", loadUrl)
            .withProperty("database-name", "test")
            .withProperty("table-name", "score_board")
            .withProperty("username", "root")
            .withProperty("password", "")
            .withProperty("sink.properties.format", "csv")
            .withProperty("sink.properties.column_separator", "\t")
            .build();
    // Create the sink with the options.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • Menulis data string dalam format JSON

    Jika catatan input berupa string dalam format JSON, kode utama pekerjaan Flink DataStream yang sesuai ditunjukkan di bawah ini. Untuk kode lengkapnya, lihat LoadJsonRecords.

    /**
     * Generate JSON-format records. 
     * Each record has three key-value pairs corresponding to the columns id, name, and score in the StarRocks table.
     */
    String[] records = new String[]{
            "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}",
            "{\"id\":2, \"name\":\"flink-json\", \"score\":100}",
    };
    DataStream<String> source = env.fromElements(records);
    
    /** 
     * Configure the Flink connector with the required properties.
     * You also need to add properties "sink.properties.format" and "sink.properties.strip_outer_array"
     * to tell the Flink connector the input records are JSON-format and to strip the outermost array structure. 
     */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", jdbcUrl)
            .withProperty("load-url", loadUrl)
            .withProperty("database-name", "test")
            .withProperty("table-name", "score_board")
            .withProperty("username", "root")
            .withProperty("password", "")
            .withProperty("sink.properties.format", "json")
            .withProperty("sink.properties.strip_outer_array", "true")
            .build();
    // Create the sink with the options.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • Menulis data objek Java kustom

    Jika catatan input berupa objek Java kustom, kode utama pekerjaan Flink DataStream yang sesuai ditunjukkan di bawah ini. Untuk kode lengkapnya, lihat LoadCustomJavaRecords.

    • Dalam contoh ini, kelas POJO sederhana RowData didefinisikan untuk merepresentasikan setiap catatan.

      public static class RowData {
              public int id;
              public String name;
              public int score;
        
              public RowData() {}
        
              public RowData(int id, String name, int score) {
                  this.id = id;
                  this.name = name;
                  this.score = score;
              }
          }
      
    • Kode utamanya ditunjukkan di bawah ini.

      // Generate records which use RowData as the container.
      RowData[] records = new RowData[]{
              new RowData(1, "starrocks-rowdata", 100),
              new RowData(2, "flink-rowdata", 100),
          };
      DataStream<RowData> source = env.fromElements(records);
      
      // Configure the Flink connector with the required properties.
      StarRocksSinkOptions options = StarRocksSinkOptions.builder()
              .withProperty("jdbc-url", jdbcUrl)
              .withProperty("load-url", loadUrl)
              .withProperty("database-name", "test")
              .withProperty("table-name", "score_board")
              .withProperty("username", "root")
              .withProperty("password", "")
              .build();
      
      /**
       * The Flink connector will use a Java object array (Object[]) to represent a row to be loaded into the StarRocks table,
       * and each element is the value for a column.
       * You need to define the schema of the Object[] which matches that of the StarRocks table.
       */
      TableSchema schema = TableSchema.builder()
              .field("id", DataTypes.INT().notNull())
              .field("name", DataTypes.STRING())
              .field("score", DataTypes.INT())
              // When the StarRocks table is a Primary Key table, you must specify notNull(), for example, DataTypes.INT().notNull(), for the primary key `id`.
              .primaryKey("id")
              .build();
      // Transform the RowData to the Object[] according to the schema.
      RowDataTransformer transformer = new RowDataTransformer();
      // Create the sink with the schema, options, and transformer.
      SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer);
      source.addSink(starRockSink);
      

      RowDataTransformer didefinisikan sebagai berikut.

      private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> {
      
          /**
           * Set each element of the object array according to the input RowData.
           * The schema of the array matches that of the StarRocks table.
           */
          @Override
          public void accept(Object[] internalRow, RowData rowData) {
              internalRow[0] = rowData.id;
              internalRow[1] = rowData.name;
              internalRow[2] = rowData.score;
              // When the StarRocks table is a Primary Key table, you need to set the last element to indicate whether the data loading is an UPSERT or DELETE operation.
              internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
          }
      }  

Menyinkronkan data menggunakan Flink CDC 3.0

Framework Flink CDC 3.0 memungkinkan Anda dengan mudah membangun pipeline ELT streaming dari sumber data CDC (seperti MySQL, Kafka) ke StarRocks. Melalui pipeline ini, Anda dapat mencapai fungsi-fungsi berikut:

  • Pembuatan database dan tabel secara otomatis

  • Sinkronisasi data penuh dan inkremental

  • Sinkronisasi Perubahan Skema

Mulai dari StarRocks Flink Connector v1.2.9, konektor ini telah diintegrasikan ke dalam framework Flink CDC 3.0 dan diberi nama StarRocks Pipeline Connector. Konektor ini memiliki semua fitur di atas dan direkomendasikan untuk digunakan dengan StarRocks v3.2.1 dan di atasnya agar dapat memanfaatkan sepenuhnya fitur fast_schema_evolution, yang lebih lanjut meningkatkan kecepatan penambahan dan penghapusan kolom serta mengurangi konsumsi sumber daya.

Praktik terbaik

Impor ke tabel kunci primer

  1. Buat database bernama test di StarRocks, dan buat tabel kunci primer score_board di dalamnya.

    CREATE DATABASE `test`;
    
    CREATE TABLE `test`.`score_board`
    (
        `id` int(11) NOT NULL COMMENT "",
        `name` varchar(65533) NULL DEFAULT "" COMMENT "",
        `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`);
  2. Masukkan data ke tabel StarRocks.

    INSERT INTO `test`.`score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);
  3. Jalankan perintah berikut untuk memulai klien Flink SQL.

    /opt/apps/FLINK/flink-current/bin/sql-client.sh
  4. Perbarui data.

    Pembaruan parsial

    Pembaruan parsial memungkinkan Anda memperbarui hanya kolom tertentu (seperti name) tanpa memengaruhi kolom lainnya (seperti score).

    1. Buat tabel score_board di klien Flink SQL dan aktifkan fitur pembaruan parsial.

      CREATE TABLE `score_board` (
          `id` INT,
          `name` STRING,
          PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
          'connector' = 'starrocks',
          'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
          'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
          'database-name' = 'test',
          'table-name' = 'score_board',
          'username' = 'admin',
          'password' = '<password>',
          'sink.properties.partial_update' = 'true',
          -- only for Flink connector version <= 1.2.7
          'sink.properties.columns' = 'id,name,__op'
      ); 
      • sink.properties.partial_update: Mengaktifkan pembaruan parsial.

      • sink.properties.columns: Menentukan kolom yang akan diperbarui. Jika versi Flink Connector kurang dari atau sama dengan 1.2.7, Anda juga perlu mengatur opsi sink.properties.columns menjadi id,name,__op untuk memberi tahu konektor Flink kolom mana yang perlu diperbarui. Perhatikan bahwa Anda perlu menambahkan bidang __op di akhir. Bidang __op menunjukkan apakah impor tersebut merupakan operasi UPSERT atau DELETE, dan nilainya diatur secara otomatis oleh konektor Flink.

    2. Masukkan data pembaruan.

      Masukkan dua baris data dengan kunci primer yang sama seperti data yang sudah ada, tetapi dengan nilai yang dimodifikasi pada kolom name.

      INSERT INTO score_board VALUES (1, 'starrocks-update'), (2, 'flink-update');
    3. Kueri tabel StarRocks di SQL Editor.

      SELECT * FROM `test`.`score_board`;

      Anda akan melihat bahwa hanya nilai pada kolom name yang berubah, sedangkan kolom score tetap tidak berubah.

      image

    Pembaruan bersyarat

    Contoh ini menunjukkan cara melakukan pembaruan bersyarat berdasarkan nilai kolom score. Baris data diperbarui hanya ketika nilai kolom score dalam data yang diimpor lebih besar dari atau sama dengan nilai saat ini di tabel StarRocks.

    1. Buat tabel score_board di klien Flink SQL sebagai berikut.

      CREATE TABLE `score_board` (
          `id` INT,
          `name` STRING,
          `score` INT,
          PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
          'connector' = 'starrocks',
          'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
          'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
          'database-name' = 'test',
          'table-name' = 'score_board',
          'username' = 'admin',
          'password' = '<password>',
          'sink.properties.merge_condition' = 'score',
          'sink.version' = 'V1'
      );
      • sink.properties.merge_condition: Diatur ke score, menunjukkan bahwa saat data ditulis, Flink Connector akan menggunakan kolom score sebagai kondisi pembaruan.

      • sink.version: Diatur ke V1, menunjukkan bahwa Flink Connector menggunakan antarmuka Stream Load untuk mengimpor data.

    2. Masukkan dua baris data ke tabel di klien Flink SQL.

      Baris data memiliki kunci primer yang sama dengan baris di tabel StarRocks. Baris data pertama memiliki nilai yang lebih kecil pada kolom score, sedangkan baris data kedua memiliki nilai yang lebih besar pada kolom score.

      INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101);
    3. Kueri tabel StarRocks di SQL Editor.

      SELECT * FROM `test`.`score_board`;

      Anda akan melihat bahwa hanya baris data kedua yang berubah, sedangkan baris data pertama tetap tidak berubah.

      image

Impor ke kolom Bitmap

Tipe Bitmap umumnya digunakan untuk mempercepat skenario penghitungan deduplikasi eksak, seperti menghitung pengunjung unik (UV). Berikut adalah contoh lengkap yang menunjukkan cara mengimpor data ke kolom Bitmap dalam tabel StarRocks melalui Flink SQL dan mengkueri jumlah UV di StarRocks.

  1. Buat tabel agregat StarRocks di SQL Editor.

    Buat tabel agregat bernama page_uv di database test, di mana:

    • Kolom visit_users didefinisikan sebagai tipe BITMAP dan dikonfigurasi dengan fungsi agregat BITMAP_UNION.

    • page_id dan visit_date berfungsi sebagai kunci agregat (AGGREGATE KEY) untuk pengelompokan dan deduplikasi.

    CREATE TABLE `test`.`page_uv` (
      `page_id` INT NOT NULL COMMENT 'page ID',
      `visit_date` datetime NOT NULL COMMENT 'access time',
      `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Buat tabel di klien Flink SQL.

    Karena Flink tidak mendukung tipe Bitmap, pemetaan kolom dan konversi tipe perlu diimplementasikan sebagai berikut:

    • Di tabel Flink, definisikan kolom visit_user_id sebagai tipe BIGINT untuk merepresentasikan kolom visit_users di tabel StarRocks.

    • Gunakan konfigurasi sink.properties.columns untuk mengonversi data di kolom visit_user_id ke tipe Bitmap melalui fungsi to_bitmap.

    CREATE TABLE `page_uv` (
        `page_id` INT,
        `visit_date` TIMESTAMP,
        `visit_user_id` BIGINT
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'page_uv',
        'username' = 'admin',
        'password' = '<password>',
        'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)'
    );
  3. Masukkan data di klien Flink SQL.

    Masukkan beberapa baris data ke tabel page_uv, mensimulasikan pengguna berbeda yang mengakses halaman pada waktu berbeda.

    visit_user_id bertipe BIGINT, dan Flink akan secara otomatis mengonversinya ke tipe Bitmap.
    INSERT INTO `page_uv` VALUES
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
       (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
       (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  4. Kueri jumlah UV di SQL Editor.

    Gunakan kemampuan agregasi StarRocks untuk menghitung jumlah pengunjung unik (UV) untuk setiap halaman menggunakan COUNT(DISTINCT visit_users).

    SELECT page_id, COUNT(DISTINCT visit_users) FROM page_uv GROUP BY page_id;

    Hasil yang dikembalikan ditunjukkan di bawah ini.

    image

Impor ke kolom HLL

HLL (HyperLogLog) adalah tipe data yang digunakan untuk penghitungan deduplikasi perkiraan, cocok untuk menghitung pengunjung unik (UV) dalam skenario data berskala besar. Berikut adalah contoh lengkap yang menunjukkan cara mengimpor data ke kolom HLL dalam tabel StarRocks melalui Flink SQL dan mengkueri jumlah UV di StarRocks.

  1. Buat tabel agregat StarRocks di SQL Editor.

    Buat tabel agregat bernama hll_uv di database test, di mana:

    • Kolom visit_users didefinisikan sebagai tipe HLL dan dikonfigurasi dengan fungsi agregat HLL_UNION.

    • page_id dan visit_date berfungsi sebagai kunci agregat (AGGREGATE KEY) untuk pengelompokan dan deduplikasi.

    CREATE TABLE `test`.`hll_uv` (
      `page_id` INT NOT NULL COMMENT 'page ID',
      `visit_date` DATETIME NOT NULL COMMENT 'access time',
      `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Buat tabel di klien Flink SQL.

    Karena Flink tidak mendukung tipe HLL, pemetaan kolom dan konversi tipe perlu diimplementasikan sebagai berikut:

    • Di tabel Flink, definisikan kolom visit_user_id sebagai tipe BIGINT untuk merepresentasikan kolom visit_users di tabel StarRocks.

    • Gunakan konfigurasi sink.properties.columns untuk pemetaan kolom, dan konversikan data tipe BIGINT visit_user_id ke tipe HLL melalui fungsi hll_hash.

    CREATE TABLE `hll_uv` (
        `page_id` INT,
        `visit_date` TIMESTAMP,
        `visit_user_id` BIGINT
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'hll_uv',
        'username' = 'admin',
        'password' = '<password>',
        'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)'
    );
  3. Masukkan data di klien Flink SQL.

    Masukkan beberapa baris data ke tabel hll_uv, mensimulasikan pengguna berbeda yang mengakses halaman pada waktu berbeda.

    visit_user_id bertipe BIGINT, dan Flink akan secara otomatis mengonversinya ke tipe HLL.
    INSERT INTO `hll_uv` VALUES
       (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
       (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
       (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  4. Kueri jumlah UV di SQL Editor.

    Gunakan kemampuan agregasi StarRocks untuk menghitung jumlah pengunjung unik (UV) untuk setiap halaman menggunakan COUNT(DISTINCT visit_users).

    SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;

    Hasil yang dikembalikan ditunjukkan di bawah ini.

    image