All Products
Search
Document Center

E-MapReduce:Flink Connector

Last Updated:Mar 26, 2026

The StarRocks Flink Connector streams data from Apache Flink into StarRocks tables using Stream Load. It buffers records in memory and flushes them in batches, delivering higher throughput than the built-in flink-connector-jdbc for large-scale data ingestion. It supports the DataStream API, Table API & SQL, and Python API.

Prerequisites

Before you begin, make sure you have:

Limitations

  • The machine running Flink must be able to reach the following ports on your StarRocks instance:

    • FE node: http_port (default 8030) and query_port (default 9030)

    • BE node: be_http_port (default 8040)

  • The StarRocks user account must have SELECT and INSERT permissions on the target table.

  • Connector version compatibility:

    Connector Flink StarRocks Java Scala
    1.2.9 1.15–1.18 2.1 and later 8 2.11, 2.12
    1.2.8 1.13–1.17 2.1 and later 8 2.11, 2.12
    1.2.7 1.11–1.15 2.1 and later 8 2.11, 2.12

Get the Flink Connector JAR

Choose one of the following methods to get the JAR, then upload it to your Flink cluster.

Method 1: Download from Maven Central

Download the JAR directly from the Maven Central Repository.

JAR naming format:

  • Flink 1.15 and later: flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar Example: flink-connector-starrocks-1.2.8_flink-1.17.jar

  • Flink before 1.15: flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar Example: flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar

Method 2: Add as a Maven dependency

Add the connector to your pom.xml:

  • Flink 1.15 and later:

    <dependency>
        <groupId>com.starrocks</groupId>
        <artifactId>flink-connector-starrocks</artifactId>
        <version>${connector_version}_flink-${flink_version}</version>
    </dependency>
  • Flink before 1.15:

    <dependency>
        <groupId>com.starrocks</groupId>
        <artifactId>flink-connector-starrocks</artifactId>
        <version>${connector_version}_flink-${flink_version}_${scala_version}</version>
    </dependency>

Method 3: Build from source

  1. Clone the Flink Connector source code.

  2. Build the JAR for your Flink version:

    sh build.sh <flink_version>

    For example, for Flink 1.17:

    sh build.sh 1.17
  3. Find the generated JAR in the target/ directory. The file name follows the format flink-connector-starrocks-1.2.7_flink-1.17-SNAPSHOT.jar.

    Unofficial releases include a SNAPSHOT suffix in the file name.

Upload the JAR to your Flink cluster

Upload the JAR to the flink-{flink_version}/lib directory on the Flink cluster.

For EMR clusters running EMR-5.19.0, place the JAR in /opt/apps/FLINK/flink-current/lib.

Start the Flink cluster

  1. Log on to the master node of the Flink cluster. See Log on to a cluster.

  2. Start the cluster:

    /opt/apps/FLINK/flink-current/bin/start-cluster.sh

Configuration

For the full parameter reference, see Continuously load data from Apache Flink® | StarRocks.

Parameters

Required parameters

Parameter Description
connector Fixed to starrocks.
jdbc-url The JDBC URL for connecting to StarRocks. Format: jdbc:mysql://<FE internal address>:9030. Example: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030. To get the FE internal address, see View instance list and details.
load-url The FE HTTP address for Stream Load. Format: <FE internal address>:8030. Example: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030.
database-name The StarRocks database name.
table-name The StarRocks table name.
username The StarRocks user account. The account needs SELECT and INSERT permissions on the target table. To grant permissions, see Manage users and data authorization.
password The password for the StarRocks user account.

Optional parameters

