全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor Hologres untuk ingesti data (YAML)

更新时间:Feb 07, 2026

Topik ini menjelaskan cara menggunakan konektor Hologres untuk menyinkronkan data dalam pekerjaan ingesti data berbasis YAML.

Informasi latar belakang

Hologres adalah mesin gudang data real-time end-to-end yang mendukung ingesti data real-time berskala besar, pembaruan, dan analitik. Hologres menggunakan SQL standar dan kompatibel dengan protokol PostgreSQL, serta mendukung OLAP dan analisis ad hoc pada data berskala petabyte. Layanan ini menyediakan akses data online berlatensi rendah dan konkurensi tinggi, serta terintegrasi erat dengan MaxCompute, Flink, dan DataWorks untuk menyediakan solusi gudang data online dan offline yang lengkap. Tabel berikut mencantumkan kemampuan konektor YAML Hologres.

Kategori

Deskripsi

Tipe yang didukung

Data ingestion sink

Mode eksekusi

Mode streaming dan batch

Format data

Tidak didukung

Metrik pemantauan

  • numRecordsOut

  • numRecordsOutPerSecond

Catatan

Untuk informasi lebih lanjut mengenai metrik tersebut, lihat Metrik pemantauan.

Tipe API

YAML

Dukungan untuk memperbarui atau menghapus data di tabel sink

Ya

Fitur

Fitur

Deskripsi

Sinkronisasi semua tabel dalam database

Menyinkronkan data penuh dan inkremental dari semua tabel dalam sebuah database—atau beberapa tabel—ke masing-masing tabel sink yang sesuai.

Sinkronisasi perubahan skema tabel

Saat menyinkronkan semua tabel dalam database, juga menyinkronkan perubahan skema—seperti penambahan, penghapusan, atau penggantian nama kolom—di setiap tabel sumber ke tabel sink-nya secara real time.

Gabungkan tabel sharded

Menggunakan ekspresi reguler untuk mencocokkan tabel sumber di berbagai database sharded. Setelah data digabungkan, sinkronkan ke tabel sink downstream dengan nama yang sesuai.

Tulis ke tabel partisi

Menulis data dari tabel upstream ke tabel partisi Hologres.

Pemetaan tipe data

Memetakan tipe data upstream ke tipe data Hologres yang lebih luas menggunakan beberapa strategi pemetaan.

Sintaks

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}

Parameter

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Keterangan

type

Tipe sink.

String

Ya

None

Atur ke hologres.

name

Nama sink.

String

Tidak

None

None.

dbname

Nama database.

String

Ya

None

None.

username

Username. Masukkan ID AccessKey Akun Alibaba Cloud Anda.

String

Ya

None

Untuk informasi lebih lanjut, lihat Bagaimana cara melihat ID AccessKey dan Rahasia AccessKey?

Penting

Untuk melindungi Pasangan Kunci Akses Anda, gunakan variabel untuk mengonfigurasi ID AccessKey. Untuk informasi lebih lanjut, lihat Variabel proyek.

password

Password. Masukkan Rahasia AccessKey Akun Alibaba Cloud Anda.

String

Ya

None

endpoint

Titik akhir Hologres.

String

Ya

None

Untuk informasi lebih lanjut, lihat Titik akhir.

jdbcRetryCount

Jumlah maksimum percobaan ulang untuk operasi tulis dan kueri jika koneksi gagal.

Integer

Tidak

10

None.

jdbcRetrySleepInitMs

Waktu tunggu tetap sebelum setiap percobaan ulang.

Long

Tidak

1000

Unit: milidetik. Waktu tunggu aktual dihitung sebagai jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs.

jdbcRetrySleepStepMs

Waktu tambahan yang ditambahkan sebelum setiap percobaan ulang.

Long

Tidak

5000

Unit: milidetik. Waktu tunggu aktual dihitung sebagai jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs.

jdbcConnectionMaxIdleMs

Waktu idle maksimum untuk koneksi JDBC.

Long

Tidak

60000

Unit: milidetik. Jika koneksi tetap idle lebih lama dari nilai ini, koneksi akan ditutup dan dilepas.

jdbcMetaCacheTTL

Waktu hidup (TTL) untuk informasi TableSchema yang di-cache.

Long

Tidak

60000

Unit: milidetik.

jdbcMetaAutoRefreshFactor

Jika waktu cache yang tersisa kurang dari waktu pemicu, sistem akan merefresh cache secara otomatis.

Integer

Tidak

4

