All Products
Search
Document Center

Realtime Compute for Apache Flink:Best practices for handling complex data ingestion needs with Flink CDC

Last Updated:Mar 26, 2026

Flink CDC data ingestion jobs support advanced patterns for complex business scenarios: synchronizing newly added tables, excluding specific tables, enriching data with metadata and computed columns, routing tables across databases, and starting from a specific timestamp.

Synchronize newly added tables

Two methods are available, depending on whether the new tables already contain historical data.

Method

When to use

Requires restart

Hot synchronization

New tables are empty (no historical data)

No

Full + incremental synchronization

New tables already have historical data

Yes

Hot-synchronize new empty tables (no restart required)

Set scan.binlog.newly-added-table.enabled: true to let the job capture new, empty tables during the incremental phase without restarting.

For example, if a job is synchronizing all tables in the MySQL dlf_test database and you create a new, empty products table, the job picks it up automatically:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize data from tables newly created during the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

After the job starts, it automatically creates any new dlf_test tables in the destination.

Note

scan.newly-added-table.enabled is effective only when scan.startup.mode is set to initial (the default). It cannot be used together with scan.binlog.newly-added-table.enabled.

Synchronize tables with historical data (restart required)

Use this method when new tables already contain historical data that must be captured. The job performs a full + incremental synchronization, which requires a restart.

Prerequisites

Before you begin, ensure that:

  • The job is currently running in incremental mode

  • You have identified the new tables to add

Procedure

  1. Stop the job and create a savepoint.

  2. Update the source configuration: expand tables to include the new tables, remove scan.binlog.newly-added-table.enabled (if any), and add scan.newly-added-table.enabled: true. For example, if the job was originally synchronizing only dlf_test.customers and you want to add dlf_test.products:

    source:
      type: mysql
      name: MySQL Source
      hostname: localhost
      port: 3306
      username: username
      password: password
      tables: dlf_test.\.*
      server-id: 8601-8604
      # (Optional) Synchronize full and incremental data for newly added tables.
      scan.newly-added-table.enabled: true
      # (Optional) Synchronize table and field comments.
      include-comments.enabled: true
      # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (Optional) Enable parsing filters to speed up reads.
      scan.only.deserialize.captured.tables.changelog.enabled: true
    
    sink:
      type: paimon
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf
      # (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
      commit.user: your_job_name
      # (Optional) Enable deletion vectors to improve read performance.
      table.properties.deletion-vectors.enabled: true
  3. Restart the job from the savepoint.

Important

scan.binlog.newly-added-table.enabled and scan.newly-added-table.enabled cannot be enabled at the same time.

Exclude specific tables

Add tables.exclude to prevent specific tables from being synchronized to the destination.

For example, to synchronize all tables in dlf_test except products_tmp:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  # (Optional) Exclude tables that you do not want to synchronize.
  tables.exclude: dlf_test.products_tmp
  server-id: 8601-8604
  # (Optional) Synchronize data from tables newly created during the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

The job creates all tables from dlf_test in the destination except products_tmp, and keeps their schemas and data synchronized in real time.

Note

tables.exclude supports regular expressions. If a table matches both tables and tables.exclude, the exclusion takes precedence.

Enhance with metadata and computed columns

Use the transform module to add metadata columns or computed columns to the downstream table. For more information, see Transform module.

Add metadata columns

The following configuration adds the table identifier, operation time (op_ts), and operation type to the dlf_test.customers table in the destination:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Use the operation time as metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: __schema_name__ || '.' || __table_name__  as identifier, op_ts, __data_event_type__ as op, *
    # (Optional) Modify the primary key.
    primary-keys: id,identifier
    description: add identifier, op_ts and op

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note

When using MySQL as a source, set metadata-column.include-list: op_ts to pass the operation time as metadata to the downstream sink. For more information, see MySQL connector.

