All Products
Search
Document Center

Realtime Compute for Apache Flink:Synchronize data from all tables in a MySQL database to Kafka

Last Updated:Mar 03, 2025

This topic describes how to synchronize data from all tables in a MySQL database to Kafka. This reduces the load on the MySQL database caused by multiple deployments.

Background information

In most cases, MySQL Change Data Capture (CDC) data tables are used to obtain data from MySQL tables and synchronize changes in the data tables to the downstream database in real time. MySQL CDC data tables are used in complex computing scenarios. For example, you can use a MySQL CDC data table as a dimension table and join the table with another data table. Multiple deployments may use the same MySQL table. In this case, the MySQL database establishes multiple connections. This causes a heavy load on the MySQL server and network.

Solution architecture

To reduce the load on an upstream MySQL database, Realtime Compute for Apache Flink allows you to synchronize data from all tables in the MySQL database to Kafka. This solution uses Kafka as the intermediate layer for data synchronization. Data is synchronized from all tables or a single table in a MySQL database to Kafka by executing the CREATE DATABASE AS or CREATE TABLE AS statement. Data can be synchronized from the upstream MySQL database to Kafka in real time in one deployment. Data in each MySQL table is written to the related Kafka topic in the upsert fashion. The Kafka JSON catalog is used to read data in the topic to reduce direct access to the MySQL database. This effectively reduces the load on the MySQL database caused by multiple deployments.

mysql2kafka

Limits

  • The MySQL table from which data is synchronized must contain the primary key.

  • You can synchronize data from a MySQL database to a self-managed Kafka cluster, an E-MapReduce (EMR) Kafka cluster, or an ApsaraMQ for Kafka. If you synchronize data from a MySQL database to an ApsaraMQ for Kafka cluster, you can access the cluster only by using the default endpoint.

  • Upsert Kafka tables cannot be used as source tables for the CREATE TABLE AS and CREATE DATABASE AS statements. Upsert Kafka tables can only be used as result tables for the synchronization by using the CREATE TABLE AS and CREATE DATABASE AS statements.

  • The storage space of the Kafka cluster must be greater than the storage space of the source table. Otherwise, data may be lost because of insufficient storage space. The Kafka topic that is created during database synchronization is a compacted topic. Each key field of the topic includes only the most recent message, but the data in the topic does not expire. The compacted topic stores data of the same size as the table in the source database.

Best practices

In this example, you want to analyze order comments from three tables in real time: the user table named user, the order table named order, and the comment table named feedback. The following figure shows the data that is contained in each table.mysql database

If you want to display user order information or user comments, you must join the order table or the feedback table with the user table to obtain usernames. The usernames are specified by the name field. The following sample code provides an example on how to join the tables:

-- Join the order table and the user table to display the username and commodity name of each order. 
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;

-- Join the feedback table and the user table to display the content and username of each comment. 
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;

The user table is used once both in the preceding SQL deployments. When the deployments are running, both deployments read full and incremental data from the MySQL database. For full data reading, you must create MySQL database connections. For incremental data reading, you must create the binlog client. As the number of deployments increases, MySQL database connections and binlog client resources also increase. This causes a heavy load on the upstream MySQL database. To reduce the load on the upstream MySQL database, you can execute the CREATE DATABASE AS or CREATE TABLE AS statement to synchronize upstream MySQL data to Kafka in real time in a deployment. The synchronized data is provided to multiple downstream deployments for consumption.

Prerequisites

Preparations

Create a database for the ApsaraDB RDS for MySQL instance and prepare a MySQL CDC data source

  1. Create a database for the ApsaraDB RDS for MySQL instance. For more information, see Create a database.

    Create a database named order_dw for the ApsaraDB RDS for MySQL instance.

  2. Prepare a MySQL CDC data source.

    1. In the upper-right corner of the details page of the desired instance, click Log On to Database.

    2. In the Log on to Database Instance dialog box, configure the Database Account and Database Password parameters and click Login.

    3. After the logon is successful, double-click the order_dw database in the left-side navigation pane to switch the database.

    4. On the SQL Console tab, enter the DDL statements that are used to create three business tables in the order_dw database and insert data into the business tables.

      CREATE TABLE `user` (
        id bigint not null primary key,
        name varchar(50) not null
      );
      
      CREATE TABLE `order` (
        id bigint not null primary key,
        product varchar(50) not null,
        user_id bigint not null
      );
      
      CREATE TABLE `feedback` (
        id bigint not null primary key,
        user_id bigint not null,
        comment varchar(50) not null
      );
      
      -- Prepare data.
      INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry');
      
      INSERT INTO `order` VALUES
      (1, 'Football', 2),
      (2, 'Basket', 1);
      
      INSERT INTO `feedback` VALUES
      (1, 1, 'Good.'),
      (2, 2, 'Very good');
  3. Click Execute. On the page that appears, click Execute.

