Develop Flink CDC jobs (public preview) Realtime Compute for Apache Flink lets you define data synchronization pipelines in YAML and run them as Flink jobs, without writing Java or SQL. This topic explains how to create a Flink CDC data ingestion job.
How Flink CDC data ingestion works
Flink CDC data ingestion uses Flink CDC to capture changes from a source and write them to a destination. You define extract, transform, and load (ETL) pipelines in YAML, and Flink CDC automatically converts them into Flink computing logic.
Supported synchronization modes include full database synchronization, single-table synchronization, sharded database and table synchronization, newly added table synchronization, schema evolution, custom computed columns, ETL processing, WHERE clause filtering, and column pruning.
Limits
-
Use Ververica Runtime (VVR) 11.1. For VVR 8.x, use VVR 8.0.11.
-
Each job synchronizes data from one source to one destination. To read from multiple sources or write to multiple destinations, create separate jobs.
-
Flink CDC jobs cannot be deployed to a session cluster.
-
Automatic tuning is not supported for Flink CDC jobs.
Supported connectors
| Connector | Source | Sink |
|---|---|---|
| MySQL (supports RDS for MySQL, PolarDB for MySQL, and self-managed MySQL) | Yes | No |
| Streaming data lakehouse Paimon | No | Yes |
| Message Queue for Kafka (source requires VVR 8.0.10 or later) | Yes | Yes |
| Upsert Kafka | No | Yes |
| StarRocks | No | Yes |
| Hologres | No | Yes |
| Simple Log Service (SLS) (requires VVR 11.1 or later) | Yes | No |
| MongoDB (requires VVR 11.2 or later) | Yes | No |
| MaxCompute (requires VVR 11.1 or later) | No | Yes |
| SelectDB (requires VVR 11.1 or later) | No | Yes |
| Postgres CDC (public preview) (requires VVR 11.4 or later) | Yes | No |
| No | Yes | |
| Iceberg (requires VVR 11.6 or later) | No | Yes |
Submit a ticket or contact us via DingTalk to request support for additional source or destination connectors.
Reuse an existing catalog for connection properties
Starting from VVR 11.5, you can reference a built-in catalog from the Data Management page in your Flink CDC job. This pulls in connection properties — such as URL, username, and password — without manual entry.
Use the using.built-in-catalog field in the source or sink block:
source:
type: mysql
using.built-in-catalog: mysql_rds_catalog
sink:
type: paimon
using.built-in-catalog: paimon_dlf_catalog
In this example, mysql_rds_catalog already stores hostname, username, and password. You do not need to specify them again in the YAML job.
Connectors that support catalog reuse:
-
MySQL (source)
-
Kafka (source)
-
Upsert Kafka (sink)
-
StarRocks (sink)
-
Hologres (sink)
-
Paimon (sink)
-
SLS (source)
-
Iceberg (sink)
Catalog parameters that are incompatible with CDC YAML do not take effect. See the parameter list for each connector.
Create a Flink CDC data ingestion job
Choose the creation method that fits your situation:
| Method | When to use |
|---|---|
| From a template | Fastest path for common MySQL-to-sink pipelines |
| From a CTAS/CDAS job | Converting an existing SQL job |
| From the open source community | Reusing an existing Flink CDC YAML file |
| From scratch | Writing a custom YAML configuration |
From a synchronization job template
-
Log on to the Realtime Compute for Apache Flink console.
-
Click Console in the Actions column of the target workspace.
-
In the left navigation pane, select Development > Data Ingestion.
-
Click
, and then click Create From Template. -
Select a data synchronization template. Currently supported templates: MySQL to StarRocks, MySQL to Paimon, and MySQL to Hologres.

