All Products
Search
Document Center

Realtime Compute for Apache Flink:Quick start with Flink CDC data ingestion

Last Updated:Mar 26, 2026

This tutorial shows how to synchronize an entire MySQL database to StarRocks using a Flink Change Data Capture (CDC) YAML job. It covers full-database sync, table routing, and real-time incremental change replication — so you can see both the initial data load and live CDC updates reflected in the destination.

What you'll do:

  1. Prepare test tables and data in RDS MySQL

  2. Create a Flink CDC data ingestion job from a YAML template

  3. Deploy and start the job

  4. Verify the initial sync and real-time incremental changes in StarRocks

Prerequisites

Before you begin, make sure you have:

Note

The RDS MySQL and StarRocks instances must be in the same virtual private cloud (VPC) as the Flink workspace. If they are in different VPCs, establish a network connection and configure an IP address whitelist for the RDS MySQL instance. See How do I access other services across VPCs?, How do I access the Internet?, and How do I configure a whitelist?.

Background

This tutorial uses a MySQL database named order_dw_mysql that contains three business tables: orders, orders_pay, and product_catalog. The goal is to synchronize all tables and their data to a database named order_dw_sr in StarRocks.

Step 1: Prepare RDS MySQL test data

  1. On the RDS MySQL instance, create a database named order_dw_mysql and a standard account with read and write permissions on that database. See Create databases and accounts and Manage databases.

  2. Log on to the RDS MySQL instance using DMS. See Log on to an RDS MySQL instance using DMS.

  3. In the SQL Console, enter the following SQL and click Execute to create three tables and insert test data.

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null
    );
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null,
      create_time timestamp not null
    );
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- Insert test data
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

Step 2: Create a Flink CDC data ingestion job

  1. Log on to the Realtime Compute for Apache Flink console.

  2. Click Console to navigate to the required workspace.

  3. In the left navigation pane, choose Data Studio > Data Ingestion.

  4. Click the image icon, click New from Template, select MySQL to StarRocks Data Synchronization, and then click Next.

    image

  5. Enter a Job Name, specify a Storage Location, select an Engine Version, and then click OK.

  6. Replace the default YAML with the following configuration. This example synchronizes all tables in order_dw_mysql to order_dw_sr in StarRocks.

    Note

    YAML jobs support only project variables. Use variables (as shown in this example) to avoid exposing plaintext passwords and other sensitive information in your job configuration. See Variable Management.

    Section Parameter Required Description Example value
    source hostname Required The IP address or hostname of the MySQL database. Use the VPC address for better performance. rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
    source port Required The port of the MySQL database service. 3306
    source username Required The username for the MySQL database. Use the account created in step 1. ${secret_values.mysqlusername}
    source password Required The password for the MySQL database account. ${secret_values.mysqlpassword}
    source tables Required The tables to synchronize. Supports regular expressions. order_dw_mysql\.\* matches all tables in the database. order_dw_mysql\.\*
    source server-id Required A numeric ID range for the database client. 5405-5415
    sink jdbc-url Required The JDBC URL for the StarRocks frontend (FE) node. Format: jdbc:mysql://ip:port. Find the Internal Network Address and Query Port on the Instance Details tab in the E-MapReduce console. jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
    sink load-url Required The HTTP service URL for the FE node. Find the Internal Endpoint and HTTP Port on the Instance Details tab in the E-MapReduce console. fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
    sink username Required The username for the StarRocks connection. ${secret_values.starrocksusername}
    sink password Required The password for the StarRocks connection. ${secret_values.starrockspassword}
    sink sink.buffer-flush.interval-ms Optional The interval at which the internal buffer flushes data to StarRocks. Set to 5000 (5 seconds) in this example to observe results sooner. 5000
    route source-table Required The source tables to route. Use a regular expression to match multiple tables. order_dw_mysql\.\*
    route sink-table Required The destination for routing. Use the replace-symbol placeholder to preserve each source table name in the destination. See Route module. order_dw_sr.<>
    route replace-symbol Required The string that acts as a placeholder for each source table name during routing. <>
    source:
      type: mysql
      hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: order_dw_mysql.\.*
      server-id: 8601-8604
      # (Optional) Synchronize data from newly created tables during the incremental phase.
      scan.binlog.newly-added-table.enabled: true
      # (Optional) Synchronize table and field comments.
      include-comments.enabled: true
      # (Optional) Prioritize unbounded chunks to prevent TaskManager OutOfMemory errors.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # (Optional) Enable parsing filters to accelerate reads.
      scan.only.deserialize.captured.tables.changelog.enabled: true
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
      load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
      username: ${secret_values.starrocksusername}
      password: ${secret_values.starrockspassword}
      table.create.properties.replication_num: 1
      sink.buffer-flush.interval-ms: 5000  # Flush data every 5 seconds.
    
    route:
      - source-table: order_dw_mysql.\.*
        sink-table: order_dw_sr.<>
        replace-symbol: <>
        description: route all tables in source_db to sink_db
    
    pipeline:
      name: Sync MySQL Database to StarRocks

    Replace the parameters in the table below with your own values. For the full parameter reference, see MySQL and StarRocks.

  7. Click Deploy.

Step 3: Start the Flink CDC data ingestion job

  1. On the Data Ingestion page, click Deploy. In the dialog box that appears, click OK.

  2. Go to Operation Center > Job O&M, find the YAML job, and click Start in the Actions column.

  3. Click Start. This example uses Stateless Start. For other start options, see Start a job. After the job starts, monitor its runtime status on the Job O&M page.

Step 4: Verify the synchronization result in StarRocks

After the job enters the Running state, verify that the data has been synchronized to StarRocks.

  1. Connect to the StarRocks instance using EMR StarRocks Manager.

  2. In the left navigation pane, click SQL Editor. On the Databases tab, click the image icon. A database named order_dw_sr appears under default_catalog.

  3. On the Query List tab, click +File to create a query script. Enter the following SQL and click Run.

    SELECT * FROM default_catalog.order_dw_sr.orders ORDER BY order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay ORDER BY pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog ORDER BY product_id;
  4. Check the results. The tables and data from MySQL now exist in StarRocks.

    image

What's next