This topic describes the best practices for using Flink CDC data ingestion jobs in complex business scenarios. These scenarios include handling source table schema evolution, enhancing data logic by injecting metadata, adding computed columns, and performing soft deletes, implementing heterogeneous routing for merging sharded tables and synchronizing entire databases, and exercising precise control by filtering tables and starting jobs from a specific timestamp.
Synchronize newly added tables
Flink CDC data ingestion jobs support synchronizing newly added tables in two ways:
Hot synchronization of new empty tables: This method applies to new tables that have no historical data and will only contain subsequent changes. The job can dynamically capture these tables without a restart.
Synchronization of tables with historical data: This method applies to new tables that already contain historical data. This process requires a full and incremental synchronization, and the job must be restarted for the changes to take effect.
Hot synchronize new empty tables without historical data
You can enable the scan.binlog.newly-added-table.enabled parameter to allow your Flink CDC job to synchronize new, empty tables in real time during the incremental phase. This method does not require a job restart and is the recommended approach.
For example, if a Flink CDC data ingestion job is synchronizing all tables in the MySQL `dlf_test` database and a new, empty table named products is created in the source database, you can synchronize this new table without restarting the job. To do this, set the scan.binlog.newly-added-table.enabled parameter to `true` in your job configuration. The following is an example configuration:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize data from tables newly created during the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueAfter you run a CDC YAML job with this configuration, the job automatically creates any new tables from the `dlf_test` database in the destination.
The scan.newly-added-table.enabled parameter is effective only when scan.startup.mode is set to initial (the default).
Synchronize tables with historical data
Assume a MySQL database contains the `customers` and `products` tables, but you only want to synchronize the `customers` table at startup. The initial job configuration is as follows:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.customers
server-id: 8601-8604
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueAfter the job has been running for some time, if you want to synchronize all tables and their historical data from the database, you must restart the job. To do this, follow these steps:
Terminate the job and create a savepoint.
Modify the `tables` configuration in the MySQL data source to include all the tables that you want to synchronize. In addition, remove the
scan.binlog.newly-added-table.enabledparameter and enable thescan.newly-added-table.enabledparameter.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueRestart the job from the savepoint.
You cannot enable scan.binlog.newly-added-table.enabled and scan.newly-added-table.enabled at the same time.
Exclude specific tables
In a Flink CDC data ingestion job, you can exclude specific tables to prevent them from being created and synchronized to the downstream destination.
For example, if the `dlf_test` database in MySQL contains multiple tables, such as `customers` and `products`, and you want to exclude the `products_tmp` table, you can configure the job as follows:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (Optional) Exclude tables that you do not want to synchronize.
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (Optional) Synchronize data from tables newly created during the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueA Flink CDC data ingestion job with this configuration automatically creates all tables from the dlf_test database in the destination, except for the products_tmp table. The job also keeps the table schemas and data synchronized in real time.
The `tables.exclude` parameter supports using regular expressions to match multiple tables. If there is an overlap between the tables specified in `tables.exclude` and `tables`, the overlapping tables are excluded and not synchronized. This means that exclusions take precedence over inclusions.
Enhance with metadata and computed columns
Add metadata columns
When writing data, you can use the transform module to add metadata columns. For example, the following job configuration adds the table name, operation time, and operation type to the downstream table. For more information, see Transform module.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Use the operation time as metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, *
# (Optional) Modify the primary key.
primary-keys: id,identifier
description: add identifier, op_ts and op
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueWhen you use MySQL as a source, you must set `metadata-column.include-list: op_ts` to send the operation time as metadata to the downstream sink. For more information, see MySQL.
The source data for the ingestion job contains all change data capture (CDC) event types, including deletes. To convert `DELETE` operations into `INSERT` operations in the downstream table and implement soft deletes, add the `converter-after-transform: SOFT_DELETE` configuration in the transform module.
Add computed columns
When writing data, you can use the transform module to add computed columns. For example, the following job configuration transforms the `created_at` field to generate a `dt` field and then uses the new `dt` field as the partition field for the downstream table.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Use the operation time as metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (Optional) Set partition fields.
partition-keys: dt
description: add dt
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueWhen you use MySQL as a source, you must set `metadata-column.include-list: op_ts` to send the operation time as metadata to the downstream sink. For more information, see MySQL.
Table name mapping
When you synchronize ancestor tables to descendant tables, you may need to replace table names using the route module. The following sections provide example configurations for typical scenarios where you replace table names in a Flink CDC data ingestion job.
Merge sharded databases and tables
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# Merge all tables in the dlf_test database whose names start with product_ and end with a number into the dlf.products table.
- source-table: dlf_test.product_[0-9]+
sink-table: dlf.products
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueSynchronize an entire database
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# Uniformly modify table names. Synchronize all tables from the dlf_test database to the dlf database, prefixing each destination table name with ods_.
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueComprehensive case study for complex business scenarios
The following Flink CDC data ingestion job configuration is a comprehensive example for a complex business scenario that combines the features described in the previous sections. You can adapt this code to meet your specific business requirements.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (Optional) Exclude tables that you do not want to synchronize.
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Use the operation time as metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (Optional) Modify the primary key.
primary-keys: id,identifier
# (Optional) Set partition fields.
partition-keys: dt
# (Optional) Convert deleted data into inserts.
converter-after-transform: SOFT_DELETE
route:
# Synchronize all tables from the dlf_test database to the dlf database, prefixing each destination table name with ods_.
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueStart from a specific timestamp
When you perform a stateless start of a Flink CDC data ingestion job, you can specify a start time for the data source. This lets you resume reading data from a specific binary logging (binlog) position.
Configure on the O&M page
On the job's Operations and Maintenance (O&M) page, you can specify a start time for the source table for a stateless start.

Configure job parameters
In a job draft, you can specify the start time for the source table by configuring parameters.
For a MySQL source, you can set scan.startup.mode to `timestamp` in the job configuration to specify the start time. The following is an example configuration:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Start in the mode that specifies a start time for the source table.
scan.startup.mode: timestamp
# Specify the startup timestamp in timestamp startup mode.
scan.startup.timestamp-millis: 1667232000000
# (Optional) Synchronize data from tables newly created during the incremental phase.
scan.binlog.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueIf you specify the start time in both the O&M page and the job parameters, the configuration on the O&M page takes precedence.