Waktu tersisa cache dihitung menggunakan rumus berikut: Waktu tersisa = Waktu hidup (TTL) - Waktu yang telah berlalu. Setelah cache direfresh secara otomatis, waktu yang telah berlalu diatur ulang menjadi 0.

Waktu pemicu = jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor.

mutatetype

Mode penulisan data.

String

Tidak

INSERT_OR_UPDATE

Jika kunci primary dikonfigurasi di tabel fisik Hologres, sink streaming Hologres menggunakan semantik tepat-sekali berdasarkan kunci primary. Jika beberapa catatan dengan kunci primary yang sama muncul, Anda harus menentukan parameter mutatetype untuk menentukan cara tabel sink diperbarui. Nilai valid untuk parameter mutatetype:

  • INSERT_OR_IGNORE: menyimpan catatan pertama dan mengabaikan duplikat berikutnya.

  • INSERT_OR_REPLACE: mengganti seluruh baris yang ada dengan yang baru.

  • INSERT_OR_UPDATE: hanya memperbarui kolom tertentu. Misalnya, sebuah tabel memiliki bidang a, b, c, dan d. Bidang a adalah kunci primary. Hanya a dan b yang ditulis ke Hologres. Pada kunci primary yang duplikat, hanya b yang diperbarui. Bidang c dan d tetap tidak berubah.

createparttable

Apakah akan membuat tabel partisi yang hilang secara otomatis berdasarkan nilai partisi.

Boolean

Tidak

false

None.

sink.delete-strategy

Cara menangani pesan retraction.

String

Tidak

None

Nilai valid:

  • IGNORE_DELETE: mengabaikan pesan UPDATE BEFORE dan DELETE. Gunakan ini untuk skenario di mana Anda hanya menyisipkan atau memperbarui data, tetapi tidak pernah menghapus.

  • DELETE_ROW_ON_PK: menerapkan penghapusan berdasarkan kunci primary. Pembaruan dijalankan sebagai hapus-lalu-sisip untuk memastikan akurasi.

jdbcWriteBatchSize

Jumlah maksimum baris per batch saat menulis dalam mode JDBC.

Integer

Tidak

256

Unit: baris.

Catatan

Hanya salah satu dari jdbcWriteBatchSize, jdbcWriteBatchByteSize, atau jdbcWriteFlushInterval yang perlu dipenuhi untuk memicu penulisan.

jdbcWriteBatchByteSize

Jumlah maksimum byte per batch saat menulis dalam mode JDBC.

Long

Tidak

2097152 (2 × 1024 × 1024 byte), atau 2 MB

Catatan

Hanya salah satu dari jdbcWriteBatchSize, jdbcWriteBatchByteSize, atau jdbcWriteFlushInterval yang perlu dipenuhi untuk memicu penulisan.

jdbcWriteFlushInterval

Waktu tunggu maksimum sebelum mem-flush batch ke Hologres dalam mode JDBC.

Long

Tidak

10000

Unit: milidetik.

Catatan

Hanya salah satu dari jdbcWriteBatchSize, jdbcWriteBatchByteSize, atau jdbcWriteFlushInterval yang perlu dipenuhi untuk memicu penulisan.

ignoreNullWhenUpdate

Apakah akan mengabaikan nilai null dalam penulisan pembaruan ketika mutatetype='insertOrUpdate'.

Boolean

Tidak

false

Nilai valid:

  • false (default): menulis nilai null ke tabel sink Hologres.

  • true: mengabaikan nilai null dalam penulisan pembaruan.

jdbcEnableDefaultForNotNullColumn

Apakah konektor akan mengisi nilai default saat menulis null ke kolom NOT NULL tanpa nilai default di tabel Hologres.

Boolean

Tidak

true

Nilai valid:

  • true (default): memungkinkan konektor mengisi dan menulis nilai default. Aturan:

    • Jika kolom bertipe STRING, tulis string kosong ("").

    • Jika kolom bertipe NUMBER, tulis 0.

    • Jika kolom bertipe DATE, TIMESTAMP, atau TIMESTAMPTZ, tulis 1970-01-01 00:00:00.

  • false: tidak mengisi nilai default. Menulis null ke kolom NOT NULL akan menghasilkan error.

remove-u0000-in-text.enabled

Apakah akan menghapus karakter tidak valid \u0000 dari data STRING sebelum menulis.

Boolean

Tidak

false

