本文为您介绍如何使用StarRocks连接器。
背景信息
StarRocks是新一代极速全场景MPP(Massively Parallel Processing)数据仓库,致力于构建极速和统一分析体验。StarRocks具有以下优势:
StarRocks兼容MySQL协议,可以使用MySQL客户端和常用BI工具对接StarRocks来分析数据。
StarRocks采用分布式架构:
对数据表进行水平划分并以多副本存储。
集群规模可以灵活伸缩,支持10 PB级别的数据分析。
支持MPP框架,并行加速计算。
支持多副本,具有弹性容错能力。
Flink连接器内部的结果表是通过缓存并批量由Stream Load导入实现,源表是通过批量读取数据实现。StarRocks连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | CSV |
特有监控指标 | 暂无 |
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 是 |
特色功能
EMR的StarRocks支持CTAS&CDAS功能,CTAS可以实现单表的结构和数据同步,CDAS可以实现整库同步或者同一库中的多表结构和数据同步,详情请参见基于实时计算Flink使用CTAS&CDAS功能同步MySQL数据至StarRocks。
前提条件
已创建StarRocks集群,包括EMR的StarRocks或基于ECS的云上自建StarRocks。
使用限制
仅实时计算引擎VVR 6.0.5及以上版本支持StarRocks连接器。
StarRocks连接器仅支持at-least-once和exactly-once语义。
语法结构
CREATE TABLE USER_RESULT(
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);
WITH参数
类型 | 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
通用 | connector | 表类型。 | String | 是 | 无 | 固定值为starrocks。 |
jdbc-url | JDBC连接的URL。 | String | 是 | 无 | 指定FE(Front End)的IP和JDBC端口,格式为 | |
database-name | StarRocks数据库名称。 | String | 是 | 无 | 无。 | |
table-name | StarRocks表名称。 | String | 是 | 无 | 无。 | |
username | StarRocks连接用户名。 | String | 是 | 无 | 无。 | |
password | StarRocks连接密码。 | String | 是 | 无 | 无。 | |
starrocks.create.table.properties | StarRocks表属性。 | String | 否 | 无 | 设置数据表初始属性,如引擎、副本数等。例如,'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'。 | |
源表独有 | scan-url | 数据扫描的url。 | String | 否 | 无 | 指定FE(Front End)的IP和HTTP端口,格式为 说明 填写多个IP和端口号时,请使用半角分号(;)进行分隔。 |
scan.connect.timeout-ms | flink-connector-starrocks连接StarRocks的时间上限。 超过该时间上限,将报错。 | String | 否 | 1000 | 单位为毫秒。 | |
scan.params.keep-alive-min | 查询任务的保活时间。 | String | 否 | 10 | 无。 | |
scan.params.query-timeout-s | 查询任务的超时时间。 如果超过该时间,仍未返回查询结果,则停止查询任务。 | String | 否 | 600 | 单位为秒。 | |
scan.params.mem-limit-byte | BE节点中单个查询的内存上限。 | String | 否 | 1073741824(1 GB) | 单位为字节。 | |
scan.max-retries | 查询失败时的最大重试次数。 超过该数量上限,则将报错。 | String | 否 | 1 | 无。 | |
结果表独有 | load-url | 数据导入的URL。 | String | 是 | 无 | 指定FE(Front End)的IP和HTTP端口,格式为 说明 填写多个IP和端口号时,请使用半角分号(;)进行分隔。 |
sink.semantic | 数据写入语义。 | String | 否 | at-least-once | 取值如下:
| |
sink.buffer-flush.max-bytes | Buffer可容纳的最大数据量。 | String | 否 | 94371840(90 MB) | 取值范围为64 MB~10 GB。 | |
sink.buffer-flush.max-rows | Buffer可容纳的最大数据行数。 | String | 否 | 500000 | 取值范围为64,000~5000,000。 | |
sink.buffer-flush.interval-ms | Buffer刷新时间间隔。 | String | 否 | 300000 | 取值范围为1000毫秒~3600000毫秒。 | |
sink.max-retries | 最大重试次数。 | String | 否 | 3 | 取值范围为0~10。 | |
sink.connect.timeout-ms | 连接到starrocks的超时时间。 | String | 否 | 1000 | 取值范围为100~60000。单位为毫秒。 | |
sink.properties.* | 结果表属性。 | String | 否 | 无 | Stream Load的参数控制Stream Load导入行为。例如,参数 sink.properties.format表示Stream Load所导入的数据格式,如CSV。更多参数和解释,请参见Stream Load。 |
类型映射
StarRocks字段类型 | Flink字段类型 |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
代码示例
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
PRIMARY KEY(`runoob_id`)
NOT ENFORCED
) WITH (
'jdbc-url' = 'jdbc:mysql://ip:9030',
'connector' = 'starrocks',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;