The StarRocks connector integrates Realtime Compute for Apache Flink with StarRocks databases. StarRocks is a new-generation Massively Parallel Processing (MPP) data warehouse designed for extremely fast query performance across all scenarios. The connector caches data and uses Stream Load to import data in batches to generate result tables, and reads data in batches to generate source tables. Use it to read from StarRocks source tables, enrich streams with StarRocks dimension tables, write to StarRocks sink tables via the SQL or DataStream API, and synchronize data and schema changes from upstream sources via the YAML API (Flink Change Data Capture (CDC)).
Supported capabilities
| Item | Description |
|---|---|
| Table type | Source table, dimension table, sink table, and data ingestion sink |
| Running mode | Streaming mode and batch mode |
| Data format | CSV |
| Metric | N/A |
| API type | DataStream API, SQL API, and YAML API for data ingestion |
| Data update or deletion in a result table | Supported |
Prerequisites
Before you begin, ensure that you have:
-
A StarRocks cluster — either an E-MapReduce (EMR) StarRocks cluster or a self-managed StarRocks cluster hosted on Elastic Compute Service (ECS) instances
Limitations
-
The StarRocks connector supports only at-least-once and exactly-once semantics.
-
Only Ververica Runtime (VVR) 11.1 or later supports lookup joins with StarRocks dimension tables.
-
To prevent network restrictions, whitelist the following ports for your StarRocks cluster in your security group or firewall:
9030,8030,8040,9060,8060,9020.
SQL statements
Use SQL statements to connect Flink source and sink tables to StarRocks. E-MapReduce (EMR) StarRocks also supports the CREATE TABLE AS (CTAS) and CREATE DATABASE AS (CDAS) statements: CTAS synchronizes the schema and data of a single table, while CDAS synchronizes an entire database or multiple tables within the same database. For more information, see Use the CREATE TABLE AS and CREATE DATABASE AS statements of Realtime Compute for Apache Flink to synchronize data from an ApsaraDB RDS for MySQL instance to a StarRocks cluster.
Syntax
CREATE TABLE USER_RESULT(
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
'load-url' = 'fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);
Connector options
General options
| Option | Description | Type | Required | Default | Remarks |
|---|---|---|---|---|---|
connector |
The connector type. | String | Yes | — | Set to starrocks. |
jdbc-url |
The Java Database Connectivity (JDBC) URL for connecting to the database. | String | Yes | — | Uses the frontend (FE) IP address and JDBC port. Format: jdbc:mysql://ip:port. |
database-name |
The name of the StarRocks database. | String | Yes | — | — |
table-name |
The name of the StarRocks table. | String | Yes | — | — |
username |
The username for connecting to the StarRocks database. | String | Yes | — | — |
password |
The password for connecting to the StarRocks database. | String | Yes | — | — |
starrocks.create.table.properties |
The properties for automatic table creation. | String | No | — | Specifies initial table properties such as engine and replica count. Example: 'starrocks.create.table.properties' = 'buckets 8', 'starrocks.create.table.properties' = 'replication_num=1'. |
Source-specific options
| Option | Description | Type | Required | Default | Remarks |
|---|---|---|---|---|---|
scan-url |
The URL for data scan. | String | No | — | Uses the FE IP address and HTTP port. Format: fe_ip:http_port;fe_ip:http_port. Separate multiple entries with semicolons (;). |
scan.connect.timeout-ms |
The connection timeout for the StarRocks connector. If the connection duration exceeds this value, an error is returned. | String | No | 1000 | Unit: milliseconds. |
scan.params.keep-alive-min |
The keep-alive duration of the query task. | String | No | 10 | — |
scan.params.query-timeout-s |
The query task timeout. If no result is returned within this period, the query stops. | String | No | 600 | Unit: seconds. |
scan.params.mem-limit-byte |
The maximum memory for a single query on a backend (BE) node. | String | No | 1073741824 (1 GB) | Unit: bytes. |
scan.max-retries |
The maximum number of retries when a query fails. If the retry count reaches this value, an error is returned. | String | No | 1 | — |
Sink-specific options
| Option | Description | Type | Required | Default | Remarks |
|---|---|---|---|---|---|
load-url |
The URL for data loading. | String | Yes | — | Uses the FE IP address and HTTP port. Format: fe_ip:http_port;fe_ip:http_port. Separate multiple entries with semicolons (;). |
sink.semantic |
The delivery semantics for writes. | String | No | at-least-once |
Valid values: at-least-once, exactly-once. Controls flush behavior — see Flush policy. |
sink.buffer-flush.max-bytes |
The maximum amount of data buffered before a flush. | String | No | 94371840 (90 MB) | Valid values: 64 MB to 10 GB. Effective only when sink.semantic is at-least-once. |
sink.buffer-flush.max-rows |
The maximum number of rows buffered before a flush. | String | No | 500000 | Valid values: 64000 to 5000000. Effective only when sink.semantic is at-least-once. |
sink.buffer-flush.interval-ms |
The interval between automatic flushes. | String | No | 300000 | Valid values: 1000 to 3600000. Unit: milliseconds. Effective only when sink.semantic is at-least-once. |
sink.max-retries |
The maximum number of retries for failed writes. | String | No | 3 | Valid values: 0 to 10. |
sink.connect.timeout-ms |
The connection timeout for the StarRocks database. | String | No | 1000 | Valid values: 100 to 60000. Unit: milliseconds. |
sink.properties.* |
Additional Stream Load properties for the sink. | String | No | — | For example, sink.properties.format specifies the data format (CSV). For supported options, see Stream Load. |
Dimension table options
| Option | Description | Type | Required | Default | Remarks |
|---|---|---|---|---|---|
lookup.cache.enabled |
Specifies whether to cache dimension table data. | Boolean | No | true |
Valid values: true (cache data, reduce I/O), false (fetch directly from the data source). Set to false when the dimension table is frequently updated or contains a large amount of data. Supported in VVR 11.1 or later. |
Flush policy
The connector buffers data in memory and flushes it to StarRocks via Stream Load. Flush behavior differs by delivery semantics:
-
at-least-once: A flush is triggered when any of the following conditions is met:
-
The buffered data size reaches
sink.buffer-flush.max-bytes. -
The buffered row count reaches
sink.buffer-flush.max-rows. -
The time since the last flush reaches
sink.buffer-flush.interval-ms. -
A Flink checkpoint is triggered.
-
-
exactly-once: Flush only happens when a Flink checkpoint is triggered. The
sink.buffer-flush.*options have no effect.
Data type mappings
The following table lists the data type mappings between StarRocks and Realtime Compute for Apache Flink.
| StarRocks type | Flink type | Remarks |
|---|---|---|
| NULL | NULL | — |
| BOOLEAN | BOOLEAN | — |
| TINYINT | TINYINT | — |
| SMALLINT | SMALLINT | — |
| INT | INT | — |
| BIGINT | BIGINT | — |
| BIGINT UNSIGNED | DECIMAL(20,0) | Supported in VVR 8.0.10 or later. |
| LARGEINT | DECIMAL(20,0) | — |
| FLOAT | FLOAT | — |
| DOUBLE | DOUBLE | — |
| DATE | DATE | — |
| DATETIME | TIMESTAMP | — |
| DECIMAL | DECIMAL | — |
| DECIMALV2 | DECIMAL | — |
| DECIMAL32 | DECIMAL | — |
| DECIMAL64 | DECIMAL | — |
| DECIMAL128 | DECIMAL | — |
| CHAR(m) | CHAR(n) | VVR 8.0.10: m = n × 3, n ≤ 85 (adapted for MySQL-StarRocks encoding differences). VVR 8.0.11 and later: m = n × 4, n ≤ 63. The maximum CHAR length in StarRocks is 255 bytes; columns that exceed this limit after automatic extension cannot be mapped to CHAR. |
| VARCHAR(m) | CHAR(n) | VVR 8.0.10: m = n × 4, n > 85. VVR 8.0.11 and later: m = n × 4, n > 63. CHAR columns whose length exceeds 255 bytes after extension are mapped to VARCHAR in StarRocks. |
| VARCHAR | STRING | — |
| VARBINARY | VARBINARY | Supported in VVR 8.0.10 or later. |
Sample code
The following example creates a source table and a sink table, then reads from the source and writes to the sink.
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'username' = 'xxxxx',
'password' = 'xxxxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL,
PRIMARY KEY(`runoob_id`) NOT ENFORCED
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'username' = 'xxxx',
'password' = 'xxxxxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;
StarRocks allows nullable primary key columns, but Flink requires primary keys to be non-nullable and unique for data consistency. A nullable primary key in StarRocks causes the error: Invalid primary key. Column 'xxx' is nullable. For more information, see Why do I get the error "Invalid primary key. Column 'xxx' is nullable." when writing to a table?.
Data ingestion
Use the StarRocks pipeline connector to write data records and table schema changes from upstream sources to StarRocks databases. Both open-source StarRocks and fully managed EMR Serverless StarRocks are supported.
The connector handles three operations automatically:
-
Automatic table creation: If an upstream database or table does not exist in the downstream StarRocks instance, the connector creates it automatically. Use
table.create.properties.*to control creation options. -
Schema change synchronization: The connector propagates CreateTableEvent, AddColumnEvent, and DropColumnEvent events to downstream databases. Starting from VVR 11.1, compatible column type changes are also supported. For more information, see ALTER TABLE in the StarRocks documentation.
-
Data synchronization: The connector writes data records from upstream sources to StarRocks continuously.fully-managed EMR Serverless StarRocks
Usage notes
-
The pipeline connector supports only at-least-once semantics. It uses primary key tables to ensure idempotent writes, so duplicate records caused by retries are automatically deduplicated in StarRocks.
-
Every source table must have a primary key. For tables without a primary key, specify one in the
TRANSFORMstatement block before writing to downstream databases:transform: - source-table: ... primary-keys: id, ... -
The bucket key of an automatically created table must match the primary key, and the table cannot contain partition keys.
-
New columns are appended to the end of existing columns. Lenient mode is used by default for schema evolution: columns inserted at other positions are automatically moved to the end.
-
StarRocks earlier than 2.5.7: explicitly set
table.create.num-buckets. StarRocks 2.5.7 and later: the bucket count is determined automatically. For more information, see Data distribution. -
StarRocks 3.2 and later: set
table.create.properties.fast_schema_evolutiontotrueto speed up schema changes. -
Stream data issues may arise when ingesting data to EMR Serverless StarRocks using Flink CDC. To resolve this, use one of the following options:
-
Use SQL jobs and set
sink.version=V1in the job draft. -
Continue to use Flink CDC, but enable
FE emr_internal_redirect. -
Use an EMR Serverless StarRocks instance with built-in Private Zone instead of Server Load Balancer (SLB) for load balancing.
-
Syntax
source:
...
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
sink.buffer-flush.interval-ms: 5000 # Flush buffered data every 5 seconds.
Connector options
Required options
| Option | Description | Type | Required | Default | Remarks |
|---|---|---|---|---|---|
type |
The connector name. | String | Yes | — | Set to starrocks. |
jdbc-url |
The JDBC URL for connecting to the database. | String | Yes | — | Supports multiple URLs separated by commas (,). Example: jdbc:mysql://fe_host1:port1,fe_host2:port2. |
load-url |
The HTTP URL for connecting to the FE node. | String | Yes | — | Supports multiple URLs separated by semicolons (;). Example: fe_host1:http_port1;fe_host2:http_port2. |
username |
The username for connecting to the StarRocks database. | String | Yes | — | The user must have SELECT and INSERT permissions on the destination table. Grant permissions using the StarRocks GRANT command. |
password |
The password for connecting to the StarRocks database. | String | Yes | — | — |
Optional options
| Option | Description | Type | Required | Default | Remarks |
|---|---|---|---|---|---|
name |
The display name of the sink. | String | No | — | — |
sink.semantic |
The delivery semantics for writes. | String | No | at-least-once |
Valid values: at-least-once, exactly-once. |
sink.label-prefix |
The label prefix for Stream Load. | String | No | — | — |
sink.connect.timeout-ms |
The timeout for HTTP connections. | Integer | No | 30000 | Valid values: 100–60000. Unit: milliseconds. |
sink.wait-for-continue.timeout-ms |
The timeout for the client to wait for a 100 Continue response from the server. | Integer | No | 30000 | Valid values: 3000–600000. Unit: milliseconds. |
sink.buffer-flush.max-bytes |
The maximum size of data cached in memory before a write. | Long | No | 157286400 | Valid values: 64 MB–10 GB. Unit: bytes. Cache is shared across all tables; when the limit is reached, the connector flushes multiple tables. Increasing this value improves throughput but may increase latency. |
sink.buffer-flush.max-rows |
The maximum number of records cached in memory before a write. | Long | No | 500000 | Valid values: 64000–5000000. |
sink.buffer-flush.interval-ms |
The flush interval per table. | Long | No | 300000 | Unit: milliseconds. For smaller datasets, reduce this value to flush buffered data promptly. |
sink.max-retries |
The maximum number of retries. | Long | No | 3 | Valid values: 0–1000. |
sink.scan-frequency.ms |
The interval between checks to determine whether a flush is needed. | Long | No | 50 | Unit: milliseconds. |
sink.io.thread-count |
The number of threads for Stream Load data ingestion. | Integer | No | 2 | — |
sink.at-least-once.use-transaction-stream-load |
Specifies whether to use the Stream Load transaction interface. | Boolean | No | true |
Takes effect only when a supported database is used. |
sink.properties.* |
Additional sink options. | String | No | — | See supported options in Stream Load mode. |
table.create.num-buckets |
The bucket count for automatically created tables. | Integer | No | — | Required for StarRocks 2.5.6 and earlier. Optional for StarRocks 2.5.7 and later (auto-determined). See Data distribution. |
table.create.properties.* |
Additional options for automatic table creation. | String | No | — | Example: 'table.create.properties.fast_schema_evolution' = 'true'. See the StarRocks documentation. |
table.schema-change.timeout |
The timeout for schema change operations. | Duration | No | 30 min | Value must be an integer. Unit: seconds. If a schema change exceeds this duration, the deployment fails. |
unicode-char.max-bytes |
The number of bytes allocated per Unicode character. | Integer | No | 3 | In Flink CDC, VARCHAR length is measured in characters; in StarRocks, it is measured in bytes. UTF-8 typically uses up to 3 bytes per character, but some characters may require 4 or more bytes. |
Data type mappings
StarRocks does not support all CDC YAML data types. Writing data of an unsupported type causes the job to fail. Use the built-in CAST function in the transform component to convert unsupported types, or use a projection statement to exclude them from the sink table. For more information, see Data ingestion with Flink CDC.
| CDC type | StarRocks type | Remarks |
|---|---|---|
| TINYINT | TINYINT | — |
| SMALLINT | SMALLINT | — |
| INT | INT | — |
| BIGINT | BIGINT | — |
| FLOAT | FLOAT | — |
| DOUBLE | DOUBLE | — |
| BOOLEAN | BOOLEAN | — |
| DATE | DATE | — |
| TIMESTAMP | DATETIME | — |
| TIMESTAMP_LTZ | DATETIME | — |
| DECIMAL(p, s) | DECIMAL(p, s) | StarRocks does not support DECIMAL as a primary key type. If an upstream primary key column is DECIMAL, its type in the synchronized StarRocks schema is automatically changed to VARCHAR. |
| CHAR(n) (n ≤ 85) | CHAR(n × 3) | CDC CHAR length is in characters; StarRocks CHAR length is in UTF-8 bytes. UTF-8 Chinese characters use up to 3 bytes. Use unicode-char.max-bytes to allocate more bytes per character. |
| CHAR(n) (n > 85) | VARCHAR(n × 3) | CHAR columns longer than 85 characters exceed StarRocks' 255-byte CHAR limit and are mapped to VARCHAR. Use unicode-char.max-bytes to allocate more bytes per character. |
| VARCHAR(n) | VARCHAR(n × 3) | CDC VARCHAR length is in characters; StarRocks VARCHAR length is in UTF-8 bytes. |
| BINARY(n) | BINARY(n+2) | Two padding bytes are added to prevent errors. |
| VARBINARY(n) | VARBINARY(n+1) | One padding byte is added to prevent errors. |
Schema evolution
As a data ingestion sink, the StarRocks connector supports the following schema evolution events:
-
CREATE TABLE: If the table already exists, the connector skips creation. Make sure the existing table schema is compatible with the source schema.
-
ADD COLUMN: StarRocks requires primary key columns to appear first. Newly added columns must follow this restriction.
-
MODIFY COLUMN TYPE: For supported type conversions, see the StarRocks ALTER TABLE documentation.
-
DROP COLUMN
-
TRUNCATE TABLE
-
DROP TABLE