All Products
Search
Document Center

Realtime Compute for Apache Flink:Synchronize data from all tables in a MySQL database to Kafka

Last Updated:Dec 05, 2023

This topic describes how to synchronize data from all tables in a MySQL database to Kafka. This reduces the load on the MySQL database caused by multiple deployments.

Background information

In most cases, MySQL Change Data Capture (CDC) data tables are used to obtain data from MySQL tables and synchronize changes in the data tables to the downstream database in real time. MySQL CDC data tables are used in complex computing scenarios. For example, you can use a MySQL CDC data table as a dimension table and join the table with another data table. Multiple deployments may use the same MySQL table. If multiple deployments use the same MySQL table, the MySQL database establishes multiple connections. This causes a heavy load on the MySQL server and network.

To reduce the load on an upstream MySQL database, Realtime Compute for Apache Flink allows you to synchronize data from all tables in the MySQL database to Kafka. This way, Kafka is used as the intermediate layer for data synchronization. You can execute the CREATE DATABASE AS statement to synchronize data from all tables in a MySQL database to Kafka. You can also execute the CREATE TABLE AS statement to synchronize data from a single table in a MySQL database to Kafka.

When you execute the CREATE DATABASE AS statement or the CREATE TABLE AS statement, data can be synchronized from the upstream MySQL database to Kafka in real time in one deployment. After the MySQL database synchronization task is started, the Kafka JSON catalog creates a Kafka topic for each MySQL table. Data in each MySQL table is written to the related Kafka topic in the upsert fashion. The tables in the Kafka JSON catalog are used instead of the tables in the MySQL database to reduce the load on the MySQL database caused by multiple deployments.

mysql2kafka

Limits

  • The MySQL table from which data is synchronized must contain a primary key.

  • You can synchronize data from a MySQL database to a self-managed Kafka cluster, an E-MapReduce (EMR) Kafka cluster, or an ApsaraMQ for Kafka cluster. If you synchronize data from a MySQL database to an ApsaraMQ for Kafka cluster, you can access the cluster only by using the default endpoint.

  • Upsert Kafka tables cannot be used as source tables for the CREATE TABLE AS and CREATE DATABASE AS statements. Upsert Kafka tables can only be used as result tables for the synchronization by using the CREATE TABLE AS and CREATE DATABASE AS statements.

  • The storage space of the Kafka cluster must be greater than the storage space of the source table. Otherwise, data may be lost because of insufficient storage space. The Kafka topic that is created during database synchronization is a compacted topic. Each key field of the topic includes only the most recent message, but the data in the topic does not expire. The compacted topic stores data of the same size as the table in the source database.