-
Enter the job name, storage location, and engine version, and click OK.
-
Configure the source and sink parameters. See the documentation for the corresponding connector.
From a CTAS/CDAS job
-
If a job contains multiple CTAS/CDAS statements, Flink detects and transforms only the first statement.
-
Because Flink SQL and Flink CDC have different built-in function support, generated transform rules may need adjustment. Review and update them manually before running the job.
-
If the source is MySQL and the original CTAS/CDAS job is still running, change the
server-idin the new Flink CDC job to avoid conflicts.
-
Log on to the Realtime Compute for Apache Flink console.
-
Click Console in the Actions column of the target workspace.
-
In the left navigation pane, select Data Studio > Data Ingestion.
-
Click
, click New Draft From CTAS/CDAS, select the target CTAS or CDAS job, and click OK. The list shows only valid CTAS and CDAS jobs. Jobs with syntax errors and non-ETL jobs are excluded. -
Enter the job name, storage location, and engine version, and click OK.
Migrate a job from the open source community
-
Log on to the Realtime Compute for Apache Flink console.
-
Click Console in the Actions column of the target workspace.
-
In the left navigation pane, select Development > Data Ingestion.
-
Click
, select New Data Ingestion Draft, set the File Name and Engine Version, and click Create. -
Copy the Flink CDC YAML from the open source community.
-
(Optional) Click Depth Check to validate syntax, network connectivity, and access permissions.
Create a job from scratch
-
Log on to the Realtime Compute for Apache Flink console.
-
Click Console in the Actions column of the target workspace.
-
In the left navigation pane, select Data Development > Data Ingestion.
-
Click
, select New Data Ingestion Draft, enter the File Name and Engine Version, and click Create. -
Write the YAML configuration. A Flink CDC job consists of up to five blocks: The following template shows all five blocks with inline comments:
In a Flink CDC YAML job, separate each key and value with a space:
key: value.Block Required Description sourceYes Defines the data source. Flink CDC captures change data from here. sinkYes Defines the destination. Flink CDC writes captured changes here. pipelineOptional Sets job-level properties, such as the pipeline name. transformOptional Applies transformation rules: ETL processing, WHERE filtering, column pruning, and computed columns. routeOptional Maps source tables to destination tables. If omitted, the job runs full database or target table synchronization. # Required source: # Data source type type: <replace-with-source-connector-type> # Data source configurations — see the connector documentation for available parameters ... # Required sink: # Destination type type: <replace-with-sink-connector-type> # Destination configurations — see the connector documentation for available parameters ... # Optional transform: # Transformation rule for the flink_test.customers table - source-table: flink_test.customers # Projection: specify columns to include and apply computed expressions projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name # Filter: synchronize only rows where id > 10 filter: id > 10 description: append calculated columns based on source table # Optional route: # Map source table flink_test.customers to sink table db.customers_o - source-table: flink_test.customers sink-table: db.customers_o description: sync customers table - source-table: flink_test.customers_suffix sink-table: db.customers_s description: sync customers_suffix table # Optional pipeline: name: MySQL to Hologres PipelineFor the full syntax reference and all configuration parameters, see Development reference for Flink CDC data ingestion jobs. The following example synchronizes all tables from the
app_dbdatabase in MySQL to Hologres. Credentials are stored as variables to avoid hardcoding sensitive information.# Sync all tables in app_db from MySQL to Hologres source: type: mysql hostname: <hostname> port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: app_db.\.* server-id: 5400-5404 # (Optional) Capture tables created after the job starts (incremental phase only) scan.binlog.newly-added-table.enabled: true # (Optional) Synchronize table and column comments include-comments.enabled: true # (Optional) Dispatch unbounded chunks first to reduce TaskManager out-of-memory risk scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Optional) Parse only captured tables to accelerate reads scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: hologres name: Hologres Sink endpoint: <endpoint> dbname: <database-name> username: ${secret_values.holousername} password: ${secret_values.holopassword} pipeline: name: Sync MySQL Database to HologresFor sensitive fields such as
usernameandpassword, use the${secret_values.<key>}syntax and manage credentials with Variable Management. -
(Optional) Click Depth Check to validate syntax, network connectivity, and access permissions.
Flink CDC vs. other job types
Realtime Compute for Apache Flink supports three job types for data synchronization: Flink CDC data ingestion jobs, SQL jobs, and DataStream jobs.
Flink CDC vs. Flink SQL
Flink CDC and SQL jobs transfer different data types:
-
SQL jobs transfer
RowData. Each row carries a change type: insert (+I), update_before (-U), update_after (+U), or delete (-D). -
Flink CDC jobs transfer
DataChangeEventfor data changes (inserts, updates, deletes) andSchemaChangeEventfor schema changes (table creation, column addition, table truncation). Update messages contain both the before and after values, so raw change data is written to the destination as-is.
| Flink CDC data ingestion | Flink SQL |
|---|---|
| Automatically detects schemas and supports full database synchronization | Requires manual CREATE TABLE and INSERT statements |
| Supports multiple schema evolution policies | Does not support schema evolution |
| Supports raw changelog synchronization | Alters the raw changelog structure |
| Reads from and writes to multiple tables | Reads from and writes to a single table |
Compared with CTAS or CDAS statements, Flink CDC jobs also:
-
Propagate schema changes in ancestor tables immediately, without waiting for new data
-
Preserve raw changelog — update messages are not split
-
Support more schema evolution types, including
TRUNCATE TABLEandDROP TABLE -
Support table mapping for flexible destination table naming
-
Support configurable schema evolution behavior
-
Support data filtering using a WHERE clause
-
Support column pruning
Flink CDC vs. Flink DataStream
| Flink CDC data ingestion | Flink DataStream |
|---|---|
| Accessible to users at all levels, not just Java experts | Requires familiarity with Java and distributed systems |
| Hides underlying framework details | Requires familiarity with the Flink framework |
| Uses YAML — easy to read and learn | Requires tooling knowledge (e.g., Maven) to manage dependencies |
| Jobs are easy to reuse and share | Code reuse is difficult |
What's next
-
Deploy the job: Deploy a job
-
Build a MySQL-to-StarRocks pipeline end to end: Quick start for Flink CDC data ingestion jobs