Parameter Default Valid values Description
sink.semantic at-least-once at-least-once, exactly-once The delivery semantic guarantee.
sink.version AUTO V1, V2, AUTO The Stream Load interface version. Supported from Flink Connector 1.2.4. V2 uses the Stream Load transaction interface (requires StarRocks 2.4 or later) and reduces memory usage with a more stable exactly-once implementation. AUTO selects V2 when supported.
sink.label-prefix The label prefix for Stream Load requests. For Flink Connector 1.2.8 and later with exactly-once semantics, set this to enable cleanup of transactions left open by Flink failures during checkpointing.
sink.buffer-flush.max-bytes 94371840 (90 MB) 64 MB–10 GB The maximum buffer size before a flush is triggered. Applies only to at-least-once.
sink.buffer-flush.max-rows 500000 64,000–5,000,000 The maximum number of buffered rows before a flush is triggered. Applies only to sink version V1 with at-least-once.
sink.buffer-flush.interval-ms 300000 1,000–3,600,000 The maximum interval between flushes, in milliseconds. Applies only to at-least-once.
sink.max-retries 3 0–10 The number of retries after a Stream Load failure. Applies only to sink version V1.
sink.connect.timeout-ms 30000 100–60,000 The HTTP connection timeout to the FE, in milliseconds. Before Flink Connector 1.2.9, the default was 1000.
sink.socket.timeout-ms -1 The timeout for the HTTP client to wait for data, in milliseconds. -1 means no timeout. Supported from Flink Connector 1.2.10.
sink.wait-for-continue.timeout-ms 10000 3,000–60,000 The timeout for waiting for the FE HTTP 100-continue response, in milliseconds. Supported from Flink Connector 1.2.7.
sink.ignore.update-before TRUE TRUE, FALSE Whether to ignore UPDATE_BEFORE records when writing to a primary key table. If set to FALSE, the record is treated as a DELETE. Supported from Flink Connector 1.2.8.
sink.parallelism The write parallelism for Flink SQL. If not set, Flink determines the parallelism.
sink.properties.* Stream Load parameters that control import behavior.
sink.properties.format csv csv, json The data format for Stream Load.
sink.properties.column_separator \t The column separator for CSV data.
sink.properties.row_delimiter \n The row delimiter for CSV data.
sink.properties.max_filter_ratio 0 0–1 The maximum proportion of rows that can be filtered due to data quality issues.
sink.properties.partial_update false TRUE, FALSE Whether to enable partial update.
sink.properties.partial_update_mode row row, column The partial update mode. row suits real-time updates with many columns and small batches. column suits batch updates with few columns and many rows, and can significantly improve performance in those scenarios.
sink.properties.strict_mode false true, false Whether to enable strict mode for Stream Load. Strict mode controls how rows with unqualified values are handled.
sink.properties.compression NONE lz4_frame The compression algorithm for Stream Load JSON data. Requires StarRocks 3.2.7 or later. Supported from Flink Connector 1.2.10.

Data type mapping

Flink type StarRocks type
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INTEGER INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL DECIMAL
BINARY INT
CHAR STRING
VARCHAR STRING
STRING STRING
DATE DATE
TIMESTAMP_WITHOUT_TIME_ZONE(N) DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) DATETIME
ARRAY\<T\> ARRAY\<T\>
MAP\<KT,VT\> JSON STRING
ROW\<arg T...\> JSON STRING

Usage notes

Flush policy

The connector buffers records in memory and flushes them to StarRocks via Stream Load. When a flush is triggered depends on the delivery semantic:

For `at-least-once`, a flush is triggered when any of the following conditions are met:

  • The buffered data size reaches sink.buffer-flush.max-bytes.

  • The number of buffered rows reaches sink.buffer-flush.max-rows (sink version V1 only).

  • The time since the last flush reaches sink.buffer-flush.interval-ms.

For `exactly-once`, data in memory is flushed only when a Flink checkpoint is triggered. In this case, the sink.buffer-flush.max-bytes and sink.buffer-flush.interval-ms parameters have no effect because data is not automatically flushed when the threshold is reached.

Exactly-once semantics

From Flink Connector 1.2.4, exactly-once is implemented using the Stream Load transaction interface introduced in StarRocks 2.4. This approach uses less memory and reduces checkpoint overhead compared to the earlier non-transactional implementation.

Set sink.label-prefix when using exactly-once with Flink Connector 1.2.8 or later. The connector uses this prefix to identify and clean up transactions left open when a Flink job fails during a checkpoint.

Examples

