This topic describes how to write data from Apache Flink to an AnalyticDB for MySQL cluster.

Prerequisites

  • An Apache Flink driver is downloaded and deployed to the ${Flink deployment directory}/lib directory of all Apache Flink nodes. You can download the driver corresponding to your Apache Flink version. The following section lists the drivers corresponding to the Apache Flink versions:

    To download drivers of other Apache Flink versions, go to the JDBC SQL Connector page.

  • A MySQL driver is downloaded and deployed to the ${Flink deployment directory}/lib directory of all Apache Flink nodes.
    Note The version of the MySQL driver must be 5.1.40 or later. To download MySQL drivers, go to the mysql/mysql-connector-java page.
  • The Apache Flink cluster is restarted after all JAR packages are deployed. For more information about how to start a cluster, see Step 2: Start a Cluster.
  • A database and a table are created in the destination AnalyticDB for MySQL cluster to store the data to be written. For more information how to create a database and a table, see CREATE DATABASE and CREATE TABLE.
    Note
    • In this example, the tpch database is used. Execute the following statement to create the database:
      CREATE DATABASE IF NOT EXISTS tpch;
    • In this example, the person database is used. Execute the following statement to create the table:
      CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
  • If the AnalyticDB for MySQL cluster is in elastic mode, you must turn on ENI in the Network Information section of the Cluster Information page. Enable ENI

Precautions

  • This topic describes only how to use Apache Flink SQL to create a table and write data to AnalyticDB for MySQL. For more information about how to use Apache Flink JDBC API to write data, see JDBC Connector.
  • The method described in this topic is applicable only to Apache Flink 1.11 and later. To write data of other Apache Flink versions to an AnalyticDB for MySQL cluster, you can use one of the following methods:
    • For information about how to write data of Apache Flink 1.10 and 1.09, see JDBC Connector.
    • For information about how to write data of Apache Flink 1.08 and earlier, see JDBCAppendTableSink.

Procedure

Note In this example, a CSV file is used as the source data to be written.
Step Description
Step 1: Prepare data Create a CSV file, write source data to the file, and then deploy the new file to the /root directory of all Apache Flink nodes.
Step 2: Write data Use SQL statements to create a source table and a result table in Apache Flink and write the source data to AnalyticDB for MySQL by using these tables.
Step 3: Verify data Log on to the destination AnalyticDB for MySQL database to check whether the source data is written.

Step 1: Prepare data

  1. In the root directory of an Apache Flink node, run the vim /root/data.csv command to create a CSV file named data.csv.
    The CSV file contains the following data. You can copy more rows of data to increase the amount of data to be written.
    0,json00,20
    1,json01,21
    2,json02,22
    3,json03,23
    4,json04,24
    5,json05,25
    6,json06,26
    7,json07,27
    8,json08,28
    9,json09,29
  2. After the CSV file is created, deploy the file to the /root directory of other Apache Flink nodes.

