All Products
Search
Document Center

Realtime Compute for Apache Flink:Database synchronization from MySQL to Kafka with Flink CDC

Last Updated:Nov 20, 2025

This topic describes how to synchronize data from all tables in a MySQL database to Kafka. This reduces the load on the MySQL database caused by multiple jobs.

Background

In most cases, MySQL Change Data Capture (CDC) data tables are used to obtain data from MySQL tables and synchronize changes in the data tables to the downstream database in real time. MySQL CDC data tables are used in complex computing scenarios. For example, you can use a MySQL CDC data table as a dimension table and join the table with another data table. Multiple job may use the same MySQL table. In this case, the MySQL database establishes multiple connections. This causes a heavy load on the MySQL server and network.

Solution architecture

To offload processing and reduce the load on the MySQL source, leverage Kafka as an intermediate buffer between your source and destination systems.

Then read data from these Kafka topics using the Kafka catalog.

Limitations

  • The MySQL table from which data is synchronized must contain the primary key.

  • You can synchronize data from a MySQL database to a self-managed Kafka cluster, an E-MapReduce (EMR) Kafka cluster, or an ApsaraMQ for Kafka. If you synchronize data from a MySQL database to an ApsaraMQ for Kafka cluster, you can access the cluster only by using the default endpoint.

  • The storage space of the Kafka cluster must be greater than the storage space of the source table. Otherwise, data may be lost because of insufficient storage space. The Kafka topic that is created during database synchronization is a compacted topic. Each key field of the topic includes only the most recent message, but the data in the topic does not expire. The compacted topic stores data of the same size as the table in the source database.

Example

Assume you want to analyze order comments from three tables in real time: user, order, and feedback. The following figure shows the data that is contained in each table.mysql database

If you want to display user order information or user comments, you must join the order table or the feedback table with the user table to obtain usernames. The usernames are specified by the name field. The following sample code provides an example on how to join the tables:

-- Join the order table and the user table to display the username and commodity name of each order. 
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;

-- Join the feedback table and the user table to display the content and username of each comment. 
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;

The user table is used in both of the preceding jobs. When the jobs are running, both jobs read historical and incremental data from the MySQL database. For historical data reading, you must create MySQL database connections. For incremental data reading, you must create the binlog client. As the number of job increases, MySQL database connections and binlog client resources also increase. This causes a heavy load on the upstream MySQL database. To reduce the load on the upstream MySQL database, you can synchronize upstream MySQL data to Kafka in real time. The synchronized data is provided to multiple downstream job for consumption.

Prerequisites

Preparations

Create an ApsaraDB RDS for MySQL instance and prepare source data

  1. Create a database for the ApsaraDB RDS for MySQL instance. For more information, see Create a database.

    Create a database named order_dw for the ApsaraDB RDS for MySQL instance.

  2. Prepare sample source data.

    1. In the upper-right corner of the details page of the desired instance, click Log On to Database.

    2. In the Log on to Database Instance dialog box, configure the Database Account and Database Password parameters and click Login.

    3. After the logon is successful, double-click the order_dw database in the left-side navigation pane to switch the database.

    4. On the SQL Console tab, enter the DDL statements that are used to create three business tables in the order_dw database and insert data into the business tables.

      CREATE TABLE `user` (
        id bigint not null primary key,
        name varchar(50) not null
      );
      
      CREATE TABLE `order` (
        id bigint not null primary key,
        product varchar(50) not null,
        user_id bigint not null
      );
      
      CREATE TABLE `feedback` (
        id bigint not null primary key,
        user_id bigint not null,
        comment varchar(50) not null
      );
      
      -- Prepare data.
      INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry');
      
      INSERT INTO `order` VALUES
      (1, 'Football', 2),
      (2, 'Basket', 1);
      
      INSERT INTO `feedback` VALUES
      (1, 1, 'Good.'),
      (2, 2, 'Very good');
  3. Click Execute. On the panel that appears, click Execute.