Nilai valid:

  • false (default): Konektor tidak melakukan operasi apa pun pada data. Namun, jika menemui data kotor, operasi penulisan dapat melemparkan exception berikut: ERROR: invalid byte sequence for encoding "UTF8": 0x00

    Dalam kasus ini, Anda perlu menghapus data kotor dari tabel sumber terlebih dahulu atau menentukan logika pemrosesan data kotor dalam pernyataan SQL.

  • true: konektor menghapus \u0000 dari data STRING untuk mencegah error.

deduplication.enabled

Apakah akan melakukan deduplikasi selama penulisan batch dalam mode JDBC atau jdbc_fixed.

Boolean

Tidak

true

Nilai valid:

  • true (default): menghapus duplikat baris dengan kunci primary yang sama dalam satu batch. Hanya menyimpan baris yang tiba terakhir. Contoh: dua bidang, bidang pertama adalah PK.

    • INSERT (1,'a') dan INSERT (1,'b') tiba secara berurutan. Setelah deduplikasi, hanya (1,'b') yang ditulis ke tabel sink Hologres.

    • (1,'a') sudah ada di tabel sink. Kemudian DELETE (1,'a') dan INSERT (1,'b') tiba secara berurutan. Hanya (1,'b') yang ditulis. Ini berperilaku seperti pembaruan langsung—bukan hapus-lalu-sisip.

  • false: tidak melakukan deduplikasi selama batching. Jika data baru memiliki PK yang sama dengan data dalam batch saat ini, batch ditulis terlebih dahulu, lalu data baru ditulis.

sink.type-normalize-strategy

Strategi pemetaan data.

String

Tidak

STANDARD

Strategi yang digunakan saat sink Hologres mengonversi tipe data upstream ke tipe Hologres.

  • STANDARD: memetakan tipe Flink CDC ke tipe PostgreSQL sesuai standar.

  • BROADEN: memetakan tipe Flink CDC ke tipe Hologres yang lebih luas.

  • ONLY_BIGINT_OR_TEXT: memetakan semua tipe Flink CDC ke BIGINT atau STRING di Hologres.

table_property.*

Properti tabel fisik Hologres.

String

Tidak

None

Saat membuat tabel Hologres, Anda dapat mengatur properti tabel fisik dalam klausa WITH. Pengaturan yang tepat membantu mengorganisasi dan mengkueri data secara efisien.

Peringatan

Nilai default table_property.distribution_key adalah kunci primary. Jangan mengubahnya kecuali benar-benar diperlukan—hal ini memengaruhi kebenaran penulisan.

connection.ssl.mode

Apakah akan mengaktifkan enkripsi SSL dalam transit, dan mode mana yang digunakan.

String

Tidak

disable

  • disable (default): menonaktifkan enkripsi dalam transit.

  • require: mengaktifkan SSL dan hanya mengenkripsi tautan data.

  • verify-ca: mengaktifkan SSL, mengenkripsi tautan data, dan memverifikasi identitas server Hologres menggunakan sertifikat CA.

  • verify-full: mengaktifkan SSL, mengenkripsi tautan data, memverifikasi identitas server Hologres menggunakan sertifikat CA, dan memeriksa apakah CN atau DNS dalam sertifikat cocok dengan titik akhir Hologres yang dikonfigurasi.

Catatan
  • Versi Hologres 2.1 dan yang lebih baru mendukung verify-ca dan verify-full. Lihat Enkripsi dalam transit.

  • Saat menggunakan verify-ca atau verify-full, atur juga connection.ssl.root-cert.location.

connection.ssl.root-cert.location

Jalur ke file sertifikat saat enkripsi dalam transit memerlukan sertifikat.

String

Tidak

None

Wajib saat connection.ssl.mode diatur ke verify-ca atau verify-full. Unggah sertifikat CA menggunakan fitur File Management di Konsol Realtime Compute. File yang diunggah disimpan di /flink/usrlib. Misalnya, jika sertifikat CA Anda bernama certificate.crt, atur parameter ini ke '/flink/usrlib/certificate.crt'.

Catatan

Untuk petunjuk mendapatkan sertifikat CA, lihat Enkripsi dalam transit — Unduh sertifikat CA.

Pemetaan tipe data

Gunakan parameter sink.type-normalize-strategy untuk menentukan cara tipe data upstream dipetakan ke tipe data Hologres.

Catatan
  • Aktifkan sink.type-normalize-strategy saat memulai pekerjaan YAML untuk pertama kali. Jika diaktifkan setelah startup, hapus tabel downstream dan restart pekerjaan secara tanpa status agar pengaturan berlaku.

  • Tipe array yang didukung meliputi INTEGER, BIGINT, FLOAT, DOUBLE, BOOLEAN, CHAR, dan VARCHAR.

  • Hologres tidak mendukung NUMERIC sebagai kunci primary. Jika kunci primary dipetakan ke NUMERIC, Hologres akan mengonversinya ke VARCHAR.

