The StarRocks Flink Connector streams data from Apache Flink into StarRocks tables using Stream Load. It buffers records in memory and flushes them in batches, delivering higher throughput than the built-in flink-connector-jdbc for large-scale data ingestion. It supports the DataStream API, Table API & SQL, and Python API.
Prerequisites
Before you begin, make sure you have:
-
A DataFlow cluster with Flink services running on EMR on ECS. See Create a cluster.
-
An EMR Serverless StarRocks instance. See Create an instance.
Limitations
-
The machine running Flink must be able to reach the following ports on your StarRocks instance:
-
FE node:
http_port(default8030) andquery_port(default9030) -
BE node:
be_http_port(default8040)
-
-
The StarRocks user account must have SELECT and INSERT permissions on the target table.
-
Connector version compatibility:
Connector Flink StarRocks Java Scala 1.2.9 1.15–1.18 2.1 and later 8 2.11, 2.12 1.2.8 1.13–1.17 2.1 and later 8 2.11, 2.12 1.2.7 1.11–1.15 2.1 and later 8 2.11, 2.12
Get the Flink Connector JAR
Choose one of the following methods to get the JAR, then upload it to your Flink cluster.
Method 1: Download from Maven Central
Download the JAR directly from the Maven Central Repository.
JAR naming format:
-
Flink 1.15 and later:
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jarExample:flink-connector-starrocks-1.2.8_flink-1.17.jar -
Flink before 1.15:
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jarExample:flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar
Method 2: Add as a Maven dependency
Add the connector to your pom.xml:
-
Flink 1.15 and later:
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>${connector_version}_flink-${flink_version}</version> </dependency> -
Flink before 1.15:
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>${connector_version}_flink-${flink_version}_${scala_version}</version> </dependency>
Method 3: Build from source
-
Clone the Flink Connector source code.
-
Build the JAR for your Flink version:
sh build.sh <flink_version>For example, for Flink 1.17:
sh build.sh 1.17 -
Find the generated JAR in the
target/directory. The file name follows the formatflink-connector-starrocks-1.2.7_flink-1.17-SNAPSHOT.jar.Unofficial releases include a
SNAPSHOTsuffix in the file name.
Upload the JAR to your Flink cluster
Upload the JAR to the flink-{flink_version}/lib directory on the Flink cluster.
For EMR clusters running EMR-5.19.0, place the JAR in /opt/apps/FLINK/flink-current/lib.
Start the Flink cluster
-
Log on to the master node of the Flink cluster. See Log on to a cluster.
-
Start the cluster:
/opt/apps/FLINK/flink-current/bin/start-cluster.sh
Configuration
For the full parameter reference, see Continuously load data from Apache Flink® | StarRocks.
Parameters
Required parameters
| Parameter | Description |
|---|---|
connector |
Fixed to starrocks. |
jdbc-url |
The JDBC URL for connecting to StarRocks. Format: jdbc:mysql://<FE internal address>:9030. Example: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030. To get the FE internal address, see View instance list and details. |
load-url |
The FE HTTP address for Stream Load. Format: <FE internal address>:8030. Example: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030. |
database-name |
The StarRocks database name. |
table-name |
The StarRocks table name. |
username |
The StarRocks user account. The account needs SELECT and INSERT permissions on the target table. To grant permissions, see Manage users and data authorization. |
password |
The password for the StarRocks user account. |
Optional parameters
| Parameter | Default | Valid values | Description |
|---|---|---|---|
sink.semantic |
at-least-once |
at-least-once, exactly-once |
The delivery semantic guarantee. |
sink.version |
AUTO |
V1, V2, AUTO |
The Stream Load interface version. Supported from Flink Connector 1.2.4. V2 uses the Stream Load transaction interface (requires StarRocks 2.4 or later) and reduces memory usage with a more stable exactly-once implementation. AUTO selects V2 when supported. |
sink.label-prefix |
— | — | The label prefix for Stream Load requests. For Flink Connector 1.2.8 and later with exactly-once semantics, set this to enable cleanup of transactions left open by Flink failures during checkpointing. |
sink.buffer-flush.max-bytes |
94371840 (90 MB) |
64 MB–10 GB | The maximum buffer size before a flush is triggered. Applies only to at-least-once. |
sink.buffer-flush.max-rows |
500000 |
64,000–5,000,000 | The maximum number of buffered rows before a flush is triggered. Applies only to sink version V1 with at-least-once. |
sink.buffer-flush.interval-ms |
300000 |
1,000–3,600,000 | The maximum interval between flushes, in milliseconds. Applies only to at-least-once. |
sink.max-retries |
3 |
0–10 | The number of retries after a Stream Load failure. Applies only to sink version V1. |
sink.connect.timeout-ms |
30000 |
100–60,000 | The HTTP connection timeout to the FE, in milliseconds. Before Flink Connector 1.2.9, the default was 1000. |
sink.socket.timeout-ms |
-1 |
— | The timeout for the HTTP client to wait for data, in milliseconds. -1 means no timeout. Supported from Flink Connector 1.2.10. |
sink.wait-for-continue.timeout-ms |
10000 |
3,000–60,000 | The timeout for waiting for the FE HTTP 100-continue response, in milliseconds. Supported from Flink Connector 1.2.7. |
sink.ignore.update-before |
TRUE |
TRUE, FALSE |
Whether to ignore UPDATE_BEFORE records when writing to a primary key table. If set to FALSE, the record is treated as a DELETE. Supported from Flink Connector 1.2.8. |
sink.parallelism |
— | — | The write parallelism for Flink SQL. If not set, Flink determines the parallelism. |
sink.properties.* |
— | — | Stream Load parameters that control import behavior. |
sink.properties.format |
csv |
csv, json |
The data format for Stream Load. |
sink.properties.column_separator |
\t |
— | The column separator for CSV data. |
sink.properties.row_delimiter |
\n |
— | The row delimiter for CSV data. |
sink.properties.max_filter_ratio |
0 |
0–1 | The maximum proportion of rows that can be filtered due to data quality issues. |
sink.properties.partial_update |
false |
TRUE, FALSE |
Whether to enable partial update. |
sink.properties.partial_update_mode |
row |
row, column |
The partial update mode. row suits real-time updates with many columns and small batches. column suits batch updates with few columns and many rows, and can significantly improve performance in those scenarios. |
sink.properties.strict_mode |
false |
true, false |
Whether to enable strict mode for Stream Load. Strict mode controls how rows with unqualified values are handled. |
sink.properties.compression |
NONE |
lz4_frame |
The compression algorithm for Stream Load JSON data. Requires StarRocks 3.2.7 or later. Supported from Flink Connector 1.2.10. |
Data type mapping
| Flink type | StarRocks type |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INTEGER | INTEGER |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL | DECIMAL |
| BINARY | INT |
| CHAR | STRING |
| VARCHAR | STRING |
| STRING | STRING |
| DATE | DATE |
| TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
| TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
| ARRAY\<T\> | ARRAY\<T\> |
| MAP\<KT,VT\> | JSON STRING |
| ROW\<arg T...\> | JSON STRING |
Usage notes
Flush policy
The connector buffers records in memory and flushes them to StarRocks via Stream Load. When a flush is triggered depends on the delivery semantic:
For `at-least-once`, a flush is triggered when any of the following conditions are met:
-
The buffered data size reaches
sink.buffer-flush.max-bytes. -
The number of buffered rows reaches
sink.buffer-flush.max-rows(sink versionV1only). -
The time since the last flush reaches
sink.buffer-flush.interval-ms.
For `exactly-once`, data in memory is flushed only when a Flink checkpoint is triggered. In this case, the sink.buffer-flush.max-bytes and sink.buffer-flush.interval-ms parameters have no effect because data is not automatically flushed when the threshold is reached.
Exactly-once semantics
From Flink Connector 1.2.4, exactly-once is implemented using the Stream Load transaction interface introduced in StarRocks 2.4. This approach uses less memory and reduces checkpoint overhead compared to the earlier non-transactional implementation.
Set sink.label-prefix when using exactly-once with Flink Connector 1.2.8 or later. The connector uses this prefix to identify and clean up transactions left open when a Flink job fails during a checkpoint.
Examples
Write data using Flink SQL
-
Create a database named
testin StarRocks and create a primary key tablescore_board:CREATE DATABASE test; CREATE TABLE test.score_board( id int(11) NOT NULL COMMENT "", name varchar(65533) NULL DEFAULT "" COMMENT "", score int(11) NOT NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP PRIMARY KEY(id) DISTRIBUTED BY HASH(id); -
Log on to the master node of the Flink cluster. See Log on to a cluster.
-
Start the Flink SQL client:
/opt/apps/FLINK/flink-current/bin/sql-client.sh -
Create a Flink table mapped to the StarRocks table and insert data:
When the target is a StarRocks primary key table, you must explicitly define the primary key in the Flink table DDL (as shown above). For other table types such as Duplicate Key tables, the primary key definition is optional.
CREATE TABLE `score_board` ( `id` INT, `name` STRING, `score` INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'admin', 'password' = '<password>' ); INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);
Write data using the Flink DataStream API
Choose the approach based on your record format. All examples write to the same score_board table used in the Flink SQL example.
Write CSV string records
If your records are CSV-formatted strings, set sink.properties.format to csv and specify the column separator. See the full example on GitHub.
/**
* Generate CSV-format records. Each record has three values separated by "\t".
* These values map to the columns `id`, `name`, and `score` in the StarRocks table.
*/
String[] records = new String[]{
"1\tstarrocks-csv\t100",
"2\tflink-csv\t100"
};
DataStream<String> source = env.fromElements(records);
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("sink.properties.format", "csv")
.withProperty("sink.properties.column_separator", "\t")
.build();
SinkFunction<String> starRockSink = StarRocksSink.sink(options);
source.addSink(starRockSink);
Write JSON string records
If your records are JSON-formatted strings, set sink.properties.format to json and enable strip_outer_array to strip the outermost array. See the full example on GitHub.
/**
* Generate JSON-format records.
* Each record has three key-value pairs mapping to columns id, name, and score.
*/
String[] records = new String[]{
"{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}",
"{\"id\":2, \"name\":\"flink-json\", \"score\":100}"
};
DataStream<String> source = env.fromElements(records);
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build();
SinkFunction<String> starRockSink = StarRocksSink.sink(options);
source.addSink(starRockSink);
Write custom Java objects
If your records are custom Java objects, define a schema that matches the StarRocks table and implement a StarRocksSinkRowBuilder to convert each object to an Object[]. See the full example on GitHub.
Define the POJO:
public static class RowData {
public int id;
public String name;
public int score;
public RowData() {}
public RowData(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
}
Set up the sink:
RowData[] records = new RowData[]{
new RowData(1, "starrocks-rowdata", 100),
new RowData(2, "flink-rowdata", 100)
};
DataStream<RowData> source = env.fromElements(records);
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.build();
// Define the schema to match the StarRocks table layout.
TableSchema schema = TableSchema.builder()
.field("id", DataTypes.INT().notNull()) // notNull() required for primary key columns
.field("name", DataTypes.STRING())
.field("score", DataTypes.INT())
.primaryKey("id")
.build();
RowDataTransformer transformer = new RowDataTransformer();
SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer);
source.addSink(starRockSink);
Implement the transformer:
private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> {
/**
* Map each field of RowData to the corresponding position in the Object[].
* The array layout must match the schema defined above.
*/
@Override
public void accept(Object[] internalRow, RowData rowData) {
internalRow[0] = rowData.id;
internalRow[1] = rowData.name;
internalRow[2] = rowData.score;
// For primary key tables, set the last element to indicate UPSERT or DELETE.
internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
}
}
Synchronize data using Flink CDC 3.0
The Flink CDC 3.0 framework lets you build streaming ELT pipelines from CDC sources such as MySQL and Kafka to StarRocks. Through this pipeline, you can:
-
Automatically create databases and tables in StarRocks
-
Synchronize full and incremental data
-
Propagate schema changes
From Flink Connector v1.2.9, the connector is integrated into Flink CDC 3.0 as the StarRocks Pipeline Connector. Use it with StarRocks v3.2.1 or later to take advantage of the fast_schema_evolution feature, which speeds up column additions and removals and reduces resource consumption.
Best practices
Import to primary key tables
Primary key tables support partial updates and conditional updates for fine-grained write control.
Set up the StarRocks table
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
INSERT INTO `test`.`score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);
Start the Flink SQL client:
/opt/apps/FLINK/flink-current/bin/sql-client.sh
Partial update
Partial update lets you update specific columns without touching others.
-
Create the Flink table with
sink.properties.partial_updateenabled:-
sink.properties.partial_update: Enables partial update mode. -
sink.properties.columns: For Flink Connector 1.2.7 and earlier, specify the columns to update and append__opat the end. The__opfield indicates UPSERT or DELETE and is set automatically by the connector. Not required for Flink Connector 1.2.8 and later.
CREATE TABLE `score_board` ( `id` INT, `name` STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'admin', 'password' = '<password>', 'sink.properties.partial_update' = 'true', -- Required for Flink Connector version 1.2.7 and earlier only 'sink.properties.columns' = 'id,name,__op' ); -
-
Insert the update data — only the
namecolumn changes;scoreis left untouched:INSERT INTO score_board VALUES (1, 'starrocks-update'), (2, 'flink-update'); -
Verify the result in SQL Editor:
SELECT * FROM `test`.`score_board`;The
namecolumn reflects the new values, whilescoreremains unchanged.
Conditional update
Conditional update writes a row only when the incoming value meets a specified condition relative to the existing row.
This example updates a row only when the incoming score value is greater than or equal to the current value in the table.
-
Create the Flink table with
sink.properties.merge_conditionset toscore:-
sink.properties.merge_condition: The column used as the update condition. Here,scoremeans a row is updated only when the incoming score is greater than or equal to the current score. -
sink.version: Must be set toV1for conditional update.
CREATE TABLE `score_board` ( `id` INT, `name` STRING, `score` INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'admin', 'password' = '<password>', 'sink.properties.merge_condition' = 'score', 'sink.version' = 'V1' ); -
-
Insert two rows with the same primary keys as the existing data. The first row has a lower
score(99 < 100), and the second has a higherscore(101 > 100):INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101); -
Verify the result in SQL Editor:
SELECT * FROM `test`.`score_board`;Only the second row is updated. The first row remains unchanged because its incoming
score(99) is less than the existing value (100).
Import to Bitmap columns
Bitmap columns accelerate exact deduplication counts, such as calculating unique visitors (UV). Because Flink does not have a native Bitmap type, use a BIGINT column in the Flink table and convert it with the to_bitmap() function.
-
Create an Aggregate table with a Bitmap column in SQL Editor:
CREATE TABLE `test`.`page_uv` ( `page_id` INT NOT NULL COMMENT 'page ID', `visit_date` datetime NOT NULL COMMENT 'access time', `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID' ) ENGINE=OLAP AGGREGATE KEY(`page_id`, `visit_date`) DISTRIBUTED BY HASH(`page_id`);page_idandvisit_dateare the Aggregate Keys.visit_usersusesBITMAP_UNIONas the aggregate function. -
Create the corresponding Flink table, using
sink.properties.columnsto convertvisit_user_id(BIGINT) to Bitmap viato_bitmap():CREATE TABLE `page_uv` ( `page_id` INT, `visit_date` TIMESTAMP, `visit_user_id` BIGINT ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'page_uv', 'username' = 'admin', 'password' = '<password>', 'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)' ); -
Insert data in the Flink SQL client:
INSERT INTO `page_uv` VALUES (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13), (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23), (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33), (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13), (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23); -
Query the UV count per page in SQL Editor:
SELECT page_id, COUNT(DISTINCT visit_users) FROM page_uv GROUP BY page_id;
Import to HLL columns
HLL (HyperLogLog) columns support approximate deduplication counting at scale with lower storage overhead than Bitmap. The setup is similar: use a BIGINT column in Flink and convert it with hll_hash().
-
Create an Aggregate table with an HLL column in SQL Editor:
CREATE TABLE `test`.`hll_uv` ( `page_id` INT NOT NULL COMMENT 'page ID', `visit_date` DATETIME NOT NULL COMMENT 'access time', `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID' ) ENGINE=OLAP AGGREGATE KEY(`page_id`, `visit_date`) DISTRIBUTED BY HASH(`page_id`); -
Create the corresponding Flink table, using
sink.properties.columnsto convertvisit_user_id(BIGINT) to HLL viahll_hash():CREATE TABLE `hll_uv` ( `page_id` INT, `visit_date` TIMESTAMP, `visit_user_id` BIGINT ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'hll_uv', 'username' = 'admin', 'password' = '<password>', 'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)' ); -
Insert data in the Flink SQL client:
INSERT INTO `hll_uv` VALUES (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78), (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2), (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674); -
Query the UV count per page in SQL Editor:
SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;