All Products
Search
Document Center

Realtime Compute for Apache Flink:StarRocks connector

Last Updated:Mar 26, 2026

The StarRocks connector integrates Realtime Compute for Apache Flink with StarRocks databases. StarRocks is a new-generation Massively Parallel Processing (MPP) data warehouse designed for extremely fast query performance across all scenarios. The connector caches data and uses Stream Load to import data in batches to generate result tables, and reads data in batches to generate source tables. Use it to read from StarRocks source tables, enrich streams with StarRocks dimension tables, write to StarRocks sink tables via the SQL or DataStream API, and synchronize data and schema changes from upstream sources via the YAML API (Flink Change Data Capture (CDC)).

Supported capabilities

Item Description
Table type Source table, dimension table, sink table, and data ingestion sink
Running mode Streaming mode and batch mode
Data format CSV
Metric N/A
API type DataStream API, SQL API, and YAML API for data ingestion
Data update or deletion in a result table Supported

Prerequisites

Before you begin, ensure that you have:

  • A StarRocks cluster — either an E-MapReduce (EMR) StarRocks cluster or a self-managed StarRocks cluster hosted on Elastic Compute Service (ECS) instances

Limitations

  • The StarRocks connector supports only at-least-once and exactly-once semantics.

  • Only Ververica Runtime (VVR) 11.1 or later supports lookup joins with StarRocks dimension tables.

  • To prevent network restrictions, whitelist the following ports for your StarRocks cluster in your security group or firewall: 9030, 8030, 8040, 9060, 8060, 9020.

SQL statements

Use SQL statements to connect Flink source and sink tables to StarRocks. E-MapReduce (EMR) StarRocks also supports the CREATE TABLE AS (CTAS) and CREATE DATABASE AS (CDAS) statements: CTAS synchronizes the schema and data of a single table, while CDAS synchronizes an entire database or multiple tables within the same database. For more information, see Use the CREATE TABLE AS and CREATE DATABASE AS statements of Realtime Compute for Apache Flink to synchronize data from an ApsaraDB RDS for MySQL instance to a StarRocks cluster.

Syntax

CREATE TABLE USER_RESULT(
  name VARCHAR,
  score BIGINT
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
  'load-url' = 'fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
  'database-name' = 'xxx',
  'table-name' = 'xxx',
  'username' = 'xxx',
  'password' = 'xxx'
);

Connector options

General options

Option Description Type Required Default Remarks
connector The connector type. String Yes Set to starrocks.
jdbc-url The Java Database Connectivity (JDBC) URL for connecting to the database. String Yes Uses the frontend (FE) IP address and JDBC port. Format: jdbc:mysql://ip:port.
database-name The name of the StarRocks database. String Yes
table-name The name of the StarRocks table. String Yes
username The username for connecting to the StarRocks database. String Yes
password The password for connecting to the StarRocks database. String Yes
starrocks.create.table.properties The properties for automatic table creation. String No Specifies initial table properties such as engine and replica count. Example: 'starrocks.create.table.properties' = 'buckets 8', 'starrocks.create.table.properties' = 'replication_num=1'.

Source-specific options

Option Description Type Required Default Remarks
scan-url The URL for data scan. String No Uses the FE IP address and HTTP port. Format: fe_ip:http_port;fe_ip:http_port. Separate multiple entries with semicolons (;).
scan.connect.timeout-ms The connection timeout for the StarRocks connector. If the connection duration exceeds this value, an error is returned. String No 1000 Unit: milliseconds.
scan.params.keep-alive-min The keep-alive duration of the query task. String No 10
scan.params.query-timeout-s The query task timeout. If no result is returned within this period, the query stops. String No 600 Unit: seconds.
scan.params.mem-limit-byte The maximum memory for a single query on a backend (BE) node. String No 1073741824 (1 GB) Unit: bytes.
scan.max-retries The maximum number of retries when a query fails. If the retry count reaches this value, an error is returned. String No 1

Sink-specific options

