All Products
Search
Document Center

Hologres:Impor data batch dengan Flink

Last Updated:Mar 11, 2026

Konektor Flink baru untuk Hologres memungkinkan Anda mengimpor data secara batch ke Hologres. Metode ini menyediakan impor data yang efisien dengan muatan rendah.

Informasi latar belakang

Hologres adalah sistem Pemrosesan Analitik Online (Online Analytical Processing/OLAP) yang andal. Saat diintegrasikan dengan Flink, sistem ini menyediakan kemampuan pemrosesan aliran data real-time yang kuat. Anda dapat menggunakan Flink untuk mengimpor data secara batch ke Hologres dalam skenario di mana ketepatan waktu data tidak kritis, seperti memuat data historis, memproses data offline, atau mengagregasi log. Impor batch memungkinkan Anda menulis sejumlah besar data ke Hologres sekaligus, sehingga lebih efisien, meningkatkan kecepatan impor dan pemanfaatan sumber daya. Anda dapat memilih antara impor real-time dan batch berdasarkan kebutuhan Anda. Untuk informasi selengkapnya tentang impor real-time, lihat Flink yang sepenuhnya dikelola.

Prasyarat

Impor batch menggunakan Realtime Compute for Apache Flink

  1. Sambungkan ke HoloWeb dan eksekusi kueri untuk membuat tabel sink Hologres guna menerima data dari Flink. Untuk informasi selengkapnya, lihat Sambungkan ke HoloWeb dan eksekusi kueri. Topik ini menggunakan tabel test_sink_customer sebagai contoh.

    -- Buat tabel sink Hologres.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
      orientation="column"
    );
    Catatan

    Nama dan tipe bidang pada tabel sumber Flink harus sesuai dengan tabel sink Hologres.

  2. Masuk ke Konsol Realtime Compute for Apache Flink. Di halaman Jobs, klik Deploy Job. Konfigurasi pekerjaan penerapan dan klik Deploy. Untuk informasi selengkapnya tentang parameter, lihat Deploy a JAR job.

    Parameter utama dijelaskan dalam tabel berikut.

    Parameter

    Deskripsi

    Deployment Job Type

    Pilih JAR.

    Deployment Mode

    Mode streaming dan batch didukung. Topik ini menggunakan mode batch.

    Engine Version

    Untuk informasi selengkapnya tentang versi engine, lihat Engine versions dan Lifecycle policies. Topik ini menggunakan versi vvr-8.0.7-flink-1.17 sebagai contoh.

    JAR URI

    Unggah konektor Apache Flink: hologres-connector-flink-repartition.jar.

    Catatan

    Anda dapat menggunakan konektor Flink open source untuk mengimpor data secara batch ke Hologres. Untuk informasi selengkapnya tentang kode sumber plugin konektor Flink, lihat repositori GitHub resmi Hologres.

    Entry Point Class

    Kelas titik masuk program. Untuk konektor Flink, tentukan nama kelas utama sebagai com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample.

    Entry Point Main Arguments

    Berikan parameter path file repartition.sql. Selama waktu proses Realtime Compute for Apache Flink, path file dependensi yang dilampirkan adalah /flink/usrlib/. Oleh karena itu, parameter lengkapnya adalah --sqlFilePath="/flink/usrlib/repartition.sql".

    Additional Dependency Files

    Unggah file repartition.sql. File repartition.sql adalah file skrip SQL Flink. File ini mendefinisikan sumber data, mendeklarasikan tabel sink, dan berisi informasi koneksi untuk Hologres. Kode berikut menunjukkan contoh file repartition.sql.

    -- sourceDDL. Topik ini menggunakan data uji publik Flink DataGen sebagai sumber data.
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- sourceDql. Pernyataan kueri untuk tabel sumber harus memastikan bahwa hasil kueri sesuai dengan tabel sink yang dideklarasikan dalam sinkDDL. Ini mencakup jumlah bidang dan tipe bidang.
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL. Ini adalah deklarasi tabel sink dan konfigurasi koneksi Hologres.
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );
    Catatan

    Untuk informasi selengkapnya tentang parameter koneksi ke Hologres dalam file repartition.sql, lihat Hologres Flink Connector parameters.

  3. Klik nama pekerjaan untuk membuka panel Deployment Details. Di bagian Resource Configurations, ubah Concurrency.

    Catatan

    Atur konkurensi ke nilai yang sama dengan jumlah shard tabel sink Hologres.

  4. Kueri tabel sink Hologres.

    Setelah pekerjaan Flink dikirimkan, kueri data yang ditulis di Hologres. Pernyataan berikut merupakan contohnya.

    SELECT * FROM test_sink_customer;