Write data using Flink SQL

  1. Create a database named test in StarRocks and create a primary key table score_board:

    CREATE DATABASE test;
    
    CREATE TABLE test.score_board(
        id int(11) NOT NULL COMMENT "",
        name varchar(65533) NULL DEFAULT "" COMMENT "",
        score int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(id)
    DISTRIBUTED BY HASH(id);
  2. Log on to the master node of the Flink cluster. See Log on to a cluster.

  3. Start the Flink SQL client:

    /opt/apps/FLINK/flink-current/bin/sql-client.sh
  4. Create a Flink table mapped to the StarRocks table and insert data:

    When the target is a StarRocks primary key table, you must explicitly define the primary key in the Flink table DDL (as shown above). For other table types such as Duplicate Key tables, the primary key definition is optional.
    CREATE TABLE `score_board` (
        `id` INT,
        `name` STRING,
        `score` INT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'score_board',
        'username' = 'admin',
        'password' = '<password>'
    );
    
    INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

Write data using the Flink DataStream API

Choose the approach based on your record format. All examples write to the same score_board table used in the Flink SQL example.

Write CSV string records

If your records are CSV-formatted strings, set sink.properties.format to csv and specify the column separator. See the full example on GitHub.

/**
 * Generate CSV-format records. Each record has three values separated by "\t".
 * These values map to the columns `id`, `name`, and `score` in the StarRocks table.
 */
String[] records = new String[]{
        "1\tstarrocks-csv\t100",
        "2\tflink-csv\t100"
};
DataStream<String> source = env.fromElements(records);

StarRocksSinkOptions options = StarRocksSinkOptions.builder()
        .withProperty("jdbc-url", jdbcUrl)
        .withProperty("load-url", loadUrl)
        .withProperty("database-name", "test")
        .withProperty("table-name", "score_board")
        .withProperty("username", "root")
        .withProperty("password", "")
        .withProperty("sink.properties.format", "csv")
        .withProperty("sink.properties.column_separator", "\t")
        .build();

SinkFunction<String> starRockSink = StarRocksSink.sink(options);
source.addSink(starRockSink);

Write JSON string records

If your records are JSON-formatted strings, set sink.properties.format to json and enable strip_outer_array to strip the outermost array. See the full example on GitHub.

/**
 * Generate JSON-format records.
 * Each record has three key-value pairs mapping to columns id, name, and score.
 */
String[] records = new String[]{
        "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}",
        "{\"id\":2, \"name\":\"flink-json\", \"score\":100}"
};
DataStream<String> source = env.fromElements(records);

StarRocksSinkOptions options = StarRocksSinkOptions.builder()
        .withProperty("jdbc-url", jdbcUrl)
        .withProperty("load-url", loadUrl)
        .withProperty("database-name", "test")
        .withProperty("table-name", "score_board")
        .withProperty("username", "root")
        .withProperty("password", "")
        .withProperty("sink.properties.format", "json")
        .withProperty("sink.properties.strip_outer_array", "true")
        .build();

SinkFunction<String> starRockSink = StarRocksSink.sink(options);
source.addSink(starRockSink);

Write custom Java objects

If your records are custom Java objects, define a schema that matches the StarRocks table and implement a StarRocksSinkRowBuilder to convert each object to an Object[]. See the full example on GitHub.

Define the POJO:

public static class RowData {
    public int id;
    public String name;
    public int score;

    public RowData() {}

    public RowData(int id, String name, int score) {
        this.id = id;
        this.name = name;
        this.score = score;
    }
}

Set up the sink:

RowData[] records = new RowData[]{
        new RowData(1, "starrocks-rowdata", 100),
        new RowData(2, "flink-rowdata", 100)
};
DataStream<RowData> source = env.fromElements(records);

StarRocksSinkOptions options = StarRocksSinkOptions.builder()
        .withProperty("jdbc-url", jdbcUrl)
        .withProperty("load-url", loadUrl)
        .withProperty("database-name", "test")
        .withProperty("table-name", "score_board")
        .withProperty("username", "root")
        .withProperty("password", "")
        .build();

// Define the schema to match the StarRocks table layout.
TableSchema schema = TableSchema.builder()
        .field("id", DataTypes.INT().notNull())  // notNull() required for primary key columns
        .field("name", DataTypes.STRING())
        .field("score", DataTypes.INT())
        .primaryKey("id")
        .build();

RowDataTransformer transformer = new RowDataTransformer();
SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer);
source.addSink(starRockSink);

Implement the transformer:

private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> {

    /**
     * Map each field of RowData to the corresponding position in the Object[].
     * The array layout must match the schema defined above.
     */
    @Override
    public void accept(Object[] internalRow, RowData rowData) {
        internalRow[0] = rowData.id;
        internalRow[1] = rowData.name;
        internalRow[2] = rowData.score;
        // For primary key tables, set the last element to indicate UPSERT or DELETE.
        internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
    }
}

Synchronize data using Flink CDC 3.0

The Flink CDC 3.0 framework lets you build streaming ELT pipelines from CDC sources such as MySQL and Kafka to StarRocks. Through this pipeline, you can:

  • Automatically create databases and tables in StarRocks

  • Synchronize full and incremental data

  • Propagate schema changes

From Flink Connector v1.2.9, the connector is integrated into Flink CDC 3.0 as the StarRocks Pipeline Connector. Use it with StarRocks v3.2.1 or later to take advantage of the fast_schema_evolution feature, which speeds up column additions and removals and reduces resource consumption.

Best practices

Import to primary key tables

Primary key tables support partial updates and conditional updates for fine-grained write control.

Set up the StarRocks table

CREATE DATABASE `test`;

CREATE TABLE `test`.`score_board`
(
    `id` int(11) NOT NULL COMMENT "",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);

