This guide explains how to create a data ingestion job within Realtime Compute for Apache Flink CDC. This feature simplifies data replication from the source to the destination.
Background
Realtime Compute for Apache Flink leverages Flink CDC for a robust, low-code data ingestion solution. It simplifies the definition of complex ETL pipelines using YAML configurations. This enables seamless data replication from various sources to destinations, supporting both database and table synchronization. Furthermore, it facilitates the consolidation of data from distributed sources into a single destination and replicates both data and schema changes effectively. Advanced features like data filtering, column pruning, and computed column definitions are fully supported, significantly improving data integration efficiency and reliability.
Advantages of Flink CDC
In Realtime Compute for Apache Flink, you can build a data synchronization pipeline with Flink CDC, Flink SQL, or a DataStream program. The following sections describe the advantages of Flink CDC over the other two options.
Flink CDC vs. Flink SQL
Flink CDC and Flink SQL use different data types for data transmission:
Flink SQL: Transmits top-level records as
RowDatainstances. EachRowDataincludes aRowKindto signify the change type:insert(+I),update before(-U),update after(+U), anddelete(-D).Flink CDC: Utilizes
DataChangeEventfor data modifications (INSERT, UPDATE, DELETE) andSchemaChangeEventfor schema evolution (e.g., table creation, column additions, table truncation). Crucially, Flink CDC's update messages contain both the "before" and "after" state of the row, enabling you to write the original change data to your sink.
The following table lists general advantages of Flink CDC over Flink SQL.
Flink CDC | Flink SQL |
Automatic schema detection & database sync | Manual |
Supports multiple schema evolution policies | No native schema evolution support |
Preserves original changelog structure | Disrupts original changelog structure |
Reads from/writes to multiple tables | Reads from/writes to a single table |
Flink CDC offers enhanced capabilities beyond Flink SQL's CTAS/CDAS statements, including:
Immediate schema evolution, without requiring new data writes
Original changelog preservation
Comprehensive support for schema change types, such as
TRUNCATE TABLEandDROP TABLEFlexible table mapping and definition of sink table names
Configurable schema evolution
Data filtering using
WHEREclausesColumn pruning
Flink CDC vs. DataStream
The following table lists the advantages of Flink CDC data ingestion jobs over DataStream jobs.
Flink CDC | DataStream |
Accessible to all users | Requires Java & distributed systems expertise |
Hides underlying complexity, simplifies development | Requires Flink framework knowledge |
Easy-to-learn YAML | Requires Maven for dependency management |
High reusability of existing jobs | Difficult to reuse existing code |
Limitations
Recommended runtime: Flink CDC jobs are recommended to be developed using Ververica Runtime (VVR) 11.1+. For VVR 8.x, use version 8.0.11.
Single source/sink: A single Flink CDC job supports ingestion from one source system to one destination system. Multiple data sources or sinks require the creation of separate Flink CDC jobs.
Session cluster deployment: Flink CDC jobs cannot be deployed on a session cluster.
Automatic tuning: Automatic tuning is not supported for Flink CDC jobs.
Flink CDC connectors
The following table lists the connectors supported as sources and sinks for Flink CDC jobs.
Supported connectors
Connector | Supported type | |
Source | Sink | |
Note Connects to ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL. | √ | × |
× | √ | |
√ Note Requires Ververica Runtime (VVR) 8.0.10 or later. | √ | |
× | √ | |
× | √ | |
× | √ | |
√ Note Requires VVR 11.1 or later. | × | |
√ Note Requires VVR 11.2 or later. | × | |
× | √ Note Requires VVR 11.1 or later. | |
× | √ Note Requires VVR 11.1 or later. | |
× | √ | |
Create a Flink CDC job
Use a template
Log on to Realtime Compute for Apache Flink's Management Console.
Click Console in Actions column of your workspace.
In the left navigation menu, choose .
Click
, and then select New Draft with Template.Select a template.

