All Products
Search
Document Center

Realtime Compute for Apache Flink:Implement real-time ingestion

Last Updated:Mar 26, 2026

This topic covers best practices for streaming real-time changes from MySQL or Kafka into a Hologres data warehouse using a Change Data Capture (CDC) YAML data ingestion job.

Sync from MySQL to Hologres

The Hologres connector supports:

  • Automatic table creation and schema synchronization

  • Row-column hybrid storage and Binlog-based incremental reads

  • Tolerant type mapping to handle upstream schema changes without job failures

The following YAML defines the minimum configuration to synchronize an entire MySQL database to Hologres:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test\.*
  server-id: 8601-8604

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN

Optional parameters to set on the first job run:

Parameter Purpose Reference
include-comments.enabled: true Synchronize table comments and field comments MySQL connector
scan.incremental.snapshot.unbounded-chunk-first.enabled: true Prioritize unbounded shards to prevent TaskManager OutOfMemory errors MySQL connector
scan.only.deserialize.captured.tables.changelog.enabled: true Enable parsing filters to accelerate data reads MySQL connector

Handle upstream type changes

The Hologres connector does not support column type change events. To handle upstream schema changes without job failures, configure sink.type-normalize-strategy to map MySQL types to broader Hologres types. The default value is STANDARD.

Strategy Behavior
STANDARD Default type mapping
BROADEN Maps to broader compatible types
ONLY_BIGINT_OR_TEXT Maps all types to int8 or text

For example, with ONLY_BIGINT_OR_TEXT, a column type change from INT to BIGINT in MySQL maps both types to int8 in Hologres — the job continues without reporting a type conversion error.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test\.*
  server-id: 8601-8604
  # (Optional) Synchronize table comments and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize the distribution of unbounded shards to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to accelerate data reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

For the full list of supported type mappings, see Hologres connector type mapping for data ingestion YAML jobs.

Write to partitioned tables

When using the Hologres connector as the sink, you can write data to partitioned tables. For more information, see Write data to partitioned tables.

Sync from Kafka to Hologres

This section assumes MySQL changes are already flowing into Kafka — for example, using the setup described in Implement real-time distribution. The pipeline reads from Kafka and writes to Hologres.

Assume a Kafka topic named inventory contains data from two tables, customers and products, in debezium-json format. The following job synchronizes both tables to their corresponding tables in Hologres:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id

Supported formats

The Kafka source supports three message formats:

Format Notes
json Standard JSON format
canal-json Canal-compatible JSON format
debezium-json Default format for Debezium-sourced CDC data

Primary key requirement for debezium-json

When the format is debezium-json, the Kafka source does not infer primary keys automatically. Add a primary key explicitly using a transform rule:

transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id

Distributed tables

If a single table's data spans multiple partitions, or if sharded tables across different partitions need to be merged, set debezium-json.distributed-tables or canal-json.distributed-tables to true.

Schema inference

The Kafka source supports multiple schema inference policies. Configure the policy using schema.inference.strategy. For details, see Message Queue for Apache Kafka.