Procedure

  1. Create a MySQL catalog and a Kafka JSON catalog.

    For more information, see Manage MySQL catalogs and Manage Kafka JSON catalogs.

  2. Create and start a data synchronization task that executes the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize data from one or all tables in a MySQL database to Kafka.

    • Syntax of the CREATE DATABASE AS statement

      CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
      AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;
      Note

      Kafka does not involve databases. Therefore, you do not need to create a database. When you execute the CREATE DATABASE AS statement to synchronize data to Kafka, you must add IF NOT EXISTS to the statement to skip database creation.

    • Syntax of the CREATE TABLE AS statement

      CREATE TABLE `kafka-catalog`.`kafka`.`topic`
      AS TABLE `mysql-catalog`.`db`.`table`;
  3. Access the tables that store the synchronized data in Kafka.

    The name of the Kafka topic that is created in the database synchronization task is the same as the name of the related MySQL table. For the number of partitions and the number of replicas, the default configuration of the Kafka cluster is used. The cleanup.policy parameter is set to compact.

    Use the Kafka JSON catalog to access the tables in Kafka that correspond to the tables in the MySQL database. Downstream deployments can consume data in the topic to obtain the latest data of database tables. You can use the tables in Kafka by using one of the following methods:

    • Use the tables by using the Kafka JSON catalog

      For more information, see Use a Kafka JSON catalog.

      Note

      If the schema of a MySQL table is changed during data synchronization, the schema that is returned after the table is parsed by the Kafka JSON catalog may be different from the schema of the MySQL table. For example, if specific fields are deleted from the MySQL table during data synchronization but the deleted fields appear in the schema that is returned after the table is parsed by the Kafka JSON catalog, the values of the fields may be null.

      The schema that is returned by the Kafka JSON catalog contains the fields in the messages that are consumed. If specific fields that are deleted exist in a message that is consumed and the message is not expired, the deleted fields may appear in the schema that is returned after the table is parsed by the Kafka JSON catalog. In this case, the values of the fields are null. If this occurs, no action is required.

    • Use the tables by creating a temporary table

      If you use the tables in Kafka by creating a temporary table, you can specify a custom schema. On the Schemas tab in the console of fully managed Flink, you can view and copy the configuration in the WITH clause. Sample statement:

      CREATE TEMPORARY TABLE tempOrder (
        `key_order_id` BIGINT NOT NULL,
        `value_product` STRING,
        PRIMARY KEY (key_order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'order',
        'properties.bootstrap.servers' = 'xxxx',
        'key.format' = 'json',
        'key.fields-prefix' = 'key_',
        'value.format' = 'json',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );

      Parameter

      Description

      Remarks

      connector

      The type of the connector.

      Set the value to upsert-kafka.

      topic

      The name of the topic.

      Set the value to the topic of the table that is accessed by the Kafka JSON catalog.

      properties.bootstrap.servers

      The IP addresses or endpoints of Kafka brokers.

      Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

      key.format

      The format that the Flink Kafka connector uses to serialize or deserialize the key field in a Kafka message.

      Set the value to json.

      key.fields-prefix

      A custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields or metadata fields.

      Set the value to the value of the key.fields-prefix parameter of the Kafka JSON catalog.

      value.format

      The format that the Flink Kafka connector uses to serialize or deserialize the value fields in a Kafka message.

      Set the value to json.

      value.fields-prefix

      A custom prefix for all value fields in Kafka messages. You can configure this parameter to prevent name conflicts with the key fields or metadata fields.

      Set the value to the value of the value.fields-prefix parameter of the Kafka JSON catalog.

      value.fields-include

      The policy that is used to process the key field when value fields are parsed.

      Set the value to EXCEPT_KEY. If this parameter is set to EXCEPT_KEY, the key field is excluded when value fields are parsed.

      value.json.infer-schema.flatten-nested-columns.enable

      Specifies whether to recursively expand nested columns in a JSON text when the value fields in the JSON-formatted Kafka message are parsed.

      Set the value to the value of the infer-schema.flatten-nested-columns.enable parameter of the Kafka JSON catalog.

      value.json.infer-schema.primitive-as-string

      Specifies whether to infer all basic types as the STRING type when the value fields in the JSON-formatted Kafka message are parsed.

      Set the value to the same as the value of the infer-schema.primitive-as-string parameter of the Kafka JSON catalog.

Examples

In this example, you want to analyze order comments from three tables in real time. The tables are the user table named user, the order table named order, and the comment table named feedback. The following figure shows the data that is contained in each table.mysql database

If you want to display user order information or user comments, you must join the order table or the feedback table and the user table to obtain usernames. The usernames are specified by the name field. The following sample code provides an example on how to join the tables.

-- Join the order table and the user table to display the username and commodity name of each order. 
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;

-- Join the feedback table and the user table to display the content and username of each comment. 
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;

The user table is used once both in the preceding SQL deployments. When the deployments are running, both deployments read full and incremental data from the MySQL database. For full data reading, you must create MySQL database connections. For incremental data reading, you must create the binlog client. When the number of deployments increases, MySQL database connections and binlog client resources also increase. This causes a heavy load on the upstream MySQL database.

To reduce the load on the upstream MySQL database, you can execute the CREATE TABLE AS statement or the CREATE DATABASE AS statement to synchronize upstream MySQL data to Kafka in real time in a deployment, and then provide the data to multiple downstream deployments for consumption. Sample statement:

CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;

After a data synchronization task starts, data in the upstream MySQL database is written in the JSON format to Kafka. Data in one Kafka topic can be consumed by multiple downstream deployments. This prevents multiple MySQL CDC sources from accessing the database. The following sample code provides an example on how to join the tables.

-- Join the order table and the user table in the Kafka JSON catalog to display the username and commodity name of each order. 
SELECT order.id as order_id, product, user.value_name as user_name
FROM order LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
ON order.user_id = user.id;

-- Join the feedback table and the user table in the Kafka JSON catalog to display the content and username of each comment. 
SELECT feedback.id as feedback_id, comment, user.value_name as user_name
FROM feedback LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
ON feedback.user_id = user.id;

References