Set the draft name, location, and engine version, and click OK.
Configure the source and sink information.
For more information, see the documents for the corresponding source and sink connectors.
Use existing CTAS/CDAS code
Flink detects and converts only the first
CTASorCDASstatement in a job draft containing multiple such statements.Due to differences in built-in functions between Flink SQL and Flink CDC, generated
transformrules may require manual review and adjustment.If the source is MySQL and an original
CTAS/CDASjob is active, manually adjust the source's server ID in the Flink CDC data ingestion job to prevent conflicts.
Log on to Realtime Compute for Apache Flink's Management Console.
Click Console in Actions column of your workspace.
In the left navigation menu, choose .
Click
, and select New Draft from CTAS/CDAS. Select the target CTAS or CDAS job draft and click OK.Only valid CTAS and CDAS job drafts are displayed.
Set the draft name, location, and engine version, and click OK.
Migrate from Apache Flink CDC
Log on to Realtime Compute for Apache Flink's Management Console.
Click Console in Actions column of your workspace.
In the left navigation menu, choose .
Click
and select New Draft. In the dialog, set the name and engine version, and click Create.Copy and paste job YAML code to the editor.
(Optional) Click Validate.
This option checks the syntax, network connectivity, and access permissions.
Create a job from scratch
Log on to Realtime Compute for Apache Flink's Management Console.
Click Console in Actions column of your workspace.
In the left navigation menu, choose .
Click
and select New Draft. In the dialog, set the name and engine version, and click Create.Write code. Example:
# Required source: # Source connector type type: <Replace with your source connector type> # Source configurations. ... # Required sink: # Sink connector type type: <Replace with your sink connector type> # Sink configurations. ... # Optional transform: # Transform rule for the flink_test.customers table - source-table: flink_test.customers # Projection settings. Specifies the columns to sync and performs data transformation. projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name # Filter condition. Sync only data where id is greater than 10. filter: id > 10 # Description of the transform rule description: append computed columns based on source table # Optional route: # Routing rule, which specifies the mapping between source tables and sink tables. - source-table: flink_test.customers sink-table: db.customers_o # Description of the routing rule description: sync customers table - source-table: flink_test.customers_suffix sink-table: db.customers_s # Description of the routing rule description: sync customers_suffix table #Optional pipeline: # Job name name: MySQL to Hologres PipelineNoteSeparate a key-value pair with a colon and a space. Format:
Key: Value.The following table describes the code blocks.
Required
Module
Description
Required
source
The start of the data pipeline. Flink CDC captures change data from the source.
NoteCurrently, the Flink CDC source is restricted to MySQL.
For enhanced security, use variables rather than hardcode your credentials. For more information, see Variable Management.
sink
The end of the data pipeline. Flink CDC transmits the captured data changes to the sink system.
NoteFor information about supported sink systems, see Flink CDC connectors.
For enhanced security, use variables rather than hardcode your credentials. For more information, see Variable Management.
Optional
pipeline
Defines basic configurations for the entire data pipeline job, such as the pipeline name.
transform
Specifies data transformation rules for operating on data as it flows through your Flink pipeline. It enables comprehensive data manipulation, including: ETL processing, data filtering (
WHERE), column pruning, and computed columns.Employ the
transformblock whenever you need to adapt raw change data to your sink systems.route
Defines the mapping between source and destination tables. It enables conditional data routing to different destinations based on specific rules.
If the
routemodule is not configured, it defaults to synchronization of an entire database or target table.For more information, see Data Ingestion with Flink CDC.
The following code provides an example of synchronizing all tables from the
app_dbMySQL database to a Hologres database.source: type: mysql hostname: <hostname> port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: app_db.\.* server-id: 5400-5404 sink: type: hologres name: Hologres Sink endpoint: <endpoint> dbname: <database-name> username: ${secret_values.holousername} password: ${secret_values.holopassword} pipeline: name: Sync MySQL Database to Hologres(Optional) Click Validate.
This option will check the syntax, network connectivity, and access permissions.