Flink CDC data ingestion jobs support advanced patterns for complex business scenarios: synchronizing newly added tables, excluding specific tables, enriching data with metadata and computed columns, routing tables across databases, and starting from a specific timestamp.
Synchronize newly added tables
Two methods are available, depending on whether the new tables already contain historical data.
Method | When to use | Requires restart |
Hot synchronization | New tables are empty (no historical data) | No |
Full + incremental synchronization | New tables already have historical data | Yes |
Hot-synchronize new empty tables (no restart required)
Set scan.binlog.newly-added-table.enabled: true to let the job capture new, empty tables during the incremental phase without restarting.
For example, if a job is synchronizing all tables in the MySQL dlf_test database and you create a new, empty products table, the job picks it up automatically:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize data from tables newly created during the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueAfter the job starts, it automatically creates any new dlf_test tables in the destination.
scan.newly-added-table.enabled is effective only when scan.startup.mode is set to initial (the default). It cannot be used together with scan.binlog.newly-added-table.enabled.
Synchronize tables with historical data (restart required)
Use this method when new tables already contain historical data that must be captured. The job performs a full + incremental synchronization, which requires a restart.
Prerequisites
Before you begin, ensure that:
The job is currently running in incremental mode
You have identified the new tables to add
Procedure
Stop the job and create a savepoint.
Update the source configuration: expand
tablesto include the new tables, removescan.binlog.newly-added-table.enabled(if any), and addscan.newly-added-table.enabled: true. For example, if the job was originally synchronizing onlydlf_test.customersand you want to adddlf_test.products:source: type: mysql name: MySQL Source hostname: localhost port: 3306 username: username password: password tables: dlf_test.\.* server-id: 8601-8604 # (Optional) Synchronize full and incremental data for newly added tables. scan.newly-added-table.enabled: true # (Optional) Synchronize table and field comments. include-comments.enabled: true # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Optional) Enable parsing filters to speed up reads. scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: paimon catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlf # (Optional) Specify a commit username. Set a different username for each job to avoid conflicts. commit.user: your_job_name # (Optional) Enable deletion vectors to improve read performance. table.properties.deletion-vectors.enabled: trueRestart the job from the savepoint.
scan.binlog.newly-added-table.enabled and scan.newly-added-table.enabled cannot be enabled at the same time.
Exclude specific tables
Add tables.exclude to prevent specific tables from being synchronized to the destination.
For example, to synchronize all tables in dlf_test except products_tmp:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (Optional) Exclude tables that you do not want to synchronize.
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (Optional) Synchronize data from tables newly created during the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueThe job creates all tables from dlf_test in the destination except products_tmp, and keeps their schemas and data synchronized in real time.
tables.exclude supports regular expressions. If a table matches both tables and tables.exclude, the exclusion takes precedence.
Enhance with metadata and computed columns
Use the transform module to add metadata columns or computed columns to the downstream table. For more information, see Transform module.
Add metadata columns
The following configuration adds the table identifier, operation time (op_ts), and operation type to the dlf_test.customers table in the destination:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Use the operation time as metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, *
# (Optional) Modify the primary key.
primary-keys: id,identifier
description: add identifier, op_ts and op
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueWhen using MySQL as a source, set metadata-column.include-list: op_ts to pass the operation time as metadata to the downstream sink. For more information, see MySQL connector.
The source data includes all Change Data Capture (CDC) event types, including deletes. To convert DELETE operations into INSERT operations in the downstream table (soft delete), add converter-after-transform: SOFT_DELETE in the transform block.
Add computed columns
The following configuration derives a dt partition field from the created_at column using DATE_FORMAT:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Use the operation time as metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (Optional) Set partition fields.
partition-keys: dt
description: add dt
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueWhen using MySQL as a source, set metadata-column.include-list: op_ts to pass the operation time as metadata to the downstream sink. For more information, see MySQL connector.
Table name mapping
Use the route module to rename tables when synchronizing from source to destination. Two common patterns are merging sharded tables and renaming an entire database.
Merge sharded tables
Map multiple source tables matching a regex pattern to a single destination table. The following configuration merges all tables in dlf_test whose names follow the pattern product_<number> into a single dlf.products table:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# Merge all tables matching product_<number> into dlf.products.
- source-table: dlf_test.product_[0-9]+
sink-table: dlf.products
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueSynchronize an entire database with table renaming
Use replace-symbol to apply a naming transformation to all destination tables. The following configuration synchronizes all tables from dlf_test to dlf, prefixing each destination table name with ods_:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# Synchronize all tables from dlf_test to dlf, prefixing each name with ods_.
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueComprehensive case study for complex business scenarios
The following configuration combines all the features covered in this document: table exclusion, metadata and computed columns, soft deletes, and database-wide routing with table renaming.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (Optional) Exclude tables that you do not want to synchronize.
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Use the operation time as metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (Optional) Modify the primary key.
primary-keys: id,identifier
# (Optional) Set partition fields.
partition-keys: dt
# (Optional) Convert deleted data into inserts.
converter-after-transform: SOFT_DELETE
route:
# Synchronize all tables from dlf_test to dlf, prefixing each name with ods_.
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueStart from a specific timestamp
During a stateless start of a Flink CDC data ingestion job, specify a start time to resume reading from a specific binary log (binlog) position.
Two configuration methods are available. If both are set, the O&M page configuration takes precedence.
Configure on the O&M page
On the job's Operations and Maintenance (O&M) page, set the start time for the source table.

Configure in the job definition
For a MySQL source, set scan.startup.mode: timestamp and provide the start time in milliseconds:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# Start reading from a specific timestamp.
scan.startup.mode: timestamp
# Startup timestamp in milliseconds.
scan.startup.timestamp-millis: 1667232000000
# (Optional) Synchronize data from tables newly created during the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueIf you set the start time on both the O&M page and in the job definition, the O&M page configuration takes precedence.