All Products
Search
Document Center

Realtime Compute for Apache Flink:Case study: Build data ingestion jobs for complex business scenarios using Flink CDC

Last Updated:Feb 04, 2026

This topic describes the best practices for using Flink CDC data ingestion jobs in complex business scenarios. These scenarios include handling source table schema evolution, enhancing data logic by injecting metadata, adding computed columns, and performing soft deletes, implementing heterogeneous routing for merging sharded tables and synchronizing entire databases, and exercising precise control by filtering tables and starting jobs from a specific timestamp.

Synchronize newly added tables

Flink CDC data ingestion jobs support synchronizing newly added tables in two ways:

  • Hot synchronization of new empty tables: This method applies to new tables that have no historical data and will only contain subsequent changes. The job can dynamically capture these tables without a restart.

  • Synchronization of tables with historical data: This method applies to new tables that already contain historical data. This process requires a full and incremental synchronization, and the job must be restarted for the changes to take effect.

Hot synchronize new empty tables without historical data

You can enable the scan.binlog.newly-added-table.enabled parameter to allow your Flink CDC job to synchronize new, empty tables in real time during the incremental phase. This method does not require a job restart and is the recommended approach.

For example, if a Flink CDC data ingestion job is synchronizing all tables in the MySQL `dlf_test` database and a new, empty table named products is created in the source database, you can synchronize this new table without restarting the job. To do this, set the scan.binlog.newly-added-table.enabled parameter to `true` in your job configuration. The following is an example configuration:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize data from tables newly created during the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

After you run a CDC YAML job with this configuration, the job automatically creates any new tables from the `dlf_test` database in the destination.

Important

The scan.newly-added-table.enabled parameter is effective only when scan.startup.mode is set to initial (the default).

Synchronize tables with historical data

Assume a MySQL database contains the `customers` and `products` tables, but you only want to synchronize the `customers` table at startup. The initial job configuration is as follows:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.customers
  server-id: 8601-8604
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

After the job has been running for some time, if you want to synchronize all tables and their historical data from the database, you must restart the job. To do this, follow these steps:

  1. Terminate the job and create a savepoint.

  2. Modify the `tables` configuration in the MySQL data source to include all the tables that you want to synchronize. In addition, remove the scan.binlog.newly-added-table.enabled parameter and enable the scan.newly-added-table.enabled parameter.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
  1. Restart the job from the savepoint.

Important

You cannot enable scan.binlog.newly-added-table.enabled and scan.newly-added-table.enabled at the same time.

Exclude specific tables

In a Flink CDC data ingestion job, you can exclude specific tables to prevent them from being created and synchronized to the downstream destination.

For example, if the `dlf_test` database in MySQL contains multiple tables, such as `customers` and `products`, and you want to exclude the `products_tmp` table, you can configure the job as follows:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  # (Optional) Exclude tables that you do not want to synchronize.
  tables.exclude: dlf_test.products_tmp
  server-id: 8601-8604
  # (Optional) Synchronize data from tables newly created during the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

A Flink CDC data ingestion job with this configuration automatically creates all tables from the dlf_test database in the destination, except for the products_tmp table. The job also keeps the table schemas and data synchronized in real time.

Note

The `tables.exclude` parameter supports using regular expressions to match multiple tables. If there is an overlap between the tables specified in `tables.exclude` and `tables`, the overlapping tables are excluded and not synchronized. This means that exclusions take precedence over inclusions.

Enhance with metadata and computed columns

Add metadata columns

When writing data, you can use the transform module to add metadata columns. For example, the following job configuration adds the table name, operation time, and operation type to the downstream table. For more information, see Transform module.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Use the operation time as metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: __schema_name__ || '.' || __table_name__  as identifier, op_ts, __data_event_type__ as op, *
    # (Optional) Modify the primary key.
    primary-keys: id,identifier
    description: add identifier, op_ts and op
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note

When you use MySQL as a source, you must set `metadata-column.include-list: op_ts` to send the operation time as metadata to the downstream sink. For more information, see MySQL.

The source data for the ingestion job contains all change data capture (CDC) event types, including deletes. To convert `DELETE` operations into `INSERT` operations in the downstream table and implement soft deletes, add the `converter-after-transform: SOFT_DELETE` configuration in the transform module.

Add computed columns

When writing data, you can use the transform module to add computed columns. For example, the following job configuration transforms the `created_at` field to generate a `dt` field and then uses the new `dt` field as the partition field for the downstream table.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Use the operation time as metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
    # (Optional) Set partition fields.
    partition-keys: dt
    description: add dt
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note

When you use MySQL as a source, you must set `metadata-column.include-list: op_ts` to send the operation time as metadata to the downstream sink. For more information, see MySQL.

Table name mapping

When you synchronize ancestor tables to descendant tables, you may need to replace table names using the route module. The following sections provide example configurations for typical scenarios where you replace table names in a Flink CDC data ingestion job.

Merge sharded databases and tables

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
 
route:
  # Merge all tables in the dlf_test database whose names start with product_ and end with a number into the dlf.products table.
  - source-table: dlf_test.product_[0-9]+
    sink-table: dlf.products

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Synchronize an entire database

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  
route:
  # Uniformly modify table names. Synchronize all tables from the dlf_test database to the dlf database, prefixing each destination table name with ods_.
  - source-table: dlf_test.\.*
    sink-table: dlf.ods_<>
    replace-symbol: <>
        
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Comprehensive case study for complex business scenarios

The following Flink CDC data ingestion job configuration is a comprehensive example for a complex business scenario that combines the features described in the previous sections. You can adapt this code to meet your specific business requirements.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  # (Optional) Exclude tables that you do not want to synchronize.
  tables.exclude: dlf_test.products_tmp
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Use the operation time as metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
    # (Optional) Modify the primary key.
    primary-keys: id,identifier
    # (Optional) Set partition fields.
    partition-keys: dt
    # (Optional) Convert deleted data into inserts.
    converter-after-transform: SOFT_DELETE
    
route:
  # Synchronize all tables from the dlf_test database to the dlf database, prefixing each destination table name with ods_.
  - source-table: dlf_test.\.*
    sink-table: dlf.ods_<>
    replace-symbol: <>
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Start from a specific timestamp

When you perform a stateless start of a Flink CDC data ingestion job, you can specify a start time for the data source. This lets you resume reading data from a specific binary logging (binlog) position.

Configure on the O&M page

On the job's Operations and Maintenance (O&M) page, you can specify a start time for the source table for a stateless start.

image

Configure job parameters

In a job draft, you can specify the start time for the source table by configuring parameters.

For a MySQL source, you can set scan.startup.mode to `timestamp` in the job configuration to specify the start time. The following is an example configuration:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Start in the mode that specifies a start time for the source table.
  scan.startup.mode: timestamp
  # Specify the startup timestamp in timestamp startup mode.
  scan.startup.timestamp-millis: 1667232000000
  # (Optional) Synchronize data from tables newly created during the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. We recommend setting a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note

If you specify the start time in both the O&M page and the job parameters, the configuration on the O&M page takes precedence.