全部产品
Search
文档中心

Realtime Compute for Apache Flink:Ingesti Data Real-time ke Gudang Data

更新时间:Aug 09, 2025

Topik ini menjelaskan praktik terbaik untuk menggunakan pekerjaan ingesti data CDC YAML dalam menulis data real-time ke gudang data.

Sinkronisasi real-time dari Flink CDC ke gudang data Hologres

Menggunakan pekerjaan ingesti data CDC YAML untuk menyinkronkan data ke Hologres dalam membangun gudang data real-time memungkinkan Anda memanfaatkan sepenuhnya kemampuan pemrosesan real-time yang kuat dari Flink dan fitur-fitur Hologres, seperti binary logging, penyimpanan hibrid baris-kolom, serta isolasi sumber daya yang kuat. Kombinasi ini mendukung pemrosesan dan analisis data real-time yang efisien dan skalabel.

Sinkronisasi real-time dari MySQL ke gudang data Hologres

Kode berikut menunjukkan pekerjaan ingesti data CDC YAML paling dasar untuk menyinkronkan seluruh database MySQL ke Hologres:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan komentar tabel dan komentar bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi kesalahan TaskManager OutOfMemory.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi kesalahan TaskManager OutOfMemory.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan data.
  scan.only.deserialize.captured.tables.changelog.enabled: true  

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
Catatan
  • Disarankan untuk mengatur include-comments.enabled di MySQL Source agar komentar tabel dan bidang disinkronkan saat pekerjaan dimulai untuk pertama kalinya. Untuk informasi lebih lanjut, lihat MySQL.

  • Disarankan untuk mengatur scan.incremental.snapshot.unbounded-chunk-first.enabled di MySQL Source guna mencegah potensi kesalahan TaskManager OutOfMemory saat pekerjaan dimulai untuk pertama kalinya. Untuk informasi lebih lanjut, lihat MySQL.

Gunakan pemetaan tipe yang lebih toleran

Konektor Hologres tidak dapat memproses peristiwa perubahan tipe kolom, tetapi mendukung beberapa hubungan pemetaan tipe. Untuk mendukung perubahan pada sumber data dengan lebih baik, Anda dapat memetakan beberapa tipe data MySQL ke tipe Hologres yang lebih luas. Hal ini memungkinkan Anda melewati peristiwa perubahan tipe yang tidak perlu dan memastikan pekerjaan berjalan sesuai harapan. Pemetaan dapat diubah menggunakan item konfigurasi sink.type-normalize-strategy, dengan nilai default STANDARD. Untuk informasi lebih lanjut, lihat Pemetaan tipe konektor Hologres untuk pekerjaan YAML ingesti data.

Sebagai contoh, Anda dapat menggunakan ONLY_BIGINT_OR_TEXT untuk memetakan tipe hanya ke tipe int8 dan text di Hologres. Dalam hal ini, jika tipe kolom di MySQL berubah dari INT menjadi BIGINT, Hologres akan memetakan kedua tipe tersebut ke tipe int8. Akibatnya, pekerjaan tidak melaporkan kesalahan untuk konversi tipe yang tidak didukung.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan komentar tabel dan komentar bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi kesalahan TaskManager OutOfMemory.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi kesalahan TaskManager OutOfMemory.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter penguraian untuk mempercepat pembacaan data.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

Tulis data ke tabel partisi

Saat menggunakan konektor Hologres sebagai sink dalam pekerjaan ingesti data CDC YAML, Anda dapat menulis data ke tabel partisi. Untuk informasi lebih lanjut, lihat Tulis Data ke Tabel Partisi.

Sinkronisasi real-time dari Kafka ke gudang data Hologres

Implementasikan Distribusi Data Real-time Menggunakan Flink CDC menjelaskan solusi untuk menyimpan data MySQL di Kafka. Anda juga dapat menggunakan pekerjaan ingesti data CDC YAML untuk menyinkronkan data dari Kafka ke Hologres dalam membangun gudang data real-time.

Asumsikan bahwa topik Kafka bernama inventory berisi data dari dua tabel, customers dan products, dalam format debezium-json. Pekerjaan berikut dapat menyinkronkan data dari kedua tabel tersebut ke tabel yang sesuai di Hologres.

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
Catatan
  • Sumber data Kafka mendukung membaca data dalam format berikut: json, canal-json, dan debezium-json (default).

  • Jika format datanya adalah debezium-json, Anda harus secara manual menambahkan kunci utama ke tabel menggunakan aturan transformasi:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • Jika data dari satu tabel didistribusikan di beberapa partisi, atau jika data dari tabel bersharding di partisi berbeda perlu digabungkan, Anda harus mengatur item konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables menjadi true.

  • Sumber data Kafka mendukung beberapa kebijakan inferensi skema. Anda dapat mengatur kebijakan tersebut menggunakan item konfigurasi schema.inference.strategy. Untuk informasi lebih lanjut tentang inferensi skema dan kebijakan sinkronisasi perubahan, lihat Message Queue for Kafka.