全部产品
Search
文档中心

Hologres:Gunakan Flink untuk mengimpor data ke Hologres secara batch

更新时间:Jul 02, 2025

Hologres menyediakan versi baru dari plugin konektor Flink yang memungkinkan Anda mengimpor data ke Hologres secara batch menggunakan Flink. Hal ini meningkatkan efisiensi impor dan mengurangi beban kerja selama proses impor.

Informasi latar belakang

Dalam pemrosesan data besar, Hologres adalah sistem pemrosesan analitik online (OLAP) yang kuat yang terintegrasi dengan Flink untuk memberikan kemampuan pemrosesan data streaming real-time yang dioptimalkan. Untuk skenario yang tidak memerlukan tingkat ketepatan waktu data yang tinggi, seperti pemuatan batch data historis, pemrosesan data offline, atau agregasi log, kami menyarankan Anda menggunakan Flink untuk mengimpor data ke Hologres secara batch. Impor batch memungkinkan Anda menulis sejumlah besar data ke Hologres sekaligus dengan cara yang lebih efisien dan hemat sumber daya. Ini meningkatkan efisiensi impor data dan pemanfaatan sumber daya. Anda dapat mengimpor data secara real-time atau batch berdasarkan kebutuhan bisnis dan status sumber daya Anda. Untuk informasi lebih lanjut tentang cara mengimpor data secara real-time, lihat Gunakan fully managed Flink.

Prasyarat

Gunakan Realtime Compute for Apache Flink untuk mengimpor data secara batch

  1. Buat tabel hasil Hologres di konsol HoloWeb untuk menerima data yang diimpor menggunakan Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Hubungkan ke HoloWeb. Dalam contoh ini, tabel test_sink_customer dibuat.

    -- Buat tabel hasil Hologres.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    Catatan

    Nama dan tipe data kolom dalam tabel sumber Flink harus sesuai dengan nama dan tipe data kolom dalam tabel hasil Hologres.

  2. Masuk ke konsol Realtime Compute for Apache Flink. Di panel navigasi kiri, klik Deployments. Pada halaman Deployments, klik Create Deployment. Di kotak dialog Buat Deployment, konfigurasikan parameter dan klik Deploy. Untuk informasi lebih lanjut tentang pengaturan parameter, lihat bagian "Buat deployment JAR" dalam Buat deployment.

    Tabel berikut menjelaskan parameter tersebut.

    Parameter

    Deskripsi

    Deployment Type

    Pilih JAR.

    Deployment Mode

    Pilih Stream Mode atau Batch Mode. Dalam contoh ini, Batch Mode dipilih.

    Engine Version

    Pilih versi engine Flink yang ingin Anda gunakan untuk deployment. Untuk informasi lebih lanjut tentang versi engine, lihat Versi Engine dan Kebijakan siklus hidup. Dalam contoh ini, vvr-8.0.7-flink-1.17 digunakan.

    JAR URI

    Unggah paket konektor Flink open source hologres-connector-flink-repartition.

    Catatan

    Konektor Flink open source dapat digunakan untuk mengimpor data ke Hologres secara batch. Untuk informasi lebih lanjut tentang kode sumber konektor Flink, kunjungi perpustakaan sampel resmi Hologres.

    Entry Point Class

    Kelas titik masuk program. Nama kelas utama konektor Flink adalah com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample.

    Entry Point Main Arguments

    Masukkan parameter jalur file repartition.sql. Untuk Realtime Compute for Apache Flink, file dependensi tambahan disimpan di /flink/usrlib/. Dalam contoh ini, --sqlFilePath="/flink/usrlib/repartition.sql" dimasukkan.

    Additional Dependencies

    Unggah file repartition.sql. File repartition.sql adalah file skrip SQL Flink yang digunakan untuk mendefinisikan tabel sumber, mendeklarasikan tabel hasil, dan mengonfigurasi informasi koneksi Hologres. Kode sampel berikut menunjukkan isi file repartition.sql dalam contoh ini.

    -- Pernyataan bahasa definisi data (DDL) yang digunakan untuk mendefinisikan tabel sumber. Dalam contoh ini, DataGen data uji publik Flink digunakan sebagai data sumber.
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- Pernyataan DQL yang digunakan untuk mengambil data dari tabel sumber. Jumlah dan tipe data kolom yang dikembalikan oleh pernyataan DQL ini harus sesuai dengan jumlah dan tipe data kolom yang dideklarasikan dalam pernyataan DDL tabel hasil.
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- Pernyataan DDL yang digunakan untuk mendeklarasikan tabel hasil dan mengonfigurasi informasi koneksi Hologres.
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );
    Catatan

    Untuk informasi lebih lanjut tentang parameter koneksi Hologres dalam file repartition.sql, lihat Tulis data dari open source Apache Flink 1.11 atau lebih baru ke Hologres secara real-time.

  3. Klik nama deployment yang diinginkan. Pada tab Configuration, modifikasi konfigurasi deployment di bagian Resources dan nilai parameter Parallelism.

    Catatan

    Kami menyarankan Anda mengatur parameter Paralelisme ke jumlah shard dalam tabel hasil Hologres.

  4. Kueri data dari tabel hasil Hologres.

    Setelah deployment Flink dikirimkan, Anda dapat mengkueri data dari tabel hasil Hologres. Contoh pernyataan:

    SELECT * FROM test_sink_customer;

