Hologres menyediakan versi baru dari plugin konektor Flink yang memungkinkan Anda mengimpor data ke Hologres secara batch menggunakan Flink. Hal ini meningkatkan efisiensi impor dan mengurangi beban kerja selama proses impor.
Informasi latar belakang
Dalam pemrosesan data besar, Hologres adalah sistem pemrosesan analitik online (OLAP) yang kuat yang terintegrasi dengan Flink untuk memberikan kemampuan pemrosesan data streaming real-time yang dioptimalkan. Untuk skenario yang tidak memerlukan tingkat ketepatan waktu data yang tinggi, seperti pemuatan batch data historis, pemrosesan data offline, atau agregasi log, kami menyarankan Anda menggunakan Flink untuk mengimpor data ke Hologres secara batch. Impor batch memungkinkan Anda menulis sejumlah besar data ke Hologres sekaligus dengan cara yang lebih efisien dan hemat sumber daya. Ini meningkatkan efisiensi impor data dan pemanfaatan sumber daya. Anda dapat mengimpor data secara real-time atau batch berdasarkan kebutuhan bisnis dan status sumber daya Anda. Untuk informasi lebih lanjut tentang cara mengimpor data secara real-time, lihat Gunakan fully managed Flink.
Prasyarat
Sebuah instance Hologres telah dibeli. Untuk informasi lebih lanjut tentang cara membeli instance Hologres, lihat Beli instance Hologres.
Sebuah kluster Flink versi 1.15 atau lebih baru telah diterapkan. Untuk informasi lebih lanjut, lihat topik-topik berikut:
Apache Flink: Deploy Flink
Realtime Compute for Apache Flink: Aktifkan Realtime Compute for Apache Flink
Gunakan Realtime Compute for Apache Flink untuk mengimpor data secara batch
Buat tabel hasil Hologres di konsol HoloWeb untuk menerima data yang diimpor menggunakan Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Hubungkan ke HoloWeb. Dalam contoh ini, tabel
test_sink_customerdibuat.-- Buat tabel hasil Hologres. CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );CatatanNama dan tipe data kolom dalam tabel sumber Flink harus sesuai dengan nama dan tipe data kolom dalam tabel hasil Hologres.
Masuk ke konsol Realtime Compute for Apache Flink. Di panel navigasi kiri, klik Deployments. Pada halaman Deployments, klik Create Deployment. Di kotak dialog Buat Deployment, konfigurasikan parameter dan klik Deploy. Untuk informasi lebih lanjut tentang pengaturan parameter, lihat bagian "Buat deployment JAR" dalam Buat deployment.
Tabel berikut menjelaskan parameter tersebut.
Parameter
Deskripsi
Deployment Type
Pilih JAR.
Deployment Mode
Pilih Stream Mode atau Batch Mode. Dalam contoh ini, Batch Mode dipilih.
Engine Version
Pilih versi engine Flink yang ingin Anda gunakan untuk deployment. Untuk informasi lebih lanjut tentang versi engine, lihat Versi Engine dan Kebijakan siklus hidup. Dalam contoh ini,
vvr-8.0.7-flink-1.17digunakan.JAR URI
Unggah paket konektor Flink open source hologres-connector-flink-repartition.
CatatanKonektor Flink open source dapat digunakan untuk mengimpor data ke Hologres secara batch. Untuk informasi lebih lanjut tentang kode sumber konektor Flink, kunjungi perpustakaan sampel resmi Hologres.
Entry Point Class
Kelas titik masuk program. Nama kelas utama konektor Flink adalah
com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample.Entry Point Main Arguments
Masukkan parameter jalur file
repartition.sql. Untuk Realtime Compute for Apache Flink, file dependensi tambahan disimpan di/flink/usrlib/. Dalam contoh ini,--sqlFilePath="/flink/usrlib/repartition.sql"dimasukkan.Additional Dependencies
Unggah file
repartition.sql. Filerepartition.sqladalah file skrip SQL Flink yang digunakan untuk mendefinisikan tabel sumber, mendeklarasikan tabel hasil, dan mengonfigurasi informasi koneksi Hologres. Kode sampel berikut menunjukkan isi filerepartition.sqldalam contoh ini.-- Pernyataan bahasa definisi data (DDL) yang digunakan untuk mendefinisikan tabel sumber. Dalam contoh ini, DataGen data uji publik Flink digunakan sebagai data sumber. CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); -- Pernyataan DQL yang digunakan untuk mengambil data dari tabel sumber. Jumlah dan tipe data kolom yang dikembalikan oleh pernyataan DQL ini harus sesuai dengan jumlah dan tipe data kolom yang dideklarasikan dalam pernyataan DDL tabel hasil. SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- Pernyataan DDL yang digunakan untuk mendeklarasikan tabel hasil dan mengonfigurasi informasi koneksi Hologres. CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'yourAccessKeyId' ,'password' = 'yourAccessKeySecret' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );CatatanUntuk informasi lebih lanjut tentang parameter koneksi Hologres dalam file
repartition.sql, lihat Tulis data dari open source Apache Flink 1.11 atau lebih baru ke Hologres secara real-time.Klik nama deployment yang diinginkan. Pada tab Configuration, modifikasi konfigurasi deployment di bagian Resources dan nilai parameter Parallelism.
CatatanKami menyarankan Anda mengatur parameter Paralelisme ke jumlah shard dalam tabel hasil Hologres.
Kueri data dari tabel hasil Hologres.
Setelah deployment Flink dikirimkan, Anda dapat mengkueri data dari tabel hasil Hologres. Contoh pernyataan:
SELECT * FROM test_sink_customer;
Gunakan Apache Flink untuk mengimpor data ke Hologres secara batch
Buat tabel hasil Hologres di konsol HoloWeb untuk menerima data yang diimpor menggunakan Apache Flink. Untuk informasi lebih lanjut, lihat Hubungkan ke HoloWeb. Dalam contoh ini, tabel
test_sink_customerdibuat.-- Buat tabel hasil Hologres. CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );CatatanAnda dapat menentukan jumlah shard berdasarkan jumlah data. Untuk informasi lebih lanjut tentang shard, lihat Panduan pengguna grup tabel dan jumlah shard.
Buat file
repartition.sqldan unggah file tersebut ke direktori di kluster Flink. Dalam contoh ini, file diunggah ke direktori/flink-1.15.4/src. Kode sampel berikut menunjukkan isi filerepartition.sql.CatatanFile
repartition.sqladalah file skrip SQL Flink yang digunakan untuk mendefinisikan tabel sumber, mendeklarasikan tabel hasil, dan mengonfigurasi informasi koneksi Hologres.-- Pernyataan DDL yang digunakan untuk mendefinisikan tabel sumber. Dalam contoh ini, DataGen data uji publik Flink digunakan sebagai data sumber. CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); -- Pernyataan DQL yang digunakan untuk mengambil data dari tabel sumber. Jumlah dan tipe data kolom yang dikembalikan oleh pernyataan DQL ini harus sesuai dengan jumlah dan tipe data kolom yang dideklarasikan dalam pernyataan DDL tabel hasil. SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- Pernyataan DDL yang digunakan untuk mendeklarasikan tabel hasil dan mengonfigurasi informasi koneksi Hologres. CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'yourAccessKeyId' ,'password' = 'yourAccessKeySecret' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );Konfigurasikan parameter. Tabel berikut menjelaskan parameter tersebut.
Parameter
Diperlukan
Deskripsi
connector
Ya
Tipe tabel hasil. Atur parameter ini ke hologres.
dbname
Ya
Nama database Hologres.
tablename
Ya
Nama tabel Hologres ke mana data diimpor.
username
Ya
ID AccessKey akun Alibaba Cloud.
Anda dapat memperoleh ID AccessKey dari halaman Pasangan AccessKey.
password
Ya
Rahasia AccessKey akun Alibaba Cloud.
endpoint
Ya
Titik akhir virtual private cloud (VPC) instance Hologres Anda. Anda dapat melihat titik akhir instance Hologres pada tab Configurations halaman detail instance di konsol Hologres.
CatatanTitik akhir harus berisi nomor port dan harus dalam format
Alamat IP:Nomor Port. Jika instance Hologres dan deployment Flink berada di wilayah yang sama, gunakan titik akhir VPC instance Hologres. Jika instance Hologres dan deployment Flink berada di wilayah yang berbeda, gunakan titik akhir publik instance Hologres.jdbccopywritemode
Tidak
Metode penulisan data. Nilai yang valid:
false: Pernyataan INSERT digunakan. Ini adalah nilai default.
true: Mode salinan digunakan. Mode salinan batch dan mode salinan tetap disediakan. Secara default, mode salinan tetap digunakan.
CatatanDalam mode salinan tetap, data ditulis dalam mode streaming bukan dalam batch. Ini memastikan throughput yang lebih tinggi dan latensi data yang lebih rendah serta mengonsumsi lebih sedikit sumber daya memori klien daripada penulisan data menggunakan pernyataan INSERT. Namun, data yang diperbarui dalam mode salinan tetap tidak dapat ditarik kembali.
bulkload
Tidak
Menentukan apakah akan menggunakan mode salinan batch untuk menulis data. Nilai yang valid:
true: Mode salinan batch digunakan. Parameter ini hanya berlaku jika Anda mengatur parameter jdbccopywritemode ke true. Jika Anda mengatur parameter ini ke false, mode salinan tetap digunakan.
CatatanDibandingkan dengan mode salinan tetap, mode salinan batch memberikan efisiensi yang lebih tinggi, pemanfaatan sumber daya yang lebih baik, dan performa yang lebih baik. Pilih mode penulisan data berdasarkan kebutuhan bisnis Anda.
Pada sebagian besar kasus, ketika Anda menulis data ke tabel yang dikonfigurasi dengan kunci utama menggunakan mode salinan batch, tabel tersebut dikunci. Anda dapat mengatur parameter target-shards.enabled ke true untuk mengubah kunci level tabel menjadi kunci level shard. Dengan cara ini, beberapa tugas impor data pada tabel yang sama dapat dilakukan pada saat yang bersamaan. Dibandingkan dengan mode salinan tetap, mode salinan batch dapat secara signifikan mengurangi beban pada instance Hologres sekitar 66,7% ketika Anda menulis data ke tabel yang dikonfigurasi dengan kunci utama. Pengurangan ini diverifikasi oleh tes.
Jika Anda ingin menulis data ke tabel yang dikonfigurasi dengan kunci utama menggunakan mode salinan batch, tabel hasil harus kosong. Jika tabel hasil tidak kosong, deduplikasi data berbasis kunci utama secara negatif memengaruhi performa penulisan data.
false: Mode salinan batch tidak digunakan. Ini adalah nilai default.
target-shards.enabled
Tidak
Menentukan apakah akan mengaktifkan penulisan data batch pada level shard. Nilai yang valid:
true: Penulisan data batch pada level shard diaktifkan. Jika data sumber dipartisi berdasarkan shard, kunci level tabel diubah menjadi kunci level shard.
false: Penulisan data batch pada level shard dinonaktifkan. Ini adalah nilai default.
CatatanUntuk informasi lebih lanjut tentang parameter koneksi Hologres dalam file
repartition.sql, lihat Tulis data dari open source Apache Flink 1.11 atau lebih baru ke Hologres secara real-time.Di kluster Flink, unggah paket konektor Flink open source hologres-connector-flink-repartition ke direktori. Dalam contoh ini, paket diunggah ke direktori root.
CatatanKonektor Flink open source dapat digunakan untuk mengimpor data ke Hologres secara batch. Untuk informasi lebih lanjut tentang kode sumber konektor Flink, kunjungi perpustakaan sampel resmi Hologres.
Kirim deployment Flink. Contoh kode:
./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"Parameter dalam kode sebelumnya:
Dexecution.runtime-mode: mode eksekusi deployment Flink. Untuk informasi lebih lanjut, lihat Execution Mode.
p: paralelisme deployment Flink. Kami menyarankan Anda mengatur parameter ini ke jumlah shard dalam tabel hasil atau nilai yang dapat dibagi dengan jumlah shard dalam tabel hasil.
c: nama kelas utama dan jalur file hologres-connector-flink-repartition.
sqlFilePath: jalur file
repartition.sql.
Kueri data dari tabel hasil Hologres.
Setelah deployment Flink dikirimkan, Anda dapat mengkueri data dari tabel hasil Hologres. Contoh pernyataan:
SELECT * FROM test_sink_customer;