Procedure

  1. Create a Flink CDC job that replicates data from MySQL to Kafka in real time.

    This job automatically creates new Kafka topics. You can define Kafka topic names within the route module. Topic partitions and replicas will default to your Kafka cluster's settings. The cleanup.policy will be set to compact.

    Use default topic names

    By default, Kafka topic names are formed by concatenating MySQL database and table names with a period (.). This job creates three topics: order_dw.userorder_dw.order, and order_dw.feedback.

    1. In the Development Console, navigate to Development > Data Ingestion, create a blank draft, and copy and paste the following code to the editor.

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # Ingesting data to ApsaraMQ for Kafka requires the following additional options.
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. Click Deploy.

    3. In the left navigation menu, choose O&M > Deployments. Click Start in the Actions column of your job deployment. In the Start Job panel, select Initial Mode and click Start.

    Customize topic names separately

    Specify the name per topic in the route module. This job creates three topics with custom names: user1, order2, and feedback3.

    1. In the Development Console, navigate to Development > Data Ingestion, create a blank draft, and copy and paste the following code to the editor.

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        
      route:
        - source-table: order_dw.user
          sink-table: user1
        - source-table: order_dw.order
          sink-table: order2
        - source-table: order_dw.feedback
          sink-table: feedback3
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # Ingesting data to ApsaraMQ for Kafka requires the following additional options.
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. Click Deploy.

    3. In the left navigation menu, choose O&M > Deployments. Click Start in the Actions column of your job deployment. In the Start Job panel, select Initial Mode and click Start.

    Customize topic names in a batch

    Use a pattern to define custom topic names in the route module. This job creates three topics: topic_user, topic_order, and topic_feedback.

    1. In the Development Console, navigate to Development > Data Ingestion, create a blank draft, and copy and paste the following code to the editor.

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        
      route:
        - source-table: order_dw.\.*
          sink-table: topic_<>
          replace-symbol: <>
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # Ingesting data to ApsaraMQ for Kafka requires the following additional options.
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. Click Deploy.

    3. In the left navigation menu, choose O&M > Deployments. Click Start in the Actions column of your job deployment. In the Start Job panel, select Initial Mode and click Start.

  2. Consume Kafka data in real time.

    The previous job ingests data to Kafka in the JSON format. By consuming data in the Kafka topics, one or more downstream systems can get the latest MySQL data. Choose one of the following data consumption methods:

    Consume data using the Kafka JSON catalog

    If a table is used as a source table, data in the table can be read from the Kafka topic that matches the table by using the Kafka JSON catalog.

    1. In the left-side navigation pane, choose Development > ETL. On the page that appears, create an SQL stream draft and copy the following code to the SQL editor:

      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- This is required when you write data to multiple sinks. 
      
      -- Join the order table and the user table in the Kafka JSON catalog to display the username and product name of each order. 
      INSERT INTO print_user_proudct
      SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as `order` -- Specify the group and startup mode.
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Specify the group and startup mode.
      ON `order`.value_user_id = `user`.key_id;
      
      -- Join the feedback table and the user table to display the content and username of each comment. 
      INSERT INTO print_user_feedback
      SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as feedback  -- Specify the group and startup mode.
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Specify the group and startup mode.
      ON feedback.value_user_id = `user`.key_id;
      
      END;      -- Required when you write data to multiple sinks.

      In this example, the Print connector is used to directly display the results. You can also export the results to a selected sink table for further analysis and processing. For information about the syntax of the statement that is used to write data to multiple sinks, see INSERT INTO.

      Note

      If the schema of a MySQL table is changed during data synchronization, the schema that is returned after the table is parsed by the Kafka JSON catalog may be different from the schema of the MySQL table. For example, if specific fields are deleted from the MySQL table during data synchronization but the deleted fields appear in the schema that is returned after the table is parsed by the Kafka JSON catalog, the values of the fields may be null.

      The schema that is returned by the Kafka JSON catalog contains the fields in the messages that are consumed. If specific fields that are deleted exist in a message that is consumed and the message is not expired, the deleted fields may appear in the schema that is returned after the table is parsed by the Kafka JSON catalog. In this case, the values of the fields are null. If this occurs, no action is required.

    2. In the upper-right corner of the SQL editor, click Deploy.

    3. In the left navigation menu, choose O&M > Deployments. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, and then click Start.

    Consume data using a temporary table

    You can specify a custom schema to read data from a temporary table.

    1. In the left navigation menu, choose Development > ETL. On the page that appears, create an SQL stream draft and copy the following code to the SQL editor:

      CREATE TEMPORARY TABLE user_source (
        key_id BIGINT,
        value_name STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE order_source (
        key_id  BIGINT,
        value_product STRING,
        value_user_id BIGINT  
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'order',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE feedback_source (
        key_id  BIGINT,
        value_user_id BIGINT,
        value_comment STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'feedback',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- Required when you write data to multiple sinks. 
      -- Join the order table and the user table in the Kafka JSON catalog to display the username and product name of each order. 
      INSERT INTO print_user_proudct
      SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name
      FROM order_source LEFT JOIN user_source
      ON order_source.value_user_id = user_source.key_id;
      
      
      -- Join the feedback table and the user table to display the content and username of each comment. 
      INSERT INTO print_user_feedback
      SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name
      FROM feedback_source  LEFT JOIN user_source
      ON feedback_source.value_user_id = user_source.key_id;
      
      END;      -- Required when you write data to multiple sinks.

      In this example, the Print connector is used to directly display the results. You can also export the results to a result table of the connector for further analysis and processing. For more information about the syntax of the statement that is used to write data to multiple sinks, see INSERT INTO.

      The following table describes the connector options of a temporary table.

      Option

      Description

      Remarks

      connector

      The type of the connector.

      Set the value to kafka.

      topic

      The name of the topic.

      Set the value to the topic of the table that is accessed by the Kafka JSON catalog.

      properties.bootstrap.servers

      The IP addresses or endpoints of Kafka brokers.

      Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

      scan.startup.mode

      The start offset from which data is read from Kafka.

      Valid values:

      • earliest-offset: reads data from the earliest partition of Kafka.

      • latest-offset: reads data from the latest partition of Kafka.

      • group-offsets: reads data from the offset that is committed by the consumer group whose ID is specified by the properties.group.id parameter. This is the default value.

      • timestamp: reads data from the timestamp that is specified by the scan.startup.timestamp-millis parameter.

      • specific-offsets: reads data from the offset that is specified by the scan.startup.specific-offsets parameter.

      Note

      This parameter takes effect when the deployment is started without states. When the deployment is restarted from a checkpoint or resumed from the specified state, the deployment preferentially starts to read data from the progress that is saved in the state data.

      key.format

      The format that the Flink Kafka connector uses to serialize or deserialize the key field in a Kafka message.

      Set the value to json.

      key.fields

      The key fields in the source table or sink table that correspond to the key fields of Kafka messages.

      Separate multiple field names with semicolons (;). Example: field 1;field2.

      key.fields-prefix

      A custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields or metadata fields.

      Set the value to the value of the key.fields-prefix parameter of the Kafka JSON catalog.

      value.format

      The format that the Flink Kafka connector uses to serialize or deserialize the value fields in a Kafka message.

      Set the value to json.

      value.fields-prefix

      A custom prefix for all value fields in Kafka messages. You can configure this parameter to prevent name conflicts with the key fields or metadata fields.

      Set the value to the value of the value.fields-prefix parameter of the Kafka JSON catalog.

      value.fields-include

      The policy that is used to process the key field when value fields are parsed.

      Set the value to EXCEPT_KEY. If this parameter is set to EXCEPT_KEY, the key field is excluded when value fields are parsed.

      value.json.infer-schema.flatten-nested-columns.enable

      Specifies whether to recursively expand nested columns in a JSON text when the value fields in the JSON-formatted Kafka message are parsed.

      Set the value to the value of the infer-schema.flatten-nested-columns.enable parameter of the Kafka JSON catalog.

      value.json.infer-schema.primitive-as-string

      Specifies whether to infer all basic types as the STRING type when the value fields in the JSON-formatted Kafka message are parsed.

      Set the value to the same as the value of the infer-schema.primitive-as-string parameter of the Kafka JSON catalog.

    2. In the upper-right corner of the SQL editor, click Deploy.

    3. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel, select Initial Mode, and then click Start.

  3. View the job result.

    1. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, click the name of your job deployment.

    2. Click the Logs tab. Then, click the Running Task Managers tab and view the jobs in the Path, ID column.

    3. In the left-side pane of the Logs tab, click Logs. Then, click the Logs tab and search for logs related to PrintSinkOutputWriter.

      1.png

References