Gunakan Apache Flink untuk mengimpor data ke Hologres secara batch

  1. Buat tabel hasil Hologres di konsol HoloWeb untuk menerima data yang diimpor menggunakan Apache Flink. Untuk informasi lebih lanjut, lihat Hubungkan ke HoloWeb. Dalam contoh ini, tabel test_sink_customer dibuat.

    -- Buat tabel hasil Hologres.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    Catatan

    Anda dapat menentukan jumlah shard berdasarkan jumlah data. Untuk informasi lebih lanjut tentang shard, lihat Panduan pengguna grup tabel dan jumlah shard.

  2. Buat file repartition.sql dan unggah file tersebut ke direktori di kluster Flink. Dalam contoh ini, file diunggah ke direktori /flink-1.15.4/src. Kode sampel berikut menunjukkan isi file repartition.sql.

    Catatan

    File repartition.sql adalah file skrip SQL Flink yang digunakan untuk mendefinisikan tabel sumber, mendeklarasikan tabel hasil, dan mengonfigurasi informasi koneksi Hologres.

    -- Pernyataan DDL yang digunakan untuk mendefinisikan tabel sumber. Dalam contoh ini, DataGen data uji publik Flink digunakan sebagai data sumber.
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- Pernyataan DQL yang digunakan untuk mengambil data dari tabel sumber. Jumlah dan tipe data kolom yang dikembalikan oleh pernyataan DQL ini harus sesuai dengan jumlah dan tipe data kolom yang dideklarasikan dalam pernyataan DDL tabel hasil.
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- Pernyataan DDL yang digunakan untuk mendeklarasikan tabel hasil dan mengonfigurasi informasi koneksi Hologres.
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );

    Konfigurasikan parameter. Tabel berikut menjelaskan parameter tersebut.

    Parameter

    Diperlukan

    Deskripsi

    connector

    Ya

    Tipe tabel hasil. Atur parameter ini ke hologres.

    dbname

    Ya

    Nama database Hologres.

    tablename

    Ya

    Nama tabel Hologres ke mana data diimpor.

    username

    Ya

    ID AccessKey akun Alibaba Cloud.

    Anda dapat memperoleh ID AccessKey dari halaman Pasangan AccessKey.

    password

    Ya

    Rahasia AccessKey akun Alibaba Cloud.

    endpoint

    Ya

    Titik akhir virtual private cloud (VPC) instance Hologres Anda. Anda dapat melihat titik akhir instance Hologres pada tab Configurations halaman detail instance di konsol Hologres.

    Catatan

    Titik akhir harus berisi nomor port dan harus dalam format Alamat IP:Nomor Port. Jika instance Hologres dan deployment Flink berada di wilayah yang sama, gunakan titik akhir VPC instance Hologres. Jika instance Hologres dan deployment Flink berada di wilayah yang berbeda, gunakan titik akhir publik instance Hologres.

    jdbccopywritemode

    Tidak

    Metode penulisan data. Nilai yang valid:

    • false: Pernyataan INSERT digunakan. Ini adalah nilai default.

    • true: Mode salinan digunakan. Mode salinan batch dan mode salinan tetap disediakan. Secara default, mode salinan tetap digunakan.

      Catatan

      Dalam mode salinan tetap, data ditulis dalam mode streaming bukan dalam batch. Ini memastikan throughput yang lebih tinggi dan latensi data yang lebih rendah serta mengonsumsi lebih sedikit sumber daya memori klien daripada penulisan data menggunakan pernyataan INSERT. Namun, data yang diperbarui dalam mode salinan tetap tidak dapat ditarik kembali.

    bulkload

    Tidak

    Menentukan apakah akan menggunakan mode salinan batch untuk menulis data. Nilai yang valid:

    • true: Mode salinan batch digunakan. Parameter ini hanya berlaku jika Anda mengatur parameter jdbccopywritemode ke true. Jika Anda mengatur parameter ini ke false, mode salinan tetap digunakan.

      Catatan
      • Dibandingkan dengan mode salinan tetap, mode salinan batch memberikan efisiensi yang lebih tinggi, pemanfaatan sumber daya yang lebih baik, dan performa yang lebih baik. Pilih mode penulisan data berdasarkan kebutuhan bisnis Anda.

      • Pada sebagian besar kasus, ketika Anda menulis data ke tabel yang dikonfigurasi dengan kunci utama menggunakan mode salinan batch, tabel tersebut dikunci. Anda dapat mengatur parameter target-shards.enabled ke true untuk mengubah kunci level tabel menjadi kunci level shard. Dengan cara ini, beberapa tugas impor data pada tabel yang sama dapat dilakukan pada saat yang bersamaan. Dibandingkan dengan mode salinan tetap, mode salinan batch dapat secara signifikan mengurangi beban pada instance Hologres sekitar 66,7% ketika Anda menulis data ke tabel yang dikonfigurasi dengan kunci utama. Pengurangan ini diverifikasi oleh tes.

      • Jika Anda ingin menulis data ke tabel yang dikonfigurasi dengan kunci utama menggunakan mode salinan batch, tabel hasil harus kosong. Jika tabel hasil tidak kosong, deduplikasi data berbasis kunci utama secara negatif memengaruhi performa penulisan data.

    • false: Mode salinan batch tidak digunakan. Ini adalah nilai default.

    target-shards.enabled

    Tidak

    Menentukan apakah akan mengaktifkan penulisan data batch pada level shard. Nilai yang valid:

    • true: Penulisan data batch pada level shard diaktifkan. Jika data sumber dipartisi berdasarkan shard, kunci level tabel diubah menjadi kunci level shard.

    • false: Penulisan data batch pada level shard dinonaktifkan. Ini adalah nilai default.

    Catatan

    Untuk informasi lebih lanjut tentang parameter koneksi Hologres dalam file repartition.sql, lihat Tulis data dari open source Apache Flink 1.11 atau lebih baru ke Hologres secara real-time.

  3. Di kluster Flink, unggah paket konektor Flink open source hologres-connector-flink-repartition ke direktori. Dalam contoh ini, paket diunggah ke direktori root.

    Catatan

    Konektor Flink open source dapat digunakan untuk mengimpor data ke Hologres secara batch. Untuk informasi lebih lanjut tentang kode sumber konektor Flink, kunjungi perpustakaan sampel resmi Hologres.

  4. Kirim deployment Flink. Contoh kode:

    ./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"

    Parameter dalam kode sebelumnya:

    • Dexecution.runtime-mode: mode eksekusi deployment Flink. Untuk informasi lebih lanjut, lihat Execution Mode.

    • p: paralelisme deployment Flink. Kami menyarankan Anda mengatur parameter ini ke jumlah shard dalam tabel hasil atau nilai yang dapat dibagi dengan jumlah shard dalam tabel hasil.

    • c: nama kelas utama dan jalur file hologres-connector-flink-repartition.

    • sqlFilePath: jalur file repartition.sql.

  5. Kueri data dari tabel hasil Hologres.

    Setelah deployment Flink dikirimkan, Anda dapat mengkueri data dari tabel hasil Hologres. Contoh pernyataan:

    SELECT * FROM test_sink_customer;