本文介绍如何将开源Flink中的数据写入AnalyticDB MySQL版集群。
前提条件
- 下载Flink驱动,并将其部署到Flink所有节点的${flink部署目录}/lib目录下。您可以根据Flink版本下载对应的驱动:
- Flink 1.11版本:flink-connector-jdbc_2.11-1.11.0.jar
- Flink 1.12版本:flink-connector-jdbc_2.11-1.12.0.jar
- Flink 1.13版本:flink-connector-jdbc_2.11-1.13.0.jar
如需其他版本的驱动,请前往JDBC SQL Connector 页面下载。
- 下载MySQL驱动,并将其部署到Flink所有节点的${flink部署目录}/lib目录下。
说明 MySQL驱动版本需为5.1.40或以上,请前往MySQL驱动下载页面下载。
- 部署所有的JAR包后请重启Flink集群。启动方式,请参见Start a Cluster。
- 已在目标AnalyticDB MySQL版集群中创建数据库和数据表,用于保存需要写入的数据。数据库和数据表的创建方法,请参见CREATE DATABASE和CREATE TABLE。
说明
- 本文示例中创建的数据库名称为
tpch
,建库语句如下:CREATE DATABASE IF NOT EXISTS tpch;
- 本文示例中创建的数据表名为
person
,建表语句如下:CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
- 本文示例中创建的数据库名称为
- 如果您的AnalyticDB MySQL集群是弹性模式,您需要在集群信息页面的网络信息区域,打开启用ENI网络的开关。
注意事项
- 本文仅介绍通过Flink SQL创建表并写入数据至AnalyticDB MySQL版的方法。通过Flink JDBC API写入数据的方法,请参见JDBC Connector。
- 本文介绍的方法仅适用于Flink1.11及以上版本。若您需要将其他版本的Flink数据写入AnalyticDB MySQL版集群,那么:
- 针对Flink1.10和Flink1.09版本,数据写入方法,请参见Flink 1.10 Documentation: Connect to External Systems。
- 针对Flink1.08及以下版本,数据写入方法,请参见Flink 1.08 Documentation: Connect to External Systems。
流程介绍
说明 本文示例以CSV格式的文件作为输入源介绍数据写入流程。
步骤 | 说明 |
---|---|
步骤一:数据准备 | 创建一个新的CSV文件并在文件中写入源数据,然后将新文件部署至Flink所有节点的/root下。 |
步骤二:数据写入 | 通过SQL语句在Flink中创建源表和结果表,并通过源表和结果表将数据写入AnalyticDB MySQL中。 |
步骤三:数据验证 | 登录AnalyticDB MySQL目标数据库,来查看并验证源数据是否成功导入。 |
步骤一:数据准备
步骤二:数据写入
步骤三:数据验证
导入完成后,您可以登录AnalyticDB MySQL集群的目标库
tpch
,执行如下语句查看并验证源数据是否成功导入至目标表person
中:SELECT * FROM person;