All Products
Search
Document Center

E-MapReduce:Flink Connector

Last Updated:Dec 01, 2025

StarRocks provides an Apache Flink connector (Flink Connector), which allows you to import data into StarRocks tables through Flink. Compared to Flink's built-in flink-connector-jdbc, StarRocks' Flink Connector offers superior performance and stability, making it particularly suitable for large-scale data import scenarios.

Background

StarRocks Flink Connector significantly improves data import efficiency by caching small batches of data in memory and utilizing StarRocks' Stream Load feature for batch imports. It supports DataStream API, Table API & SQL, and Python API.

Prerequisites

  • You have created a cluster that includes Flink services.

    This topic uses a DataFlow cluster with Flink services (hereinafter referred to as a Flink cluster) created in EMR on ECS as an example. For more information, see Create a cluster.

  • You have created an EMR Serverless StarRocks instance. For more information, see Create an instance.

Limitations

  • Ensure that the machine where Flink is running can access the http_port port (default 8030) and query_port port (default 9030) of the FE node in the StarRocks instance, along with the be_http_port port (default 8040) of the BE node.

  • Using the Flink Connector to import data into StarRocks requires SELECT and INSERT permissions on the target table.

  • The compatibility requirements for Flink Connector versions with Java, Scala environments, and Flink versions are as follows.

    Connector

    Flink

    StarRocks

    Java

    Scala

    1.2.9

    1.15~1.18

    2.1 and above

    8

    2.11, 2.12

    1.2.8

    1.13~1.17

    2.1 and above

    8

    2.11, 2.12

    1.2.7

    1.11~1.15

    2.1 and above

    8

    2.11, 2.12

Configuration

This section introduces StarRocks parameter settings and corresponding data type mappings. For more detailed information, please refer to Continuously load data from Apache Flink® | StarRocks.

Parameters

Parameter

Required

Default value

Description

connector

Yes

NONE

Specifies the connector as StarRocks, fixed to starrocks.

jdbc-url

Yes

NONE

The JDBC URL that is used to connect to StarRocks and perform queries in StarRocks.

Example: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030. Here, fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com is the internal address of the FE node of the EMR Serverless StarRocks instance.

Note

For information about how to obtain the internal address of the FE node of an EMR Serverless StarRocks instance, see View instance list and details.

load-url

Yes

NONE

Specifies the internal address and HTTP port of the FE node, in the format of Internal address of the FE node of the EMR Serverless StarRocks instance:8030.

Example: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030.

database-name

Yes

NONE

The name of the StarRocks database.

table-name

Yes

NONE

The name of the StarRocks table.

username

Yes

NONE

The username of the StarRocks instance. For example, the default admin.

Using the Flink Connector to import data into StarRocks requires SELECT and INSERT permissions on the target table. If your user account does not have these permissions, you need to grant them. For more information, see Manage users and data authorization.

password

Yes

NONE

The password of the StarRocks instance user.

sink.semantic

No

at-least-once

Defines the level of semantic guarantee for the sink, ensuring reliability and consistency when data is written to the target system. Valid values:

  • at-least-once: Ensures data is written at least once, with possible duplicate data.

  • exactly-once: Ensures data is written exactly once, with no duplicates or loss.

sink.version

No

AUTO

The interface for importing data. This parameter is supported starting from Flink Connector 1.2.4.

  • V1: Uses the Stream Load interface to import data. Flink Connector versions before 1.2.4 only support this mode.

  • V2: Uses the Stream Load transaction interface to import data. Requires StarRocks version 2.4 or higher. V2 is recommended because it reduces memory usage and provides a more stable exactly-once implementation.

  • AUTO: Automatically selects V2 if the StarRocks version supports the Stream Load transaction interface, otherwise selects V1.

sink.label-prefix

No

NONE

Specifies the prefix for the label used by Stream Load. If the Flink Connector version is 1.2.8 or later, and the sink guarantees exactly-once semantics, it is recommended to configure a label prefix.

sink.buffer-flush.max-bytes

No

94371840(90M)

The maximum size of data accumulated in memory, after which the data is imported into StarRocks at once through Stream Load. Setting a larger value can improve import performance but may lead to higher import latency.

