全部产品
Search
文档中心

Hologres:Menulis data dari Apache Flink 1.11 atau yang lebih baru ke Hologres secara real time

更新时间:Nov 20, 2025

Topik ini menjelaskan cara menulis data dari Apache Flink 1.11 atau yang lebih baru ke Hologres secara real-time.

Prasyarat

  • Sebuah instance Hologres telah dibuat dan alat pengembangan digunakan untuk terhubung ke instance tersebut. Untuk informasi lebih lanjut, lihat Terhubung ke HoloWeb.

  • Sebuah klaster Apache Flink telah dibuat. Dalam contoh ini, klaster Apache Flink 1.15 digunakan. Anda dapat mengunduh file biner dari situs resmi Apache Flink dan membuat klaster Apache Flink yang beroperasi dalam mode standalone. Untuk informasi lebih lanjut, lihat Instalasi Lokal Flink.

Latar Belakang

Kode konektor Hologres telah dibuka sumbernya sejak Apache Flink 1.11 dan versi yang lebih baru. Paket rilis konektor Hologres yang sesuai dengan berbagai versi Apache Flink tersedia di Repositori Maven. File pom.xml berikut memberikan contoh konfigurasinya.

<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-flink-1.15</artifactId>
    <version>1.4.0</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

Tabel berikut menjelaskan pemetaan versi antara Apache Flink dan konektor Hologres. Kami merekomendasikan penggunaan versi 1.15 atau yang lebih baru untuk memanfaatkan lebih banyak fitur.

Versi Apache Flink

Versi konektor Hologres

Apache Flink 1.11

hologres-connector-flink-1.11:1.0.1

Apache Flink 1.12

hologres-connector-flink-1.12:1.0.1

Apache Flink 1.13

hologres-connector-flink-1.13:1.3.2

Apache Flink 1.14

hologres-connector-flink-1.14:1.3.2

Apache Flink 1.15

hologres-connector-flink-1.15:1.4.1

Apache Flink 1.17

hologres-connector-flink-1.17:1.4.1

Kode Contoh

Anda dapat mengeksekusi pernyataan SQL Flink dengan sintaks berikut untuk menulis data ke Hologres.

        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");

  • FlinkSQLToHoloExample: Aplikasi untuk menulis data ke Hologres menggunakan antarmuka SQL Flink.

  • FlinkDSAndSQLToHoloExample: Aplikasi untuk mentransformasikan aliran data menjadi tabel dan menuliskannya ke Hologres. Antarmuka DataStream Flink digunakan untuk transformasi, sementara antarmuka SQL Flink digunakan untuk penulisan data.

  • FlinkDataStreamToHoloExample: Aplikasi untuk menulis aliran data ke Hologres menggunakan antarmuka DataStream Flink.

  • FlinkRoaringBitmapAggJob: Aplikasi untuk menghitung jumlah pengunjung unik (UV) setelah deduplikasi secara real-time dan menulis statistik ke Hologres. Aplikasi ini menggunakan Flink, roaring bitmaps, dan tabel dimensi Hologres.

  • FlinkToHoloRePartitionExample: Aplikasi untuk mempartisi data dalam shard di Hologres selama penulisan data real-time menggunakan antarmuka DataStream Flink. Ini membantu mengurangi jumlah file kecil secara signifikan di Hologres, meningkatkan kinerja penulisan, dan mengurangi beban sistem. Aplikasi ini cocok untuk skenario di mana Anda ingin mengimpor data ke beberapa tabel kosong yang dikonfigurasi dengan kunci utama pada saat yang sama. Aplikasi ini mencapai efek serupa dengan operasi INSERT OVERWRITE.

Opsi konektor Hologres

Anda dapat menggunakan konektor Hologres untuk menulis data dari Apache Flink ke Hologres. Tabel berikut menjelaskan parameter yang digunakan oleh konektor Hologres.

Opsi

Diperlukan

Deskripsi

connector

Ya

Jenis tabel sink. Tetapkan nilainya ke hologres.

dbname

Ya

Nama database Hologres.

tablename

Ya

Nama tabel Hologres ke mana Anda ingin menulis data.

username

Ya

ID AccessKey akun Alibaba Cloud.

