全部产品
Search
文档中心

Realtime Compute for Apache Flink:Implementasi ingesti real-time

更新时间:Mar 10, 2026

Topik ini menjelaskan praktik terbaik untuk menggunakan pekerjaan ingesti data CDC (Change Data Capture) YAML guna menulis data real-time ke gudang data.

Sinkronisasi real-time dari Flink CDC ke gudang data Hologres

Penggunaan pekerjaan ingesti data CDC YAML untuk menyinkronkan data ke Hologres dalam membangun gudang data real-time memungkinkan Anda memanfaatkan sepenuhnya kemampuan pemrosesan real-time yang andal dari Flink serta fitur-fitur Hologres, seperti binary logging, penyimpanan hibrida baris-kolom, dan isolasi resource yang kuat. Kombinasi ini mendukung pemrosesan dan analisis data real-time yang efisien serta skalabel.

Sinkronisasi real-time dari MySQL ke gudang data Hologres

Kode berikut menunjukkan pekerjaan ingesti data CDC YAML paling dasar untuk menyinkronkan seluruh database MySQL ke Hologres:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan komentar tabel dan komentar bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan data.
  scan.only.deserialize.captured.tables.changelog.enabled: true  

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
Catatan
  • Disarankan mengaktifkan include-comments.enabled di MySQL Source untuk menyinkronkan komentar tabel dan bidang saat pekerjaan pertama kali dijalankan. Untuk informasi selengkapnya, lihat MySQL.

  • Disarankan mengaktifkan scan.incremental.snapshot.unbounded-chunk-first.enabled di MySQL Source untuk mencegah potensi error OutOfMemory pada TaskManager saat pekerjaan pertama kali dijalankan. Untuk informasi selengkapnya, lihat MySQL.

Gunakan pemetaan tipe yang lebih toleran

Konektor Hologres tidak dapat memproses event perubahan tipe kolom, tetapi mendukung berbagai hubungan pemetaan tipe. Untuk mendukung perubahan pada sumber data secara lebih baik, Anda dapat memetakan beberapa tipe data MySQL ke tipe Hologres yang lebih luas. Pendekatan ini memungkinkan Anda melewati event perubahan tipe yang tidak perlu dan memastikan pekerjaan berjalan sesuai harapan. Pemetaan tersebut dapat diatur melalui item konfigurasi sink.type-normalize-strategy. Nilai default-nya adalah STANDARD. Untuk informasi selengkapnya, lihat Pemetaan tipe konektor Hologres untuk pekerjaan YAML ingesti data.

Sebagai contoh, Anda dapat menggunakan ONLY_BIGINT_OR_TEXT untuk memetakan tipe hanya ke tipe int8 dan text di Hologres. Dalam kasus ini, jika tipe kolom di MySQL berubah dari INT ke BIGINT, Hologres akan memetakan kedua tipe tersebut ke int8, sehingga pekerjaan tidak melaporkan error akibat konversi tipe yang tidak didukung.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  # (Opsional) Sinkronkan komentar tabel dan komentar bidang.
  include-comments.enabled: true
  # (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Utamakan distribusi shard tak terbatas untuk mencegah potensi error OutOfMemory pada TaskManager.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Opsional) Aktifkan filter parsing untuk mempercepat pembacaan data.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

Tulis data ke tabel partisi

Saat menggunakan konektor Hologres sebagai sink dalam pekerjaan ingesti data CDC YAML, Anda dapat menulis data ke tabel partisi. Untuk informasi selengkapnya, lihat Tulis data ke tabel partisi.

Sinkronisasi real-time dari Kafka ke gudang data Hologres

Implementasi distribusi real-time menjelaskan solusi untuk menyimpan data MySQL di Kafka. Anda kemudian dapat menggunakan file YAML ingesti data untuk menyinkronkan data tersebut ke Hologres guna membangun gudang data real-time.

Misalnya, topik Kafka bernama inventory berisi data dari dua tabel, customers dan products, dalam format debezium-json. Pekerjaan berikut dapat menyinkronkan data dari kedua tabel tersebut ke tabel yang sesuai di Hologres.

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
Catatan
  • Sumber data Kafka mendukung pembacaan data dalam format berikut: json, canal-json, dan debezium-json (default).

  • Jika format datanya adalah debezium-json, Anda harus menambahkan kunci primer ke tabel secara manual menggunakan aturan transform:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • Jika data satu tabel didistribusikan ke beberapa partisi, atau jika data dari tabel ter-shard di partisi berbeda perlu digabungkan, Anda harus mengatur item konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables ke true.

  • Sumber data Kafka mendukung beberapa kebijakan inferensi skema. Anda dapat mengatur kebijakan tersebut melalui item konfigurasi schema.inference.strategy. Untuk informasi selengkapnya tentang kebijakan inferensi skema dan sinkronisasi perubahan, lihat Message Queue for Kafka.