Valid values: [64MB, 10GB].

Note
  • This parameter is effective only when sink.semantic is set to at-least-once.

  • When sink.semantic is set to exactly-once, data in memory is flushed only when a Flink checkpoint is triggered. In this case, the sink.buffer-flush.max-bytes parameter does not take effect because data is not automatically flushed when the threshold is reached.

sink.buffer-flush.max-rows

No

500000

The maximum number of data rows accumulated in memory, after which the data is imported into StarRocks at once through Stream Load.

Valid values: [64000, 5000000].

Note

This parameter is effective only when sink.version is set to V1 and sink.semantic is set to at-least-once.

sink.buffer-flush.interval-ms

No

300000

Sets the time interval for data sending, controlling the latency of data written to StarRocks.

Valid values: [1000, 3600000].

Note

This parameter is effective only when sink.semantic is set to at-least-once.

sink.max-retries

No

3

The number of retries after a Stream Load failure. If this number is exceeded, the data import task reports an error.

Valid values: [0, 10].

Note

This parameter is effective only when sink.version is set to V1.

sink.connect.timeout-ms

No

30000

The timeout period for establishing an HTTP connection with the FE.

Valid values: [100, 60000].

Before Flink Connector v1.2.9, the default value was 1000.

sink.socket.timeout-ms

No

-1

This parameter is supported starting from Flink connector 1.2.10. The timeout period for the HTTP client to wait for data. Unit: milliseconds. The default value -1 indicates no timeout.

sink.wait-for-continue.timeout-ms

No

10000

This parameter is supported starting from Flink Connector 1.2.7. The timeout period for waiting for the FE HTTP 100-continue response.

Valid values: [3000, 60000].

sink.ignore.update-before

No

TRUE

This parameter is supported starting from Flink Connector 1.2.8. When importing data into a primary key table, specifies whether to ignore UPDATE_BEFORE records from Flink. If this parameter is set to false, the record is treated as a DELETE operation in the primary key table.

sink.parallelism

No

NONE

The parallelism of writing, applicable only to Flink SQL. If not set, the Flink planner will determine the parallelism. In multi-parallelism scenarios, users need to ensure that data is written in the correct order.

sink.properties.*

No

NONE

Parameters for Stream Load, controlling Stream Load import behavior.

sink.properties.format

No

csv

The data format for Stream Load import. The Flink Connector converts the data in memory to the corresponding format and then imports it into StarRocks through Stream Load. Valid values: CSV or JSON.

sink.properties.column_separator

No

\t

The column separator for CSV data.

sink.properties.row_delimiter

No

\n

The row delimiter for CSV data.

sink.properties.max_filter_ratio

No

0

The maximum tolerance rate for import jobs, which is the maximum proportion of data rows that can be filtered out due to data quality issues.

Valid values: 0~1.

sink.properties.partial_update

No

false

Specifies whether to use partial update. Valid values include TRUE and FALSE (default).

sink.properties.partial_update_mode

No

row

Specifies the mode for partial update. Valid values:

  • row (default): Specifies to perform partial update in row mode, which is more suitable for real-time update scenarios with more columns and small batches.

  • column: Specifies to perform partial update in column mode, which is more suitable for batch processing update scenarios with fewer columns and many rows. In this scenario, enabling column mode can significantly improve update performance.

sink.properties.strict_mode

No

false

Specifies whether to enable strict mode for Stream Load. Strict mode affects import behavior when unqualified rows (such as inconsistent column values) appear in the imported data.

Valid values: true and false.

sink.properties.compression

No

NONE

This parameter is supported starting from Flink Connector 1.2.10. Specifies the compression algorithm for Stream Load. Currently, only JSON format compression is supported.

Valid value: lz4_frame.

Note

Only StarRocks v3.2.7 and higher versions support JSON format compression.

Data type mapping

Data type of Flink

Data type of StarRocks

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

BINARY

INT

CHAR

STRING

VARCHAR

STRING

STRING

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE(N)

DATETIME

TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)

DATETIME

ARRAY<T>

ARRAY<T>

MAP<KT,VT>

JSON STRING

ROW<arg T...>

