This topic describes how to write data from Apache Flink to an AnalyticDB for MySQL cluster.
Prerequisites
- An Apache Flink driver is downloaded and deployed to the ${Flink deployment directory}/lib directory of all Apache Flink nodes. You can download the driver corresponding to
your Apache Flink version. The following section lists the drivers corresponding to
the Apache Flink versions:
- Apache Flink 1.11: flink-connector-jdbc_2.11-1.11.0.jar
- Apache Flink 1.12: flink-connector-jdbc_2.11-1.12.0.jar
- Apache Flink 1.13: flink-connector-jdbc_2.11-1.13.0.jar
To download drivers of other Apache Flink versions, go to the JDBC SQL Connector page.
- A MySQL driver is downloaded and deployed to the ${Flink deployment directory}/lib directory of all Apache Flink nodes.
Note The version of the MySQL driver must be 5.1.40 or later. To download MySQL drivers, go to the mysql/mysql-connector-java page.
- The Apache Flink cluster is restarted after all JAR packages are deployed. For more information about how to start a cluster, see Step 2: Start a Cluster.
- A database and a table are created in the destination AnalyticDB for MySQL cluster to store the data to be written. For more information how to create a database
and a table, see CREATE DATABASE and CREATE TABLE.
Note
- In this example, the
tpch
database is used. Execute the following statement to create the database:CREATE DATABASE IF NOT EXISTS tpch;
- In this example, the
person
database is used. Execute the following statement to create the table:CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
- In this example, the
- If the AnalyticDB for MySQL cluster is in elastic mode, you must turn on ENI in the Network Information section of the Cluster Information page.
Precautions
- This topic describes only how to use Apache Flink SQL to create a table and write data to AnalyticDB for MySQL. For more information about how to use Apache Flink JDBC API to write data, see JDBC Connector.
- The method described in this topic is applicable only to Apache Flink 1.11 and later.
To write data of other Apache Flink versions to an AnalyticDB for MySQL cluster, you can use one of the following methods:
- For information about how to write data of Apache Flink 1.10 and 1.09, see JDBC Connector.
- For information about how to write data of Apache Flink 1.08 and earlier, see JDBCAppendTableSink.
Procedure
Note In this example, a CSV file is used as the source data to be written.
Step | Description |
---|---|
Step 1: Prepare data | Create a CSV file, write source data to the file, and then deploy the new file to the /root directory of all Apache Flink nodes. |
Step 2: Write data | Use SQL statements to create a source table and a result table in Apache Flink and write the source data to AnalyticDB for MySQL by using these tables. |
Step 3: Verify data | Log on to the destination AnalyticDB for MySQL database to check whether the source data is written. |
Step 1: Prepare data
Step 2: Write data
Step 3: Verify data
After the data is written, you can log on to the
tpch
database of the AnalyticDB for MySQL cluster and execute the following statement to check whether the source data is written
to the person
table:SELECT * FROM person;