All Products
Search
Document Center

AnalyticDB for MySQL:Import data from Apache Flink

Last Updated:Jan 31, 2024

This topic describes how to import data from Apache Flink to AnalyticDB for MySQL Data Warehouse Edition (V3.0).

Prerequisites

  • An Apache Flink driver is downloaded and deployed to the ${Flink deployment directory}/lib directory of all Apache Flink nodes. You can download the driver corresponding to your Apache Flink version. The following section lists the drivers corresponding to the Apache Flink versions:

    To download drivers of other Apache Flink versions, go to the JDBC SQL Connector page.

  • A MySQL driver is downloaded and deployed to the ${Flink deployment directory}/lib directory of all Apache Flink nodes.

    Note

    The version of the MySQL driver must be 5.1.40 or later. To download MySQL drivers, go to the mysql/mysql-connector-java page.

  • The Apache Flink cluster is restarted after all JAR packages are deployed. For more information about how to start a cluster, see Step 2: Start a Cluster.

  • A database and a table are created in the destination AnalyticDB for MySQL cluster to store the data to be written. For more information about how to create a database and a table, see CREATE DATABASE and CREATE TABLE.

    Note
    • In this example, the tpch database is used. Execute the following statement to create the database:

      CREATE DATABASE IF NOT EXISTS tpch;
    • In this example, the person database is used. Execute the following statement to create the table:

      CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
  • If the AnalyticDB for MySQL cluster is in elastic mode, you must turn on ENI in the Network Information section of the Cluster Information page.启用ENI网络

Precautions

  • This topic describes only how to use Apache Flink SQL to create a table and write data to AnalyticDB for MySQL. For more information about how to use Apache Flink JDBC API to write data, see JDBC Connector.

  • The method described in this topic is applicable only to Apache Flink 1.11 and later. To write data of other Apache Flink versions to an AnalyticDB for MySQL cluster, you can use one of the following methods:

    • For information about how to write data of Apache Flink 1.10 and 1.9, see JDBC Connector .

    • For information about how to write data of Apache Flink 1.8 and earlier, see JDBCAppendTableSink .

Procedure

Note

In this example, a CSV file is used as the source data to be written.

Step

Description

Step 1: Prepare data

Create a CSV file, write source data to the file, and then deploy the new file to the /root directory of all Apache Flink nodes.

Step 2: Write data

Use SQL statements to create a source table and a result table in Apache Flink and write the source data to AnalyticDB for MySQL by using these tables.

Step 3: Verify data

Log on to the destination AnalyticDB for MySQL database to check whether the source data is written.

Step 1: Prepare data

  1. In the root directory of an Apache Flink node, run the vim /root/data.csv command to create a CSV file named data.csv.

    The CSV file contains the following data. You can copy more rows of data to increase the amount of data to be written.

    0,json00,20
    1,json01,21
    2,json02,22
    3,json03,23
    4,json04,24
    5,json05,25
    6,json06,26
    7,json07,27
    8,json08,28
    9,json09,29
  2. After the CSV file is created, deploy the file to the /root directory of other Apache Flink nodes.

