A Flink CDC data ingestion job is defined as a YAML file composed of up to five modules. The Source and Sink modules are required; Transform, Route, and Pipeline modules are optional.
Job structure overview
A complete job file follows this structure:
source: # Required: defines the data source
...
sink: # Required: defines the destination
...
transform: # Optional: filters and reshapes data before writing
- ...
route: # Optional: maps source tables to different sink tables
- ...
pipeline: # Optional: sets global job parameters
...
Minimal configuration
The following example syncs all matched tables from MySQL to Paimon using only the required modules:
# Source module
source:
type: mysql
name: MySQL Source
host: localhost
port: 3306
username: admin
password: <yourPassword>
tables: adb.*
# Sink module
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
# Pipeline module
pipeline:
name: source-database-sync-pipe
schema.change.behavior: evolve
Full configuration
The following example adds Transform and Route modules to filter fields, apply expressions, and remap tables:
# Source module
source:
type: mysql
name: MySQL Source
host: localhost
port: 3306
username: admin
password: <yourPassword>
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*, mydb.\\.*
# Sink module
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
# Transform module
transform:
- source-table: mydb.app_order_.*
projection: id, order_id, TO_UPPER(product_name)
filter: id > 10 AND order_id > 100
primary-keys: id
partition-keys: product_name
table-options: comment=app order
description: project fields from source table
converter-after-transform: SOFT_DELETE
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
description: add new uniq_id for each row
# Route module
route:
- source-table: mydb.default.app_order_.*
sink-table: odsdb.default.app_order
description: sync all table shards to one
- source-table: mydb.default.web_order
sink-table: odsdb.default.ods_web_order
description: sync table with given prefix ods_
# Pipeline module
pipeline:
name: source-database-sync-pipe
schema.change.behavior: evolve
Source module
The Source module defines where Flink CDC reads data from. Supported connectors:
source:
type: mysql # Connector type
name: mysql source
# Connector-specific parameters follow
xxx: ...
For all supported parameters, see the documentation for the corresponding connector.
Sink module
The Sink module defines where Flink CDC writes data to. Supported connectors:
sink:
type: hologres # Connector type
name: hologres sink
# Connector-specific parameters follow
xxx: ...
For all supported parameters, see the documentation for the corresponding connector.
Transform module
The Transform module applies projection, calculation, and filtering rules to source table data before it reaches the sink. Define one rule per source table pattern.
transform:
- source-table: db.tbl1
projection: ...
filter: ...
- source-table: db.tbl2
projection: ...
filter: ...
| Parameter | Description | Required |
|---|---|---|
source-table |
Source table identifier; supports regular expressions | Required |
projection |
Column selection and expressions, similar to the SQL SELECT clause |
Optional |
filter |
Row filter condition, similar to the SQL WHERE clause |
Optional |
primary-keys |
Sink table primary keys, comma-separated | Optional |
partition-keys |
Sink table partition keys, comma-separated | Optional |
table-options |
Table creation options applied when the sink table is auto-created | Optional |
converter-after-transform |
Converter applied to change events after the transform rule runs | Optional |
description |
Rule description | Optional |
For expression syntax details, see Flink CDC Transform module.
Route module
The Route module maps source tables to sink tables. Use it to merge sharded tables, add table name prefixes, or redirect tables to different destinations.
route:
- source-table: db.tbl1
sink-table: sinkdb.tbl1
- source-table: db.tbl2
sink-table: sinkdb.tbl2
| Parameter | Description | Required |
|---|---|---|
source-table |
Source table identifier; supports regular expressions | Required |
sink-table |
Target sink table identifier | Required |
description |
Route rule description | Optional |
For expression syntax details, see Flink CDC Route module.
Pipeline module
The Pipeline module sets global parameters that apply to the entire job.
pipeline:
name: CDC YAML job
schema.change.behavior: LENIENT
| Parameter | Description | Required |
|---|---|---|
name |
Job name displayed in the Flink dashboard | Optional |
schema.change.behavior |
How the job handles upstream schema changes. Accepted values: evolve, LENIENT, and others. |
Optional |
For all configurable parameters, see Flink CDC Pipeline module.