You can use the Flink StarRocks connector to cache data and then import the data to StarRocks at a time in Stream Load mode. This topic describes how to use the Flink StarRocks connector and provides examples.

Background information

The Flink Java Database Connectivity (JDBC) connector provided by Apache Flink cannot meet the import performance requirements of StarRocks. Therefore, Alibaba Cloud provides the Flink StarRocks connector to help you cache data and then import the data to StarRocks at a time in Stream Load mode.

Use the Flink connector

You can download the source code of the Flink StarRocks connector for testing from the starrocks-connector-for-apache-flink page on GitHub.

Add the following code to the pom.xml file of your project:
<dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for flink-1.11, flink-1.12 -->
    <version>x.x.x_flink-1.11</version>
    <!-- for flink-1.13 -->
    <version>x.x.x_flink-1.13</version>
</dependency>
Note You can view the latest version of the Flink StarRocks connector on the version information page and replace x.x.x in the preceding code with the latest version.
Sample code:
  • Method 1
    // -------- sink with raw json string stream --------
    fromElements(new String[]{
        "{\"score\": \"99\", \"name\": \"stephen\"}",
        "{\"score\": \"100\", \"name\": \"lebron\"}"
    }).addSink(
        StarRocksSink.sink(
            // the sink options
            StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
                .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
                .withProperty("username", "xxx")
                .withProperty("password", "xxx")
                .withProperty("table-name", "xxx")
                .withProperty("database-name", "xxx")
                .withProperty("sink.properties.format", "json")
                .withProperty("sink.properties.strip_outer_array", "true")
                .build()
        )
    );
    
    
    // -------- sink with stream transformation --------
    class RowData {
        public int score;
        public String name;
        public RowData(int score, String name) {
            ......
        }
    }
    fromElements(
        new RowData[]{
            new RowData(99, "stephen"),
            new RowData(100, "lebron")
        }
    ).addSink(
        StarRocksSink.sink(
            // the table structure
            TableSchema.builder()
                .field("score", DataTypes.INT())
                .field("name", DataTypes.VARCHAR(20))
                .build(),
            // the sink options
            StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
                .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
                .withProperty("username", "xxx")
                .withProperty("password", "xxx")
                .withProperty("table-name", "xxx")
                .withProperty("database-name", "xxx")
                .withProperty("sink.properties.column_separator", "\\x01")
                .withProperty("sink.properties.row_delimiter", "\\x02")
                .build(),
            // set the slots with streamRowData
            (slots, streamRowData) -> {
                slots[0] = streamRowData.score;
                slots[1] = streamRowData.name;
            }
        )
    );
  • Method 2
    // create a table with `structure` and `properties`
    // Needed: Add `com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` to: `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory`
    tEnv.executeSql(
        "CREATE TABLE USER_RESULT(" +
            "name VARCHAR," +
            "score BIGINT" +
        ") WITH ( " +
            "'connector' = 'starrocks'," +
            "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," +
            "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," +
            "'database-name' = 'xxx'," +
            "'table-name' = 'xxx'," +
            "'username' = 'xxx'," +
            "'password' = 'xxx'," +
            "'sink.buffer-flush.max-rows' = '1000000'," +
            "'sink.buffer-flush.max-bytes' = '300000000'," +
            "'sink.buffer-flush.interval-ms' = '5000'," +
            "'sink.properties.column_separator' = '\\x01'," +
            "'sink.properties.row_delimiter' = '\\x02'," +
            "'sink.max-retries' = '3'" +
            "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'`
        ")"
    );
