All Products
Search
Document Center

Realtime Compute for Apache Flink:Develop a Flink CDC job (Beta)

Last Updated:Dec 08, 2025

This guide explains how to create a data ingestion job within Realtime Compute for Apache Flink CDC. This feature simplifies data replication from the source to the destination.

Background

Realtime Compute for Apache Flink leverages Flink CDC for a robust, low-code data ingestion solution. It simplifies the definition of complex ETL pipelines using YAML configurations. This enables seamless data replication from various sources to destinations, supporting both database and table synchronization. Furthermore, it facilitates the consolidation of data from distributed sources into a single destination and replicates both data and schema changes effectively. Advanced features like data filtering, column pruning, and computed column definitions are fully supported, significantly improving data integration efficiency and reliability.

Advantages of Flink CDC

In Realtime Compute for Apache Flink, you can build a data synchronization pipeline with Flink CDC, Flink SQL, or a DataStream program. The following sections describe the advantages of Flink CDC over the other two options.

Flink CDC vs. Flink SQL

Flink CDC and Flink SQL use different data types for data transmission:

  • Flink SQL: Transmits top-level records as RowData instances. Each RowData includes a RowKind to signify the change type: insert (+I), update before (-U), update after (+U), and delete (-D).

  • Flink CDC: Utilizes DataChangeEvent for data modifications (INSERT, UPDATE, DELETE) and SchemaChangeEvent for schema evolution (e.g., table creation, column additions, table truncation). Crucially, Flink CDC's update messages contain both the "before" and "after" state of the row, enabling you to write the original change data to your sink.

The following table lists general advantages of Flink CDC over Flink SQL.

Flink CDC

Flink SQL

Automatic schema detection & database sync

Manual CREATE TABLE / INSERT statements

Supports multiple schema evolution policies

No native schema evolution support

Preserves original changelog structure

Disrupts original changelog structure

Reads from/writes to multiple tables

Reads from/writes to a single table

Flink CDC offers enhanced capabilities beyond Flink SQL's CTAS/CDAS statements, including:

  • Immediate schema evolution, without requiring new data writes

  • Original changelog preservation

  • Comprehensive support for schema change types, such as TRUNCATE TABLE and DROP TABLE

  • Flexible table mapping and definition of sink table names

  • Configurable schema evolution

  • Data filtering using WHERE clauses

  • Column pruning

Flink CDC vs. DataStream

The following table lists the advantages of Flink CDC data ingestion jobs over DataStream jobs.

Flink CDC

DataStream

Accessible to all users

Requires Java & distributed systems expertise

Hides underlying complexity, simplifies development

Requires Flink framework knowledge

Easy-to-learn YAML

Requires Maven for dependency management

High reusability of existing jobs

Difficult to reuse existing code

Limitations

  • Recommended runtime: Flink CDC jobs are recommended to be developed using Ververica Runtime (VVR) 11.1+. For VVR 8.x, use version 8.0.11.

  • Single source/sink: A single Flink CDC job supports ingestion from one source system to one destination system. Multiple data sources or sinks require the creation of separate Flink CDC jobs.

  • Session cluster deployment: Flink CDC jobs cannot be deployed on a session cluster.

  • Automatic tuning: Automatic tuning is not supported for Flink CDC jobs.

Flink CDC connectors

The following table lists the connectors supported as sources and sinks for Flink CDC jobs.

Supported connectors

Connector

Supported type

Source

Sink

MySQL

Note

Connects to ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL.

×

Paimon

×

Kafka

Note

Requires Ververica Runtime (VVR) 8.0.10 or later.

Upsert Kafka

×

StarRocks

×

Hologres

×

SLS

Note

Requires VVR 11.1 or later.

×

MongoDB

Note

Requires VVR 11.2 or later.

×

MaxCompute

×

Note

Requires VVR 11.1 or later.

SelectDB

×

Note

Requires VVR 11.1 or later.

Print

×

Create a Flink CDC job

Use a template

  1. Log on to Realtime Compute for Apache Flink's Management Console.

  2. Click Console in Actions column of your workspace.

  3. In the left navigation menu, choose Development > Data Ingestion.

  4. Click image, and then select New Draft with Template.

  5. Select a template.

    image

  6. Set the draft name, location, and engine version, and click OK.

  7. Configure the source and sink information.

    For more information, see the documents for the corresponding source and sink connectors.