Create a topic and a group for the ApsaraMQ for Kafka instance

Create a Kafka topic and a group. In this example, create three topics with the same table names in the MySQL database: user, order, and feedback. For more information, see Step 3: Create resources.

Procedure

  1. Create a MySQL catalog. For more information, see Create a MySQL catalog.

    In this example, a catalog named mysql-catalog is created and the default database is order_dw.

  2. Create a Kafka JSON catalog named kafka-catalog. For more information, see Create a Kafka JSON catalog.

  3. Create and start a data synchronization task by executing the CREATE DATABASE AS or CREATE TABLE AS statement. Upstream MySQL data is synchronized to Kafka in real time for consumption by multiple downstream deployments.

    Data synchronization by executing the CREATE DATABASE AS statement

    The name of the Kafka topic that is created in the database synchronization task is the same as the name of the related MySQL table. For the number of partitions and the number of replicas, the default configuration of the Kafka cluster is used. The cleanup.policy parameter is set to compact.

    1. In the left-side navigation pane, choose Development > ETL. On the page that appears, create an SQL streaming draft and copy the following code to the SQL editor:

      CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
      AS DATABASE `mysql-catalog`.`order_dw` INCLUDING ALL TABLES;
      Note

      Kafka does not involve databases. Therefore, you do not need to create a database. When you execute the CREATE DATABASE AS statement to synchronize data to Kafka, you must add IF NOT EXISTS to the statement to skip database creation.

    2. In the upper-right corner of the SQL editor, click Deploy.

    3. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, and then click Start.

    Data synchronization by executing the CREATE TABLE AS statement

    1. In the left-side navigation pane, choose Development > ETL. On the page that appears, create an SQL streaming draft and copy the following code to the SQL editor:

      BEGIN STATEMENT SET;
      
      -- Synchronize the user table. 
      CREATE TABLE IF NOT EXISTS `kafka-catalog`.`kafka`.`user`
      AS TABLE `mysql-catalog`.`order_dw`.`user`
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- Synchronize the order table. 
      CREATE TABLE IF NOT EXISTS `kafka-catalog`.`kafka`.`order`
      AS TABLE `mysql-catalog`.`order_dw`.`order`
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- Synchronize the feedback table. 
      CREATE TABLE IF NOT EXISTS `kafka-catalog`.`kafka`.`feedback`
      AS TABLE `mysql-catalog`.`order_dw`.`feedback`
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      END;
    2. In the upper-right corner of the SQL editor, click Deploy.

    3. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, and then click Start.

  4. Consume Kafka data in real time.

    Data in the upstream MySQL database is written to Kafka in the JSON format. Data in one Kafka topic can be consumed by multiple downstream deployments to obtain the latest data in the database. Data in the tables that are synchronized to Kafka can be consumed by using one of the following methods.

    Consume data by using the Kafka JSON catalog

    If a table is used as a source table, data in the table can be read from the Kafka topic that matches the table by using the Kafka JSON catalog.

    1. In the left-side navigation pane, choose Development > ETL. On the page that appears, create an SQL streaming draft and copy the following code to the SQL editor:

      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- This is required when you write data to multiple sinks. 
      
      -- Join the order table and the user table in the Kafka JSON catalog to display the username and product name of each order. 
      INSERT INTO print_user_proudct
      SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as `order` -- Specify the group and startup mode.
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Specify the group and startup mode.
      ON `order`.value_user_id = `user`.key_id;
      
      -- Join the feedback table and the user table to display the content and username of each comment. 
      INSERT INTO print_user_feedback
      SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as feedback  -- Specify the group and startup mode.
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Specify the group and startup mode.
      ON feedback.value_user_id = `user`.key_id;
      
      END;      -- Required when you write data to multiple sinks.

      In this example, the Print connector is used to directly display the results. You can also export the results to a result table of the connector for further analysis and processing. For more information about the syntax of the statement that is used to write data to multiple sinks, see INSERT INTO statement.

      Note

      If the schema of a MySQL table is changed during data synchronization, the schema that is returned after the table is parsed by the Kafka JSON catalog may be different from the schema of the MySQL table. For example, if specific fields are deleted from the MySQL table during data synchronization but the deleted fields appear in the schema that is returned after the table is parsed by the Kafka JSON catalog, the values of the fields may be null.

      The schema that is returned by the Kafka JSON catalog contains the fields in the messages that are consumed. If specific fields that are deleted exist in a message that is consumed and the message is not expired, the deleted fields may appear in the schema that is returned after the table is parsed by the Kafka JSON catalog. In this case, the values of the fields are null. If this occurs, no action is required.

    2. In the upper-right corner of the SQL editor, click Deploy.

    3. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, and then click Start.

    Consume data by creating a temporary table

    You can specify a custom schema to read data from a temporary table.

    1. In the left-side navigation pane, choose Development > ETL. On the page that appears, create an SQL streaming draft and copy the following code to the SQL editor:

      CREATE TEMPORARY TABLE user_source (
        key_id BIGINT,
        value_name STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE order_source (
        key_id  BIGINT,
        value_product STRING,
        value_user_id BIGINT  
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'order',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE feedback_source (
        key_id  BIGINT,
        value_user_id BIGINT,
        value_comment STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'feedback',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- Required when you write data to multiple sinks. 
      -- Join the order table and the user table in the Kafka JSON catalog to display the username and product name of each order. 
      INSERT INTO print_user_proudct
      SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name
      FROM order_source LEFT JOIN user_source
      ON order_source.value_user_id = user_source.key_id;
      
      
      -- Join the feedback table and the user table to display the content and username of each comment. 
      INSERT INTO print_user_feedback
      SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name
      FROM feedback_source  LEFT JOIN user_source
      ON feedback_source.value_user_id = user_source.key_id;
      
      END;      -- Required when you write data to multiple sinks.

      In this example, the Print connector is used to directly display the results. You can also export the results to a result table of the connector for further analysis and processing. For more information about the syntax of the statement that is used to write data to multiple sinks, see INSERT INTO statement.

      The following table describes the parameters of a temporary table.

      Parameter

      Description

      Remarks

      connector

      The type of the connector.

      Set the value to kafka.

      topic

      The name of the topic.

      Set the value to the topic of the table that is accessed by the Kafka JSON catalog.

      properties.bootstrap.servers

      The IP addresses or endpoints of Kafka brokers.

      Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

      scan.startup.mode

      The start offset from which data is read from Kafka.

      Valid values:

      • earliest-offset: reads data from the earliest partition of Kafka.

      • latest-offset: reads data from the latest partition of Kafka.

      • group-offsets: reads data from the offset that is committed by the consumer group whose ID is specified by the properties.group.id parameter. This is the default value.

      • timestamp: reads data from the timestamp that is specified by the scan.startup.timestamp-millis parameter.

      • specific-offsets: reads data from the offset that is specified by the scan.startup.specific-offsets parameter.

      Note

      This parameter takes effect when the deployment is started without states. When the deployment is restarted from a checkpoint or resumed from the specified state, the deployment preferentially starts to read data from the progress that is saved in the state data.

      key.format

      The format that the Flink Kafka connector uses to serialize or deserialize the key field in a Kafka message.

      Set the value to json.

      key.fields

      The key fields in the source table or sink table that correspond to the key fields of Kafka messages.

      Separate multiple field names with semicolons (;). Example: field 1;field2.

      key.fields-prefix

      A custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields or metadata fields.

      Set the value to the value of the key.fields-prefix parameter of the Kafka JSON catalog.

      value.format

      The format that the Flink Kafka connector uses to serialize or deserialize the value fields in a Kafka message.

      Set the value to json.

      value.fields-prefix

      A custom prefix for all value fields in Kafka messages. You can configure this parameter to prevent name conflicts with the key fields or metadata fields.

      Set the value to the value of the value.fields-prefix parameter of the Kafka JSON catalog.

      value.fields-include

      The policy that is used to process the key field when value fields are parsed.

      Set the value to EXCEPT_KEY. If this parameter is set to EXCEPT_KEY, the key field is excluded when value fields are parsed.

      value.json.infer-schema.flatten-nested-columns.enable

      Specifies whether to recursively expand nested columns in a JSON text when the value fields in the JSON-formatted Kafka message are parsed.

      Set the value to the value of the infer-schema.flatten-nested-columns.enable parameter of the Kafka JSON catalog.

      value.json.infer-schema.primitive-as-string

      Specifies whether to infer all basic types as the STRING type when the value fields in the JSON-formatted Kafka message are parsed.

      Set the value to the same as the value of the infer-schema.primitive-as-string parameter of the Kafka JSON catalog.

    2. In the upper-right corner of the SQL editor, click Deploy.

    3. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, and then click Start.

  5. View the job result.

    1. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, click the name of the deployment that you want to manage.

    2. Click the Logs tab. Then, click the Running Task Managers tab and view the jobs in the Path, ID column.

    3. In the left-side pane of the Logs tab, click Logs. Then, click the Logs tab and search for logs related to PrintSinkOutputWriter.

      1.png

References