This document details best practices for using Realtime Compute for Apache Flink's Change Data Capture (CDC) to address complex data ingestion needs. You'll learn how to manage source schema evolution, enrich data with metadata or computed columns, implement soft deletes, merge distributed sources, replicate entire databases, and apply granular controls like table filtering and timestamp-based startup.
Synchronize new tables
A data ingestion job can synchronize newly added tables in two ways:
Hot-sync new empty tables: New tables without historical data can be captured dynamically without a job restart.
Sync tables with historical data: A new table that already contains historical data requires a full and incremental synchronization, which involves a job restart.
Hot-sync new empty tables
To synchronize newly created empty tables in real time during incremental reading without a restart, enable the scan.binlog.newly-added-table.enabled option in the job configuration.
When a new, empty products table is created in the dlf_test database during synchronization, setting scan.binlog.newly-added-table.enabled: true in your job configuration eliminates the need for a job restart. Sample code:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Sync data from new tables during incremental reading.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize changelogs of only captured tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit user. Set different usernames for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueThis job automatically creates destination tables for newly added tables in the dlf_test database.
Synchronize new tables with historical data
Suppose your MySQL database has customers and products tables, but you only want to synchronize customers when the job starts. The initial job configuration is:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.customers
server-id: 8601-8604
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize changelog of captured tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueLater on, you want to synchronize all tables and their data from this database. Follow these steps:
Stop the job with a savepoint.
Modify the
tablesconfiguration in thesourcemodule to capture all tables, and enablescan.newly-added-table.enabled.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Sync historical and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize changelogs of only captured tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueRestart the job from the savepoint.
Do not enable scan.binlog.newly-added-table.enabled and scan.newly-added-table.enabled simultaneously.
Exclude specific tables from synchronization
Exclude specific tables during data ingestion to prevent them from being synchronized downstream.
Consider a MySQL database dlf_test containing tables like customers and products. To exclude the products_tmp table, configure the job as follows:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (Optional) Exclude tables from replication.
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (Optional) Sync data from tables newly created during incremental sync.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize changelog of captured tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueThis job automatically synchronizes all tables from the dlf_test database (excluding products_tmp) to the target destination in real time, including their schemas and data.
The tables.exclude option supports regular expressions. If a table name matches both tables and tables.exclude, it will be excluded.
Enrich data with metadata and computed columns
Add metadata columns
Use the transform module to enrich data during ingestion by adding metadata columns. For instance, the following job configuration appends the table name, operation timestamp (when the operation occurred), and operation type to the downstream table. For details, see Transform.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Sync all and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize changelog of only captured tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Use operation timestamp as metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, *
# (Optional) Modify the primary key.
primary-keys: id,identifier
description: add identifier, op_ts and op
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueWhen using a MySQL source, add metadata-column.include-list: op_ts in the source module to include the operation timestamp as metadata. For details, see MySQL connector.
The source table captures all change event types by default. To implement a soft delete, which converts delete operations into insert operations in the downstream table, add the converter-after-transform: SOFT_DELETE configuration in the transform module.
Add computed columns
Use the transform module to add computed columns. The following job generates a dt field from the created_at field and uses it as the partition field for the downstream table.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Sync full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize the changelog of only captured tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Use the operation time as metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (Optional) Set the partition field.
partition-keys: dt
description: add dt
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueWhen using a MySQL source, add metadata-column.include-list: op_ts in the source module to include the operation time as metadata. For details, see MySQL connector.
Map table names
Use the route module to map upstream and downstream table names.
Merge distributed sources with identical structures
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Sync full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize the changelog of only captured tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# Merge all tables in the dlf_test database whose names start with 'product_' and end with a number into dlf.products.
- source-table: dlf_test.product_[0-9]+
sink-table: dlf.products
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueSynchronize an entire database
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Sync all and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Enable parsing filters to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
route:
# Uniformly modify table names. Synchronize all tables from the dlf_test database to the dlf database. The new table names are prefixed with 'ods_'.
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The username for commits. Set different usernames for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueComprehensive example for a complex business scenario
The following Flink CDC Data Ingestion Job demonstrates a comprehensive example that combines the features described above. You can adapt this configuration to meet your specific requirements.
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
# (Optional) Exclude tables that you do not want to synchronize.
tables.exclude: dlf_test.products_tmp
server-id: 8601-8604
# (Optional) Synchronize full and incremental data for newly added tables.
scan.newly-added-table.enabled: true
# (Optional) Synchronize table and field comments.
include-comments.enabled: true
# (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory issues.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize the changelog of only captured tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
# Use the operation time as metadata.
metadata-column.include-list: op_ts
transform:
- source-table: dlf_test.customers
projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
# (Optional) Modify the primary key.
primary-keys: id,identifier
# (Optional) Set the partition key.
partition-keys: dt
# (Optional) Convert deletes into inserts.
converter-after-transform: SOFT_DELETE
route:
# Sync all tables from the dlf_test database to the dlf database. The new table names are prefixed with 'ods_'.
- source-table: dlf_test.\.*
sink-table: dlf.ods_<>
replace-symbol: <>
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueStart a job from a specific timestamp
When starting a job without states, specify a start time for the data source to begin reading from a specific Binlog position.
Use the Development Console
In the Development Console, navigate to , and start your job deployment. In the Start Job panel, specify the source's start time.

Use job code
In the job draft, specify the source's start time.
For a MySQL source, you can set scan.startup.mode: timestamp in the job configuration. For example:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: dlf_test.\.*
server-id: 8601-8604
# (Optional) Set startup mode to timestamp.
scan.startup.mode: timestamp
# Specify the startup timestamp.
scan.startup.timestamp-millis: 1667232000000
# (Optional) Sync data from tables newly created during incremental sync.
scan.binlog.newly-added-table.enabled: true
# (Optional) Sync table and field comments.
include-comments.enabled: true
# (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
# (Optional) Deserialize the changelog of captured tables to speed up reads.
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
commit.user: your_job_name
# (Optional) Enable deletion vectors to improve read performance.
table.properties.deletion-vectors.enabled: trueJob startup configurations set in the Development Console take precedence over those in your code.