Use the Apache Flink JDBC SQL connector to write streaming or batch data into an AnalyticDB for MySQL Data Warehouse Edition cluster. This guide covers Flink 1.11 and later.
This guide uses Apache Flink SQL. To write data using the Flink Java Database Connectivity (JDBC) API instead, see JDBC Connector.
Prerequisites
Before you begin, ensure that you have:
An AnalyticDB for MySQL Data Warehouse Edition cluster with a database and table ready to receive data. See CREATE DATABASE and CREATE TABLE
An Apache Flink cluster running version 1.11 or later
The Flink JDBC connector JAR and a MySQL driver JAR (version 5.1.40 or later) deployed to the
${Flink deployment directory}/libdirectory on all Flink nodes, and the cluster restarted after deployment(Elastic mode clusters only) ENI enabled on the Cluster Information page under Network Information
Enabling or disabling ENI interrupts all database connections for approximately 2 minutes, during which read and write operations are unavailable.
Download the required JAR files
Flink JDBC connector
Download the connector JAR that matches your Flink version. For versions not listed below, see JDBC SQL Connector.
| Flink version | JAR file |
|---|---|
| 1.11 | flink-connector-jdbc_2.11-1.11.0.jar |
| 1.12 | flink-connector-jdbc_2.11-1.12.0.jar |
| 1.13 | flink-connector-jdbc_2.11-1.13.0.jar |
MySQL driver
Download a MySQL driver (version 5.1.40 or later) from mysql/mysql-connector-java.
After placing both JARs in the lib directory on every Flink node, restart the cluster. For cluster startup instructions, see Step 2: Start a cluster.
Version compatibility for older Flink releases
For Flink versions earlier than 1.11:
Flink 1.9 and 1.10: Flink v1.10 JDBC connector
Flink 1.8 and earlier: Flink v1.8 JDBCAppendTableSink
Step 1: Prepare source data
This example uses a local CSV file as the data source.
On any Flink node, create
/root/data.csvwith the following content:0,json00,20 1,json01,21 2,json02,22 3,json03,23 4,json04,24 5,json05,25 6,json06,26 7,json07,27 8,json08,28 9,json09,29Copy the file to
/root/data.csvon all other Flink nodes. The path must be identical on every node.
Step 2: Write data to AnalyticDB for MySQL
All SQL in this step runs in the Flink SQL Client CLI. To start it, see Starting the SQL Client CLI.
Create the source table
Create a Flink source table that reads from the CSV file:
CREATE TABLE IF NOT EXISTS csv_person (
`user_id` STRING,
`user_name` STRING,
`age` INT
) WITH (
'connector' = 'filesystem',
'path' = 'file:///root/data.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
);Column names and data types must match the destination table in AnalyticDB for MySQL. The path value must point to the same absolute path on every Flink node. For other connector options, see FileSystem SQL Connector.
Create the result table
Create a Flink result table that maps to the destination table in AnalyticDB for MySQL:
CREATE TABLE mysql_person (
user_id STRING,
user_name STRING,
age INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://<endpoint>:<port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true',
'table-name' = '<table_name>',
'username' = '<username>',
'password' = '<password>',
'sink.buffer-flush.max-rows' = '10',
'sink.buffer-flush.interval' = '1s'
);Replace the placeholders before running this statement:
| Placeholder | Description | Example |
|---|---|---|
<endpoint> | Cluster endpoint. To use a public endpoint, first apply for one. | am-**********.ads.aliyuncs.com |
<port> | Cluster port | 3306 |
<db_name> | Destination database name | tpch |
<table_name> | Destination table name | person |
<username> | Database account with write permissions. Run SHOW GRANTS to check permissions, or GRANT to assign them. | — |
<password> | Password for the database account | — |
Connector parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
connector | Yes | — | Set to jdbc. |
url | Yes | — | JDBC URL of the cluster. The query string useServerPrepStmts=false&rewriteBatchedStatements=true enables batch writes for better throughput and lower cluster load. |
table-name | Yes | — | Name of the destination table. |
username | Yes | — | Database account with write permissions. |
password | Yes | — | Account password. |
sink.buffer-flush.max-rows | No | — | Maximum number of rows per batch. Set to 0 to flush only on the time interval. Non-zero examples: 1000, 2000. Avoid setting to 0 — it degrades write throughput during concurrent queries. |
sink.buffer-flush.interval | No | — | Maximum time between flushes. Set to 0 to flush only when max-rows is reached. Non-zero examples: 1d, 1h, 1min, 1s, 1ms. Avoid setting to 0 — data may be delayed during low-traffic periods. |
For all available connector options, see Connector options.
Flush behavior
When both sink.buffer-flush.max-rows and sink.buffer-flush.interval are set to non-zero values, a flush triggers when either condition is met first:
The number of buffered rows reaches
sink.buffer-flush.max-rowsThe elapsed time since the last flush reaches
sink.buffer-flush.interval
Run the import
Execute the following statement to start writing data from the source table to AnalyticDB for MySQL:
INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;If the destination table has a primary key and the source data contains duplicate primary key values, INSERT INTO behaves like INSERT IGNORE INTO — duplicate rows are skipped rather than inserted again. See INSERT INTO.
Step 3: Verify the data
After the job completes, log in to the tpch database in your AnalyticDB for MySQL cluster and run:
SELECT * FROM person;