STANDARD

Saat sink.type-normalize-strategy diatur ke STANDARD, pemetaannya adalah sebagai berikut:

Tipe Flink CDC

Tipe Hologres

CHAR

bpchar

STRING

text

VARCHAR

text (jika panjang > 10.485.760 byte)

varchar (jika panjang ≤ 10.485.760 byte)

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int2

SMALLINT

INTEGER

int4

BIGINT

int8

FLOAT

float4

DOUBLE

float8

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

Array dari berbagai tipe

MAP

Tidak didukung

ROW

Tidak didukung

BROADEN

Saat sink.type-normalize-strategy diatur ke BROADEN, tipe Flink CDC dipetakan ke tipe Hologres yang lebih luas. Pemetaannya adalah sebagai berikut:

Tipe Flink CDC

Tipe Hologres

CHAR

text

STRING

VARCHAR

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int8

SMALLINT

INTEGER

BIGINT

FLOAT

float8

DOUBLE

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

Array dari berbagai tipe

MAP

Tidak didukung

ROW

Tidak didukung

ONLY_BIGINT_OR_TEXT

Saat sink.type-normalize-strategy diatur ke ONLY_BIGINT_OR_TEXT, semua tipe Flink CDC dipetakan ke BIGINT atau STRING di Hologres. Pemetaannya adalah sebagai berikut:

Tipe Flink CDC

Tipe Hologres

TINYINT

int8

SMALLINT

INTEGER

BIGINT

BOOLEAN

text

BINARY

VARBINARY

DECIMAL

FLOAT

DOUBLE

DATE

TIME_WITHOUT_TIME_ZONE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_WITH_LOCAL_TIME_ZONE

ARRAY

Array dari berbagai tipe

MAP

Tidak didukung

ROW

Tidak didukung

Tulis ke tabel partisi

Sink Hologres mendukung penulisan ke tabel partisi. Gabungkan dengan modul transform untuk menulis data upstream ke tabel partisi Hologres. Perhatikan hal-hal berikut:

  • Kunci partisi harus merupakan bagian dari kunci primary. Menggunakan kunci non-primary dari upstream sebagai kunci partisi dapat menyebabkan ketidakkonsistenan kunci primary antara upstream dan downstream, yang berpotensi mengakibatkan inkonsistensi data selama sinkronisasi.

  • Anda dapat menggunakan kolom dengan tipe data TEXT, VARCHAR, dan INT sebagai kunci partisi. Di Hologres V1.3.22 dan versi yang lebih baru, Anda juga dapat menggunakan kolom dengan tipe data DATE sebagai kunci partisi.

  • Atur createparttable ke true untuk membuat tabel partisi anak secara otomatis. Jika tidak, buat secara manual.

Untuk contoh, lihat Tulis ke tabel partisi.

Sinkronisasi perubahan skema tabel

Pekerjaan pipeline YAML CDC menggunakan kebijakan berbeda untuk menangani evolusi skema. Kebijakan tersebut ditentukan melalui item konfigurasi tingkat pipeline schema.change.behavior. Nilai yang valid untuk schema.change.behavior adalah IGNORE, LENIENT, TRY_EVOLVE, EVOLVE, dan EXCEPTION. Sink Hologres tidak mendukung kebijakan TRY_EVOLVE, sedangkan kebijakan LENIENT dan EVOLVE melibatkan evolusi skema. Bagian berikut menjelaskan cara menangani berbagai event perubahan skema.

LENIENT (default)

Dalam mode LENIENT, perubahan skema ditangani sebagai berikut:

  • Menambahkan kolom nullable: kolom ditambahkan ke akhir skema tabel sink dan data disinkronkan ke kolom tersebut.

  • Menghapus kolom nullable: kolom diisi dengan null alih-alih dihapus dari tabel sink.

  • Menambahkan kolom non-nullable: kolom ditambahkan ke akhir skema tabel sink dan data disinkronkan ke kolom tersebut. Kolom baru secara default dianggap nullable, dan data sebelum penambahan kolom diisi dengan null.

  • Mengganti nama kolom: diperlakukan sebagai operasi tambah + hapus. Kolom yang diganti namanya ditambahkan ke akhir tabel sink, sedangkan kolom asli diisi dengan null. Misalnya, jika col_a diubah menjadi col_b, maka col_b ditambahkan dan col_a diisi dengan null.

  • Mengubah tipe kolom: tidak didukung. Hologres tidak mengizinkan perubahan tipe kolom; gunakan sink.type-normalize-strategy sebagai gantinya.

  • Perubahan skema berikut tidak didukung:

    • Perubahan pada constraint seperti kunci primary atau indeks.

    • Penghapusan kolom non-nullable.

    • Mengubah dari NOT NULL menjadi nullable.