Anda dapat memperoleh ID AccessKey dari halaman Pasangan AccessKey.

password

Ya

Rahasia AccessKey akun Alibaba Cloud.

Anda dapat memperoleh Rahasia AccessKey dari halaman Pasangan AccessKey.

endpoint

Ya

Titik akhir virtual private cloud (VPC) dari instance Hologres Anda. Anda dapat melihat titik akhir instance Hologres pada halaman detail instance di Konsol Hologres.

Catatan

Titik akhir harus menyertakan nomor port dan harus dalam format Alamat IP:Nomor Port. Jika instans Hologres dan kluster Apache Flink berada di wilayah yang sama, gunakan titik akhir VPC instans Hologres Anda. Jika instans Hologres dan kluster Apache Flink berada di wilayah yang berbeda, gunakan titik akhir publik instans Hologres Anda.

  • Khusus koneksi

    Opsi

    Diperlukan

    Deskripsi

    connectionSize

    Tidak

    Jumlah koneksi Java Database Connectivity (JDBC) dalam satu kolam koneksi yang dibuat oleh tugas Flink Hologres.

    Nilai default: 3. Nilai ini sebanding dengan throughput.

    connectionPoolName

    Tidak

    Nama kolam koneksi. Dalam TaskManager yang sama, tabel yang dikonfigurasikan dengan kolam koneksi yang sama dapat berbagi kolam koneksi.

    Tidak ada nilai default. Setiap tabel menggunakan kolam koneksinya sendiri secara default. Jika Anda menentukan nama kolam koneksi, nilai parameter connectionSize semua tabel harus sama.

    fixedConnectionMode

    Tidak

    Operasi penulisan data dan kueri poin tidak mengonsumsi koneksi. Fitur ini dirilis dalam versi beta dan memerlukan versi konektor 1.2.0 atau yang lebih baru serta versi Hologres 1.3 atau yang lebih baru.

    Nilai default: false.

    jdbcRetryCount

    Tidak

    Jumlah maksimum percobaan ulang yang diizinkan untuk menulis dan mengkueri data jika terjadi kegagalan koneksi.

    Nilai default: 10.

    jdbcRetrySleepInitMs

    Tidak

    Interval pelaksanaan percobaan ulang dihitung dengan menggunakan rumus berikut: Nilai parameter retrySleepInitMs + Jumlah percobaan ulang × Nilai parameter retrySleepStepMs.

    Satuan: milidetik. Nilai default: 1000.

    jdbcRetrySleepStepMs

    Tidak

    Interval pelaksanaan percobaan ulang dihitung dengan menggunakan rumus berikut: Nilai parameter retrySleepInitMs + Jumlah percobaan ulang × Nilai parameter retrySleepStepMs.

    Satuan: milidetik. Nilai default: 5000.

    jdbcConnectionMaxIdleMs

    Tidak

    Periode timeout idle yang berlaku untuk koneksi yang digunakan untuk penulisan data dan kueri poin. Jika periode timeout berakhir, koneksi dilepaskan.

    Satuan: milidetik. Nilai default: 60000.

    jdbcMetaCacheTTL

    Tidak

    Periode time-to-live (TTL) informasi skema tabel dalam cache.

    Satuan: milidetik. Nilai default: 60000.

    jdbcMetaAutoRefreshFactor

    Tidak

    Faktor yang menentukan kapan cache diperbarui secara otomatis. Jika sisa periode TTL informasi skema tabel dalam cache kurang dari hasil pembagian nilai parameter jdbcMetaCacheTTL dengan nilai parameter ini, cache akan diperbarui secara otomatis.

    Nilai default: -1. Nilai default menunjukkan bahwa cache tidak disegarkan secara otomatis.

    connection.ssl.mode

    Tidak

    Menentukan apakah akan mengaktifkan transmisi terenkripsi SSL. Nilai valid:

    • disable: Transmisi terenkripsi SSL dinonaktifkan. Ini adalah nilai default.

    • require: Transmisi terenkripsi SSL diaktifkan. Klien menggunakan transmisi terenkripsi SSL untuk mengenkripsi hanya koneksi yang digunakan untuk mentransmisikan data.

    • verify-ca: Transmisi terenkripsi SSL diaktifkan. Klien menggunakan transmisi terenkripsi SSL untuk mengenkripsi koneksi yang mentransmisikan data dan menggunakan sertifikat CA untuk memverifikasi server Hologres.

    • verify-full: Transmisi terenkripsi SSL diaktifkan. Klien menggunakan transmisi terenkripsi SSL untuk mengenkripsi koneksi yang mentransmisikan data, menggunakan sertifikat CA untuk memverifikasi server Hologres, dan memeriksa apakah CN atau Domain Name System (DNS) yang ditentukan dalam sertifikat CA konsisten dengan titik akhir Hologres yang ditentukan selama penyiapan koneksi.

    connection.ssl.root-cert.location

    Tidak

    Lokasi sertifikat CA. Anda harus memastikan bahwa sertifikat CA diunggah ke klaster Apache Flink.

    Catatan

    Parameter ini diperlukan jika Anda mengatur parameter connection.ssl.mode ke verify-ca atau verify-full.

    jdbcDirectConnect

    Tidak

    Menentukan apakah akan mengaktifkan mode koneksi langsung. Nilai valid:

    • false: dinonaktifkan. Ini adalah nilai default.

    • true: diaktifkan.

      Jumlah data yang dapat ditulis menggunakan Apache Flink ditentukan oleh throughput yang diizinkan oleh titik akhir VPC. Jika parameter ini diatur ke true, sistem memeriksa apakah Apache Flink dapat langsung terhubung ke node FE Hologres dalam lingkungan saat ini. Jika Apache Flink dapat langsung terhubung ke node FE Hologres dalam lingkungan saat ini, koneksi langsung digunakan secara default.

  • Khusus sink

    Parameter

    Diperlukan

    Deskripsi

    mutatetype

    Tidak

    Mode penulisan data. Untuk informasi lebih lanjut, lihat bagian "Semantik streaming" di Buat tabel hasil Hologres.

    Nilai default: insertorignore.

    ignoredelete

    Tidak

    Menentukan apakah akan mengabaikan pesan retract. Dalam kebanyakan kasus, pesan retract dihasilkan oleh operasi GROUP BY di Flink. Ketika pesan retract ditransfer ke konektor Hologres, permintaan Delete dihasilkan.

    Nilai default: true. Parameter ini hanya berlaku ketika semantik streaming digunakan.

    createparttable

    Tidak

    Menentukan apakah akan membuat partisi secara otomatis berdasarkan nilai kunci partisi jika Anda menulis data ke tabel yang dipartisi.

    • false: Partisi tidak dibuat secara otomatis. Ini adalah nilai default.

    • true: Partisi dibuat secara otomatis.

    Berhati-hatilah saat menggunakan parameter ini. Pastikan nilai kunci partisi tidak mengandung data kotor. Jika tidak, partisi tidak valid akan dibuat.

    ignoreNullWhenUpdate

    Tidak

    Menentukan apakah nilai null yang ditulis dalam data diabaikan jika pengaturan mutatetype='insertOrUpdate' digunakan.

    Nilai default: false.

    jdbcWriteBatchSize

    Tidak

    Jumlah maksimum entri data yang dapat dikumpulkan dalam satu batch untuk node sink streaming Hologres.

    Nilai default: 256.

    jdbcWriteBatchByteSize

    Tidak

    Ukuran maksimum data yang dapat dikumpulkan dalam satu batch untuk node sink streaming Hologres dalam satu thread. Satuan: byte.

    Nilai default: 2097152 (2 × 1024 × 1024), yaitu 2 MB.

    jdbcWriteBatchTotalByteSize

    Tidak

    Ukuran maksimum data yang dapat dikumpulkan dalam satu batch untuk node sink streaming Hologres di semua thread. Satuan: byte.

    Nilai default: 20971520 (20 × 1024 × 1024), yaitu 20 MB.

    jdbcWriteFlushInterval

    Tidak

    Periode maksimum waktu menunggu operasi FLUSH selesai. Operasi FLUSH dilakukan untuk entri data yang dikumpulkan dalam satu batch untuk node sink streaming Hologres.

    Satuan: milidetik. Nilai default: 10000, yaitu 10 detik.

    jdbcUseLegacyPutHandler

    Tidak

    Sintaksis yang digunakan oleh pernyataan SQL. Nilai valid:

    • true: Sintaks insert into xxx(c0,c1,...) values (?,,..),.. on conflict; digunakan.

    • false: Sintaks insert into xxx(c0,c1,...) select unnest(?),unnest(?),.. on conflict digunakan. Ini adalah nilai default.

    jdbcEnableDefaultForNotNullColumn

    Tidak

    Menentukan apakah akan mengonversi nilai null menjadi nilai default yang ditentukan ketika nilai null ditulis ke kolom yang menggunakan batasan NOT NULL dan tidak memiliki nilai default yang ditentukan. Jika kolom tujuan bertipe STRING, nilai null dikonversi menjadi string kosong (""). Jika kolom tujuan bertipe NUMBER, nilai null dikonversi menjadi 0. Jika kolom tujuan bertipe DATE, TIMESTAMP, atau TIMESTAMPTZ, nilai null dikonversi menjadi 1970-01-01 00:00:00.

    Nilai default: true.

    remove-u0000-in-text.enabled

    Tidak

    Menentukan apakah akan secara otomatis mengganti karakter u0000 tipe TEXT yang tidak dienkripsi dalam UTF-8. Nilai valid:

    • true: Karakter u0000 diganti.

    • false: Karakter u0000 tidak diganti. Ini adalah nilai default.

    deduplication.enabled

    Tidak

    Menentukan apakah akan melakukan deduplikasi jika jumlah data yang ditulis pada saat yang sama mencakup catatan data dengan nilai kunci utama yang sama. Nilai valid:

    • true: Deduplikasi dilakukan. Hanya catatan data terakhir yang memiliki nilai kunci utama yang sama dengan catatan data lainnya yang dipertahankan. Ini adalah nilai default.

    • false: Deduplikasi tidak dilakukan.

      Setelah data yang diproses secara batch ditulis, data yang perlu disisipkan ditulis.

      Catatan

      Jika deduplikasi dinonaktifkan, penulisan data batch mungkin tidak dilakukan dalam kasus ekstrem. Misalnya, jika semua data yang perlu ditulis memiliki nilai kunci utama yang sama, data tersebut tidak dapat ditulis pada saat yang sama. Hal ini berdampak negatif pada kinerja penulisan.

    aggressive.enabled

    Tidak

    Menentukan apakah akan mengaktifkan mode commit agresif. Nilai valid:

    • true: diaktifkan.

      Dalam mode commit agresif, sistem memaksa melakukan commit data ketika mendeteksi koneksi idle tanpa memperhatikan apakah jumlah catatan data yang diproses pada saat yang sama mencapai jumlah yang diharapkan. Dalam mode ini, latensi transmisi data dapat dikurangi ketika lalu lintas rendah.

    • false: dinonaktifkan. Ini adalah nilai default.

    jdbcCopyWriteMode

    Tidak

    Menentukan mode penulisan data. Nilai yang valid:

    • true: Menggunakan bulk load. Jika opsi jdbcCopyWriteMode juga disetel ke true, pengaturan ini akan berlaku. Jika tidak, fixed copy digunakan.

      Catatan
      • Bulk load lebih efisien daripada fixed copy. Bulk load memanfaatkan sumber daya Hologres lebih efektif dan memberikan kinerja penulisan data yang lebih tinggi.

      • Memuat data secara massal ke tabel dengan kunci utama biasanya menyebabkan penguncian tabel. Untuk mencegah hal ini, setel target-shards.enabled ke true untuk mengurangi granularitas kunci ke tingkat shard. Hal ini memungkinkan beberapa pekerjaan bulk load berjalan secara konkuren dan mengurangi terjadinya penguncian tabel. Bulk load juga secara signifikan mengurangi beban pada instans Hologres dibandingkan dengan fixed copy. Pengujian menunjukkan pengurangan beban sekitar 66,7%.

      • Jika Anda menggunakan bulk load untuk menulis ke tabel dengan kunci utama, tabel tersebut harus kosong. Hal ini untuk menghindari deduplikasi kunci utama dan degradasi kinerja selama proses penulisan.

    • false (default): Menulis data menggunakan metode INSERT.

    Catatan

    Hanya Hologres V1.3.1 dan yang lebih baru yang mendukung opsi ini.

    jdbcCopyWriteFormat

    Tidak

    Menentukan apakah akan menggunakan protokol biner.

    • binary: Protokol biner digunakan, yang memberikan kecepatan transmisi lebih cepat. Ini adalah nilai default.

    • text: Mode teks digunakan.

    Hanya Hologres V1.3.1 dan yang lebih baru yang mendukung parameter ini.

    bulkLoad

    Menentukan apakah bulk load digunakan untuk menulis data. Nilai yang valid:

    • true: Gunakan bulk load untuk menulis data. Nilai opsi ini hanya berlaku ketika jdbcCopyWriteMode disetel ke true.

      Catatan
      • Dibandingkan dengan fixed copy, bulk load memberikan pemanfaatan sumber daya yang lebih baik.

      • Secara default, bulk load hanya didukung saat Anda menulis data ke tabel tanpa kunci utama. Jika Anda menulis data ke tabel yang dikonfigurasi dengan kunci utama, kunci tingkat tabel mungkin diperoleh. Hal ini dapat menyebabkan pembatasan atau membuat penulisan data menjadi kompleks.

    • false (default): Tidak menggunakan bulk load.

    Catatan

    Hanya Hologres V1.4.0 dan yang lebih baru yang mendukung opsi ini.

    target-shards.enabled

    Menentukan apakah bulk load ke shard target diaktifkan. Nilai yang valid:

    • true

    • false (default)

    Catatan

    Hanya Hologres V1.4.1 dan yang lebih baru yang mendukung opsi ini.

  • Khusus kueri titik

    Parameter

    Diperlukan

    Deskripsi

    jdbcReadBatchSize

    Tidak

    Jumlah maksimum permintaan yang diizinkan dalam satu batch dalam satu thread untuk melakukan kueri poin berdasarkan tabel dimensi.

    Nilai default: 128.

    jdbcReadBatchQueueSize

    Tidak

    Jumlah maksimum permintaan yang diizinkan dalam antrian dalam satu thread untuk melakukan kueri poin berdasarkan tabel dimensi.

    Nilai default: 256.

    async

    Tidak

    Menentukan apakah akan menyinkronkan data dalam mode asinkron.

    Nilai default: false. Dalam mode asinkron, beberapa permintaan dan respons diproses secara bersamaan. Oleh karena itu, permintaan berturut-turut tidak memblokir satu sama lain, dan throughput kueri ditingkatkan. Namun, permintaan tidak diproses dalam urutan absolut dalam mode asinkron.

    cache

    Tidak

    Kebijakan cache.

    Nilai default: None. Nilai valid: None dan LRU. None menunjukkan bahwa tidak ada data yang di-cache. LRU menunjukkan bahwa sebagian data dalam tabel dimensi di-cache. Sistem mencari catatan data dalam cache setiap kali menerima catatan data dari tabel sumber. Jika sistem tidak menemukan catatan dalam cache, sistem mencari catatan data dalam tabel dimensi fisik.

    cachesize

    Tidak

    Jumlah maksimum catatan data yang dapat di-cache.

    Nilai default: 10000. Jika Anda mengatur parameter cache ke LRU, Anda dapat mengonfigurasi parameter cachesize.

    cachettlms

    Tidak

    Interval pembaruan cache. Satuan: milidetik.

    Jika Anda mengatur parameter cache ke LRU, Anda dapat mengonfigurasi parameter cachettlms. Secara default, entri cache tidak kedaluwarsa.

    cacheempty

    Tidak

    Menentukan apakah akan menyimpan cache data kueri JOIN yang hasil pengembaliannya kosong.

    Nilai default: true. Nilai ini menunjukkan bahwa sistem menyimpan cache data kueri JOIN yang hasil pengembaliannya kosong. false: Sistem tidak menyimpan cache data kueri JOIN yang hasil pengembaliannya kosong.

  • Pemetaan Tipe Data

    Untuk informasi tentang pemetaan tipe data antara Flink yang sepenuhnya dikelola dan Hologres, lihat bagian "Pemetaan Tipe Data antara Realtime Compute for Apache Flink atau Blink dan Hologres" di Tipe Data.