All Products
Search
Document Center

E-MapReduce:Flink Connector

Last Updated:Mar 26, 2026

The Flink StarRocks connector loads data into StarRocks in batches using Stream Load, offering significantly higher throughput than Apache Flink's built-in Java Database Connectivity (JDBC) connector (flink-connector-jdbc). Alibaba Cloud provides the Flink StarRocks connector to help you cache data and then import the data to StarRocks at a time in Stream Load mode. This topic explains how to set up the connector and write data to StarRocks, with a complete example for synchronizing MySQL data in real time.

Prerequisites

Before you begin, ensure that you have:

  • Apache Flink 1.11 or later (1.13 recommended)

  • A StarRocks cluster with accessible frontend HTTP ports

Add the connector to your project

Download the connector source code from the starrocks-connector-for-apache-flink repository on GitHub.

Add the following dependency to your project's pom.xml:

<dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for Flink 1.11 and 1.12 -->
    <version>x.x.x_flink-1.11</version>
    <!-- for Flink 1.13 -->
    <version>x.x.x_flink-1.13</version>
</dependency>
Replace x.x.x with the latest version. Check the version information page for the current release.

Write data using the Flink connector

The connector provides two approaches for writing data to StarRocks. Use Method 1 for raw JSON streams; use Method 2 for SQL-based table creation and structured data.

Method 1: DataStream API

// -------- sink with raw JSON string stream --------
fromElements(new String[]{
    "{\"score\": \"99\", \"name\": \"stephen\"}",
    "{\"score\": \"100\", \"name\": \"lebron\"}"
}).addSink(
    StarRocksSink.sink(
        // the sink options
        StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
            .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
            .withProperty("username", "xxx")
            .withProperty("password", "xxx")
            .withProperty("table-name", "xxx")
            .withProperty("database-name", "xxx")
            .withProperty("sink.properties.format", "json")
            .withProperty("sink.properties.strip_outer_array", "true")
            .build()
    )
);


// -------- sink with stream transformation --------
class RowData {
    public int score;
    public String name;
    public RowData(int score, String name) {
        ......
    }
}
fromElements(
    new RowData[]{
        new RowData(99, "stephen"),
        new RowData(100, "lebron")
    }
).addSink(
    StarRocksSink.sink(
        // the table structure
        TableSchema.builder()
            .field("score", DataTypes.INT())
            .field("name", DataTypes.VARCHAR(20))
            .build(),
        // the sink options
        StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
            .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
            .withProperty("username", "xxx")
            .withProperty("password", "xxx")
            .withProperty("table-name", "xxx")
            .withProperty("database-name", "xxx")
            .withProperty("sink.properties.column_separator", "\\x01")
            .withProperty("sink.properties.row_delimiter", "\\x02")
            .build(),
        // set the slots with streamRowData
        (slots, streamRowData) -> {
            slots[0] = streamRowData.score;
            slots[1] = streamRowData.name;
        }
    )
);

Method 2: SQL table creation

Before running this method, add com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory to src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.

// create a table with `structure` and `properties`
tEnv.executeSql(
    "CREATE TABLE USER_RESULT(" +
        "name VARCHAR," +
        "score BIGINT" +
    ") WITH ( " +
        "'connector' = 'starrocks'," +
        "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," +
        "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," +
        "'database-name' = 'xxx'," +
        "'table-name' = 'xxx'," +
        "'username' = 'xxx'," +
        "'password' = 'xxx'," +
        "'sink.buffer-flush.max-rows' = '1000000'," +
        "'sink.buffer-flush.max-bytes' = '300000000'," +
        "'sink.buffer-flush.interval-ms' = '5000'," +
        "'sink.properties.column_separator' = '\\x01'," +
        "'sink.properties.row_delimiter' = '\\x02'," +
        "'sink.max-retries' = '3'" +
        "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'`
    ")"
);

Sink parameters