The source data includes all Change Data Capture (CDC) event types, including deletes. To convert DELETE operations into INSERT operations in the downstream table (soft delete), add converter-after-transform: SOFT_DELETE in the transform block.

Add computed columns

The following configuration derives a dt partition field from the created_at column using DATE_FORMAT:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Use the operation time as metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
    # (Optional) Set partition fields.
    partition-keys: dt
    description: add dt

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note

When using MySQL as a source, set metadata-column.include-list: op_ts to pass the operation time as metadata to the downstream sink. For more information, see MySQL connector.

Table name mapping

Use the route module to rename tables when synchronizing from source to destination. Two common patterns are merging sharded tables and renaming an entire database.

Merge sharded tables

Map multiple source tables matching a regex pattern to a single destination table. The following configuration merges all tables in dlf_test whose names follow the pattern product_<number> into a single dlf.products table:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

route:
  # Merge all tables matching product_<number> into dlf.products.
  - source-table: dlf_test.product_[0-9]+
    sink-table: dlf.products

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Synchronize an entire database with table renaming

Use replace-symbol to apply a naming transformation to all destination tables. The following configuration synchronizes all tables from dlf_test to dlf, prefixing each destination table name with ods_:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

route:
  # Synchronize all tables from dlf_test to dlf, prefixing each name with ods_.
  - source-table: dlf_test.\.*
    sink-table: dlf.ods_<>
    replace-symbol: <>

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Comprehensive case study for complex business scenarios

The following configuration combines all the features covered in this document: table exclusion, metadata and computed columns, soft deletes, and database-wide routing with table renaming.

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  # (Optional) Exclude tables that you do not want to synchronize.
  tables.exclude: dlf_test.products_tmp
  server-id: 8601-8604
  # (Optional) Synchronize full and incremental data for newly added tables.
  scan.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # Use the operation time as metadata.
  metadata-column.include-list: op_ts

transform:
  - source-table: dlf_test.customers
    projection: __schema_name__ || '.' || __table_name__ as identifier, op_ts, __data_event_type__ as op, DATE_FORMAT(created_at, 'yyyyMMdd') as dt, *
    # (Optional) Modify the primary key.
    primary-keys: id,identifier
    # (Optional) Set partition fields.
    partition-keys: dt
    # (Optional) Convert deleted data into inserts.
    converter-after-transform: SOFT_DELETE

route:
  # Synchronize all tables from dlf_test to dlf, prefixing each name with ods_.
  - source-table: dlf_test.\.*
    sink-table: dlf.ods_<>
    replace-symbol: <>

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true

Start from a specific timestamp

During a stateless start of a Flink CDC data ingestion job, specify a start time to resume reading from a specific binary log (binlog) position.

Two configuration methods are available. If both are set, the O&M page configuration takes precedence.

Configure on the O&M page

On the job's Operations and Maintenance (O&M) page, set the start time for the source table.

image

Configure in the job definition

For a MySQL source, set scan.startup.mode: timestamp and provide the start time in milliseconds:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: dlf_test.\.*
  server-id: 8601-8604
  # Start reading from a specific timestamp.
  scan.startup.mode: timestamp
  # Startup timestamp in milliseconds.
  scan.startup.timestamp-millis: 1667232000000
  # (Optional) Synchronize data from tables newly created during the incremental phase.
  scan.binlog.newly-added-table.enabled: true
  # (Optional) Synchronize table and field comments.
  include-comments.enabled: true
  # (Optional) Prioritize dispatching unbounded chunks to prevent potential TaskManager OutOfMemory errors.
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  # (Optional) Enable parsing filters to speed up reads.
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Specify a commit username. Set a different username for each job to avoid conflicts.
  commit.user: your_job_name
  # (Optional) Enable deletion vectors to improve read performance.
  table.properties.deletion-vectors.enabled: true
Note

If you set the start time on both the O&M page and in the job definition, the O&M page configuration takes precedence.