Use existing CTAS/CDAS code

Important
  • Flink detects and converts only the first CTAS or CDAS statement in a job draft containing multiple such statements.

  • Due to differences in built-in functions between Flink SQL and Flink CDC, generated transform rules may require manual review and adjustment.

  • If the source is MySQL and an original CTAS/CDAS job is active, manually adjust the source's server ID in the Flink CDC data ingestion job to prevent conflicts.

  1. Log on to Realtime Compute for Apache Flink's Management Console.

  2. Click Console in Actions column of your workspace.

  3. In the left navigation menu, choose Development > Data Ingestion.

  4. Click image, and select New Draft from CTAS/CDAS. Select the target CTAS or CDAS job draft and click OK.

    Only valid CTAS and CDAS job drafts are displayed.

  5. Set the draft name, location, and engine version, and click OK.

Migrate from Apache Flink CDC

  1. Log on to Realtime Compute for Apache Flink's Management Console.

  2. Click Console in Actions column of your workspace.

  3. In the left navigation menu, choose Development > Data Ingestion.

  4. Click image and select New Draft. In the dialog, set the name and engine version, and click Create.

  5. Copy and paste job YAML code to the editor.

  6. (Optional) Click Validate.

    This option checks the syntax, network connectivity, and access permissions.

Create a job from scratch

  1. Log on to Realtime Compute for Apache Flink's Management Console.

  2. Click Console in Actions column of your workspace.

  3. In the left navigation menu, choose Development > Data Ingestion.

  4. Click image and select New Draft. In the dialog, set the name and engine version, and click Create.

  5. Write code. Example:

    # Required
    source:
      # Source connector type
      type: <Replace with your source connector type>
      # Source configurations.
      ...
    
    # Required
    sink:
      # Sink connector type
      type: <Replace with your sink connector type>
      # Sink configurations.
      ...
    
    # Optional
    transform:
      # Transform rule for the flink_test.customers table
      - source-table: flink_test.customers
        # Projection settings. Specifies the columns to sync and performs data transformation.
        projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name
        # Filter condition. Sync only data where id is greater than 10.
        filter: id > 10
        # Description of the transform rule
        description: append computed columns based on source table
    
    # Optional
    route:
      # Routing rule, which specifies the mapping between source tables and sink tables.
      - source-table: flink_test.customers
        sink-table: db.customers_o
        # Description of the routing rule
        description: sync customers table
      - source-table: flink_test.customers_suffix
        sink-table: db.customers_s
        # Description of the routing rule
        description: sync customers_suffix table
    
    #Optional
    pipeline:
      # Job name
      name: MySQL to Hologres Pipeline
    Note

    Separate a key-value pair with a colon and a space. Format: Key: Value.

    The following table describes the code blocks.

    Required

    Module

    Description

    Required

    source

    The start of the data pipeline. Flink CDC captures change data from the source.

    Note
    • Currently, the Flink CDC source is restricted to MySQL.

    • For enhanced security, use variables rather than hardcode your credentials. For more information, see Variable Management.

    sink

    The end of the data pipeline. Flink CDC transmits the captured data changes to the sink system.

    Note

    Optional

    pipeline

    Defines basic configurations for the entire data pipeline job, such as the pipeline name.

    transform

    Specifies data transformation rules for operating on data as it flows through your Flink pipeline. It enables comprehensive data manipulation, including: ETL processing, data filtering (WHERE), column pruning, and computed columns.

    Employ the transform block whenever you need to adapt raw change data to your sink systems.

    route

    Defines the mapping between source and destination tables. It enables conditional data routing to different destinations based on specific rules.

    If the route module is not configured, it defaults to synchronization of an entire database or target table.

    For more information, see Data Ingestion with Flink CDC.

    The following code provides an example of synchronizing all tables from the app_db MySQL database to a Hologres database.

    source:
      type: mysql
      hostname: <hostname>
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: app_db.\.*
      server-id: 5400-5404
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: <endpoint>
      dbname: <database-name>
      username: ${secret_values.holousername}
      password: ${secret_values.holopassword}
    
    pipeline:
      name: Sync MySQL Database to Hologres
  6. (Optional) Click Validate.

    This option will check the syntax, network connectivity, and access permissions.

References