JSON STRING

Preparations

Get the Flink Connector JAR and upload it to the Flink cluster

  1. You can obtain the Flink Connector JAR package through the following methods.

    Method 1: Direct download

    Get different versions of Flink Connector JAR files from the Maven Central Repository.

    Method 2: Maven dependency

    In the pom.xml file of your Maven project, add the Flink Connector as a dependency using the following format.

    • For Flink Connector applicable to Flink version 1.15 and later.

      <dependency>
          <groupId>com.starrocks</groupId>
          <artifactId>flink-connector-starrocks</artifactId>
          <version>${connector_version}_flink-${flink_version}</version>
      </dependency>
    • For Flink Connector applicable to Flink versions before 1.15.

      <dependency>
          <groupId>com.starrocks</groupId>
          <artifactId>flink-connector-starrocks</artifactId>
          <version>${connector_version}_flink-${flink_version}_${scala_version}</version>
      </dependency>

    Method 3: Manual compilation

    1. Download the Flink Connector code.

    2. Run the following command to compile the source code of the Flink Connector into a JAR file.

      sh build.sh <flink_version>

      For example, if the Flink version in your environment is 1.17, you need to run the following command.

      sh build.sh 1.17
    3. After compilation is complete, find the generated JAR file in the target/ directory.

      For example, the file name is typically in the format of flink-connector-starrocks-1.2.7_flink-1.17-SNAPSHOT.jar.

      Note

      Unofficial releases of Flink Connector versions will have a SNAPSHOT suffix.

    The naming format of Flink Connector JAR files is as follows:

    • For Flink Connector applicable to Flink version 1.15 and later, the naming format is flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar. For example, if you have installed Flink 1.17 and want to use Flink Connector version 1.2.8, you can use flink-connector-starrocks-1.2.8_flink-1.17.jar.

    • For Flink Connector applicable to Flink versions before 1.15, the naming format is flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar. For example, if you have installed Flink 1.14 and Scala 2.12, and you want to use Flink Connector version 1.2.7, you can use flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar.

      Note

      Please replace the following information according to your actual situation:

      • flink_version: The version number of Flink.

      • scala_version: The version number of Scala.

      • connector_version: The version number of the Flink Connector.

  2. Upload the obtained Flink Connector JAR file to the flink-{flink_version}/lib directory of the Flink cluster.

    For example, if you are using an EMR cluster with version EMR-5.19.0, the JAR file should be placed in the /opt/apps/FLINK/flink-current/lib directory.

Start the Flink cluster

  1. Log on to the master node of the Flink cluster. For more information, see Log on to a cluster.

  2. Run the following command to start the Flink cluster.

    /opt/apps/FLINK/flink-current/bin/start-cluster.sh

Examples