INSERT INTO `test`.`score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

Start the Flink SQL client:

/opt/apps/FLINK/flink-current/bin/sql-client.sh

Partial update

Partial update lets you update specific columns without touching others.

  1. Create the Flink table with sink.properties.partial_update enabled:

    • sink.properties.partial_update: Enables partial update mode.

    • sink.properties.columns: For Flink Connector 1.2.7 and earlier, specify the columns to update and append __op at the end. The __op field indicates UPSERT or DELETE and is set automatically by the connector. Not required for Flink Connector 1.2.8 and later.

    CREATE TABLE `score_board` (
        `id` INT,
        `name` STRING,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'score_board',
        'username' = 'admin',
        'password' = '<password>',
        'sink.properties.partial_update' = 'true',
        -- Required for Flink Connector version 1.2.7 and earlier only
        'sink.properties.columns' = 'id,name,__op'
    );
  2. Insert the update data — only the name column changes; score is left untouched:

    INSERT INTO score_board VALUES (1, 'starrocks-update'), (2, 'flink-update');
  3. Verify the result in SQL Editor:

    SELECT * FROM `test`.`score_board`;

    The name column reflects the new values, while score remains unchanged.

    image

Conditional update

Conditional update writes a row only when the incoming value meets a specified condition relative to the existing row.

This example updates a row only when the incoming score value is greater than or equal to the current value in the table.

  1. Create the Flink table with sink.properties.merge_condition set to score:

    • sink.properties.merge_condition: The column used as the update condition. Here, score means a row is updated only when the incoming score is greater than or equal to the current score.

    • sink.version: Must be set to V1 for conditional update.

    CREATE TABLE `score_board` (
        `id` INT,
        `name` STRING,
        `score` INT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'score_board',
        'username' = 'admin',
        'password' = '<password>',
        'sink.properties.merge_condition' = 'score',
        'sink.version' = 'V1'
    );
  2. Insert two rows with the same primary keys as the existing data. The first row has a lower score (99 < 100), and the second has a higher score (101 > 100):

    INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101);
  3. Verify the result in SQL Editor:

    SELECT * FROM `test`.`score_board`;

    Only the second row is updated. The first row remains unchanged because its incoming score (99) is less than the existing value (100).

    image

Import to Bitmap columns

Bitmap columns accelerate exact deduplication counts, such as calculating unique visitors (UV). Because Flink does not have a native Bitmap type, use a BIGINT column in the Flink table and convert it with the to_bitmap() function.

  1. Create an Aggregate table with a Bitmap column in SQL Editor:

    CREATE TABLE `test`.`page_uv` (
      `page_id` INT NOT NULL COMMENT 'page ID',
      `visit_date` datetime NOT NULL COMMENT 'access time',
      `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);

    page_id and visit_date are the Aggregate Keys. visit_users uses BITMAP_UNION as the aggregate function.

  2. Create the corresponding Flink table, using sink.properties.columns to convert visit_user_id (BIGINT) to Bitmap via to_bitmap():

    CREATE TABLE `page_uv` (
        `page_id` INT,
        `visit_date` TIMESTAMP,
        `visit_user_id` BIGINT
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'page_uv',
        'username' = 'admin',
        'password' = '<password>',
        'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)'
    );
  3. Insert data in the Flink SQL client:

    INSERT INTO `page_uv` VALUES
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
       (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
       (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  4. Query the UV count per page in SQL Editor:

    SELECT page_id, COUNT(DISTINCT visit_users) FROM page_uv GROUP BY page_id;

    image

Import to HLL columns

HLL (HyperLogLog) columns support approximate deduplication counting at scale with lower storage overhead than Bitmap. The setup is similar: use a BIGINT column in Flink and convert it with hll_hash().

  1. Create an Aggregate table with an HLL column in SQL Editor:

    CREATE TABLE `test`.`hll_uv` (
      `page_id` INT NOT NULL COMMENT 'page ID',
      `visit_date` DATETIME NOT NULL COMMENT 'access time',
      `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Create the corresponding Flink table, using sink.properties.columns to convert visit_user_id (BIGINT) to HLL via hll_hash():

    CREATE TABLE `hll_uv` (
        `page_id` INT,
        `visit_date` TIMESTAMP,
        `visit_user_id` BIGINT
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'hll_uv',
        'username' = 'admin',
        'password' = '<password>',
        'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)'
    );
  3. Insert data in the Flink SQL client:

    INSERT INTO `hll_uv` VALUES
       (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
       (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
       (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  4. Query the UV count per page in SQL Editor:

    SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;

    image