Step 2: Write data

  1. Start and run the Apache Flink SQL application. For more information about the procedures, see Starting the SQL Client CLI.

  2. Execute the following statement to create a source table named csv_person:

    CREATE TABLE if not exists csv_person (
      `user_id` STRING,
      `user_name` STRING,
      `age` INT
    ) WITH (
      'connector' = 'filesystem',
      'path' = 'file:///root/data.csv',
      'format' = 'csv',
      'csv.ignore-parse-errors' = 'true',
      'csv.allow-comments' = 'true'
    );
    Note
    • The column names and data types of the source table must be the same as those of the destination AnalyticDB for MySQL table.

    • The path field in the preceding statement indicates the directory of the data.csv file in your computer. The path must be the same for all Apache Flink nodes. If your data.csv file is not stored in your computer, specify a path based on your actual situation.

      For more information about other parameters in the preceding statement, see FileSystem SQL Connector.

  3. Execute the following statement to create a result table named mysql_person:

    CREATE TABLE mysql_person (
      user_id String,
      user_name String,
      age INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true',
      'table-name' = '<table_name>',
      'username' = '<username>',
      'password' = '<password>',
      'sink.buffer-flush.max-rows' = '10',
      'sink.buffer-flush.interval' = '1s'
      );
    Note
    • The column names and data types of the result table must be the same as those of the destination AnalyticDB for MySQL table.

    • The following table describes the parameters required to connect to an AnalyticDB for MySQL cluster. For more information about the optional parameters, see the "Connector options" section of the JDBC SQL Connector topic.

    Parameter

    Description

    connector

    The connector type used by Apache Flink. Select jdbc.

    url

    The JDBC URL of the AnalyticDB for MySQL cluster.

    Format: jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true'.

    • endpoint: the endpoint of the destination AnalyticDB for MySQL cluster.

      Note

      If you want to use a public endpoint to connect to the cluster, you must first apply for a public endpoint. For more information about how to apply for a public endpoint, see Apply for or release a public endpoint.

    • db_name: the name of the destination AnalyticDB for MySQL database.

    • useServerPrepStmts=false&rewriteBatchedStatements=true: the configuration required to batch write data to AnalyticDB for MySQL. This configuration is used to improve the write performance and reduce the load on the AnalyticDB for MySQL cluster.

    Example: jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true.

    table-name

    The name of the destination AnalyticDB for MySQL table that is used to store the data to be written. In this example, the destination table is named person.

    username

    The AnalyticDB for MySQL database account that has write permissions.

    Note
    • You can use the SHOW GRANTS statement to view the permissions of the current account.

    • You can use the GRANT statement to grant permissions to an account.

    password

    The password of the AnalyticDB for MySQL database account that has write permissions.

    sink.buffer-flush.max-rows

    The maximum number of rows that can be written from Apache Flink to AnalyticDB for MySQL. Apache Flink receives data in real time. When the number of data rows it receives reaches the upper limit, these data rows are batch written to the AnalyticDB for MySQL cluster. Valid values:

    • 0: If you set this parameter to 0, data is batch written only when the maximum time interval reaches the value of the sink.buffer-flush.interval parameter.

    • The specific number of rows. Examples: 1000 and 2000.

    Note

    We recommend that you do not set this parameter to 0. If you set this parameter to 0, write performance degrades and the load on the AnalyticDB for MySQL cluster increases during concurrent queries.

    If both the sink.buffer-flush.max-rows and sink.buffer-flush.interval parameters are set to values other than 0, the following rules are required for the batch write feature to take effect:

    • If the number of data rows received by Apache Flink reaches the value of the sink.buffer-flush.max-rows parameter but the maximum time interval does not reach the value of the sink.buffer-flush.interval parameter, Apache Flink batch writes data to AnalyticDB for MySQL without the need to wait for the maximum time interval to be reached.

    • If the number of data rows received by Apache Flink does not reach the value of the sink.buffer-flush.max-rows parameter but the maximum time interval reaches the value of the sink.buffer-flush.interval parameter, Apache Flink still batch writes data to AnalyticDB for MySQL no matter how much data is received.

    sink.buffer-flush.interval

    The maximum time interval for Apache Flink to batch write data to AnalyticDB for MySQL, also known as the maximum amount of time to wait before the next batch write. Valid values:

    • 0: If you set this parameter to 0, data is batch written only when the maximum number of data rows reaches the value of the sink.buffer-flush.max-rows parameter.

    • The specific time interval. Examples: 1d, 1h, 1min, 1s, and 1ms.

    Note

    When the amount of source data generated is small, we recommend that you do not set this parameter to 0 to ensure the timely write of source data during off-peak hours.

  4. Execute the following statement to write data to the AnalyticDB for MySQL cluster by using the source table and the result table:

    INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;

Step 3: Verify data

After the data is written, you can log on to the tpch database of the AnalyticDB for MySQL cluster and execute the following statement to check whether the source data is written to the person table:

SELECT * FROM person;