Topik ini menjelaskan cara menulis data dari Apache Flink 1.11 atau yang lebih baru ke Hologres secara real-time.
Prasyarat
Sebuah instance Hologres telah dibuat dan alat pengembangan digunakan untuk terhubung ke instance tersebut. Untuk informasi lebih lanjut, lihat Terhubung ke HoloWeb.
Sebuah klaster Apache Flink telah dibuat. Dalam contoh ini, klaster Apache Flink 1.15 digunakan. Anda dapat mengunduh file biner dari situs resmi Apache Flink dan membuat klaster Apache Flink yang beroperasi dalam mode standalone. Untuk informasi lebih lanjut, lihat Instalasi Lokal Flink.
Latar Belakang
Kode konektor Hologres telah dibuka sumbernya sejak Apache Flink 1.11 dan versi yang lebih baru. Paket rilis konektor Hologres yang sesuai dengan berbagai versi Apache Flink tersedia di Repositori Maven. File pom.xml berikut memberikan contoh konfigurasinya.
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>hologres-connector-flink-1.15</artifactId>
<version>1.4.0</version>
<classifier>jar-with-dependencies</classifier>
</dependency>Tabel berikut menjelaskan pemetaan versi antara Apache Flink dan konektor Hologres. Kami merekomendasikan penggunaan versi 1.15 atau yang lebih baru untuk memanfaatkan lebih banyak fitur.
Versi Apache Flink | Versi konektor Hologres |
Apache Flink 1.11 | hologres-connector-flink-1.11:1.0.1 |
Apache Flink 1.12 | hologres-connector-flink-1.12:1.0.1 |
Apache Flink 1.13 | hologres-connector-flink-1.13:1.3.2 |
Apache Flink 1.14 | hologres-connector-flink-1.14:1.3.2 |
Apache Flink 1.15 | hologres-connector-flink-1.15:1.4.1 |
Apache Flink 1.17 | hologres-connector-flink-1.17:1.4.1 |
Kode Contoh
Anda dapat mengeksekusi pernyataan SQL Flink dengan sintaks berikut untuk menulis data ke Hologres.
String createHologresTable =
String.format(
"create table sink("
+ " user_id bigint,"
+ " user_name string,"
+ " price decimal(38,2),"
+ " sale_timestamp timestamp"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s'"
+ ")",
database, tableName, userName, password, endPoint);
tEnv.executeSql(createHologresTable);
createScanTable(tEnv);
tEnv.executeSql("insert into sink select * from source");
FlinkSQLToHoloExample: Aplikasi untuk menulis data ke Hologres menggunakan antarmuka SQL Flink.
FlinkDSAndSQLToHoloExample: Aplikasi untuk mentransformasikan aliran data menjadi tabel dan menuliskannya ke Hologres. Antarmuka DataStream Flink digunakan untuk transformasi, sementara antarmuka SQL Flink digunakan untuk penulisan data.
FlinkDataStreamToHoloExample: Aplikasi untuk menulis aliran data ke Hologres menggunakan antarmuka DataStream Flink.
FlinkRoaringBitmapAggJob: Aplikasi untuk menghitung jumlah pengunjung unik (UV) setelah deduplikasi secara real-time dan menulis statistik ke Hologres. Aplikasi ini menggunakan Flink, roaring bitmaps, dan tabel dimensi Hologres.
FlinkToHoloRePartitionExample: Aplikasi untuk mempartisi data dalam shard di Hologres selama penulisan data real-time menggunakan antarmuka DataStream Flink. Ini membantu mengurangi jumlah file kecil secara signifikan di Hologres, meningkatkan kinerja penulisan, dan mengurangi beban sistem. Aplikasi ini cocok untuk skenario di mana Anda ingin mengimpor data ke beberapa tabel kosong yang dikonfigurasi dengan kunci utama pada saat yang sama. Aplikasi ini mencapai efek serupa dengan operasi INSERT OVERWRITE.
Opsi konektor Hologres
Anda dapat menggunakan konektor Hologres untuk menulis data dari Apache Flink ke Hologres. Tabel berikut menjelaskan parameter yang digunakan oleh konektor Hologres.
Opsi | Diperlukan | Deskripsi |
connector | Ya | Jenis tabel sink. Tetapkan nilainya ke hologres. |
dbname | Ya | Nama database Hologres. |
tablename | Ya | Nama tabel Hologres ke mana Anda ingin menulis data. |
username | Ya | ID AccessKey akun Alibaba Cloud. Anda dapat memperoleh ID AccessKey dari halaman Pasangan AccessKey. |
password | Ya | Rahasia AccessKey akun Alibaba Cloud. Anda dapat memperoleh Rahasia AccessKey dari halaman Pasangan AccessKey. |
endpoint | Ya | Titik akhir virtual private cloud (VPC) dari instance Hologres Anda. Anda dapat melihat titik akhir instance Hologres pada halaman detail instance di Konsol Hologres. Catatan Titik akhir harus menyertakan nomor port dan harus dalam format |
Khusus koneksi
Opsi
Diperlukan
Deskripsi
connectionSize
Tidak
Jumlah koneksi Java Database Connectivity (JDBC) dalam satu kolam koneksi yang dibuat oleh tugas Flink Hologres.
Nilai default: 3. Nilai ini sebanding dengan throughput.
connectionPoolName
Tidak
Nama kolam koneksi. Dalam TaskManager yang sama, tabel yang dikonfigurasikan dengan kolam koneksi yang sama dapat berbagi kolam koneksi.
Tidak ada nilai default. Setiap tabel menggunakan kolam koneksinya sendiri secara default. Jika Anda menentukan nama kolam koneksi, nilai parameter connectionSize semua tabel harus sama.
fixedConnectionMode
Tidak
Operasi penulisan data dan kueri poin tidak mengonsumsi koneksi. Fitur ini dirilis dalam versi beta dan memerlukan versi konektor 1.2.0 atau yang lebih baru serta versi Hologres 1.3 atau yang lebih baru.
Nilai default: false.
jdbcRetryCount
Tidak
Jumlah maksimum percobaan ulang yang diizinkan untuk menulis dan mengkueri data jika terjadi kegagalan koneksi.
Nilai default: 10.
jdbcRetrySleepInitMs
Tidak
Interval pelaksanaan percobaan ulang dihitung dengan menggunakan rumus berikut: Nilai parameter retrySleepInitMs + Jumlah percobaan ulang × Nilai parameter retrySleepStepMs.
Satuan: milidetik. Nilai default: 1000.
jdbcRetrySleepStepMs
Tidak
Interval pelaksanaan percobaan ulang dihitung dengan menggunakan rumus berikut: Nilai parameter retrySleepInitMs + Jumlah percobaan ulang × Nilai parameter retrySleepStepMs.
Satuan: milidetik. Nilai default: 5000.
jdbcConnectionMaxIdleMs
Tidak
Periode timeout idle yang berlaku untuk koneksi yang digunakan untuk penulisan data dan kueri poin. Jika periode timeout berakhir, koneksi dilepaskan.
Satuan: milidetik. Nilai default: 60000.
jdbcMetaCacheTTL
Tidak
Periode time-to-live (TTL) informasi skema tabel dalam cache.
Satuan: milidetik. Nilai default: 60000.
jdbcMetaAutoRefreshFactor
Tidak
Faktor yang menentukan kapan cache diperbarui secara otomatis. Jika sisa periode TTL informasi skema tabel dalam cache kurang dari hasil pembagian nilai parameter
jdbcMetaCacheTTLdengan nilai parameter ini, cache akan diperbarui secara otomatis.Nilai default: -1. Nilai default menunjukkan bahwa cache tidak disegarkan secara otomatis.
connection.ssl.mode
Tidak
Menentukan apakah akan mengaktifkan transmisi terenkripsi SSL. Nilai valid:
disable: Transmisi terenkripsi SSL dinonaktifkan. Ini adalah nilai default.
require: Transmisi terenkripsi SSL diaktifkan. Klien menggunakan transmisi terenkripsi SSL untuk mengenkripsi hanya koneksi yang digunakan untuk mentransmisikan data.
verify-ca: Transmisi terenkripsi SSL diaktifkan. Klien menggunakan transmisi terenkripsi SSL untuk mengenkripsi koneksi yang mentransmisikan data dan menggunakan sertifikat CA untuk memverifikasi server Hologres.
verify-full: Transmisi terenkripsi SSL diaktifkan. Klien menggunakan transmisi terenkripsi SSL untuk mengenkripsi koneksi yang mentransmisikan data, menggunakan sertifikat CA untuk memverifikasi server Hologres, dan memeriksa apakah CN atau Domain Name System (DNS) yang ditentukan dalam sertifikat CA konsisten dengan titik akhir Hologres yang ditentukan selama penyiapan koneksi.
connection.ssl.root-cert.location
Tidak
Lokasi sertifikat CA. Anda harus memastikan bahwa sertifikat CA diunggah ke klaster Apache Flink.
CatatanParameter ini diperlukan jika Anda mengatur parameter connection.ssl.mode ke verify-ca atau verify-full.
jdbcDirectConnect
Tidak
Menentukan apakah akan mengaktifkan mode koneksi langsung. Nilai valid:
false: dinonaktifkan. Ini adalah nilai default.
true: diaktifkan.
Jumlah data yang dapat ditulis menggunakan Apache Flink ditentukan oleh throughput yang diizinkan oleh titik akhir VPC. Jika parameter ini diatur ke true, sistem memeriksa apakah Apache Flink dapat langsung terhubung ke node FE Hologres dalam lingkungan saat ini. Jika Apache Flink dapat langsung terhubung ke node FE Hologres dalam lingkungan saat ini, koneksi langsung digunakan secara default.
Khusus sink
Parameter
Diperlukan
Deskripsi
mutatetype
Tidak
Mode penulisan data. Untuk informasi lebih lanjut, lihat bagian "Semantik streaming" di Buat tabel hasil Hologres.
Nilai default: insertorignore.
ignoredelete
Tidak
Menentukan apakah akan mengabaikan pesan retract. Dalam kebanyakan kasus, pesan retract dihasilkan oleh operasi GROUP BY di Flink. Ketika pesan retract ditransfer ke konektor Hologres, permintaan Delete dihasilkan.
Nilai default: true. Parameter ini hanya berlaku ketika semantik streaming digunakan.
createparttable
Tidak
Menentukan apakah akan membuat partisi secara otomatis berdasarkan nilai kunci partisi jika Anda menulis data ke tabel yang dipartisi.
false: Partisi tidak dibuat secara otomatis. Ini adalah nilai default.
true: Partisi dibuat secara otomatis.
Berhati-hatilah saat menggunakan parameter ini. Pastikan nilai kunci partisi tidak mengandung data kotor. Jika tidak, partisi tidak valid akan dibuat.
ignoreNullWhenUpdate
Tidak
Menentukan apakah nilai null yang ditulis dalam data diabaikan jika pengaturan
mutatetype='insertOrUpdate'digunakan.Nilai default: false.
jdbcWriteBatchSize
Tidak
Jumlah maksimum entri data yang dapat dikumpulkan dalam satu batch untuk node sink streaming Hologres.
Nilai default: 256.
jdbcWriteBatchByteSize
Tidak
Ukuran maksimum data yang dapat dikumpulkan dalam satu batch untuk node sink streaming Hologres dalam satu thread. Satuan: byte.
Nilai default: 2097152 (2 × 1024 × 1024), yaitu 2 MB.
jdbcWriteBatchTotalByteSize
Tidak
Ukuran maksimum data yang dapat dikumpulkan dalam satu batch untuk node sink streaming Hologres di semua thread. Satuan: byte.
Nilai default: 20971520 (20 × 1024 × 1024), yaitu 20 MB.
jdbcWriteFlushInterval
Tidak
Periode maksimum waktu menunggu operasi FLUSH selesai. Operasi FLUSH dilakukan untuk entri data yang dikumpulkan dalam satu batch untuk node sink streaming Hologres.
Satuan: milidetik. Nilai default: 10000, yaitu 10 detik.
jdbcUseLegacyPutHandler
Tidak
Sintaksis yang digunakan oleh pernyataan SQL. Nilai valid:
true: Sintaks
insert into xxx(c0,c1,...) values (?,,..),.. on conflict;digunakan.false: Sintaks
insert into xxx(c0,c1,...) select unnest(?),unnest(?),.. on conflictdigunakan. Ini adalah nilai default.
jdbcEnableDefaultForNotNullColumn
Tidak
Menentukan apakah akan mengonversi nilai null menjadi nilai default yang ditentukan ketika nilai null ditulis ke kolom yang menggunakan batasan NOT NULL dan tidak memiliki nilai default yang ditentukan. Jika kolom tujuan bertipe STRING, nilai null dikonversi menjadi string kosong (""). Jika kolom tujuan bertipe NUMBER, nilai null dikonversi menjadi 0. Jika kolom tujuan bertipe DATE, TIMESTAMP, atau TIMESTAMPTZ, nilai null dikonversi menjadi 1970-01-01 00:00:00.
Nilai default: true.
remove-u0000-in-text.enabled
Tidak
Menentukan apakah akan secara otomatis mengganti karakter u0000 tipe TEXT yang tidak dienkripsi dalam UTF-8. Nilai valid:
true: Karakter u0000 diganti.
false: Karakter u0000 tidak diganti. Ini adalah nilai default.
deduplication.enabled
Tidak
Menentukan apakah akan melakukan deduplikasi jika jumlah data yang ditulis pada saat yang sama mencakup catatan data dengan nilai kunci utama yang sama. Nilai valid:
true: Deduplikasi dilakukan. Hanya catatan data terakhir yang memiliki nilai kunci utama yang sama dengan catatan data lainnya yang dipertahankan. Ini adalah nilai default.
false: Deduplikasi tidak dilakukan.
Setelah data yang diproses secara batch ditulis, data yang perlu disisipkan ditulis.
CatatanJika deduplikasi dinonaktifkan, penulisan data batch mungkin tidak dilakukan dalam kasus ekstrem. Misalnya, jika semua data yang perlu ditulis memiliki nilai kunci utama yang sama, data tersebut tidak dapat ditulis pada saat yang sama. Hal ini berdampak negatif pada kinerja penulisan.
aggressive.enabled
Tidak
Menentukan apakah akan mengaktifkan mode commit agresif. Nilai valid:
true: diaktifkan.
Dalam mode commit agresif, sistem memaksa melakukan commit data ketika mendeteksi koneksi idle tanpa memperhatikan apakah jumlah catatan data yang diproses pada saat yang sama mencapai jumlah yang diharapkan. Dalam mode ini, latensi transmisi data dapat dikurangi ketika lalu lintas rendah.
false: dinonaktifkan. Ini adalah nilai default.
jdbcCopyWriteMode
Tidak
Menentukan mode penulisan data. Nilai yang valid:
true: Menggunakan bulk load. Jika opsi
jdbcCopyWriteModejuga disetel ketrue, pengaturan ini akan berlaku. Jika tidak, fixed copy digunakan.CatatanBulk load lebih efisien daripada fixed copy. Bulk load memanfaatkan sumber daya Hologres lebih efektif dan memberikan kinerja penulisan data yang lebih tinggi.
Memuat data secara massal ke tabel dengan kunci utama biasanya menyebabkan penguncian tabel. Untuk mencegah hal ini, setel
target-shards.enabledketrueuntuk mengurangi granularitas kunci ke tingkat shard. Hal ini memungkinkan beberapa pekerjaan bulk load berjalan secara konkuren dan mengurangi terjadinya penguncian tabel. Bulk load juga secara signifikan mengurangi beban pada instans Hologres dibandingkan dengan fixed copy. Pengujian menunjukkan pengurangan beban sekitar 66,7%.Jika Anda menggunakan bulk load untuk menulis ke tabel dengan kunci utama, tabel tersebut harus kosong. Hal ini untuk menghindari deduplikasi kunci utama dan degradasi kinerja selama proses penulisan.
false (default): Menulis data menggunakan metode INSERT.
CatatanHanya Hologres V1.3.1 dan yang lebih baru yang mendukung opsi ini.
jdbcCopyWriteFormat
Tidak
Menentukan apakah akan menggunakan protokol biner.
binary: Protokol biner digunakan, yang memberikan kecepatan transmisi lebih cepat. Ini adalah nilai default.
text: Mode teks digunakan.
Hanya Hologres V1.3.1 dan yang lebih baru yang mendukung parameter ini.
bulkLoad
Menentukan apakah bulk load digunakan untuk menulis data. Nilai yang valid:
true: Gunakan bulk load untuk menulis data. Nilai opsi ini hanya berlaku ketika
jdbcCopyWriteModedisetel ketrue.CatatanDibandingkan dengan fixed copy, bulk load memberikan pemanfaatan sumber daya yang lebih baik.
Secara default, bulk load hanya didukung saat Anda menulis data ke tabel tanpa kunci utama. Jika Anda menulis data ke tabel yang dikonfigurasi dengan kunci utama, kunci tingkat tabel mungkin diperoleh. Hal ini dapat menyebabkan pembatasan atau membuat penulisan data menjadi kompleks.
false (default): Tidak menggunakan bulk load.
CatatanHanya Hologres V1.4.0 dan yang lebih baru yang mendukung opsi ini.
target-shards.enabled
Menentukan apakah bulk load ke shard target diaktifkan. Nilai yang valid:
true
false (default)
CatatanHanya Hologres V1.4.1 dan yang lebih baru yang mendukung opsi ini.
Khusus kueri titik
Parameter
Diperlukan
Deskripsi
jdbcReadBatchSize
Tidak
Jumlah maksimum permintaan yang diizinkan dalam satu batch dalam satu thread untuk melakukan kueri poin berdasarkan tabel dimensi.
Nilai default: 128.
jdbcReadBatchQueueSize
Tidak
Jumlah maksimum permintaan yang diizinkan dalam antrian dalam satu thread untuk melakukan kueri poin berdasarkan tabel dimensi.
Nilai default: 256.
async
Tidak
Menentukan apakah akan menyinkronkan data dalam mode asinkron.
Nilai default: false. Dalam mode asinkron, beberapa permintaan dan respons diproses secara bersamaan. Oleh karena itu, permintaan berturut-turut tidak memblokir satu sama lain, dan throughput kueri ditingkatkan. Namun, permintaan tidak diproses dalam urutan absolut dalam mode asinkron.
cache
Tidak
Kebijakan cache.
Nilai default: None. Nilai valid: None dan LRU. None menunjukkan bahwa tidak ada data yang di-cache. LRU menunjukkan bahwa sebagian data dalam tabel dimensi di-cache. Sistem mencari catatan data dalam cache setiap kali menerima catatan data dari tabel sumber. Jika sistem tidak menemukan catatan dalam cache, sistem mencari catatan data dalam tabel dimensi fisik.
cachesize
Tidak
Jumlah maksimum catatan data yang dapat di-cache.
Nilai default: 10000. Jika Anda mengatur parameter cache ke LRU, Anda dapat mengonfigurasi parameter cachesize.
cachettlms
Tidak
Interval pembaruan cache. Satuan: milidetik.
Jika Anda mengatur parameter cache ke LRU, Anda dapat mengonfigurasi parameter cachettlms. Secara default, entri cache tidak kedaluwarsa.
cacheempty
Tidak
Menentukan apakah akan menyimpan cache data kueri JOIN yang hasil pengembaliannya kosong.
Nilai default: true. Nilai ini menunjukkan bahwa sistem menyimpan cache data kueri JOIN yang hasil pengembaliannya kosong. false: Sistem tidak menyimpan cache data kueri JOIN yang hasil pengembaliannya kosong.
Pemetaan Tipe Data
Untuk informasi tentang pemetaan tipe data antara Flink yang sepenuhnya dikelola dan Hologres, lihat bagian "Pemetaan Tipe Data antara Realtime Compute for Apache Flink atau Blink dan Hologres" di Tipe Data.