The following table describes all parameters for the Flink sink connector.

Parameter Required Default value Type Description
connector Yes String The connector type. Set to starrocks.
jdbc-url Yes String The JDBC URL for connecting to StarRocks and running queries.
load-url Yes String The HTTP addresses of frontends, in the format fe_ip:http_port;fe_ip:http_port. Separate multiple frontends with semicolons (;).
database-name Yes String The StarRocks database name.
table-name Yes String The StarRocks table name.
username Yes String The username for connecting to the StarRocks database.
password Yes String The password for connecting to the StarRocks database.
sink.semantic No at-least-once String The delivery semantic. Valid values: at-least-once and exactly-once.
sink.buffer-flush.max-bytes No 94371840 (90 MB) String The maximum buffer size before a flush is triggered. Valid range: 64 MB to 10 GB.
sink.buffer-flush.max-rows No 500000 String The maximum number of rows in the buffer before a flush is triggered. Valid range: 64000 to 5000000.
sink.buffer-flush.interval-ms No 300000 String The buffer flush interval in milliseconds. Valid range: 1000 to 3600000.
sink.max-retries No 1 String The maximum number of retry attempts on failure. Valid range: 0 to 10.
sink.connect.timeout-ms No 1000 String The connection timeout for the frontend HTTP addresses specified in load-url, in milliseconds. Valid range: 100 to 60000.
sink.properties.* No String Stream Load properties passed directly to StarRocks.

Usage notes

Default format and delimiters

Data is imported in CSV format by default. To use custom delimiters, set:

  • sink.properties.row_delimiter — row delimiter (e.g., \\x02). Supported in StarRocks 1.15.0 and later.

  • sink.properties.column_separator — column delimiter (e.g., \\x01).

If you cannot find appropriate CSV delimiters for your data, switch to JSON format by setting sink.properties.format=json and sink.properties.strip_outer_array=true. Note that JSON import performance may deteriorate compared to CSV.

When creating tables and synchronizing data in a SQL client, escape the backslash (\):

'sink.properties.column_separator' = '\\x01'
'sink.properties.row_delimiter' = '\\x02'

Exactly-once semantics

Important

To use sink.semantic=exactly-once, the external system must support the two-phase commit protocol. StarRocks does not natively support this protocol, so exactly-once delivery depends on Flink checkpointing. The flow works as follows: 1. When Flink generates a checkpoint, a batch of data and their labels are cached as state. 2. The system blocks until the cached data is written to StarRocks. 3. Once the write completes, Flink starts the next checkpoint. If StarRocks becomes unavailable, the Flink sink stream operators may be blocked for an extended period due to connection failure, triggering an alert and forcing the Flink import job to terminate.

Connectivity troubleshooting

If data queries succeed but writes fail, verify that your machine can reach the HTTP ports of the StarRocks backends. When you submit an import job, frontends forward write operations to the internal IP addresses and HTTP ports of backends.

For example, if your machine has a public IP address and the cluster frontends and backends are accessible via public IP addresses, but backends use internal IP addresses for intra-cluster communication — specifying public frontend addresses in load-url causes frontends to forward writes to backend internal IPs. If your machine cannot reach those internal IPs, the write fails.

If an import job is unexpectedly stopped, increase the memory capacity of the job.

Synchronize MySQL data to StarRocks using CDC

Use the Flink change data capture (CDC) connector and StarRocks migration tools to synchronize a MySQL database to a StarRocks cluster in near real time. StarRocks migration tools automatically generate CREATE TABLE statements based on the MySQL schema and StarRocks cluster configuration.

Flink
The diagram and some information in this section are sourced from Realtime synchronization from MySQL in the open-source StarRocks documentation.

Prerequisites

Before you begin, ensure that you have:

  • Apache Flink 1.13 (recommended) or 1.11 / 1.12

  • Access to a MySQL database and a StarRocks cluster