Write data using Flink SQL

  1. Create a database named test in StarRocks, and create a primary key table named score_board in it.

    CREATE DATABASE test;
    
    CREATE TABLE test.score_board(
        id int(11) NOT NULL COMMENT "",
        name varchar(65533) NULL DEFAULT "" COMMENT "",
        score int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(id)
    DISTRIBUTED BY HASH(id);
  2. Log on to the master node of the Flink cluster. For more information, see Log on to a cluster.

  3. Run the following command to start Flink SQL:

    /opt/apps/FLINK/flink-current/bin/sql-client.sh
  4. Run the following command to create a table named score_board and insert data into it.

    CREATE TABLE `score_board` (
        `id` INT,
        `name` STRING,
        `score` INT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'score_board',
        'username' = 'admin',
        'password' = '<password>',
    );
    
    INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

    If the target is to import data into a StarRocks primary key table, you must explicitly specify the primary key in the DDL of the Flink table. For other types of StarRocks tables (such as Duplicate Key tables), defining a primary key is optional.

Write data using Flink DataStream

Write corresponding Flink DataStream jobs based on different types of input records.

  • Write string data in CSV format

    If the input records are strings in CSV format, the main code of the corresponding Flink DataStream job is shown below. For the complete code, see LoadCsvRecords.

    /**
     * Generate CSV-format records. Each record has three values separated by "\t". 
     * These values will be loaded to the columns `id`, `name`, and `score` in the StarRocks table.
     */
    String[] records = new String[]{
            "1\tstarrocks-csv\t100",
            "2\tflink-csv\t100"
    };
    DataStream<String> source = env.fromElements(records);
    
    /**
     * Configure the Flink connector with the required properties.
     * You also need to add properties "sink.properties.format" and "sink.properties.column_separator"
     * to tell the Flink connector the input records are CSV-format, and the column separator is "\t".
     * You can also use other column separators in the CSV-format records,
     * but remember to modify the "sink.properties.column_separator" correspondingly.
     */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", jdbcUrl)
            .withProperty("load-url", loadUrl)
            .withProperty("database-name", "test")
            .withProperty("table-name", "score_board")
            .withProperty("username", "root")
            .withProperty("password", "")
            .withProperty("sink.properties.format", "csv")
            .withProperty("sink.properties.column_separator", "\t")
            .build();
    // Create the sink with the options.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • Write string data in JSON format

    If the input records are strings in JSON format, the main code of the corresponding Flink DataStream job is shown below. For the complete code, see LoadJsonRecords.

    /**
     * Generate JSON-format records. 
     * Each record has three key-value pairs corresponding to the columns id, name, and score in the StarRocks table.
     */
    String[] records = new String[]{
            "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}",
            "{\"id\":2, \"name\":\"flink-json\", \"score\":100}",
    };
    DataStream<String> source = env.fromElements(records);
    
    /** 
     * Configure the Flink connector with the required properties.
     * You also need to add properties "sink.properties.format" and "sink.properties.strip_outer_array"
     * to tell the Flink connector the input records are JSON-format and to strip the outermost array structure. 
     */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", jdbcUrl)
            .withProperty("load-url", loadUrl)
            .withProperty("database-name", "test")
            .withProperty("table-name", "score_board")
            .withProperty("username", "root")
            .withProperty("password", "")
            .withProperty("sink.properties.format", "json")
            .withProperty("sink.properties.strip_outer_array", "true")
            .build();
    // Create the sink with the options.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • Write custom Java object data

    If the input records are custom Java objects, the main code of the corresponding Flink DataStream job is shown below. For the complete code, see LoadCustomJavaRecords.

    • In this example, a simple POJO class RowData is defined to represent each record.

      public static class RowData {
              public int id;
              public String name;
              public int score;
        
              public RowData() {}
        
              public RowData(int id, String name, int score) {
                  this.id = id;
                  this.name = name;
                  this.score = score;
              }
          }
      
    • The main code is shown below.

      // Generate records which use RowData as the container.
      RowData[] records = new RowData[]{
              new RowData(1, "starrocks-rowdata", 100),
              new RowData(2, "flink-rowdata", 100),
          };
      DataStream<RowData> source = env.fromElements(records);
      
      // Configure the Flink connector with the required properties.
      StarRocksSinkOptions options = StarRocksSinkOptions.builder()
              .withProperty("jdbc-url", jdbcUrl)
              .withProperty("load-url", loadUrl)
              .withProperty("database-name", "test")
              .withProperty("table-name", "score_board")
              .withProperty("username", "root")
              .withProperty("password", "")
              .build();
      
      /**
       * The Flink connector will use a Java object array (Object[]) to represent a row to be loaded into the StarRocks table,
       * and each element is the value for a column.
       * You need to define the schema of the Object[] which matches that of the StarRocks table.
       */
      TableSchema schema = TableSchema.builder()
              .field("id", DataTypes.INT().notNull())
              .field("name", DataTypes.STRING())
              .field("score", DataTypes.INT())
              // When the StarRocks table is a Primary Key table, you must specify notNull(), for example, DataTypes.INT().notNull(), for the primary key `id`.
              .primaryKey("id")
              .build();
      // Transform the RowData to the Object[] according to the schema.
      RowDataTransformer transformer = new RowDataTransformer();
      // Create the sink with the schema, options, and transformer.
      SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer);
      source.addSink(starRockSink);
      

      The RowDataTransformer is defined as follows.

      private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> {
      
          /**
           * Set each element of the object array according to the input RowData.
           * The schema of the array matches that of the StarRocks table.
           */
          @Override
          public void accept(Object[] internalRow, RowData rowData) {
              internalRow[0] = rowData.id;
              internalRow[1] = rowData.name;
              internalRow[2] = rowData.score;
              // When the StarRocks table is a Primary Key table, you need to set the last element to indicate whether the data loading is an UPSERT or DELETE operation.
              internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
          }
      }  