Impor batch menggunakan Flink open source

  1. Sambungkan ke HoloWeb dan eksekusi kueri untuk membuat tabel sink Hologres guna menerima data dari Flink. Untuk informasi selengkapnya, lihat Sambungkan ke HoloWeb dan eksekusi kueri. Topik ini menggunakan tabel test_sink_customer sebagai contoh.

    -- Buat tabel sink Hologres.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    Catatan

    Tentukan jumlah shard berdasarkan volume data. Untuk informasi selengkapnya tentang shard, lihat Manage table groups and shard counts.

  2. Buat file repartition.sql dan unggah ke lokasi apa pun di lingkungan kluster Flink. Topik ini menggunakan path /flink-1.15.4/src/repartition.sql sebagai contoh. Kode berikut menunjukkan contoh file repartition.sql.

    Catatan

    File repartition.sql adalah file skrip SQL Flink yang mendefinisikan sumber data, mendeklarasikan tabel sink, serta berisi informasi koneksi untuk Hologres.

    -- sourceDDL. Topik ini menggunakan data uji publik Flink DataGen sebagai sumber data.
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- sourceDql. Pernyataan kueri untuk tabel sumber harus memastikan bahwa hasil kueri sesuai dengan tabel sink yang dideklarasikan dalam sinkDDL. Ini mencakup jumlah bidang dan tipe bidang.
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL. Ini adalah deklarasi tabel sink dan konfigurasi koneksi Hologres.
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );

    Parameter utama dijelaskan sebagai berikut:

    Parameter

    Wajib

    Deskripsi

    connector

    Ya

    Jenis tabel sink. Nilainya tetap hologres.

    dbname

    Ya

    Nama database Hologres.

    tablename

    Ya

    Nama tabel di Hologres yang menerima data.

    username

    Ya

    ID AccessKey Akun Alibaba Cloud Anda.

    Untuk mendapatkan ID AccessKey, buka halaman AccessKey Management.

    password

    Ya

    Rahasia AccessKey yang sesuai dengan ID AccessKey Akun Alibaba Cloud Anda.

    endpoint

    Ya

    Titik akhir VPC instans Hologres. Buka halaman produk instans di Konsol Hologres dan peroleh titik akhir dari bagian Configurations.

    Catatan

    Titik akhir harus mencantumkan nomor port. Formatnya adalah ip:port. Gunakan titik akhir VPC jika aplikasi Anda berada di wilayah yang sama dengan instans Hologres. Jika tidak, gunakan titik akhir publik.

    jdbccopywritemode

    Tidak

    Mode penulisan data. Nilai yang valid:

    • false (default): menggunakan mode INSERT.

    • true: menggunakan mode COPY. Mode COPY mencakup streaming copy (Fixed Copy) dan batch copy. Secara default, mode streaming copy (Fixed Copy) digunakan.

      Catatan

      Dibandingkan dengan mode INSERT, mode Fixed Copy memberikan throughput lebih tinggi karena menggunakan pola streaming, latensi data lebih rendah, dan konsumsi memori klien lebih rendah karena tidak memerlukan batching data. Namun, mode ini tidak mendukung rollback data.

    bulkload

    Tidak

    Menentukan apakah akan menggunakan mode batch copy. Nilai yang valid:

    • true: menggunakan mode batch copy. Parameter ini hanya berlaku jika parameter jdbccopywritemode juga diatur ke true. Jika tidak, mode Fixed Copy yang digunakan.

      Catatan
      • Dibandingkan dengan streaming copy (Fixed Copy), batch copy lebih efisien dan memanfaatkan sumber daya Hologres lebih baik. Hal ini memberikan performa lebih baik selama penulisan data. Anda dapat memilih mode penulisan data sesuai kebutuhan.

      • Saat menggunakan batch copy untuk menulis data ke tabel dengan primary key, kunci tabel (table locks) mungkin terjadi. Anda dapat mengatur parameter target-shards.enabled ke true untuk mengurangi granularitas kunci hingga tingkat shard. Hal ini memungkinkan beberapa tugas impor batch berjalan secara konkuren dan mengurangi terjadinya kunci tabel. Dibandingkan dengan mode Fixed Copy, metode ini secara signifikan mengurangi muatan pada instans Hologres saat menggunakan batch copy untuk menulis data ke tabel dengan primary key. Pengujian menunjukkan bahwa muatan dapat berkurang sekitar 66,7%.

      • Saat menggunakan batch copy, jika tabel tujuan berisi primary key, tabel harus kosong sebelum Anda menulis data ke dalamnya. Jika tidak, deduplikasi berdasarkan primary key selama proses penulisan akan memengaruhi performa penulisan.

    • false (default): Pengaturan tidak aktif.

    target-shards.enabled

    Tidak

    Menentukan apakah akan mengaktifkan penulisan batch berdasarkan shard target. Nilai yang valid:

    • true: mengaktifkan penulisan batch berdasarkan shard target. Saat data sumber dipartisi ulang berdasarkan shard, Anda dapat mengurangi granularitas kunci hingga tingkat shard.

    • false (default): Dinonaktifkan.

    Catatan

    Untuk informasi selengkapnya tentang parameter koneksi Hologres dalam file repartition.sql, lihat Hologres Flink Connector parameters.

  3. Di lingkungan kluster Flink, unggah konektor Flink open source hologres-connector-flink-repartition.jar ke direktori apa pun. Topik ini menggunakan direktori root sebagai contoh.

    Catatan

    Konektor Flink open source memungkinkan impor data batch ke Hologres. Kode sumber plugin ini tersedia di repositori GitHub resmi Hologres.

  4. Kirimkan pekerjaan Flink. Kode berikut menunjukkan contohnya.

    ./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"

    Parameter dijelaskan sebagai berikut:

    • Dexecution.runtime-mode: Mode eksekusi pekerjaan Flink. Untuk informasi selengkapnya, lihat Execution Mode.

    • p: Konkurensi pekerjaan. Atur konkurensi ke nilai yang sama dengan jumlah shard tabel sink, atau ke nilai yang dapat dibagi rata oleh jumlah shard tersebut.

    • c: Nama dan path kelas utama dalam hologres-connector-flink-repartition.jar.

    • sqlFilePath: Path file repartition.sql.

  5. Kueri tabel sink Hologres.

    Setelah pekerjaan Flink dikirimkan, kueri data yang ditulis di Hologres. Pernyataan berikut merupakan contohnya.

    SELECT * FROM test_sink_customer;