Step 2: Write data

  1. Start and run the Apache Flink SQL application. For more information about the procedures, see Starting the SQL Client CLI.
  2. Execute the following statement to create a source table named csv_person:
    CREATE TABLE if not exists csv_person (
      `user_id` STRING,
      `user_name` STRING,
      `age` INT
    ) WITH (
      'connector' = 'filesystem',
      'path' = 'file:///root/data.csv',
      'format' = 'csv',
      'csv.ignore-parse-errors' = 'true',
      'csv.allow-comments' = 'true'
    );
    Note
    • The column names and data types of the source table must be the same as those of the destination AnalyticDB for MySQL table.
    • The path field in the preceding statement indicates the directory of the data.csv file in your computer. The path must be the same for all Apache Flink nodes. If your data.csv file is not stored in your computer, specify a path based on your actual situation.

      For more information about other parameters in the preceding statement, see FileSystem SQL Connector.

  3. Execute the following statement to create a result table named mysql_person:
    CREATE TABLE mysql_person (
      user_id String,
      user_name String,
      age INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true',
      'table-name' = '<table_name>',
      'username' = '<username>',
      'password' = '<password>',
      'sink.buffer-flush.max-rows' = '10',
      'sink.buffer-flush.interval' = '1s'
      );
    Note
    • The column names and data types of the result table must be the same as those of the destination AnalyticDB for MySQL table.
    • The following table lists the parameters required to connect to an AnalyticDB for MySQL cluster. For more information about the optional parameters, see Connector Options.
    Parameter Description
    connector The connector type used by Apache Flink. Select jdbc.
    url The JDBC URL of the AnalyticDB for MySQL cluster.

    Format: jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true'.

    • endpoint: the endpoint of the destination AnalyticDB for MySQL cluster.
      Note If you want to use a public endpoint to connect to the cluster, you must first apply for a public endpoint. For more information about how to apply for a public endpoint, see Apply for a public endpoint.
    • db_name: the name of the destination AnalyticDB for MySQL database.
    • useServerPrepStmts=false&rewriteBatchedStatements=true: the configuration required to batch write data to AnalyticDB for MySQL. This configuration is used to improve the write performance and reduce the load on the AnalyticDB for MySQL cluster.

    Example: jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true.

    table-name The name of the destination AnalyticDB for MySQL table that is used to store the data to be written. In this example, the destination table is named person.
    username The AnalyticDB for MySQL database account that has write permissions.
    Note
    • You can view the permissions owned by the current account by using the SHOW GRANTS statement.
    • You can grant permissions to an account by using the GRANT statement.
    password The password of the AnalyticDB for MySQL database account that has write permissions.
    sink.buffer-flush.max-rows The maximum number of rows that can be written from Apache Flink to AnalyticDB for MySQL. Apache Flink receives data in real time. When the number of data rows it receives reaches the upper limit, these data rows are batch written to the AnalyticDB for MySQL cluster. Valid values:
    • 0: When this parameter is set to 0, data is batch written only when the maximum time interval specified by the sink.buffer-flush.interval parameter is reached.
    • The specific number of rows. Examples: 1000 and 2000.
    Note We recommend that you do not set this parameter to 0. If you set this parameter to 0, write performance degrades and the load on the AnalyticDB for MySQL cluster increases during concurrent queries.
    If both the sink.buffer-flush.max-rows and sink.buffer-flush.interval parameters are set to values other than 0, the following rules are required for the batch write feature to take effect:
    • If the number of data rows received by Apache Flink specified by the sink.buffer-flush.max-rows parameter is reached but the maximum time interval specified by the sink.buffer-flush.interval parameter is not reached, Apache Flink batch writes data to AnalyticDB for MySQL without the need to wait for the maximum time interval to be reached.
    • If the number of data rows received by Apache Flink specified by the sink.buffer-flush.max-rows parameter is not reached but the maximum time interval specified by the sink.buffer-flush.interval parameter is reached, Apache Flink still batch writes data to AnalyticDB for MySQL no matter how much data is received.
    sink.buffer-flush.interval The maximum time interval for Apache Flink to batch write data to AnalyticDB for MySQL, also known as the maximum amount of time to wait before the next batch write. Valid values:
    • 0: When this parameter is set to 0, data is batch written only when the maximum number of data rows specified by the sink.buffer-flush.max-rows parameter is reached.
    • The specific time interval. Examples: 1d, 1h, 1min, 1s, and 1ms.
    Note When the amount of source data generated is small, we recommend that you do not set this parameter to 0 to ensure the timely write of source data during off-peak hours.
  4. Execute the following statement to write data to the AnalyticDB for MySQL cluster by using the source table and the result table:
    INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;

Step 3: Verify data

After the data is written, you can log on to the tpch database of the AnalyticDB for MySQL cluster and execute the following statement to check whether the source data is written to the person table:
SELECT * FROM person;