The following table describes the parameters of a Flink sink.
ParameterRequiredDefault valueTypeDescription
connectorYesNo default valueStringThe type of the connector. Set the value to starrocks.
jdbc-urlYesNo default valueStringThe JDBC URL that is used to connect to StarRocks and perform queries in StarRocks.
load-urlYesNo default valueStringThe IP addresses and HTTP ports of frontends, in the format of fe_ip:http_port;fe_ip:http_port. Separate the IP addresses and HTTP ports of multiple frontends with semicolons (;).
database-nameYesNo default valueStringThe name of the StarRocks database.
table-nameYesNo default valueStringThe name of the StarRocks table.
usernameYesNo default valueStringThe username that is used to connect to the StarRocks database.
passwordYesNo default valueStringThe password that is used to connect to the StarRocks database.
sink.semanticNoat-least-onceStringThe semantics of the sink. Valid values: at-least-once and exactly-once.
sink.buffer-flush.max-bytesNo94371840 (90 MB)StringThe maximum amount of data that is allowed in the buffer. You can specify a data amount in the range of 64 MB to 10 GB.
sink.buffer-flush.max-rowsNo500000StringThe maximum number of data rows that are allowed in the buffer. Valid values: 64000 to 5000000.
sink.buffer-flush.interval-msNo300000StringThe refresh interval of the buffer. Valid values: 1000 to 3600000. Unit: milliseconds.
sink.max-retriesNo1StringThe maximum number of retries. Valid values: 0 to 10.
sink.connect.timeout-msNo1000StringThe timeout period for connecting to the IP addresses and HTTP ports of frontends specified by the load-url parameter. Valid values: 100 to 60000. Unit: milliseconds.
sink.properties.*NoNo default valueStringThe properties of the sink.
Important
  • To ensure the exactly-once semantics of Flink sinks, external systems must provide a mechanism that supports the two-phase commit protocol. StarRocks does not have this mechanism, and therefore must depend on the checkpointing feature of Flink. Each time Flink generates a checkpoint, a batch of data and their labels are cached as a state. After the checkpoint is generated, the system is blocked to wait until the cached data of the state is written to StarRocks. Then, Flink starts to generate the next checkpoint. This is how StarRocks ensures exactly-once semantics. In this case, if StarRocks breaks down, the stream operators of your Flink sink may be blocked for an extended period of time due to a connection failure. As a result, an alert is triggered and the Flink import job is forcibly terminated.
  • By default, the data is imported in the CSV format. You can set the sink.properties.row_delimiter parameter to \\x02 to customize a row delimiter and set the sink.properties.column_separator parameter to \\x01 to customize a column delimiter. The sink.properties.row_delimiter parameter is supported in StarRocks 1.15.0 and later.
  • If your import job is unexpectedly stopped, you can increase the memory capacity of the job.
  • If the code is run properly and data can be queried but data fails to be written, you must check whether your machine can access the HTTP ports of backends. In other words, you must check whether the IP addresses of backends can be reached by PING messages.

    For example, your machine has a public IP address and an internal IP address, the frontends or backends of a cluster can be accessed by using public IP addresses and HTTP ports, and internal IP addresses are used to access backends in the cluster. If you specify the public IP addresses and HTTP ports of frontends by using the load-url parameter when you submit an import job, the frontends forward the write operation to the internal IP addresses and HTTP ports of backends. If the internal IP addresses of backends cannot be reached by PING messages from your machine, the write operation fails.

Example: Synchronize data from a MySQL database by using the Flink StarRocks connector

Basic principles

You can synchronize data from a MySQL database to a StarRocks cluster within seconds by using the Flink change data capture (CDC) connector and StarRocks migration tools. Flink
Note The images and some information in this topic are from Real-time synchronization from MySQL of open source StarRocks.

StarRocks migration tools can automatically generate the CREATE TABLE statements for the source table and sink table based on the information about and table schemas of the MySQL database and StarRocks cluster.

