本文为您介绍如何使用云数据库RDS MySQL版连接器。

RDS MySQL基于阿里巴巴的MySQL源码分支,经过双十一高并发、大数据量的考验,拥有优良的性能。RDS MySQL支持实例管理、账号管理、数据库管理、备份恢复、白名单、透明数据加密以及数据迁移等基本功能。RDS MySQL详情请参见RDS MySQL云数据库
重要 后续将计划不再支持云数据库RDS MySQL版连接器,建议您直接使用MySQL连接器,使用MySQL连接器请参见MySQL
RDS MySQL连接器支持的信息如下。
类别详情
支持类型结果表和维表
运行模式流模式与批模式
数据格式暂不适用
特有监控指标
  • 维表:无
  • 结果表
    • numRecordsOut
    • numRecordsOutPerSecond
    • numBytesOut
    • numBytesOutPerSecond
    • currentSendTime
    • numRecordsOutErrors
说明 指标的含义及如何查看监控指标,请参见查看监控指标
API种类SQL
是否支持更新或删除结果表数据

前提条件

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持RDS MySQL连接器。
  • 仅支持阿里云RDS MySQL云数据库
  • 语义上可以保证At-Least-Once,在结果表有主键的情况下,幂等可以保证数据的正确性。
  • 推荐您使用最新版本的Flink(例如6.x以上),以获取最新的性能与稳定性优化。

注意事项

RDS MySQL连接器后续会逐步下线,建议您在功能满足的前提下使用MySQL连接器,详情请参见MySQL

语法结构

  • 结果表
    CREATE TABLE rds_sink (
      id INT,
      num BIGINT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'rds',
      'tableName' = 'your-table-name',
      'userName' = 'your-user-name',
      'password' = 'your-password',
      'url' = 'your-url'
    );
    说明
    • Flink RDS连接器写入数据库结果表原理:针对Flink Sink输出数据,拼接成一行SQL语句,然后执行。对于没有主键的结果表,会执行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);语句。对于包含主键的结果表,会执行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;语句。
    • 如果在RDS MySQL云数据库定义了自增主键,在Flink DDL中不应该声明该自增字段。数据写入过程中,数据库会自动填补该自增字段。Flink RDS连接器仅支持写入和删除带自增字段的数据,不支持更新。
  • 维表
    CREATE TABLE rds_dim (
      id1 INT,
      id2 VARCHAR
    ) WITH (
      'connector' = 'rds',
      'tableName' = 'your-table-name',
      'userName' = 'your-user-name',
      'password' = 'your-password',
      'url' = 'your-url'
      'cache' = 'NONE'
    );