Set up the environment

  1. Download Apache Flink. Apache Flink 1.13 is recommended. The earliest supported version is 1.11.

  2. Download the Flink CDC connector for MySQL. Select the package that matches your Flink version.

  3. Download the Flink StarRocks connector.

    Important

    The Flink StarRocks connector varies for Apache Flink 1.13, 1.11, and 1.12. Download the correct version.

  4. Copy both flink-sql-connector-mysql-cdc-xxx.jar and flink-connector-starrocks-xxx.jar to the flink-xxx/lib/ directory.

  5. Download the smt.tar.gz package, decompress it, and edit the configuration file with your connection details. The configuration file uses the following sections: Sample configuration:

    • db — MySQL database connection information

    • be_num — number of nodes in the StarRocks cluster

    • [table-rule.N] — matching rules using regular expressions for database names and table names. Configure one section per rule set.

    • flink.starrocks.* — StarRocks cluster configuration

    [db]
    host = 192.168.**.**
    port = 3306
    user = root
    password =
    
    [other]
    # number of backends in StarRocks
    be_num = 3
    # `decimal_v3` is supported since StarRocks-1.18.1
    use_decimal_v3 = false
    # file to save the converted DDL SQL
    output_dir = ./result
    
    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^console_19321.*$
    # pattern to match tables for setting properties
    table = ^.*$
    
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=15000

Create tables and start synchronization

  1. Run starrocks-migrate-tool to generate CREATE TABLE statements. The output is written to the /result directory. Run ls result to verify the generated files:

    flink-create.1.sql    smt.tar.gz              starrocks-create.all.sql
    flink-create.all.sql  starrocks-create.1.sql
  2. Create the database and tables in StarRocks:

    Mysql -h <IP address of a frontend> -P 9030 -u root -p < starrocks-create.1.sql
  3. Create Flink tables and start continuous data synchronization:

    Important

    If you are using Apache Flink earlier than 1.13, you cannot run the SQL script directly. Execute the SQL statements one by one and enable binary logging for the MySQL database.

    bin/sql-client.sh -f flink-create.1.sql
  4. Check the status of running Flink jobs:

    bin/flink list -running

    View job details and status on the Flink web UI or in the log files under $FLINK_HOME/log.

Configure multiple rule sets

To synchronize multiple databases or tables with different properties, add one [table-rule.N] section per rule set. Configure flink.starrocks.sink parameters independently for each rule set, such as different import frequencies.

Sample configuration with two rule sets:

[table-rule.1]
# pattern to match databases for setting properties
database = ^console_19321.*$
# pattern to match tables for setting properties
table = ^.*$

############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
flink.starrocks.load-url= 192.168.**.**:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=15000

[table-rule.2]
# pattern to match databases for setting properties
database = ^database2.*$
# pattern to match tables for setting properties
table = ^.*$

############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
flink.starrocks.load-url= 192.168.**.**:8030
flink.starrocks.username=root
flink.starrocks.password=
# If you cannot select appropriate CSV delimiters, switch to JSON format.
# Note that JSON import performance may deteriorate compared to CSV.
flink.starrocks.sink.properties.strip_outer_array=true
flink.starrocks.sink.properties.format=json

Consolidate sharded tables

After horizontal sharding, data from a large table may be split across multiple tables or databases. Configure a single rule set to synchronize multiple source tables into one StarRocks table.

For example, if edu_db_1 and edu_db_2 each contain course_1 and course_2 with the same schema, the following rule consolidates all four tables into a single StarRocks table:

[table-rule.3]
# pattern to match databases for setting properties
database = ^edu_db_[0-9]*$
# pattern to match tables for setting properties
table = ^course_[0-9]*$

############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
flink.starrocks.load-url= 192.168.**.**:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=5000

After running starrocks-migrate-tool, a many-to-one synchronization relationship is automatically created. The generated StarRocks table is named course__auto_shard by default. To rename it, edit the generated SQL files before running them.