Procedure

  1. Make preparations.
    • Download the installation package of Flink.

      We recommend that you use Apache Flink 1.13. The earliest version that is supported is Apache Flink 1.11.

    • Download the package of the Flink CDC connector.

      Download a package of the Flink CDC connector for MySQL based on the version of Apache Flink that you use.

    • Download the package of the Flink StarRocks connector.
      Important The Flink StarRocks connector varies for Apache Flink 1.13, 1.11, and 1.12.
  2. Copy and paste the downloaded flink-sql-connector-mysql-cdc-xxx.jar and flink-connector-starrocks-xxx.jar packages to the flink-xxx/lib/ directory.
  3. Download the smt.tar.gz package, decompress the package, and then modify the configuration file.
    Modify the following parameters in the configuration file:
    • db: the connection information of the MySQL database.
    • be_num: the number of nodes in the StarRocks cluster.
    • [table-rule.1]: the matching rules. You can use regular expressions to match the database names and table names that are used to create the CREATE TABLE statements. You can also configure multiple sets of rules.
    • flink.starrocks.*: the configuration information of the StarRocks cluster.
    Sample code:
    [db]
    host = 192.168.**.**
    port = 3306
    user = root
    password =
    
    [other]
    # number of backends in StarRocks
    be_num = 3
    # `decimal_v3` is supported since StarRocks-1.18.1
    use_decimal_v3 = false
    # file to save the converted DDL SQL
    output_dir = ./result
    
    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^console_19321.*$
    # pattern to match tables for setting properties
    table = ^.*$
    
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=15000
  4. Run the starrocks-migrate-tool command to generate all the CREATE TABLE statements in the /result directory.
    You can run the ls result command to display the files in the /result directory.
    flink-create.1.sql    smt.tar.gz              starrocks-create.all.sql
    flink-create.all.sql  starrocks-create.1.sql
  5. Run the following command to create a database and a table in the StarRocks cluster:
    Mysql -h <IP address of a frontend> -P 9030 -u root -p < starrocks-create.1.sql
  6. Run the following command to generate tables in Flink and continuously synchronize data:
    bin/sql-client.sh -f flink-create.1.sql
    Important If you use a version of Apache Flink earlier than 1.13, you may not be able to directly run the SQL script. You must execute the SQL statements in the script one by one and enable binary logging for the MySQL database.
  7. Run the following command to query the status of Flink jobs:
    bin/flink list -running

    You can view the details and status of Flink jobs on the web UI of Flink or in the log files in the $FLINK_HOME/log directory.

When you run the sample code provided in this topic, take note of the following items:
  • If you need to configure multiple sets of rules, you must configure the rules used to match the database names and table names, and configure the Flink StarRocks connector for each set of rules.
    Sample code:
    [table-rule.1]
    # pattern to match databases for setting properties
    database = ^console_19321.*$
    # pattern to match tables for setting properties
    table = ^.*$
    
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=15000
    
    [table-rule.2]
    # pattern to match databases for setting properties
    database = ^database2.*$
    # pattern to match tables for setting properties
    table = ^.*$
    
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    # If you cannot select appropriate delimiters to import data in the CSV format, you can import data in the JSON format. However, the import performance may deteriorate. To import data in the JSON format, replace the flink.starrocks.sink.properties.column_separator and flink.starrocks.sink.properties.row_delimiter parameters with the following parameters: 
    flink.starrocks.sink.properties.strip_outer_array=true
    flink.starrocks.sink.properties.format=json
    Note You can use the flink.starrocks.sink parameters to configure properties for each set of rules, such as the import frequency.
  • After sharding is performed, data in a large table may be split into multiple tables or even distributed to multiple databases. In this case, you can configure a set of rules to synchronize data from multiple tables to one table.
    For example, both the edu_db_1 and edu_db_2 databases have two tables, course_1 and course_2. All the tables have the same schema. You can run the following code to synchronize data from these tables to a table in StarRocks for further analysis:
    [table-rule.3]
    # pattern to match databases for setting properties
    database = ^edu_db_[0-9]*$
    # pattern to match tables for setting properties
    table = ^course_[0-9]*$
    
    ############################################
    ### flink sink configurations
    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
    ############################################
    flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030
    flink.starrocks.load-url= 192.168.**.**:8030
    flink.starrocks.username=root
    flink.starrocks.password=
    flink.starrocks.sink.properties.column_separator=\x01
    flink.starrocks.sink.properties.row_delimiter=\x02
    flink.starrocks.sink.buffer-flush.interval-ms=5000

    After you run the code, a many-to-one synchronization relationship is automatically generated. By default, the name of the table in StarRocks is course__auto_shard. You can modify the table name in the generated SQL files.

  • If you need to create tables and synchronize data on the CLI of an SQL client, you must escape the backslash (\).
    Sample code:
    'sink.properties.column_separator' = '\\x01'
    'sink.properties.row_delimiter' = '\\x02'