EVOLVE

Dalam mode EVOLVE, perubahan skema ditangani sebagai berikut:

  • Menambahkan kolom nullable: didukung.

  • Menghapus kolom nullable: tidak didukung.

  • Kolom non-null ditambahkan ke tabel sink sebagai kolom nullable.

  • Mengganti nama kolom: didukung; nama kolom di tabel sink diperbarui sesuai.

  • Mengubah tipe kolom: tidak didukung. Hologres tidak mengizinkan perubahan tipe kolom; gunakan sink.type-normalize-strategy sebagai gantinya.

  • Perubahan skema berikut tidak didukung:

    • Perubahan pada constraint seperti kunci primary atau indeks.

    • Penghapusan kolom non-nullable.

    • Mengubah dari NOT NULL menjadi nullable.

Peringatan

Dalam mode EVOLVE, merestart pekerjaan secara tanpa status tanpa menghapus tabel sink dapat menyebabkan ketidakkonsistenan skema antara tabel upstream dan sink, yang berpotensi mengakibatkan kegagalan pekerjaan. Sesuaikan skema tabel sink secara manual.

Untuk contoh mengaktifkan mode EVOLVE, lihat Aktifkan mode EVOLVE.

Contoh kode

Pemetaan Tipe Luas

Gunakan parameter sink.type-normalize-strategy untuk memperluas pemetaan tipe data.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Tulis ke tabel partisi

Konversi field timestamp upstream create_time ke tipe date dan gunakan sebagai kunci partisi tabel Hologres.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Membuat tabel partisi yang hilang secara otomatis.
  createparttable: true
 
transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: add partition key 

pipeline:
  name: MySQL to Hologres Pipeline

Aktifkan mode EVOLVE

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Membuat tabel partisi yang hilang secara otomatis.
  createparttable: true

pipeline:
  name: MySQL to Hologres Pipeline
  schema.change.behavior: evolve

Sinkronisasi satu tabel

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Sinkronisasi semua tabel dalam database

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Gabungkan tabel sharded

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Memetakan tipe data CDC ke tipe Hologres yang lebih luas.  
  sink.type-normalize-strategy: BROADEN
  
route:
  # Gabungkan semua tabel sharded di MySQL test_db menjadi satu tabel Hologres: test_db.user.
  - source-table: test_db.user\.*
    sink-table: test_db.user

pipeline:
  name: MySQL to Hologres Pipeline

Sinkronisasi ke skema tertentu

Skema di Hologres berkorespondensi dengan database di MySQL. Anda dapat menentukan skema untuk tabel sink.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
  sink.type-normalize-strategy: BROADEN
  
route:
  # Sinkronkan semua tabel dari MySQL test_db ke skema Hologres test_db2, tanpa mengubah nama tabel.
  - source-table: test_db.\.*
    sink-table: test_db2.<>
    replace-symbol: <>

pipeline:
  name: MySQL to Hologres Pipeline

Sinkronisasi tabel baru tanpa restart

Untuk menyinkronkan tabel yang baru dibuat secara real time selama pekerjaan berjalan, atur scan.binlog.newly-added-table.enabled ke true.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  # Tangkap data dari tabel baru yang dibuat selama pekerjaan berjalan.
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Restart Tabel Eksisting yang Baru Ditambahkan

Jika Anda ingin menambahkan sinkronisasi untuk tabel eksisting, atur scan.newly-added-table.enabled = true dan restart pekerjaan.

Peringatan

Jika Anda sudah menggunakan scan.binlog.newly-added-table.enabled = true untuk menangkap tabel baru, jangan gunakan scan.newly-added-table.enabled = true lagi untuk menangkap tabel eksisting setelah restart. Melakukan hal tersebut menyebabkan data duplikat.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.startup.mode: initial
  # Saat restart, periksa parameter tables untuk tabel baru dan jalankan snapshot.
  # Memerlukan scan.startup.mode: initial.
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Kecualikan tabel selama sinkronisasi seluruh database

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  # Tabel yang cocok dengan regex ini tidak akan disinkronkan.
  tables.exclude: test_db.table1
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  # Memetakan tipe data CDC ke tipe Hologres yang lebih luas.
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

Referensi