Synchronize data using Flink CDC 3.0

The Flink CDC 3.0 framework allows you to easily build streaming ELT pipelines from CDC data sources (such as MySQL, Kafka) to StarRocks. Through this pipeline, you can achieve the following functions:

  • Automatic creation of databases and tables

  • Synchronization of full and incremental data

  • Synchronization of Schema Changes

Starting from StarRocks Flink Connector v1.2.9, the connector has been integrated into the Flink CDC 3.0 framework and is named StarRocks Pipeline Connector. This connector has all the above features and is recommended to be used with StarRocks v3.2.1 and above to fully utilize the fast_schema_evolution feature, which further improves the speed of adding and removing columns and reduces resource consumption.

Best practices

Import to primary key tables

  1. Create a database named test in StarRocks, and create a primary key table score_board in it.

    CREATE DATABASE `test`;
    
    CREATE TABLE `test`.`score_board`
    (
        `id` int(11) NOT NULL COMMENT "",
        `name` varchar(65533) NULL DEFAULT "" COMMENT "",
        `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`);
  2. Insert data into the StarRocks table.

    INSERT INTO `test`.`score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);
  3. Run the following command to start the Flink SQL client.

    /opt/apps/FLINK/flink-current/bin/sql-client.sh
  4. Update data.

    Partial update

    Partial update allows you to update only specified columns (such as name) without affecting other columns (such as score).

    1. Create a table score_board in the Flink SQL client and enable the partial update feature.

      CREATE TABLE `score_board` (
          `id` INT,
          `name` STRING,
          PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
          'connector' = 'starrocks',
          'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
          'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
          'database-name' = 'test',
          'table-name' = 'score_board',
          'username' = 'admin',
          'password' = '<password>',
          'sink.properties.partial_update' = 'true',
          -- only for Flink connector version <= 1.2.7
          'sink.properties.columns' = 'id,name,__op'
      ); 
      • sink.properties.partial_update: Enables partial update.

      • sink.properties.columns: Specifies the columns to be updated. If the Flink Connector version is less than or equal to 1.2.7, you also need to set the option sink.properties.columns to id,name,__op to tell the Flink connector which columns need to be updated. Note that you need to append the field __op at the end. The field __op indicates whether the import is an UPSERT or DELETE operation, and its value is automatically set by the Flink connector.

    2. Insert update data.

      Insert two rows of data with the same primary keys as the existing data, but with modified values in the name column.

      INSERT INTO score_board VALUES (1, 'starrocks-update'), (2, 'flink-update');
    3. Query the StarRocks table in SQL Editor.

      SELECT * FROM `test`.`score_board`;

      You will see that only the values in the name column have changed, while the score column remains unchanged.

      image

    Conditional update

    This example shows how to perform conditional updates based on the value of the score column. A data row is updated only when the score column value in the imported data is greater than or equal to the current value in the StarRocks table.

    1. Create a table score_board in the Flink SQL client as follows.

      CREATE TABLE `score_board` (
          `id` INT,
          `name` STRING,
          `score` INT,
          PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
          'connector' = 'starrocks',
          'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
          'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
          'database-name' = 'test',
          'table-name' = 'score_board',
          'username' = 'admin',
          'password' = '<password>',
          'sink.properties.merge_condition' = 'score',
          'sink.version' = 'V1'
      );
      • sink.properties.merge_condition: Set to score, indicating that when data is written, the Flink Connector will use the score column as the update condition.

      • sink.version: Set to V1, indicating that the Flink Connector uses the Stream Load interface to import data.

    2. Insert two rows of data into the table in the Flink SQL client.

      The data rows have the same primary keys as the rows in the StarRocks table. The first data row has a smaller value in the score column, while the second data row has a larger value in the score column.

      INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101);
    3. Query the StarRocks table in SQL Editor.

      SELECT * FROM `test`.`score_board`;

      You will see that only the second row of data has changed, while the first row of data remains unchanged.

      image

Import to Bitmap columns

The Bitmap type is commonly used to accelerate exact deduplication counting scenarios, such as calculating unique visitors (UV). Below is a complete example showing how to import data into a Bitmap column in a StarRocks table through Flink SQL and query the UV count in StarRocks.

  1. Create a StarRocks aggregate table in SQL Editor.

    Create an aggregate table named page_uv in the test database, where:

    • The visit_users column is defined as a BITMAP type and configured with the BITMAP_UNION aggregate function.

    • page_id and visit_date serve as aggregate keys (AGGREGATE KEY) for grouping and deduplication.

    CREATE TABLE `test`.`page_uv` (
      `page_id` INT NOT NULL COMMENT 'page ID',
      `visit_date` datetime NOT NULL COMMENT 'access time',
      `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Create a table in the Flink SQL client.

    Since Flink does not support the Bitmap type, column mapping and type conversion need to be implemented as follows:

    • In the Flink table, define the visit_user_id column as BIGINT type to represent the visit_users column in the StarRocks table.

    • Use the sink.properties.columns configuration to convert the data in the visit_user_id column to Bitmap type through the to_bitmap function.

    CREATE TABLE `page_uv` (
        `page_id` INT,
        `visit_date` TIMESTAMP,
        `visit_user_id` BIGINT
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'page_uv',
        'username' = 'admin',
        'password' = '<password>',
        'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)'
    );
  3. Insert data in the Flink SQL client.

    Insert multiple rows of data into the page_uv table, simulating different users accessing pages at different times.

    visit_user_id is of BIGINT type, and Flink will automatically convert it to Bitmap type.
    INSERT INTO `page_uv` VALUES
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
       (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
       (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  4. Query the UV count in SQL Editor.

    Use StarRocks' aggregation capability to calculate the unique visitor count (UV) for each page using COUNT(DISTINCT visit_users).

    SELECT page_id, COUNT(DISTINCT visit_users) FROM page_uv GROUP BY page_id;

    The returned result is shown below.

    image

Import to HLL columns

HLL (HyperLogLog) is a data type used for approximate deduplication counting, suitable for calculating unique visitors (UV) in large-scale data scenarios. Below is a complete example showing how to import data into an HLL column in a StarRocks table through Flink SQL and query the UV count in StarRocks.

  1. Create a StarRocks aggregate table in SQL Editor.

    Create an aggregate table named hll_uv in the test database, where:

    • The visit_users column is defined as an HLL type and configured with the HLL_UNION aggregate function.

    • page_id and visit_date serve as aggregate keys (AGGREGATE KEY) for grouping and deduplication.

    CREATE TABLE `test`.`hll_uv` (
      `page_id` INT NOT NULL COMMENT 'page ID',
      `visit_date` DATETIME NOT NULL COMMENT 'access time',
      `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. Create a table in the Flink SQL client.

    Since Flink does not support the HLL type, column mapping and type conversion need to be implemented as follows:

    • In the Flink table, define the visit_user_id column as BIGINT type to represent the visit_users column in the StarRocks table.

    • Use the sink.properties.columns configuration for column mapping, and convert the BIGINT type visit_user_id data to HLL type through the hll_hash function.

    CREATE TABLE `hll_uv` (
        `page_id` INT,
        `visit_date` TIMESTAMP,
        `visit_user_id` BIGINT
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'hll_uv',
        'username' = 'admin',
        'password' = '<password>',
        'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)'
    );
  3. Insert data in the Flink SQL client.

    Insert multiple rows of data into the hll_uv table, simulating different users accessing pages at different times.

    visit_user_id is of BIGINT type, and Flink will automatically convert it to HLL type.
    INSERT INTO `hll_uv` VALUES
       (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
       (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
       (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  4. Query the UV count in SQL Editor.

    Use StarRocks' aggregation capability to calculate the unique visitor count (UV) for each page using COUNT(DISTINCT visit_users).

    SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;

    The returned result is shown below.

    image