This topic covers best practices for streaming real-time changes from MySQL or Kafka into a Hologres data warehouse using a Change Data Capture (CDC) YAML data ingestion job.
Sync from MySQL to Hologres
The Hologres connector supports:
-
Automatic table creation and schema synchronization
-
Row-column hybrid storage and Binlog-based incremental reads
-
Tolerant type mapping to handle upstream schema changes without job failures
The following YAML defines the minimum configuration to synchronize an entire MySQL database to Hologres:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test\.*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
Optional parameters to set on the first job run:
| Parameter | Purpose | Reference |
|---|---|---|
include-comments.enabled: true |
Synchronize table comments and field comments | MySQL connector |
scan.incremental.snapshot.unbounded-chunk-first.enabled: true |
Prioritize unbounded shards to prevent TaskManager OutOfMemory errors | MySQL connector |
scan.only.deserialize.captured.tables.changelog.enabled: true |
Enable parsing filters to accelerate data reads | MySQL connector |
Handle upstream type changes
The Hologres connector does not support column type change events. To handle upstream schema changes without job failures, configure sink.type-normalize-strategy to map MySQL types to broader Hologres types. The default value is STANDARD.
| Strategy | Behavior |
|---|---|
STANDARD |
Default type mapping |
BROADEN |
Maps to broader compatible types |
ONLY_BIGINT_OR_TEXT |
Maps all types to int8 or text |
For example, with ONLY_BIGINT_OR_TEXT, a column type change from INT to BIGINT in MySQL maps both types to int8 in Hologres — the job continues without reporting a type conversion error.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test\.*
server-id: 8601-8604
# (Optional) Synchronize table comments and field comments.
include-comments.enabled: true
# (Optional) Prioritize the distribution of unbounded shards to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to accelerate data reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
For the full list of supported type mappings, see Hologres connector type mapping for data ingestion YAML jobs.
Write to partitioned tables
When using the Hologres connector as the sink, you can write data to partitioned tables. For more information, see Write data to partitioned tables.
Sync from Kafka to Hologres
This section assumes MySQL changes are already flowing into Kafka — for example, using the setup described in Implement real-time distribution. The pipeline reads from Kafka and writes to Hologres.
Assume a Kafka topic named inventory contains data from two tables, customers and products, in debezium-json format. The following job synchronizes both tables to their corresponding tables in Hologres:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
Supported formats
The Kafka source supports three message formats:
| Format | Notes |
|---|---|
json |
Standard JSON format |
canal-json |
Canal-compatible JSON format |
debezium-json |
Default format for Debezium-sourced CDC data |
Primary key requirement for debezium-json
When the format is debezium-json, the Kafka source does not infer primary keys automatically. Add a primary key explicitly using a transform rule:
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
Distributed tables
If a single table's data spans multiple partitions, or if sharded tables across different partitions need to be merged, set debezium-json.distributed-tables or canal-json.distributed-tables to true.
Schema inference
The Kafka source supports multiple schema inference policies. Configure the policy using schema.inference.strategy. For details, see Message Queue for Apache Kafka.