Konektor Flink baru untuk Hologres memungkinkan Anda mengimpor data secara batch ke Hologres. Metode ini menyediakan impor data yang efisien dengan muatan rendah.
Informasi latar belakang
Hologres adalah sistem Pemrosesan Analitik Online (Online Analytical Processing/OLAP) yang andal. Saat diintegrasikan dengan Flink, sistem ini menyediakan kemampuan pemrosesan aliran data real-time yang kuat. Anda dapat menggunakan Flink untuk mengimpor data secara batch ke Hologres dalam skenario di mana ketepatan waktu data tidak kritis, seperti memuat data historis, memproses data offline, atau mengagregasi log. Impor batch memungkinkan Anda menulis sejumlah besar data ke Hologres sekaligus, sehingga lebih efisien, meningkatkan kecepatan impor dan pemanfaatan sumber daya. Anda dapat memilih antara impor real-time dan batch berdasarkan kebutuhan Anda. Untuk informasi selengkapnya tentang impor real-time, lihat Flink yang sepenuhnya dikelola.
Prasyarat
Anda telah membeli instans Hologres. Untuk informasi selengkapnya, lihat Beli instans Hologres.
Kluster Flink versi 1.15 atau yang lebih baru telah diterapkan. Untuk informasi selengkapnya, lihat topik berikut:
Flink open source: Deploy Flink.
Alibaba Cloud Realtime Compute for Apache Flink: Aktifkan Realtime Compute for Apache Flink.
Impor batch menggunakan Realtime Compute for Apache Flink
Sambungkan ke HoloWeb dan eksekusi kueri untuk membuat tabel sink Hologres guna menerima data dari Flink. Untuk informasi selengkapnya, lihat Sambungkan ke HoloWeb dan eksekusi kueri. Topik ini menggunakan tabel
test_sink_customersebagai contoh.-- Buat tabel sink Hologres. CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );CatatanNama dan tipe bidang pada tabel sumber Flink harus sesuai dengan tabel sink Hologres.
Masuk ke Konsol Realtime Compute for Apache Flink. Di halaman Jobs, klik Deploy Job. Konfigurasi pekerjaan penerapan dan klik Deploy. Untuk informasi selengkapnya tentang parameter, lihat Deploy a JAR job.
Parameter utama dijelaskan dalam tabel berikut.
Parameter
Deskripsi
Deployment Job Type
Pilih JAR.
Deployment Mode
Mode streaming dan batch didukung. Topik ini menggunakan mode batch.
Engine Version
Untuk informasi selengkapnya tentang versi engine, lihat Engine versions dan Lifecycle policies. Topik ini menggunakan versi
vvr-8.0.7-flink-1.17sebagai contoh.JAR URI
Unggah konektor Apache Flink: hologres-connector-flink-repartition.jar.
CatatanAnda dapat menggunakan konektor Flink open source untuk mengimpor data secara batch ke Hologres. Untuk informasi selengkapnya tentang kode sumber plugin konektor Flink, lihat repositori GitHub resmi Hologres.
Entry Point Class
Kelas titik masuk program. Untuk konektor Flink, tentukan nama kelas utama sebagai
com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample.Entry Point Main Arguments
Berikan parameter path file
repartition.sql. Selama waktu proses Realtime Compute for Apache Flink, path file dependensi yang dilampirkan adalah/flink/usrlib/. Oleh karena itu, parameter lengkapnya adalah--sqlFilePath="/flink/usrlib/repartition.sql".Additional Dependency Files
Unggah file
repartition.sql. Filerepartition.sqladalah file skrip SQL Flink. File ini mendefinisikan sumber data, mendeklarasikan tabel sink, dan berisi informasi koneksi untuk Hologres. Kode berikut menunjukkan contoh filerepartition.sql.-- sourceDDL. Topik ini menggunakan data uji publik Flink DataGen sebagai sumber data. CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); -- sourceDql. Pernyataan kueri untuk tabel sumber harus memastikan bahwa hasil kueri sesuai dengan tabel sink yang dideklarasikan dalam sinkDDL. Ini mencakup jumlah bidang dan tipe bidang. SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL. Ini adalah deklarasi tabel sink dan konfigurasi koneksi Hologres. CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'yourAccessKeyId' ,'password' = 'yourAccessKeySecret' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );CatatanUntuk informasi selengkapnya tentang parameter koneksi ke Hologres dalam file
repartition.sql, lihat Hologres Flink Connector parameters.Klik nama pekerjaan untuk membuka panel Deployment Details. Di bagian Resource Configurations, ubah Concurrency.
CatatanAtur konkurensi ke nilai yang sama dengan jumlah shard tabel sink Hologres.
Kueri tabel sink Hologres.
Setelah pekerjaan Flink dikirimkan, kueri data yang ditulis di Hologres. Pernyataan berikut merupakan contohnya.
SELECT * FROM test_sink_customer;
Impor batch menggunakan Flink open source
Sambungkan ke HoloWeb dan eksekusi kueri untuk membuat tabel sink Hologres guna menerima data dari Flink. Untuk informasi selengkapnya, lihat Sambungkan ke HoloWeb dan eksekusi kueri. Topik ini menggunakan tabel
test_sink_customersebagai contoh.-- Buat tabel sink Hologres. CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );CatatanTentukan jumlah shard berdasarkan volume data. Untuk informasi selengkapnya tentang shard, lihat Manage table groups and shard counts.
Buat file
repartition.sqldan unggah ke lokasi apa pun di lingkungan kluster Flink. Topik ini menggunakan path/flink-1.15.4/src/repartition.sqlsebagai contoh. Kode berikut menunjukkan contoh filerepartition.sql.CatatanFile
repartition.sqladalah file skrip SQL Flink yang mendefinisikan sumber data, mendeklarasikan tabel sink, serta berisi informasi koneksi untuk Hologres.-- sourceDDL. Topik ini menggunakan data uji publik Flink DataGen sebagai sumber data. CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); -- sourceDql. Pernyataan kueri untuk tabel sumber harus memastikan bahwa hasil kueri sesuai dengan tabel sink yang dideklarasikan dalam sinkDDL. Ini mencakup jumlah bidang dan tipe bidang. SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL. Ini adalah deklarasi tabel sink dan konfigurasi koneksi Hologres. CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'yourAccessKeyId' ,'password' = 'yourAccessKeySecret' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );Parameter utama dijelaskan sebagai berikut:
Parameter
Wajib
Deskripsi
connector
Ya
Jenis tabel sink. Nilainya tetap hologres.
dbname
Ya
Nama database Hologres.
tablename
Ya
Nama tabel di Hologres yang menerima data.
username
Ya
ID AccessKey Akun Alibaba Cloud Anda.
Untuk mendapatkan ID AccessKey, buka halaman AccessKey Management.
password
Ya
Rahasia AccessKey yang sesuai dengan ID AccessKey Akun Alibaba Cloud Anda.
endpoint
Ya
Titik akhir VPC instans Hologres. Buka halaman produk instans di Konsol Hologres dan peroleh titik akhir dari bagian Configurations.
CatatanTitik akhir harus mencantumkan nomor port. Formatnya adalah
ip:port. Gunakan titik akhir VPC jika aplikasi Anda berada di wilayah yang sama dengan instans Hologres. Jika tidak, gunakan titik akhir publik.jdbccopywritemode
Tidak
Mode penulisan data. Nilai yang valid:
false (default): menggunakan mode INSERT.
true: menggunakan mode COPY. Mode COPY mencakup streaming copy (Fixed Copy) dan batch copy. Secara default, mode streaming copy (Fixed Copy) digunakan.
CatatanDibandingkan dengan mode INSERT, mode Fixed Copy memberikan throughput lebih tinggi karena menggunakan pola streaming, latensi data lebih rendah, dan konsumsi memori klien lebih rendah karena tidak memerlukan batching data. Namun, mode ini tidak mendukung rollback data.
bulkload
Tidak
Menentukan apakah akan menggunakan mode batch copy. Nilai yang valid:
true: menggunakan mode batch copy. Parameter ini hanya berlaku jika parameter jdbccopywritemode juga diatur ke true. Jika tidak, mode Fixed Copy yang digunakan.
CatatanDibandingkan dengan streaming copy (Fixed Copy), batch copy lebih efisien dan memanfaatkan sumber daya Hologres lebih baik. Hal ini memberikan performa lebih baik selama penulisan data. Anda dapat memilih mode penulisan data sesuai kebutuhan.
Saat menggunakan batch copy untuk menulis data ke tabel dengan primary key, kunci tabel (table locks) mungkin terjadi. Anda dapat mengatur parameter target-shards.enabled ke true untuk mengurangi granularitas kunci hingga tingkat shard. Hal ini memungkinkan beberapa tugas impor batch berjalan secara konkuren dan mengurangi terjadinya kunci tabel. Dibandingkan dengan mode Fixed Copy, metode ini secara signifikan mengurangi muatan pada instans Hologres saat menggunakan batch copy untuk menulis data ke tabel dengan primary key. Pengujian menunjukkan bahwa muatan dapat berkurang sekitar 66,7%.
Saat menggunakan batch copy, jika tabel tujuan berisi primary key, tabel harus kosong sebelum Anda menulis data ke dalamnya. Jika tidak, deduplikasi berdasarkan primary key selama proses penulisan akan memengaruhi performa penulisan.
false (default): Pengaturan tidak aktif.
target-shards.enabled
Tidak
Menentukan apakah akan mengaktifkan penulisan batch berdasarkan shard target. Nilai yang valid:
true: mengaktifkan penulisan batch berdasarkan shard target. Saat data sumber dipartisi ulang berdasarkan shard, Anda dapat mengurangi granularitas kunci hingga tingkat shard.
false (default): Dinonaktifkan.
CatatanUntuk informasi selengkapnya tentang parameter koneksi Hologres dalam file
repartition.sql, lihat Hologres Flink Connector parameters.Di lingkungan kluster Flink, unggah konektor Flink open source hologres-connector-flink-repartition.jar ke direktori apa pun. Topik ini menggunakan direktori root sebagai contoh.
CatatanKonektor Flink open source memungkinkan impor data batch ke Hologres. Kode sumber plugin ini tersedia di repositori GitHub resmi Hologres.
Kirimkan pekerjaan Flink. Kode berikut menunjukkan contohnya.
./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"Parameter dijelaskan sebagai berikut:
Dexecution.runtime-mode: Mode eksekusi pekerjaan Flink. Untuk informasi selengkapnya, lihat Execution Mode.
p: Konkurensi pekerjaan. Atur konkurensi ke nilai yang sama dengan jumlah shard tabel sink, atau ke nilai yang dapat dibagi rata oleh jumlah shard tersebut.
c: Nama dan path kelas utama dalam hologres-connector-flink-repartition.jar.
sqlFilePath: Path file
repartition.sql.
Kueri tabel sink Hologres.
Setelah pekerjaan Flink dikirimkan, kueri data yang ditulis di Hologres. Pernyataan berikut merupakan contohnya.
SELECT * FROM test_sink_customer;