WITH参数

  • 通用
    参数说明数据类型是否必填默认值备注
    connector表类型。String无。固定值为rds。
    tableName表名。String无。无。
    userName用户名。String无。无。
    password密码。String无。无。
    url表地址。String无。RDS MySQL云数据库专有网络VPC地址,即内网地址,详情请​参见查看或修改内外网地址和端口

    URL的格式为:jdbc:mysql://<内网地址>/<数据库名称>

    说明 对于结果表,建议在URL后面加上参数?rewriteBatchedStatements=true,以提高系统性能。
    maxRetryTimes查询维表或者写数据到结果表失败后,最多重试次数。Integer
    • 在Flink计算引擎VVR 4.0.7及以上版本,该参数默认值为10。
    • 在Flink计算引擎VVR 4.0.6及以下版本,该参数默认值为3。
    无。
  • 结果表独有
    参数说明数据类型是否必填默认值备注
    batchSize一次批量写入的条数。Integer
    • 在Flink计算引擎VVR 4.0.7及以上版本,该参数默认值为4096。
    • 在Flink计算引擎VVR 4.0.0~4.0.6版本,该参数默认值为5000。
    • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为100。
    无。
    bufferSize内存中缓存的最大数据条数。batchSize或 bufferSize任一到达阈值都会触发数据写操作。Integer10000
    • 仅Flink计算引擎VVR 4.0.7及以上版本支持该参数。
    • 需指定主键后,该参数才生效。
    flushIntervalMsflush内存缓冲区的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件(batchSize或bufferSize),系统统会自动写出缓存中的所有数据到结果表。Integer
    • 在Flink计算引擎VVR 4.0.7及以上版本,该参数默认值为2000。
    • 在Flink计算引擎VVR 4.0.0~4.0.6版本,该参数默认值为0。
    • 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为1000。
    在默认值为0的版本中,如果不配置该参数,可能导致少量数据永远无法写出到结果表。建议您采用更高版本的Flink。
    ignoreDelete是否忽略数据Delete操作。BooleanfalseFlink SQL可能会生成数据Delete操作,在多个输出节点根据主键同时更新同一张结果表的不同字段的场景下,可能导致数据结果不正确。

    例如一个任务在删除了一条数据后,另一个任务又只更新了这条数据的部分字段,其余未被更新的字段由于被删除,其值会变成null或默认值。通过将ignoreDelete设置为true,可以避免数据删除操作。

    connectionMaxActive数据库连接池大小Integer40
    • 仅Flink计算引擎VVR 4.0.7及以上版本支持该参数。
    • 如果出现获取连接超时的问题,可能是连接池不够用,可适当增大连接池的大小。
    • 如果数据库能支持的最大并发连接比较小,可适当减小连接池大小或者减小作业节点并行度。
  • 维表独有
    参数说明数据类型是否必填默认值备注
    cache维表缓存策略。String
    • 在Flink计算引擎VVR 4.0.6之前版本,缓存策略默认值为None。
    • 在Flink计算引擎VVR 4.0.6及以上版本,缓存策略的默认值为ALL。
    数据缓存的目的是加速维表的查询,但缓存的数据无法保证是最新的。Flink RDS MySQL连接器支持三种维表数据缓存策略:
    • None:无缓存,每条数据都会查询数据库维表。
    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
    • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在。这个全量的Cache有一个过期时间,过期后后重新加载一遍全量Cache。

      适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。

    cacheSize缓存大小。Integer100000
    • 当选择LRU缓存策略后,需要设置缓存大小。
    • 当选择None或ALL缓存策略时,不必设置缓存大小。
    cacheTTLMs缓存超时时间。Long
    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
    • 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。
    单位为毫秒。
    maxJoinRows主表中每一条数据查询维表时,匹配后最多返回的结果数。Integer1024进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。

    如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows='n',以确保实时计算匹配处理效率。

类型映射

Flink字段类型RDS MySQL字段类型
BOOLEANBOOLEAN
TINYINTTINYINT
TINYINT(1)
说明 仅维表支持该映射
BOOLEAN
SMALLINTSMALLINT
SMALLINTTINYINT UNSIGNED
INTINT
INTSMALLINT UNSIGNED
BIGINTBIGINT
BIGINTINT UNSIGNED
DECIMAL(20,0)BIGINT UNSIGNED
FLOATFLOAT
DECIMALDECIMAL
DOUBLEDOUBLE
DATEDATE
TIMETIME
IMESTAMPIMESTAMP
VARCHARVARCHAR
VARBINARYVARBINARY

使用示例

  • 结果表
    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE rds_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'rds',
      'password' = 'your-password',
      'tableName' = 'your-tablename',
      'url' = 'your-url',
      'userName' = 'your-username'
    );
    
    INSERT INTO rds_sink
    SELECT  * FROM datagen_source;
  • 维表
    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE rds_dim (
      a INT, 
      b VARCHAR, 
      c VARCHAR
    ) WITH (
      'connector' = 'rds',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>',
      'url' = 'jdbc:mysql://xxx',
      'userName' = '<yourUsername>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    INSERT INTO blackhole_sink 
    SELECT T.a,H.b FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;

常见问题