Option Description Type Required Default Remarks
load-url The URL for data loading. String Yes Uses the FE IP address and HTTP port. Format: fe_ip:http_port;fe_ip:http_port. Separate multiple entries with semicolons (;).
sink.semantic The delivery semantics for writes. String No at-least-once Valid values: at-least-once, exactly-once. Controls flush behavior — see Flush policy.
sink.buffer-flush.max-bytes The maximum amount of data buffered before a flush. String No 94371840 (90 MB) Valid values: 64 MB to 10 GB. Effective only when sink.semantic is at-least-once.
sink.buffer-flush.max-rows The maximum number of rows buffered before a flush. String No 500000 Valid values: 64000 to 5000000. Effective only when sink.semantic is at-least-once.
sink.buffer-flush.interval-ms The interval between automatic flushes. String No 300000 Valid values: 1000 to 3600000. Unit: milliseconds. Effective only when sink.semantic is at-least-once.
sink.max-retries The maximum number of retries for failed writes. String No 3 Valid values: 0 to 10.
sink.connect.timeout-ms The connection timeout for the StarRocks database. String No 1000 Valid values: 100 to 60000. Unit: milliseconds.
sink.properties.* Additional Stream Load properties for the sink. String No For example, sink.properties.format specifies the data format (CSV). For supported options, see Stream Load.

Dimension table options

Option Description Type Required Default Remarks
lookup.cache.enabled Specifies whether to cache dimension table data. Boolean No true Valid values: true (cache data, reduce I/O), false (fetch directly from the data source). Set to false when the dimension table is frequently updated or contains a large amount of data. Supported in VVR 11.1 or later.

Flush policy

The connector buffers data in memory and flushes it to StarRocks via Stream Load. Flush behavior differs by delivery semantics:

  • at-least-once: A flush is triggered when any of the following conditions is met:

    • The buffered data size reaches sink.buffer-flush.max-bytes.

    • The buffered row count reaches sink.buffer-flush.max-rows.

    • The time since the last flush reaches sink.buffer-flush.interval-ms.

    • A Flink checkpoint is triggered.

  • exactly-once: Flush only happens when a Flink checkpoint is triggered. The sink.buffer-flush.* options have no effect.

Data type mappings

The following table lists the data type mappings between StarRocks and Realtime Compute for Apache Flink.

StarRocks type Flink type Remarks
NULL NULL
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
BIGINT UNSIGNED DECIMAL(20,0) Supported in VVR 8.0.10 or later.
LARGEINT DECIMAL(20,0)
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
DATETIME TIMESTAMP
DECIMAL DECIMAL
DECIMALV2 DECIMAL
DECIMAL32 DECIMAL
DECIMAL64 DECIMAL
DECIMAL128 DECIMAL
CHAR(m) CHAR(n) VVR 8.0.10: m = n × 3, n ≤ 85 (adapted for MySQL-StarRocks encoding differences). VVR 8.0.11 and later: m = n × 4, n ≤ 63. The maximum CHAR length in StarRocks is 255 bytes; columns that exceed this limit after automatic extension cannot be mapped to CHAR.
VARCHAR(m) CHAR(n) VVR 8.0.10: m = n × 4, n > 85. VVR 8.0.11 and later: m = n × 4, n > 63. CHAR columns whose length exceeds 255 bytes after extension are mapped to VARCHAR in StarRocks.
VARCHAR STRING
VARBINARY VARBINARY Supported in VVR 8.0.10 or later.

Sample code

The following example creates a source table and a sink table, then reads from the source and writes to the sink.

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
  `runoob_id`      BIGINT  NOT NULL,
  `runoob_title`   STRING  NOT NULL,
  `runoob_author`  STRING  NOT NULL,
  `submission_date` DATE   NULL
) WITH (
  'connector'     = 'starrocks',
  'jdbc-url'      = 'jdbc:mysql://ip:9030',
  'scan-url'      = 'ip:18030',
  'database-name' = 'db_name',
  'table-name'    = 'table_name',
  'username'      = 'xxxxx',
  'password'      = 'xxxxxxx'
);

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
  `runoob_id`      BIGINT  NOT NULL,
  `runoob_title`   STRING  NOT NULL,
  `runoob_author`  STRING  NOT NULL,
  `submission_date` DATE   NULL,
  PRIMARY KEY(`runoob_id`) NOT ENFORCED
) WITH (
  'connector'                    = 'starrocks',
  'jdbc-url'                     = 'jdbc:mysql://ip:9030',
  'load-url'                     = 'ip:18030',
  'database-name'                = 'db_name',
  'table-name'                   = 'table_name',
  'username'                     = 'xxxx',
  'password'                     = 'xxxxxxx',
  'sink.buffer-flush.interval-ms' = '5000'
);

INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;
StarRocks allows nullable primary key columns, but Flink requires primary keys to be non-nullable and unique for data consistency. A nullable primary key in StarRocks causes the error: Invalid primary key. Column 'xxx' is nullable. For more information, see Why do I get the error "Invalid primary key. Column 'xxx' is nullable." when writing to a table?.

Data ingestion

Use the StarRocks pipeline connector to write data records and table schema changes from upstream sources to StarRocks databases. Both open-source StarRocks and fully managed EMR Serverless StarRocks are supported.

The connector handles three operations automatically:

  • Automatic table creation: If an upstream database or table does not exist in the downstream StarRocks instance, the connector creates it automatically. Use table.create.properties.* to control creation options.

  • Schema change synchronization: The connector propagates CreateTableEvent, AddColumnEvent, and DropColumnEvent events to downstream databases. Starting from VVR 11.1, compatible column type changes are also supported. For more information, see ALTER TABLE in the StarRocks documentation.

  • Data synchronization: The connector writes data records from upstream sources to StarRocks continuously.fully-managed EMR Serverless StarRocks

Usage notes

  • The pipeline connector supports only at-least-once semantics. It uses primary key tables to ensure idempotent writes, so duplicate records caused by retries are automatically deduplicated in StarRocks.

  • Every source table must have a primary key. For tables without a primary key, specify one in the TRANSFORM statement block before writing to downstream databases:

    transform:
      - source-table: ...
        primary-keys: id, ...
  • The bucket key of an automatically created table must match the primary key, and the table cannot contain partition keys.

  • New columns are appended to the end of existing columns. Lenient mode is used by default for schema evolution: columns inserted at other positions are automatically moved to the end.

  • StarRocks earlier than 2.5.7: explicitly set table.create.num-buckets. StarRocks 2.5.7 and later: the bucket count is determined automatically. For more information, see Data distribution.

  • StarRocks 3.2 and later: set table.create.properties.fast_schema_evolution to true to speed up schema changes.

  • Stream data issues may arise when ingesting data to EMR Serverless StarRocks using Flink CDC. To resolve this, use one of the following options:

    • Use SQL jobs and set sink.version=V1 in the job draft.

    • Continue to use Flink CDC, but enable FE emr_internal_redirect.

    • Use an EMR Serverless StarRocks instance with built-in Private Zone instead of Server Load Balancer (SLB) for load balancing.

Syntax

source:
  ...

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass
  sink.buffer-flush.interval-ms: 5000  # Flush buffered data every 5 seconds.

Connector options

Required options

Option Description Type Required Default Remarks
type The connector name. String Yes Set to starrocks.
jdbc-url The JDBC URL for connecting to the database. String Yes Supports multiple URLs separated by commas (,). Example: jdbc:mysql://fe_host1:port1,fe_host2:port2.
load-url The HTTP URL for connecting to the FE node. String Yes Supports multiple URLs separated by semicolons (;). Example: fe_host1:http_port1;fe_host2:http_port2.
username The username for connecting to the StarRocks database. String Yes The user must have SELECT and INSERT permissions on the destination table. Grant permissions using the StarRocks GRANT command.
password The password for connecting to the StarRocks database. String Yes

Optional options

