All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop Flink CDC jobs (public preview)

Last Updated:Mar 26, 2026

Develop Flink CDC jobs (public preview) Realtime Compute for Apache Flink lets you define data synchronization pipelines in YAML and run them as Flink jobs, without writing Java or SQL. This topic explains how to create a Flink CDC data ingestion job.

How Flink CDC data ingestion works

Flink CDC data ingestion uses Flink CDC to capture changes from a source and write them to a destination. You define extract, transform, and load (ETL) pipelines in YAML, and Flink CDC automatically converts them into Flink computing logic.

Supported synchronization modes include full database synchronization, single-table synchronization, sharded database and table synchronization, newly added table synchronization, schema evolution, custom computed columns, ETL processing, WHERE clause filtering, and column pruning.

Limits

  • Use Ververica Runtime (VVR) 11.1. For VVR 8.x, use VVR 8.0.11.

  • Each job synchronizes data from one source to one destination. To read from multiple sources or write to multiple destinations, create separate jobs.

  • Flink CDC jobs cannot be deployed to a session cluster.

  • Automatic tuning is not supported for Flink CDC jobs.

Supported connectors

Connector Source Sink
MySQL (supports RDS for MySQL, PolarDB for MySQL, and self-managed MySQL) Yes No
Streaming data lakehouse Paimon No Yes
Message Queue for Kafka (source requires VVR 8.0.10 or later) Yes Yes
Upsert Kafka No Yes
StarRocks No Yes
Hologres No Yes
Simple Log Service (SLS) (requires VVR 11.1 or later) Yes No
MongoDB (requires VVR 11.2 or later) Yes No
MaxCompute (requires VVR 11.1 or later) No Yes
SelectDB (requires VVR 11.1 or later) No Yes
Postgres CDC (public preview) (requires VVR 11.4 or later) Yes No
Print No Yes
Iceberg (requires VVR 11.6 or later) No Yes
Submit a ticket or contact us via DingTalk to request support for additional source or destination connectors.

Reuse an existing catalog for connection properties

Starting from VVR 11.5, you can reference a built-in catalog from the Data Management page in your Flink CDC job. This pulls in connection properties — such as URL, username, and password — without manual entry.

Use the using.built-in-catalog field in the source or sink block:

source:
  type: mysql
  using.built-in-catalog: mysql_rds_catalog

sink:
  type: paimon
  using.built-in-catalog: paimon_dlf_catalog

In this example, mysql_rds_catalog already stores hostname, username, and password. You do not need to specify them again in the YAML job.

Connectors that support catalog reuse:

  • MySQL (source)

  • Kafka (source)

  • Upsert Kafka (sink)

  • StarRocks (sink)

  • Hologres (sink)

  • Paimon (sink)

  • SLS (source)

  • Iceberg (sink)

Catalog parameters that are incompatible with CDC YAML do not take effect. See the parameter list for each connector.

Create a Flink CDC data ingestion job

Choose the creation method that fits your situation:

Method When to use
From a template Fastest path for common MySQL-to-sink pipelines
From a CTAS/CDAS job Converting an existing SQL job
From the open source community Reusing an existing Flink CDC YAML file
From scratch Writing a custom YAML configuration

From a synchronization job template

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Click Console in the Actions column of the target workspace.

  3. In the left navigation pane, select Development > Data Ingestion.

  4. Click image, and then click Create From Template.

  5. Select a data synchronization template. Currently supported templates: MySQL to StarRocks, MySQL to Paimon, and MySQL to Hologres.

    image

  6. Enter the job name, storage location, and engine version, and click OK.

  7. Configure the source and sink parameters. See the documentation for the corresponding connector.

From a CTAS/CDAS job

Important
  • If a job contains multiple CTAS/CDAS statements, Flink detects and transforms only the first statement.

  • Because Flink SQL and Flink CDC have different built-in function support, generated transform rules may need adjustment. Review and update them manually before running the job.

  • If the source is MySQL and the original CTAS/CDAS job is still running, change the server-id in the new Flink CDC job to avoid conflicts.

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Click Console in the Actions column of the target workspace.

  3. In the left navigation pane, select Data Studio > Data Ingestion.

  4. Click image, click New Draft From CTAS/CDAS, select the target CTAS or CDAS job, and click OK. The list shows only valid CTAS and CDAS jobs. Jobs with syntax errors and non-ETL jobs are excluded.

  5. Enter the job name, storage location, and engine version, and click OK.

Migrate a job from the open source community

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Click Console in the Actions column of the target workspace.

  3. In the left navigation pane, select Development > Data Ingestion.

  4. Click image, select New Data Ingestion Draft, set the File Name and Engine Version, and click Create.

  5. Copy the Flink CDC YAML from the open source community.

  6. (Optional) Click Depth Check to validate syntax, network connectivity, and access permissions.

