Topik ini menjelaskan cara menggunakan konektor Hologres untuk menyinkronkan data dalam pekerjaan ingesti data berbasis YAML.
Informasi latar belakang
Hologres adalah mesin gudang data real-time end-to-end yang mendukung ingesti data real-time berskala besar, pembaruan, dan analitik. Hologres menggunakan SQL standar dan kompatibel dengan protokol PostgreSQL, serta mendukung OLAP dan analisis ad hoc pada data berskala petabyte. Layanan ini menyediakan akses data online berlatensi rendah dan konkurensi tinggi, serta terintegrasi erat dengan MaxCompute, Flink, dan DataWorks untuk menyediakan solusi gudang data online dan offline yang lengkap. Tabel berikut mencantumkan kemampuan konektor YAML Hologres.
Kategori | Deskripsi |
Tipe yang didukung | Data ingestion sink |
Mode eksekusi | Mode streaming dan batch |
Format data | Tidak didukung |
Metrik pemantauan |
Catatan Untuk informasi lebih lanjut mengenai metrik tersebut, lihat Metrik pemantauan. |
Tipe API | YAML |
Dukungan untuk memperbarui atau menghapus data di tabel sink | Ya |
Fitur
Fitur | Deskripsi |
Menyinkronkan data penuh dan inkremental dari semua tabel dalam sebuah database—atau beberapa tabel—ke masing-masing tabel sink yang sesuai. | |
Saat menyinkronkan semua tabel dalam database, juga menyinkronkan perubahan skema—seperti penambahan, penghapusan, atau penggantian nama kolom—di setiap tabel sumber ke tabel sink-nya secara real time. | |
Menggunakan ekspresi reguler untuk mencocokkan tabel sumber di berbagai database sharded. Setelah data digabungkan, sinkronkan ke tabel sink downstream dengan nama yang sesuai. | |
Menulis data dari tabel upstream ke tabel partisi Hologres. | |
Memetakan tipe data upstream ke tipe data Hologres yang lebih luas menggunakan beberapa strategi pemetaan. |
Sintaks
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}Parameter
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Keterangan |
type | Tipe sink. | String | Ya | None | Atur ke |
name | Nama sink. | String | Tidak | None | None. |
dbname | Nama database. | String | Ya | None | None. |
username | Username. Masukkan ID AccessKey Akun Alibaba Cloud Anda. | String | Ya | None | Untuk informasi lebih lanjut, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey? Penting Untuk melindungi Pasangan Kunci Akses Anda, gunakan variabel untuk mengonfigurasi ID AccessKey. Untuk informasi lebih lanjut, lihat Variabel proyek. |
password | Password. Masukkan Rahasia AccessKey Akun Alibaba Cloud Anda. | String | Ya | None | |
endpoint | Titik akhir Hologres. | String | Ya | None | Untuk informasi lebih lanjut, lihat Titik akhir. |
jdbcRetryCount | Jumlah maksimum percobaan ulang untuk operasi tulis dan kueri jika koneksi gagal. | Integer | Tidak | 10 | None. |
jdbcRetrySleepInitMs | Waktu tunggu tetap sebelum setiap percobaan ulang. | Long | Tidak | 1000 | Unit: milidetik. Waktu tunggu aktual dihitung sebagai |
jdbcRetrySleepStepMs | Waktu tambahan yang ditambahkan sebelum setiap percobaan ulang. | Long | Tidak | 5000 | Unit: milidetik. Waktu tunggu aktual dihitung sebagai |
jdbcConnectionMaxIdleMs | Waktu idle maksimum untuk koneksi JDBC. | Long | Tidak | 60000 | Unit: milidetik. Jika koneksi tetap idle lebih lama dari nilai ini, koneksi akan ditutup dan dilepas. |
jdbcMetaCacheTTL | Waktu hidup (TTL) untuk informasi TableSchema yang di-cache. | Long | Tidak | 60000 | Unit: milidetik. |
jdbcMetaAutoRefreshFactor | Jika waktu cache yang tersisa kurang dari waktu pemicu, sistem akan merefresh cache secara otomatis. | Integer | Tidak | 4 | Waktu tersisa cache dihitung menggunakan rumus berikut: Waktu tersisa = Waktu hidup (TTL) - Waktu yang telah berlalu. Setelah cache direfresh secara otomatis, waktu yang telah berlalu diatur ulang menjadi 0. Waktu pemicu = jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor. |
mutatetype | Mode penulisan data. | String | Tidak | INSERT_OR_UPDATE | Jika kunci primary dikonfigurasi di tabel fisik Hologres, sink streaming Hologres menggunakan semantik tepat-sekali berdasarkan kunci primary. Jika beberapa catatan dengan kunci primary yang sama muncul, Anda harus menentukan parameter mutatetype untuk menentukan cara tabel sink diperbarui. Nilai valid untuk parameter mutatetype:
|
createparttable | Apakah akan membuat tabel partisi yang hilang secara otomatis berdasarkan nilai partisi. | Boolean | Tidak | false | None. |
sink.delete-strategy | Cara menangani pesan retraction. | String | Tidak | None | Nilai valid:
|
jdbcWriteBatchSize | Jumlah maksimum baris per batch saat menulis dalam mode JDBC. | Integer | Tidak | 256 | Unit: baris. Catatan Hanya salah satu dari jdbcWriteBatchSize, jdbcWriteBatchByteSize, atau jdbcWriteFlushInterval yang perlu dipenuhi untuk memicu penulisan. |
jdbcWriteBatchByteSize | Jumlah maksimum byte per batch saat menulis dalam mode JDBC. | Long | Tidak | 2097152 (2 × 1024 × 1024 byte), atau 2 MB | Catatan Hanya salah satu dari jdbcWriteBatchSize, jdbcWriteBatchByteSize, atau jdbcWriteFlushInterval yang perlu dipenuhi untuk memicu penulisan. |
jdbcWriteFlushInterval | Waktu tunggu maksimum sebelum mem-flush batch ke Hologres dalam mode JDBC. | Long | Tidak | 10000 | Unit: milidetik. Catatan Hanya salah satu dari jdbcWriteBatchSize, jdbcWriteBatchByteSize, atau jdbcWriteFlushInterval yang perlu dipenuhi untuk memicu penulisan. |
ignoreNullWhenUpdate | Apakah akan mengabaikan nilai null dalam penulisan pembaruan ketika mutatetype='insertOrUpdate'. | Boolean | Tidak | false | Nilai valid:
|
jdbcEnableDefaultForNotNullColumn | Apakah konektor akan mengisi nilai default saat menulis null ke kolom NOT NULL tanpa nilai default di tabel Hologres. | Boolean | Tidak | true | Nilai valid:
|
remove-u0000-in-text.enabled | Apakah akan menghapus karakter tidak valid \u0000 dari data STRING sebelum menulis. | Boolean | Tidak | false | Nilai valid:
|
deduplication.enabled | Apakah akan melakukan deduplikasi selama penulisan batch dalam mode JDBC atau jdbc_fixed. | Boolean | Tidak | true | Nilai valid:
|
sink.type-normalize-strategy | Strategi pemetaan data. | String | Tidak | STANDARD | Strategi yang digunakan saat sink Hologres mengonversi tipe data upstream ke tipe Hologres.
|
table_property.* | Properti tabel fisik Hologres. | String | Tidak | None | Saat membuat tabel Hologres, Anda dapat mengatur properti tabel fisik dalam klausa WITH. Pengaturan yang tepat membantu mengorganisasi dan mengkueri data secara efisien. Peringatan Nilai default table_property.distribution_key adalah kunci primary. Jangan mengubahnya kecuali benar-benar diperlukan—hal ini memengaruhi kebenaran penulisan. |
connection.ssl.mode | Apakah akan mengaktifkan enkripsi SSL dalam transit, dan mode mana yang digunakan. | String | Tidak | disable |
Catatan
|
connection.ssl.root-cert.location | Jalur ke file sertifikat saat enkripsi dalam transit memerlukan sertifikat. | String | Tidak | None | Wajib saat connection.ssl.mode diatur ke verify-ca atau verify-full. Unggah sertifikat CA menggunakan fitur File Management di Konsol Realtime Compute. File yang diunggah disimpan di /flink/usrlib. Misalnya, jika sertifikat CA Anda bernama certificate.crt, atur parameter ini ke Catatan Untuk petunjuk mendapatkan sertifikat CA, lihat Enkripsi dalam transit — Unduh sertifikat CA. |
Pemetaan tipe data
Gunakan parameter sink.type-normalize-strategy untuk menentukan cara tipe data upstream dipetakan ke tipe data Hologres.
Aktifkan sink.type-normalize-strategy saat memulai pekerjaan YAML untuk pertama kali. Jika diaktifkan setelah startup, hapus tabel downstream dan restart pekerjaan secara tanpa status agar pengaturan berlaku.
Tipe array yang didukung meliputi INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, dan VARCHAR.
Hologres tidak mendukung NUMERIC sebagai kunci primary. Jika kunci primary dipetakan ke NUMERIC, Hologres akan mengonversinya ke VARCHAR.
STANDARD
Saat sink.type-normalize-strategy diatur ke STANDARD, pemetaannya adalah sebagai berikut:
Tipe Flink CDC | Tipe Hologres |
CHAR | bpchar |
STRING | text |
VARCHAR | text (jika panjang > 10.485.760 byte) |
varchar (jika panjang ≤ 10.485.760 byte) | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int2 |
SMALLINT | |
INTEGER | int4 |
BIGINT | int8 |
FLOAT | float4 |
DOUBLE | float8 |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | Array dari berbagai tipe |
MAP | Tidak didukung |
ROW | Tidak didukung |
BROADEN
Saat sink.type-normalize-strategy diatur ke BROADEN, tipe Flink CDC dipetakan ke tipe Hologres yang lebih luas. Pemetaannya adalah sebagai berikut:
Tipe Flink CDC | Tipe Hologres |
CHAR | text |
STRING | |
VARCHAR | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
FLOAT | float8 |
DOUBLE | |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | Array dari berbagai tipe |
MAP | Tidak didukung |
ROW | Tidak didukung |
ONLY_BIGINT_OR_TEXT
Saat sink.type-normalize-strategy diatur ke ONLY_BIGINT_OR_TEXT, semua tipe Flink CDC dipetakan ke BIGINT atau STRING di Hologres. Pemetaannya adalah sebagai berikut:
Tipe Flink CDC | Tipe Hologres |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
BOOLEAN | text |
BINARY | |
VARBINARY | |
DECIMAL | |
FLOAT | |
DOUBLE | |
DATE | |
TIME_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | |
ARRAY | Array dari berbagai tipe |
MAP | Tidak didukung |
ROW | Tidak didukung |
Tulis ke tabel partisi
Sink Hologres mendukung penulisan ke tabel partisi. Gabungkan dengan modul transform untuk menulis data upstream ke tabel partisi Hologres. Perhatikan hal-hal berikut:
Kunci partisi harus merupakan bagian dari kunci primary. Menggunakan kunci non-primary dari upstream sebagai kunci partisi dapat menyebabkan ketidakkonsistenan kunci primary antara upstream dan downstream, yang berpotensi mengakibatkan inkonsistensi data selama sinkronisasi.
Anda dapat menggunakan kolom dengan tipe data TEXT, VARCHAR, dan INT sebagai kunci partisi. Di Hologres V1.3.22 dan versi yang lebih baru, Anda juga dapat menggunakan kolom dengan tipe data DATE sebagai kunci partisi.
Atur createparttable ke true untuk membuat tabel partisi anak secara otomatis. Jika tidak, buat secara manual.
Untuk contoh, lihat Tulis ke tabel partisi.
Sinkronisasi perubahan skema tabel
Pekerjaan pipeline YAML CDC menggunakan kebijakan berbeda untuk menangani evolusi skema. Kebijakan tersebut ditentukan melalui item konfigurasi tingkat pipeline schema.change.behavior. Nilai yang valid untuk schema.change.behavior adalah IGNORE, LENIENT, TRY_EVOLVE, EVOLVE, dan EXCEPTION. Sink Hologres tidak mendukung kebijakan TRY_EVOLVE, sedangkan kebijakan LENIENT dan EVOLVE melibatkan evolusi skema. Bagian berikut menjelaskan cara menangani berbagai event perubahan skema.
LENIENT (default)
Dalam mode LENIENT, perubahan skema ditangani sebagai berikut:
Menambahkan kolom nullable: kolom ditambahkan ke akhir skema tabel sink dan data disinkronkan ke kolom tersebut.
Menghapus kolom nullable: kolom diisi dengan null alih-alih dihapus dari tabel sink.
Menambahkan kolom non-nullable: kolom ditambahkan ke akhir skema tabel sink dan data disinkronkan ke kolom tersebut. Kolom baru secara default dianggap nullable, dan data sebelum penambahan kolom diisi dengan null.
Mengganti nama kolom: diperlakukan sebagai operasi tambah + hapus. Kolom yang diganti namanya ditambahkan ke akhir tabel sink, sedangkan kolom asli diisi dengan null. Misalnya, jika col_a diubah menjadi col_b, maka col_b ditambahkan dan col_a diisi dengan null.
Mengubah tipe kolom: tidak didukung. Hologres tidak mengizinkan perubahan tipe kolom; gunakan sink.type-normalize-strategy sebagai gantinya.
Perubahan skema berikut tidak didukung:
Perubahan pada constraint seperti kunci primary atau indeks.
Penghapusan kolom non-nullable.
Mengubah dari NOT NULL menjadi nullable.
EVOLVE
Dalam mode EVOLVE, perubahan skema ditangani sebagai berikut:
Menambahkan kolom nullable: didukung.
Menghapus kolom nullable: tidak didukung.
Kolom non-null ditambahkan ke tabel sink sebagai kolom nullable.
Mengganti nama kolom: didukung; nama kolom di tabel sink diperbarui sesuai.
Mengubah tipe kolom: tidak didukung. Hologres tidak mengizinkan perubahan tipe kolom; gunakan sink.type-normalize-strategy sebagai gantinya.
Perubahan skema berikut tidak didukung:
Perubahan pada constraint seperti kunci primary atau indeks.
Penghapusan kolom non-nullable.
Mengubah dari NOT NULL menjadi nullable.
Dalam mode EVOLVE, merestart pekerjaan secara tanpa status tanpa menghapus tabel sink dapat menyebabkan ketidakkonsistenan skema antara tabel upstream dan sink, yang berpotensi mengakibatkan kegagalan pekerjaan. Sesuaikan skema tabel sink secara manual.
Untuk contoh mengaktifkan mode EVOLVE, lihat Aktifkan mode EVOLVE.
Contoh kode
Pemetaan Tipe Luas
Gunakan parameter sink.type-normalize-strategy untuk memperluas pemetaan tipe data.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineTulis ke tabel partisi
Konversi field timestamp upstream create_time ke tipe date dan gunakan sebagai kunci partisi tabel Hologres.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Membuat tabel partisi yang hilang secara otomatis.
createparttable: true
transform:
- source-table: test_db.test_source_table
projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
primary-keys: id, create_time, partition_key
partition-keys: partition_key
description: add partition key
pipeline:
name: MySQL to Hologres PipelineAktifkan mode EVOLVE
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Membuat tabel partisi yang hilang secara otomatis.
createparttable: true
pipeline:
name: MySQL to Hologres Pipeline
schema.change.behavior: evolveSinkronisasi satu tabel
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineSinkronisasi semua tabel dalam database
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineGabungkan tabel sharded
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
sink.type-normalize-strategy: BROADEN
route:
# Gabungkan semua tabel sharded di MySQL test_db menjadi satu tabel Hologres: test_db.user.
- source-table: test_db.user\.*
sink-table: test_db.user
pipeline:
name: MySQL to Hologres PipelineSinkronisasi ke skema tertentu
Skema di Hologres berkorespondensi dengan database di MySQL. Anda dapat menentukan skema untuk tabel sink.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
sink.type-normalize-strategy: BROADEN
route:
# Sinkronkan semua tabel dari MySQL test_db ke skema Hologres test_db2, tanpa mengubah nama tabel.
- source-table: test_db.\.*
sink-table: test_db2.<>
replace-symbol: <>
pipeline:
name: MySQL to Hologres PipelineSinkronisasi tabel baru tanpa restart
Untuk menyinkronkan tabel yang baru dibuat secara real time selama pekerjaan berjalan, atur scan.binlog.newly-added-table.enabled ke true.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
# Tangkap data dari tabel baru yang dibuat selama pekerjaan berjalan.
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineRestart Tabel Eksisting yang Baru Ditambahkan
Jika Anda ingin menambahkan sinkronisasi untuk tabel eksisting, atur scan.newly-added-table.enabled = true dan restart pekerjaan.
Jika Anda sudah menggunakan scan.binlog.newly-added-table.enabled = true untuk menangkap tabel baru, jangan gunakan scan.newly-added-table.enabled = true lagi untuk menangkap tabel eksisting setelah restart. Melakukan hal tersebut menyebabkan data duplikat.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.startup.mode: initial
# Saat restart, periksa parameter tables untuk tabel baru dan jalankan snapshot.
# Memerlukan scan.startup.mode: initial.
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineKecualikan tabel selama sinkronisasi seluruh database
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
# Tabel yang cocok dengan regex ini tidak akan disinkronkan.
tables.exclude: test_db.table1
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
# Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres PipelineReferensi
Untuk referensi pengembangan mengenai source, sink, transform, dan route, lihat Referensi pengembangan ingesti data Flink CDC.
Untuk langkah-langkah mengembangkan pekerjaan ingesti data YAML, lihat Kembangkan pekerjaan ingesti data YAML (pratinjau publik).