全部产品
Search
文档中心

实时计算Flink版:StarRocks

更新时间:Sep 12, 2024

本文为您介绍如何使用StarRocks连接器。

背景信息

StarRocks是新一代极速全场景MPP(Massively Parallel Processing)数据仓库,致力于构建极速和统一分析体验。StarRocks具有以下优势:

  • StarRocks兼容MySQL协议,可以使用MySQL客户端和常用BI工具对接StarRocks来分析数据。

  • StarRocks采用分布式架构:

    • 对数据表进行水平划分并以多副本存储。

    • 集群规模可以灵活伸缩,支持10 PB级别的数据分析。

    • 支持MPP框架,并行加速计算。

    • 支持多副本,具有弹性容错能力。

Flink连接器内部的结果表是通过缓存并批量由Stream Load导入实现,源表是通过批量读取数据实现。StarRocks连接器支持的信息如下。

类别

详情

支持类型

源表和结果表

运行模式

流模式和批模式

数据格式

CSV

特有监控指标

暂无

API种类

Datastream和SQL

是否支持更新或删除结果表数据

特色功能

EMR的StarRocks支持CTAS&CDAS功能,CTAS可以实现单表的结构和数据同步,CDAS可以实现整库同步或者同一库中的多表结构和数据同步,详情请参见基于实时计算Flink使用CTAS&CDAS功能同步MySQL数据至StarRocks

前提条件

已创建StarRocks集群,包括EMR的StarRocks或基于ECS的云上自建StarRocks。

使用限制

  • 仅实时计算引擎VVR 6.0.5及以上版本支持StarRocks连接器。

  • StarRocks连接器仅支持at-least-once和exactly-once语义。

语法结构

CREATE TABLE USER_RESULT(
 name VARCHAR,
 score BIGINT
 ) WITH (
 'connector' = 'starrocks',
 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
 'database-name' = 'xxx',
 'table-name' = 'xxx',
 'username' = 'xxx',
 'password' = 'xxx'
 );

WITH参数

类型

参数

说明

数据类型

是否必填

默认值

备注

通用

connector

表类型。

String

固定值为starrocks。

jdbc-url

JDBC连接的URL。

String

指定FE(Front End)的IP和JDBC端口,格式为jdbc:mysql://ip:port

database-name

StarRocks数据库名称。

String

无。

table-name

StarRocks表名称。

String

无。

username

StarRocks连接用户名。

String

无。

password

StarRocks连接密码。

String

无。

starrocks.create.table.properties

StarRocks表属性。

String

设置数据表初始属性,如引擎、副本数等。例如,'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'。

源表独有

scan-url

数据扫描的url。

String

指定FE(Front End)的IP和HTTP端口,格式为fe_ip:http_port;fe_ip:http_port

说明

填写多个IP和端口号时,请使用半角分号(;)进行分隔。

scan.connect.timeout-ms

flink-connector-starrocks连接StarRocks的时间上限。

超过该时间上限,将报错。

String

1000

单位为毫秒。

scan.params.keep-alive-min

查询任务的保活时间。

String

10

无。

scan.params.query-timeout-s

查询任务的超时时间。

如果超过该时间,仍未返回查询结果,则停止查询任务。

String

600

单位为秒。

scan.params.mem-limit-byte

BE节点中单个查询的内存上限。

String

1073741824(1 GB)

单位为字节。

scan.max-retries

查询失败时的最大重试次数。

超过该数量上限,则将报错。

String

1

无。

结果表独有

load-url

数据导入的URL。

String

指定FE(Front End)的IP和HTTP端口,格式为fe_ip:http_port;fe_ip:http_port

说明

填写多个IP和端口号时,请使用半角分号(;)进行分隔。

sink.semantic

数据写入语义。

String

at-least-once

取值如下:

  • at-least-once(默认值):至少一次。

  • exactly-once:恰好一次。

sink.buffer-flush.max-bytes

Buffer可容纳的最大数据量。

String

94371840(90 MB)

取值范围为64 MB~10 GB。

sink.buffer-flush.max-rows

Buffer可容纳的最大数据行数。

String

500000

取值范围为64,000~5000,000。

sink.buffer-flush.interval-ms

Buffer刷新时间间隔。

String

300000

取值范围为1000毫秒~3600000毫秒。

sink.max-retries

最大重试次数。

String

3

取值范围为0~10。

sink.connect.timeout-ms

连接到starrocks的超时时间。

String

1000

取值范围为100~60000。单位为毫秒。

sink.properties.*

结果表属性。

String

Stream Load的参数控制Stream Load导入行为。例如,参数 sink.properties.format表示Stream Load所导入的数据格式,如CSV。更多参数和解释,请参见Stream Load

类型映射

StarRocks字段类型

Flink字段类型

NULL

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

LARGEINT

STRING

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

DECIMALV2

DECIMAL

DECIMAL32

DECIMAL

DECIMAL64

DECIMAL

DECIMAL128

DECIMAL

CHAR

CHAR

VARCHAR

STRING

代码示例

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'scan-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
  PRIMARY KEY(`runoob_id`)
  NOT ENFORCED
) WITH (
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'connector' = 'starrocks',
  'load-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxx',
  'sink.buffer-flush.interval-ms' = '5000'
);

INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;