Option Description Type Required Default Remarks
name The display name of the sink. String No
sink.semantic The delivery semantics for writes. String No at-least-once Valid values: at-least-once, exactly-once.
sink.label-prefix The label prefix for Stream Load. String No
sink.connect.timeout-ms The timeout for HTTP connections. Integer No 30000 Valid values: 100–60000. Unit: milliseconds.
sink.wait-for-continue.timeout-ms The timeout for the client to wait for a 100 Continue response from the server. Integer No 30000 Valid values: 3000–600000. Unit: milliseconds.
sink.buffer-flush.max-bytes The maximum size of data cached in memory before a write. Long No 157286400 Valid values: 64 MB–10 GB. Unit: bytes. Cache is shared across all tables; when the limit is reached, the connector flushes multiple tables. Increasing this value improves throughput but may increase latency.
sink.buffer-flush.max-rows The maximum number of records cached in memory before a write. Long No 500000 Valid values: 64000–5000000.
sink.buffer-flush.interval-ms The flush interval per table. Long No 300000 Unit: milliseconds. For smaller datasets, reduce this value to flush buffered data promptly.
sink.max-retries The maximum number of retries. Long No 3 Valid values: 0–1000.
sink.scan-frequency.ms The interval between checks to determine whether a flush is needed. Long No 50 Unit: milliseconds.
sink.io.thread-count The number of threads for Stream Load data ingestion. Integer No 2
sink.at-least-once.use-transaction-stream-load Specifies whether to use the Stream Load transaction interface. Boolean No true Takes effect only when a supported database is used.
sink.properties.* Additional sink options. String No See supported options in Stream Load mode.
table.create.num-buckets The bucket count for automatically created tables. Integer No Required for StarRocks 2.5.6 and earlier. Optional for StarRocks 2.5.7 and later (auto-determined). See Data distribution.
table.create.properties.* Additional options for automatic table creation. String No Example: 'table.create.properties.fast_schema_evolution' = 'true'. See the StarRocks documentation.
table.schema-change.timeout The timeout for schema change operations. Duration No 30 min Value must be an integer. Unit: seconds. If a schema change exceeds this duration, the deployment fails.
unicode-char.max-bytes The number of bytes allocated per Unicode character. Integer No 3 In Flink CDC, VARCHAR length is measured in characters; in StarRocks, it is measured in bytes. UTF-8 typically uses up to 3 bytes per character, but some characters may require 4 or more bytes.

Data type mappings

StarRocks does not support all CDC YAML data types. Writing data of an unsupported type causes the job to fail. Use the built-in CAST function in the transform component to convert unsupported types, or use a projection statement to exclude them from the sink table. For more information, see Data ingestion with Flink CDC.
CDC type StarRocks type Remarks
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATE DATE
TIMESTAMP DATETIME
TIMESTAMP_LTZ DATETIME
DECIMAL(p, s) DECIMAL(p, s) StarRocks does not support DECIMAL as a primary key type. If an upstream primary key column is DECIMAL, its type in the synchronized StarRocks schema is automatically changed to VARCHAR.
CHAR(n) (n ≤ 85) CHAR(n × 3) CDC CHAR length is in characters; StarRocks CHAR length is in UTF-8 bytes. UTF-8 Chinese characters use up to 3 bytes. Use unicode-char.max-bytes to allocate more bytes per character.
CHAR(n) (n > 85) VARCHAR(n × 3) CHAR columns longer than 85 characters exceed StarRocks' 255-byte CHAR limit and are mapped to VARCHAR. Use unicode-char.max-bytes to allocate more bytes per character.
VARCHAR(n) VARCHAR(n × 3) CDC VARCHAR length is in characters; StarRocks VARCHAR length is in UTF-8 bytes.
BINARY(n) BINARY(n+2) Two padding bytes are added to prevent errors.
VARBINARY(n) VARBINARY(n+1) One padding byte is added to prevent errors.

Schema evolution

As a data ingestion sink, the StarRocks connector supports the following schema evolution events:

  • CREATE TABLE: If the table already exists, the connector skips creation. Make sure the existing table schema is compatible with the source schema.

  • ADD COLUMN: StarRocks requires primary key columns to appear first. Newly added columns must follow this restriction.

  • MODIFY COLUMN TYPE: For supported type conversions, see the StarRocks ALTER TABLE documentation.

  • DROP COLUMN

  • TRUNCATE TABLE

  • DROP TABLE