All Products
Search
Document Center

Realtime Compute for Apache Flink:Quick Start for Flink CDC data ingestion jobs

Last Updated:Jan 26, 2026

Realtime Compute for Apache Flink uses Flink Change Data Capture (CDC) to ingest data from a source to a destination. You can develop YAML jobs to synchronize data. This topic describes how to create a Flink CDC data ingestion job to synchronize all data from a MySQL database to StarRocks.

Prerequisites

Background information

Assume that a MySQL instance has a database named order_dw_mysql. This database contains three business tables: orders, orders_pay, and product_catalog. To develop a Flink CDC data ingestion job that synchronizes these tables and their data to the order_dw_sr database in StarRocks, follow these steps:

  1. Step 1: Prepare RDS MySQL test data

  2. Step 2: Develop a Flink CDC data ingestion job

  3. Step 3: Start the Flink CDC data ingestion job

  4. Step 4: View the synchronization result in StarRocks

Step 1: Prepare RDS MySQL test data

  1. Create a database and an account.

    On the RDS MySQL instance, create a database named order_dw_mysql and a standard account that has read and write permission on the database. For more information, see Create databases and accounts and Manage databases.

  2. Log on to the RDS MySQL instance using DMS.

  3. In the SQL Console, enter the following commands and click Execute to create three business tables and insert data.

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- Prepare data
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

Step 2: Develop a Flink CDC data ingestion job

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Click Console to navigate to the required workspace.

  3. In the navigation pane on the left, choose Data Studio > Data Ingestion.

  4. Click the image icon, click New from Template, select MySQL to StarRocks Data Synchronization, and then click Next.

    image

  5. Enter a Job Name, specify a Storage Location, select an Engine Version, and then click OK.

  6. Configure the YAML job code.

    The following code provides an example of how to synchronize all tables from the order_dw_mysql database in MySQL to the order_dw_sr database in StarRocks.

    source:
      type: mysql
      hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: order_dw_mysql.\.*
      server-id: 8601-8604
      # (Optional) Synchronize data from newly created tables during the incremental phase.
      scan.binlog.newly-added-table.enabled: true
      # (Optional) Synchronize table and field comments.
      include-comments.enabled: true
      # (Optional) Prioritize the distribution of unbounded chunks to prevent potential TaskManager OutOfMemory errors.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (Optional) Enable parsing filters to accelerate reads.
      scan.only.deserialize.captured.tables.changelog.enabled: true 
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
      load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
      username: ${secret_values.starrocksusername}
      password: ${secret_values.starrockspassword}
      table.create.properties.replication_num: 1
      sink.buffer-flush.interval-ms: 5000 # Flush data every 5 seconds.
      
    route:
      - source-table: order_dw_mysql.\.*
        sink-table: order_dw_sr.<>
        replace-symbol: <>
        description: route all tables in source_db to sink_db
    
    pipeline:
      name: Sync MySQL Database to StarRocks

    The following table describes the configuration information required for this example. For more information about data ingestion parameters, see MySQL and StarRocks.

    Note

    YAML jobs support only project variables. You can use variables to avoid exposing plaintext passwords and other sensitive information. For more information, see Variable Management.

    Category

    Parameter

    Description

    Example value

    source

    hostname

    The IP address or hostname of the MySQL database.

    We recommend that you use a VPC address.

    rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com

    port

    The port number of the MySQL database service.

    3306

    username

    The username and password for the MySQL database service. Use the account information that you created in Step 1: Prepare RDS MySQL test data.

    ${secret_values.mysqlusername}

    password

    ${secret_values.mysqlpassword}

    tables

    The names of the MySQL tables. Regular expressions are supported to read data from multiple tables.

    This topic synchronizes all tables and data in the order_dw_mysql database.

    order_dw_mysql.\.*

    server-id

    A numeric ID for the database client.

    5405-5415

    sink

    jdbc-url

    The Java Database Connectivity (JDBC) URL.

    Specify the IP address and query port of the frontend (FE) in the jdbc:mysql://ip:port format.

    On the Instance Details tab in the E-MapReduce console, you can view the FE Internal Network Address and Query Port of the target instance.

    jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030

    load-url

    The HTTP service URL used to connect to the FE node.

    You can view the FE Internal Endpoint and HTTP Port of the target instance on the Instance Details tab in the E-MapReduce console.

    fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030

    username

    The username and password for the StarRocks connection.

    Use the username and password that you specified when you created the StarRocks instance.

    Note

    This example uses variables to avoid exposing plaintext passwords and other sensitive information. For more information, see Variable Management.

    ${secret_values.starrocksusername}

    password

    ${secret_values.starrockspassword}

    sink.buffer-flush.interval-ms

    The refresh interval for the internal buffer.

    Because the data volume in this example is small, a short interval (5 seconds) is set to observe the results sooner.

    5000

    route

    source-table

    Specifies the ancestor tables to be routed.

    You can use a regular expression to match multiple tables. For example, order_dw_mysql.\.* routes all tables in the order_dw_mysql database.

    order_dw_mysql.\.*

    sink-table

    Specifies the destination for data routing.

    You can use the symbol from replace-symbol as a placeholder for each ancestor table name to implement many-to-many routing.

    For more information about routing rules, see Route module.

    order_dw_sr.<>

    replace-symbol

    The string that represents the ancestor table name when you use the pattern matching feature.

    <>

  7. Click Deploy.

Step 3: Start the Flink CDC data ingestion job

  1. On the Data Ingestion page, click Deploy. In the dialog box that appears, click OK.

  2. On the Operation Center > Job O&M page, find the target YAML job and click Start in the Actions column.

  3. Click Start.

    In this example, Stateless Start is selected. For more information about parameter settings, see Start a job. After the job starts, you can monitor its runtime information and status on the Job O&M page.

Step 4: View the synchronization result in StarRocks

After the YAML job enters the Running state, you can view the data synchronization results in StarRocks.

  1. Connect to the StarRocks instance using EMR StarRocks Manager.

  2. In the navigation pane on the left, click SQL Editor. On the Databases tab, click the image icon.

    A database named order_dw_sr appears under default_catalog.

  3. On the Query List tab, click +File to create a query script. Then, enter the following SQL statements and click Run.

    SELECT * FROM default_catalog.order_dw_sr.orders order by order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
  4. View the synchronization results below the commands.

    You can see that the tables and data from the MySQL database now exist in StarRocks.

    image

References