Create a job from scratch

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Click Console in the Actions column of the target workspace.

  3. In the left navigation pane, select Data Development > Data Ingestion.

  4. Click image, select New Data Ingestion Draft, enter the File Name and Engine Version, and click Create.

  5. Write the YAML configuration. A Flink CDC job consists of up to five blocks: The following template shows all five blocks with inline comments:

    In a Flink CDC YAML job, separate each key and value with a space: key: value.
    Block Required Description
    source Yes Defines the data source. Flink CDC captures change data from here.
    sink Yes Defines the destination. Flink CDC writes captured changes here.
    pipeline Optional Sets job-level properties, such as the pipeline name.
    transform Optional Applies transformation rules: ETL processing, WHERE filtering, column pruning, and computed columns.
    route Optional Maps source tables to destination tables. If omitted, the job runs full database or target table synchronization.
    # Required
    source:
      # Data source type
      type: <replace-with-source-connector-type>
      # Data source configurations — see the connector documentation for available parameters
      ...
    
    # Required
    sink:
      # Destination type
      type: <replace-with-sink-connector-type>
      # Destination configurations — see the connector documentation for available parameters
      ...
    
    # Optional
    transform:
      # Transformation rule for the flink_test.customers table
      - source-table: flink_test.customers
        # Projection: specify columns to include and apply computed expressions
        projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name
        # Filter: synchronize only rows where id > 10
        filter: id > 10
        description: append calculated columns based on source table
    
    # Optional
    route:
      # Map source table flink_test.customers to sink table db.customers_o
      - source-table: flink_test.customers
        sink-table: db.customers_o
        description: sync customers table
      - source-table: flink_test.customers_suffix
        sink-table: db.customers_s
        description: sync customers_suffix table
    
    # Optional
    pipeline:
      name: MySQL to Hologres Pipeline

    For the full syntax reference and all configuration parameters, see Development reference for Flink CDC data ingestion jobs. The following example synchronizes all tables from the app_db database in MySQL to Hologres. Credentials are stored as variables to avoid hardcoding sensitive information.

    # Sync all tables in app_db from MySQL to Hologres
    source:
      type: mysql
      hostname: <hostname>
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: app_db.\.*
      server-id: 5400-5404
      # (Optional) Capture tables created after the job starts (incremental phase only)
      scan.binlog.newly-added-table.enabled: true
      # (Optional) Synchronize table and column comments
      include-comments.enabled: true
      # (Optional) Dispatch unbounded chunks first to reduce TaskManager out-of-memory risk
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (Optional) Parse only captured tables to accelerate reads
      scan.only.deserialize.captured.tables.changelog.enabled: true
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: <endpoint>
      dbname: <database-name>
      username: ${secret_values.holousername}
      password: ${secret_values.holopassword}
    
    pipeline:
      name: Sync MySQL Database to Hologres

    For sensitive fields such as username and password, use the ${secret_values.<key>} syntax and manage credentials with Variable Management.

  6. (Optional) Click Depth Check to validate syntax, network connectivity, and access permissions.

Flink CDC vs. other job types

Realtime Compute for Apache Flink supports three job types for data synchronization: Flink CDC data ingestion jobs, SQL jobs, and DataStream jobs.

Flink CDC vs. Flink SQL

Flink CDC and SQL jobs transfer different data types:

  • SQL jobs transfer RowData. Each row carries a change type: insert (+I), update_before (-U), update_after (+U), or delete (-D).

  • Flink CDC jobs transfer DataChangeEvent for data changes (inserts, updates, deletes) and SchemaChangeEvent for schema changes (table creation, column addition, table truncation). Update messages contain both the before and after values, so raw change data is written to the destination as-is.

Flink CDC data ingestion Flink SQL
Automatically detects schemas and supports full database synchronization Requires manual CREATE TABLE and INSERT statements
Supports multiple schema evolution policies Does not support schema evolution
Supports raw changelog synchronization Alters the raw changelog structure
Reads from and writes to multiple tables Reads from and writes to a single table

Compared with CTAS or CDAS statements, Flink CDC jobs also:

  • Propagate schema changes in ancestor tables immediately, without waiting for new data

  • Preserve raw changelog — update messages are not split

  • Support more schema evolution types, including TRUNCATE TABLE and DROP TABLE

  • Support table mapping for flexible destination table naming

  • Support configurable schema evolution behavior

  • Support data filtering using a WHERE clause

  • Support column pruning

Flink CDC vs. Flink DataStream

Flink CDC data ingestion Flink DataStream
Accessible to users at all levels, not just Java experts Requires familiarity with Java and distributed systems
Hides underlying framework details Requires familiarity with the Flink framework
Uses YAML — easy to read and learn Requires tooling knowledge (e.g., Maven) to manage dependencies
Jobs are easy to reuse and share Code reuse is difficult

What's next