This topic describes how to synchronize data from all tables in a MySQL database to Kafka. This reduces the load on the MySQL database caused by multiple deployments.
Background information
In most cases, MySQL Change Data Capture (CDC) data tables are used to obtain data from MySQL tables and synchronize changes in the data tables to the downstream database in real time. MySQL CDC data tables are used in complex computing scenarios. For example, you can use a MySQL CDC data table as a dimension table and join the table with another data table. Multiple deployments may use the same MySQL table. In this case, the MySQL database establishes multiple connections. This causes a heavy load on the MySQL server and network.
Solution architecture
To reduce the load on an upstream MySQL database, Realtime Compute for Apache Flink allows you to synchronize data from all tables in the MySQL database to Kafka. This solution uses Kafka as the intermediate layer for data synchronization. Data is synchronized from all tables or a single table in a MySQL database to Kafka by executing the CREATE DATABASE AS or CREATE TABLE AS statement. Data can be synchronized from the upstream MySQL database to Kafka in real time in one deployment. Data in each MySQL table is written to the related Kafka topic in the upsert fashion. The Kafka JSON catalog is used to read data in the topic to reduce direct access to the MySQL database. This effectively reduces the load on the MySQL database caused by multiple deployments.
Limits
The MySQL table from which data is synchronized must contain the primary key.
You can synchronize data from a MySQL database to a self-managed Kafka cluster, an E-MapReduce (EMR) Kafka cluster, or an ApsaraMQ for Kafka. If you synchronize data from a MySQL database to an ApsaraMQ for Kafka cluster, you can access the cluster only by using the default endpoint.
Upsert Kafka tables cannot be used as source tables for the CREATE TABLE AS and CREATE DATABASE AS statements. Upsert Kafka tables can only be used as result tables for the synchronization by using the CREATE TABLE AS and CREATE DATABASE AS statements.
The storage space of the Kafka cluster must be greater than the storage space of the source table. Otherwise, data may be lost because of insufficient storage space. The Kafka topic that is created during database synchronization is a compacted topic. Each key field of the topic includes only the most recent message, but the data in the topic does not expire. The compacted topic stores data of the same size as the table in the source database.
Best practices
In this example, you want to analyze order comments from three tables in real time: the user table named user, the order table named order, and the comment table named feedback. The following figure shows the data that is contained in each table.
If you want to display user order information or user comments, you must join the order table or the feedback table with the user table to obtain usernames. The usernames are specified by the name field. The following sample code provides an example on how to join the tables:
-- Join the order table and the user table to display the username and commodity name of each order.
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;
-- Join the feedback table and the user table to display the content and username of each comment.
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;
The user table is used once both in the preceding SQL deployments. When the deployments are running, both deployments read full and incremental data from the MySQL database. For full data reading, you must create MySQL database connections. For incremental data reading, you must create the binlog client. As the number of deployments increases, MySQL database connections and binlog client resources also increase. This causes a heavy load on the upstream MySQL database. To reduce the load on the upstream MySQL database, you can execute the CREATE DATABASE AS or CREATE TABLE AS statement to synchronize upstream MySQL data to Kafka in real time in a deployment. The synchronized data is provided to multiple downstream deployments for consumption.
Prerequisites
Realtime Compute for Apache Flink is activated. For more information, see Activate Realtime Compute for Apache Flink.
An ApsaraMQ for Kafka instance is created. For more information, see Step 2: Purchase and deploy an instance.
An ApsaraDB RDS for MySQL instance is created. For more information, see Create an ApsaraDB RDS for MySQL instance.
The Realtime Compute for Apache Flink workspace, ApsaraDB RDS for MySQL instance, and ApsaraMQ for Kafka instance reside in the same virtual private cloud (VPC). If they do not reside in the same VPC, you must establish connections between the VPCs or enable Realtime Compute for Apache Flink to access other services over the Internet. For more information, see How does Realtime Compute for Apache Flink access a service across VPCs? and How does Realtime Compute for Apache Flink access the Internet?
The RAM user or RAM role has access to relevant resources.
Preparations
Create a database for the ApsaraDB RDS for MySQL instance and prepare a MySQL CDC data source
Create a database for the ApsaraDB RDS for MySQL instance. For more information, see Create a database.
Create a database named
order_dw
for the ApsaraDB RDS for MySQL instance.Prepare a MySQL CDC data source.
In the upper-right corner of the details page of the desired instance, click Log On to Database.
In the Log on to Database Instance dialog box, configure the Database Account and Database Password parameters and click Login.
After the logon is successful, double-click the
order_dw
database in the left-side navigation pane to switch the database.On the SQL Console tab, enter the DDL statements that are used to create three business tables in the order_dw database and insert data into the business tables.
CREATE TABLE `user` ( id bigint not null primary key, name varchar(50) not null ); CREATE TABLE `order` ( id bigint not null primary key, product varchar(50) not null, user_id bigint not null ); CREATE TABLE `feedback` ( id bigint not null primary key, user_id bigint not null, comment varchar(50) not null ); -- Prepare data. INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry'); INSERT INTO `order` VALUES (1, 'Football', 2), (2, 'Basket', 1); INSERT INTO `feedback` VALUES (1, 1, 'Good.'), (2, 2, 'Very good');
Click Execute. On the page that appears, click Execute.
Create a topic and a group for the ApsaraMQ for Kafka instance
Create a Kafka topic and a group. In this example, create three topics with the same table names in the MySQL database: user, order, and feedback. For more information, see Step 3: Create resources.
Procedure
Create a MySQL catalog. For more information, see Create a MySQL catalog.
In this example, a catalog named
mysql-catalog
is created and the default database isorder_dw
.Create a Kafka JSON catalog named
kafka-catalog
. For more information, see Create a Kafka JSON catalog.Create and start a data synchronization task by executing the CREATE DATABASE AS or CREATE TABLE AS statement. Upstream MySQL data is synchronized to Kafka in real time for consumption by multiple downstream deployments.
Data synchronization by executing the CREATE DATABASE AS statement
The name of the Kafka topic that is created in the database synchronization task is the same as the name of the related MySQL table. For the number of partitions and the number of replicas, the default configuration of the Kafka cluster is used. The cleanup.policy parameter is set to compact.
In the left-side navigation pane, choose
. On the page that appears, create an SQL streaming draft and copy the following code to the SQL editor:CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka` AS DATABASE `mysql-catalog`.`order_dw` INCLUDING ALL TABLES;
NoteKafka does not involve databases. Therefore, you do not need to create a database. When you execute the CREATE DATABASE AS statement to synchronize data to Kafka, you must add IF NOT EXISTS to the statement to skip database creation.
In the upper-right corner of the SQL editor, click Deploy.
In the left-side navigation pane, choose
. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, and then click Start.
Data synchronization by executing the CREATE TABLE AS statement
In the left-side navigation pane, choose
. On the page that appears, create an SQL streaming draft and copy the following code to the SQL editor:BEGIN STATEMENT SET; -- Synchronize the user table. CREATE TABLE IF NOT EXISTS `kafka-catalog`.`kafka`.`user` AS TABLE `mysql-catalog`.`order_dw`.`user` /*+ OPTIONS('server-id'='8001-8004') */; -- Synchronize the order table. CREATE TABLE IF NOT EXISTS `kafka-catalog`.`kafka`.`order` AS TABLE `mysql-catalog`.`order_dw`.`order` /*+ OPTIONS('server-id'='8001-8004') */; -- Synchronize the feedback table. CREATE TABLE IF NOT EXISTS `kafka-catalog`.`kafka`.`feedback` AS TABLE `mysql-catalog`.`order_dw`.`feedback` /*+ OPTIONS('server-id'='8001-8004') */; END;
In the upper-right corner of the SQL editor, click Deploy.
In the left-side navigation pane, choose
. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, and then click Start.
Consume Kafka data in real time.
Data in the upstream MySQL database is written to Kafka in the JSON format. Data in one Kafka topic can be consumed by multiple downstream deployments to obtain the latest data in the database. Data in the tables that are synchronized to Kafka can be consumed by using one of the following methods.
Consume data by using the Kafka JSON catalog
If a table is used as a source table, data in the table can be read from the Kafka topic that matches the table by using the Kafka JSON catalog.
In the left-side navigation pane, choose
. On the page that appears, create an SQL streaming draft and copy the following code to the SQL editor:CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; -- This is required when you write data to multiple sinks. -- Join the order table and the user table in the Kafka JSON catalog to display the username and product name of each order. INSERT INTO print_user_proudct SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `order` -- Specify the group and startup mode. LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Specify the group and startup mode. ON `order`.value_user_id = `user`.key_id; -- Join the feedback table and the user table to display the content and username of each comment. INSERT INTO print_user_feedback SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as feedback -- Specify the group and startup mode. LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Specify the group and startup mode. ON feedback.value_user_id = `user`.key_id; END; -- Required when you write data to multiple sinks.
In this example, the Print connector is used to directly display the results. You can also export the results to a result table of the connector for further analysis and processing. For more information about the syntax of the statement that is used to write data to multiple sinks, see INSERT INTO statement.
NoteIf the schema of a MySQL table is changed during data synchronization, the schema that is returned after the table is parsed by the Kafka JSON catalog may be different from the schema of the MySQL table. For example, if specific fields are deleted from the MySQL table during data synchronization but the deleted fields appear in the schema that is returned after the table is parsed by the Kafka JSON catalog, the values of the fields may be null.
The schema that is returned by the Kafka JSON catalog contains the fields in the messages that are consumed. If specific fields that are deleted exist in a message that is consumed and the message is not expired, the deleted fields may appear in the schema that is returned after the table is parsed by the Kafka JSON catalog. In this case, the values of the fields are null. If this occurs, no action is required.
In the upper-right corner of the SQL editor, click Deploy.
In the left-side navigation pane, choose
. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, and then click Start.
Consume data by creating a temporary table
You can specify a custom schema to read data from a temporary table.
In the left-side navigation pane, choose
. On the page that appears, create an SQL streaming draft and copy the following code to the SQL editor:CREATE TEMPORARY TABLE user_source ( key_id BIGINT, value_name STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE order_source ( key_id BIGINT, value_product STRING, value_user_id BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'order', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE feedback_source ( key_id BIGINT, value_user_id BIGINT, value_comment STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'feedback', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; -- Required when you write data to multiple sinks. -- Join the order table and the user table in the Kafka JSON catalog to display the username and product name of each order. INSERT INTO print_user_proudct SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name FROM order_source LEFT JOIN user_source ON order_source.value_user_id = user_source.key_id; -- Join the feedback table and the user table to display the content and username of each comment. INSERT INTO print_user_feedback SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name FROM feedback_source LEFT JOIN user_source ON feedback_source.value_user_id = user_source.key_id; END; -- Required when you write data to multiple sinks.
In this example, the Print connector is used to directly display the results. You can also export the results to a result table of the connector for further analysis and processing. For more information about the syntax of the statement that is used to write data to multiple sinks, see INSERT INTO statement.
The following table describes the parameters of a temporary table.
Parameter
Description
Remarks
connector
The type of the connector.
Set the value to kafka.
topic
The name of the topic.
Set the value to the topic of the table that is accessed by the Kafka JSON catalog.
properties.bootstrap.servers
The IP addresses or endpoints of Kafka brokers.
Format:
host:port,host:port,host:port
. Separate multiple host:port pairs with commas (,).scan.startup.mode
The start offset from which data is read from Kafka.
Valid values:
earliest-offset: reads data from the earliest partition of Kafka.
latest-offset: reads data from the latest partition of Kafka.
group-offsets: reads data from the offset that is committed by the consumer group whose ID is specified by the properties.group.id parameter. This is the default value.
timestamp: reads data from the timestamp that is specified by the scan.startup.timestamp-millis parameter.
specific-offsets: reads data from the offset that is specified by the scan.startup.specific-offsets parameter.
Note
This parameter takes effect when the deployment is started without states. When the deployment is restarted from a checkpoint or resumed from the specified state, the deployment preferentially starts to read data from the progress that is saved in the state data.
key.format
The format that the Flink Kafka connector uses to serialize or deserialize the key field in a Kafka message.
Set the value to json.
key.fields
The key fields in the source table or sink table that correspond to the key fields of Kafka messages.
Separate multiple field names with semicolons (;). Example:
field 1;field2
.key.fields-prefix
A custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields or metadata fields.
Set the value to the value of the key.fields-prefix parameter of the Kafka JSON catalog.
value.format
The format that the Flink Kafka connector uses to serialize or deserialize the value fields in a Kafka message.
Set the value to json.
value.fields-prefix
A custom prefix for all value fields in Kafka messages. You can configure this parameter to prevent name conflicts with the key fields or metadata fields.
Set the value to the value of the value.fields-prefix parameter of the Kafka JSON catalog.
value.fields-include
The policy that is used to process the key field when value fields are parsed.
Set the value to EXCEPT_KEY. If this parameter is set to EXCEPT_KEY, the key field is excluded when value fields are parsed.
value.json.infer-schema.flatten-nested-columns.enable
Specifies whether to recursively expand nested columns in a JSON text when the value fields in the JSON-formatted Kafka message are parsed.
Set the value to the value of the infer-schema.flatten-nested-columns.enable parameter of the Kafka JSON catalog.
value.json.infer-schema.primitive-as-string
Specifies whether to infer all basic types as the STRING type when the value fields in the JSON-formatted Kafka message are parsed.
Set the value to the same as the value of the infer-schema.primitive-as-string parameter of the Kafka JSON catalog.
In the upper-right corner of the SQL editor, click Deploy.
In the left-side navigation pane, choose
. On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, and then click Start.
View the job result.
In the left-side navigation pane, choose
. On the Deployments page, click the name of the deployment that you want to manage.Click the Logs tab. Then, click the Running Task Managers tab and view the jobs in the Path, ID column.
In the left-side pane of the Logs tab, click Logs. Then, click the Logs tab and search for logs related to
PrintSinkOutputWriter
.