This topic describes how to synchronize data from all tables in a MySQL database to Kafka. This reduces the load on the MySQL database caused by multiple jobs.
Background
In most cases, MySQL Change Data Capture (CDC) data tables are used to obtain data from MySQL tables and synchronize changes in the data tables to the downstream database in real time. MySQL CDC data tables are used in complex computing scenarios. For example, you can use a MySQL CDC data table as a dimension table and join the table with another data table. Multiple job may use the same MySQL table. In this case, the MySQL database establishes multiple connections. This causes a heavy load on the MySQL server and network.
Solution architecture
To offload processing and reduce the load on the MySQL source, leverage Kafka as an intermediate buffer between your source and destination systems.
Then read data from these Kafka topics using the Kafka catalog.
Limitations
The MySQL table from which data is synchronized must contain the primary key.
You can synchronize data from a MySQL database to a self-managed Kafka cluster, an E-MapReduce (EMR) Kafka cluster, or an ApsaraMQ for Kafka. If you synchronize data from a MySQL database to an ApsaraMQ for Kafka cluster, you can access the cluster only by using the default endpoint.
The storage space of the Kafka cluster must be greater than the storage space of the source table. Otherwise, data may be lost because of insufficient storage space. The Kafka topic that is created during database synchronization is a compacted topic. Each key field of the topic includes only the most recent message, but the data in the topic does not expire. The compacted topic stores data of the same size as the table in the source database.
Example
Assume you want to analyze order comments from three tables in real time: user, order, and feedback. The following figure shows the data that is contained in each table.
If you want to display user order information or user comments, you must join the order table or the feedback table with the user table to obtain usernames. The usernames are specified by the name field. The following sample code provides an example on how to join the tables:
-- Join the order table and the user table to display the username and commodity name of each order.
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;
-- Join the feedback table and the user table to display the content and username of each comment.
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;The user table is used in both of the preceding jobs. When the jobs are running, both jobs read historical and incremental data from the MySQL database. For historical data reading, you must create MySQL database connections. For incremental data reading, you must create the binlog client. As the number of job increases, MySQL database connections and binlog client resources also increase. This causes a heavy load on the upstream MySQL database. To reduce the load on the upstream MySQL database, you can synchronize upstream MySQL data to Kafka in real time. The synchronized data is provided to multiple downstream job for consumption.
Prerequisites
You have created a Realtime Compute for Apache Flink workspace. For more information, see Create a workspace.
An ApsaraMQ for Kafka instance is created. For more information, see Step 2: Purchase and deploy an instance.
An ApsaraDB RDS for MySQL instance is created. For more information, see Create an RDS for MySQL instance.
The Realtime Compute for Apache Flink workspace, ApsaraDB RDS for MySQL instance, and ApsaraMQ for Kafka instance reside in the same virtual private cloud (VPC). If they do not reside in the same VPC, you must establish connections between the VPCs or enable Realtime Compute for Apache Flink to access other services over the Internet. For more information, see How does Realtime Compute for Apache Flink access a service across VPCs? and How does Realtime Compute for Apache Flink access the Internet?
The RAM user or RAM role has access to relevant resources.
Preparations
Create an ApsaraDB RDS for MySQL instance and prepare source data
Create a database for the ApsaraDB RDS for MySQL instance. For more information, see Create a database.
Create a database named
order_dwfor the ApsaraDB RDS for MySQL instance.Prepare sample source data.
In the upper-right corner of the details page of the desired instance, click Log On to Database.
In the Log on to Database Instance dialog box, configure the Database Account and Database Password parameters and click Login.
After the logon is successful, double-click the
order_dwdatabase in the left-side navigation pane to switch the database.On the SQL Console tab, enter the DDL statements that are used to create three business tables in the order_dw database and insert data into the business tables.
CREATE TABLE `user` ( id bigint not null primary key, name varchar(50) not null ); CREATE TABLE `order` ( id bigint not null primary key, product varchar(50) not null, user_id bigint not null ); CREATE TABLE `feedback` ( id bigint not null primary key, user_id bigint not null, comment varchar(50) not null ); -- Prepare data. INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry'); INSERT INTO `order` VALUES (1, 'Football', 2), (2, 'Basket', 1); INSERT INTO `feedback` VALUES (1, 1, 'Good.'), (2, 2, 'Very good');
Click Execute. On the panel that appears, click Execute.
Procedure
Create a Flink CDC job that replicates data from MySQL to Kafka in real time.
This job automatically creates new Kafka topics. You can define Kafka topic names within the
routemodule. Topic partitions and replicas will default to your Kafka cluster's settings. Thecleanup.policywill be set tocompact.Use default topic names
By default, Kafka topic names are formed by concatenating MySQL database and table names with a period (
.). This job creates three topics:order_dw.user,order_dw.order, andorder_dw.feedback.In the Development Console, navigate to , create a blank draft, and copy and paste the following code to the editor.
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # Ingesting data to ApsaraMQ for Kafka requires the following additional options. aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}Click Deploy.
In the left navigation menu, choose . Click Start in the Actions column of your job deployment. In the Start Job panel, select Initial Mode and click Start.
Customize topic names separately
Specify the name per topic in the
routemodule. This job creates three topics with custom names:user1,order2, andfeedback3.In the Development Console, navigate to , create a blank draft, and copy and paste the following code to the editor.
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 route: - source-table: order_dw.user sink-table: user1 - source-table: order_dw.order sink-table: order2 - source-table: order_dw.feedback sink-table: feedback3 sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # Ingesting data to ApsaraMQ for Kafka requires the following additional options. aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}Click Deploy.
In the left navigation menu, choose . Click Start in the Actions column of your job deployment. In the Start Job panel, select Initial Mode and click Start.
Customize topic names in a batch
Use a pattern to define custom topic names in the
routemodule. This job creates three topics:topic_user,topic_order, andtopic_feedback.In the Development Console, navigate to , create a blank draft, and copy and paste the following code to the editor.
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 route: - source-table: order_dw.\.* sink-table: topic_<> replace-symbol: <> sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # Ingesting data to ApsaraMQ for Kafka requires the following additional options. aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}Click Deploy.
In the left navigation menu, choose . Click Start in the Actions column of your job deployment. In the Start Job panel, select Initial Mode and click Start.
Consume Kafka data in real time.
The previous job ingests data to Kafka in the JSON format. By consuming data in the Kafka topics, one or more downstream systems can get the latest MySQL data. Choose one of the following data consumption methods:
Consume data using the Kafka JSON catalog
If a table is used as a source table, data in the table can be read from the Kafka topic that matches the table by using the Kafka JSON catalog.
In the left-side navigation pane, choose . On the page that appears, create an SQL stream draft and copy the following code to the SQL editor:
CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; -- This is required when you write data to multiple sinks. -- Join the order table and the user table in the Kafka JSON catalog to display the username and product name of each order. INSERT INTO print_user_proudct SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `order` -- Specify the group and startup mode. LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Specify the group and startup mode. ON `order`.value_user_id = `user`.key_id; -- Join the feedback table and the user table to display the content and username of each comment. INSERT INTO print_user_feedback SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as feedback -- Specify the group and startup mode. LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- Specify the group and startup mode. ON feedback.value_user_id = `user`.key_id; END; -- Required when you write data to multiple sinks.In this example, the Print connector is used to directly display the results. You can also export the results to a selected sink table for further analysis and processing. For information about the syntax of the statement that is used to write data to multiple sinks, see INSERT INTO.
NoteIf the schema of a MySQL table is changed during data synchronization, the schema that is returned after the table is parsed by the Kafka JSON catalog may be different from the schema of the MySQL table. For example, if specific fields are deleted from the MySQL table during data synchronization but the deleted fields appear in the schema that is returned after the table is parsed by the Kafka JSON catalog, the values of the fields may be null.
The schema that is returned by the Kafka JSON catalog contains the fields in the messages that are consumed. If specific fields that are deleted exist in a message that is consumed and the message is not expired, the deleted fields may appear in the schema that is returned after the table is parsed by the Kafka JSON catalog. In this case, the values of the fields are null. If this occurs, no action is required.
In the upper-right corner of the SQL editor, click Deploy.
In the left navigation menu, choose . On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel that appears, select Initial Mode, and then click Start.
Consume data using a temporary table
You can specify a custom schema to read data from a temporary table.
In the left navigation menu, choose . On the page that appears, create an SQL stream draft and copy the following code to the SQL editor:
CREATE TEMPORARY TABLE user_source ( key_id BIGINT, value_name STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE order_source ( key_id BIGINT, value_product STRING, value_user_id BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'order', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE feedback_source ( key_id BIGINT, value_user_id BIGINT, value_comment STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'feedback', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; -- Required when you write data to multiple sinks. -- Join the order table and the user table in the Kafka JSON catalog to display the username and product name of each order. INSERT INTO print_user_proudct SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name FROM order_source LEFT JOIN user_source ON order_source.value_user_id = user_source.key_id; -- Join the feedback table and the user table to display the content and username of each comment. INSERT INTO print_user_feedback SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name FROM feedback_source LEFT JOIN user_source ON feedback_source.value_user_id = user_source.key_id; END; -- Required when you write data to multiple sinks.In this example, the Print connector is used to directly display the results. You can also export the results to a result table of the connector for further analysis and processing. For more information about the syntax of the statement that is used to write data to multiple sinks, see INSERT INTO.
The following table describes the connector options of a temporary table.
Option
Description
Remarks
connector
The type of the connector.
Set the value to
kafka.topic
The name of the topic.
Set the value to the topic of the table that is accessed by the Kafka JSON catalog.
properties.bootstrap.servers
The IP addresses or endpoints of Kafka brokers.
Format:
host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).scan.startup.mode
The start offset from which data is read from Kafka.
Valid values:
earliest-offset: reads data from the earliest partition of Kafka.
latest-offset: reads data from the latest partition of Kafka.
group-offsets: reads data from the offset that is committed by the consumer group whose ID is specified by the properties.group.id parameter. This is the default value.
timestamp: reads data from the timestamp that is specified by the scan.startup.timestamp-millis parameter.
specific-offsets: reads data from the offset that is specified by the scan.startup.specific-offsets parameter.
Note
This parameter takes effect when the deployment is started without states. When the deployment is restarted from a checkpoint or resumed from the specified state, the deployment preferentially starts to read data from the progress that is saved in the state data.
key.format
The format that the Flink Kafka connector uses to serialize or deserialize the key field in a Kafka message.
Set the value to json.
key.fields
The key fields in the source table or sink table that correspond to the key fields of Kafka messages.
Separate multiple field names with semicolons (;). Example:
field 1;field2.key.fields-prefix
A custom prefix for all key fields in Kafka messages. You can configure this parameter to prevent name conflicts with the value fields or metadata fields.
Set the value to the value of the key.fields-prefix parameter of the Kafka JSON catalog.
value.format
The format that the Flink Kafka connector uses to serialize or deserialize the value fields in a Kafka message.
Set the value to json.
value.fields-prefix
A custom prefix for all value fields in Kafka messages. You can configure this parameter to prevent name conflicts with the key fields or metadata fields.
Set the value to the value of the value.fields-prefix parameter of the Kafka JSON catalog.
value.fields-include
The policy that is used to process the key field when value fields are parsed.
Set the value to EXCEPT_KEY. If this parameter is set to EXCEPT_KEY, the key field is excluded when value fields are parsed.
value.json.infer-schema.flatten-nested-columns.enable
Specifies whether to recursively expand nested columns in a JSON text when the value fields in the JSON-formatted Kafka message are parsed.
Set the value to the value of the infer-schema.flatten-nested-columns.enable parameter of the Kafka JSON catalog.
value.json.infer-schema.primitive-as-string
Specifies whether to infer all basic types as the STRING type when the value fields in the JSON-formatted Kafka message are parsed.
Set the value to the same as the value of the infer-schema.primitive-as-string parameter of the Kafka JSON catalog.
In the upper-right corner of the SQL editor, click Deploy.
In the left-side navigation pane, choose . On the Deployments page, find your target deployment, and click Start in the Actions column. In the Start Job panel, select Initial Mode, and then click Start.
View the job result.
In the left-side navigation pane, choose . On the Deployments page, click the name of your job deployment.
Click the Logs tab. Then, click the Running Task Managers tab and view the jobs in the Path, ID column.
In the left-side pane of the Logs tab, click Logs. Then, click the Logs tab and search for logs related to
PrintSinkOutputWriter.