All Products
Search
Document Center

Realtime Compute for Apache Flink:Best practices for handling complex data ingestion needs with Flink CDC

Last Updated:Nov 14, 2025

This document details best practices for using Realtime Compute for Apache Flink's Change Data Capture (CDC) to address complex data ingestion needs. You'll learn how to manage source schema evolution, enrich data with metadata or computed columns, implement soft deletes, merge distributed sources, replicate entire databases, and apply granular controls like table filtering and timestamp-based startup.

Synchronize new tables

A data ingestion job can synchronize newly added tables in two ways:

  • Hot-sync new empty tables: New tables without historical data can be captured dynamically without a job restart.

  • Sync tables with historical data: A new table that already contains historical data requires a full and incremental synchronization, which involves a job restart.

Hot-sync new empty tables

To synchronize newly created empty tables in real time during incremental reading without a restart, enable the scan.binlog.newly-added-table.enabled option in the job configuration.

When a new, empty products table is created in the dlf_test database during synchronization, setting scan.binlog.newly-added-table.enabled: true in your job configuration eliminates the need for a job restart. Sample code:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Sync data from new tables during incremental reading.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize changelogs of only captured tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit user. Set different usernames for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

This job automatically creates destination tables for newly added tables in the dlf_test database.

Synchronize new tables with historical data

Suppose your MySQL database has customers and products tables, but you only want to synchronize customers when the job starts. The initial job configuration is:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.customers
  server-id: 8601-8604
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize changelog of captured tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Later on, you want to synchronize all tables and their data from this database. Follow these steps:

  1. Stop the job with a savepoint.

  2. Modify the tables configuration in the source module to capture all tables, and enable scan.newly-added-table.enabled.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Sync historical and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize changelogs of only captured tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
  1. Restart the job from the savepoint.

Important

Do not enable scan.binlog.newly-added-table.enabled and scan.newly-added-table.enabled simultaneously.

Exclude specific tables from synchronization

Exclude specific tables during data ingestion to prevent them from being synchronized downstream.

Consider a MySQL database dlf_test containing tables like customers and products. To exclude the products_tmp table, configure the job as follows:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  # (Optional) Exclude tables from replication.
  tables.exclude: dlf_test.products_tmp
  server-id: 8601-8604
  # (Optional) Sync data from tables newly created during incremental sync.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize changelog of captured tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

This job automatically synchronizes all tables from the dlf_test database (excluding products_tmp) to the target destination in real time, including their schemas and data.

Note

The tables.exclude option supports regular expressions. If a table name matches both tables and tables.exclude, it will be excluded.

Enrich data with metadata and computed columns

Add metadata columns

Use the transform module to enrich data during ingestion by adding metadata columns. For instance, the following job configuration appends the table name, operation timestamp (when the operation occurred), and operation type to the downstream table. For details, see Transform.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Sync all and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize changelog of only captured tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Use operation timestamp as metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: __schema_name__ || '.' || __table_name__  as identifier, op_ts, __data_event_type__ as op, *
    # (Optional) Modify the primary key.
    primary-keys: id,identifier
    description: add identifier, op_ts and op
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note

When using a MySQL source, add metadata-column.include-list: op_ts in the source module to include the operation timestamp as metadata. For details, see MySQL connector.

The source table captures all change event types by default. To implement a soft delete, which converts delete operations into insert operations in the downstream table, add the converter-after-transform: SOFT_DELETE configuration in the transform module.

Add computed columns

Use the transform module to add computed columns. The following job generates a dt field from the created_at field and uses it as the partition field for the downstream table.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Sync full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize the changelog of only captured tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Use the operation time as metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
    # (Optional) Set the partition field.
    partition-keys: dt
    description: add dt
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note

When using a MySQL source, add metadata-column.include-list: op_ts in the source module to include the operation time as metadata. For details, see MySQL connector.

Map table names

Use the route module to map upstream and downstream table names.

Merge distributed sources with identical structures

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Sync full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize the changelog of only captured tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
 
route:
  # Merge all tables in the dlf_test database whose names start with 'product_' and end with a number into dlf.products.
  - source-table: dlf_test.product_[0-9]+
    sink-table: dlf.products

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Synchronize an entire database

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Sync all and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  
route:
  # Uniformly modify table names. Synchronize all tables from the dlf_test database to the dlf database. The new table names are prefixed with 'ods_'.
  - source-table: dlf_test.\.*
    sink-table: dlf.ods_<>
    replace-symbol: <>
        
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The username for commits. Set different usernames for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Comprehensive example for a complex business scenario

The following Flink CDC Data Ingestion Job demonstrates a comprehensive example that combines the features described above. You can adapt this configuration to meet your specific requirements.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  # (Optional) Exclude tables that you do not want to synchronize.
  tables.exclude: dlf_test.products_tmp
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory issues.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize the changelog of only captured tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Use the operation time as metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
    # (Optional) Modify the primary key.
    primary-keys: id,identifier
    # (Optional) Set the partition key.
    partition-keys: dt
    # (Optional) Convert deletes into inserts.
    converter-after-transform: SOFT_DELETE
    
route:
  # Sync all tables from the dlf_test database to the dlf database. The new table names are prefixed with 'ods_'.
  - source-table: dlf_test.\.*
    sink-table: dlf.ods_<>
    replace-symbol: <>
    
sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Start a job from a specific timestamp

When starting a job without states, specify a start time for the data source to begin reading from a specific Binlog position.

Use the Development Console

In the Development Console, navigate to O&M > Deployments, and start your job deployment. In the Start Job panel, specify the source's start time.

image

Use job code

In the job draft, specify the source's start time.

For a MySQL source, you can set scan.startup.mode: timestamp in the job configuration. For example:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Set startup mode to timestamp.
  scan.startup.mode: timestamp
  # Specify the startup timestamp.
  scan.startup.timestamp-millis: 1667232000000
  # (Optional) Sync data from tables newly created during incremental sync.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Sync table and field comments.
  include-comments.enabled: true
  # (Optional) Dispatch unbounded chunks first to prevent potential TaskManager OOMs.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Deserialize the changelog of captured tables to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) The commit users. Set different usernames for different jobs to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note

Job startup configurations set in the Development Console take precedence over those in your code.