全部產品
Search
文件中心

E-MapReduce:Flink Connector

更新時間:Jul 30, 2025

StarRocks提供Apache Flink連接器(以下簡稱Flink Connector),可以通過Flink匯入資料至StarRocks表。相比於Flink內建的flink-connector-jdbc,StarRocks的Flink Connector效能更優越且穩定性更強,特別適合大規模資料匯入情境。

背景資訊

StarRocks Flink Connector通過在記憶體中緩衝小批次資料,並利用StarRocks的Stream Load功能進行大量匯入,支援DataStream API、Table API & SQL以及 Python API,能夠顯著提升資料匯入效率。

前提條件

  • 已建立包含Flink服務的叢集。

    本文以在EMR on ECS中建立包含Flink服務的DataFlow叢集(後續簡稱Flink叢集)為例,詳情請參見建立叢集

  • 已建立EMR Serverless StarRocks執行個體,詳情請參見建立執行個體

使用限制

  • 確保Flink所在機器能夠訪問StarRocks執行個體中FE節點的http_port連接埠(預設8030)和query_port連接埠(預設9030),以及BE節點的be_http_port連接埠(預設8040)。

  • 使用Flink Connector匯入資料至StarRocks需要目標表的SELECT和INSERT許可權。

  • Flink Connector版本與Java、Scala環境及Flink版本等相容性要求如下。

    Connector

    Flink

    StarRocks

    Java

    Scala

    1.2.9

    1.15~1.18

    2.1及以上

    8

    2.11、2.12

    1.2.8

    1.13~1.17

    2.1及以上

    8

    2.11、2.12

    1.2.7

    1.11~1.15

    2.1及以上

    8

    2.11、2.12

配置相關

這部分將為您介紹StarRocks的參數設定及其相應的資料類型映射。有關更詳細的資訊,請參見Continuously load data from Apache Flink® | StarRocks

參數說明

參數

是否必填

預設值

描述

connector

Yes

NONE

指定連接器為StarRocks,固定設定為starrocks

jdbc-url

Yes

NONE

用於在StarRocks中執行查詢操作。

例如,jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。其中,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com為EMR Serverless StarRocks執行個體FE節點的內網地址。

說明

關於如何擷取EMR Serverless StarRocks執行個體FE節點的內網地址,請參見查看執行個體列表與詳情

load-url

Yes

NONE

指定FE節點的內網地址和HTTP連接埠,格式為EMR Serverless StarRocks執行個體FE節點的內網地址:8030

例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。

database-name

Yes

NONE

StarRocks資料庫名。

table-name

Yes

NONE

StarRocks表名。

username

Yes

NONE

StarRocks執行個體的使用者名稱。例如,預設的admin。

使用Flink Connector匯入資料至StarRocks需要目標表的SELECT和INSERT許可權。如果您的使用者帳號沒有這些許可權,則需要為使用者授權,詳情請參見系統管理使用者及資料授權

password

Yes

NONE

StarRocks執行個體的使用者密碼。

sink.semantic

No

at-least-once

定義Sink的語義保證層級,用於確保資料寫入目標系統時的可靠性和一致性。取值如下:

  • at-least-once:保證資料至少被寫入一次,可能存在重複資料。

  • exactly-once:保證資料精確寫入一次,無重複、無丟失。

sink.version

No

AUTO

匯入資料的介面。此參數自Flink Connector 1.2.4開始支援。

  • V1:使用Stream Load介面匯入資料。1.2.4之前的Flink Connector僅支援此模式。

  • V2:使用Stream Load事務介面匯入資料。要求StarRocks版本大於等於2.4。建議選擇V2,因為其降低記憶體使用量,並提供了更穩定的exactly-once實現。

  • AUTO:如果StarRocks版本支援Stream Load事務介面,將自動選擇V2,否則選擇V1。

sink.label-prefix

No

NONE

指定Stream Load使用的label的首碼。 如果Flink Connector版本為1.2.8及以上,並且Sink保證exactly-once語義,則建議配置label首碼。

sink.buffer-flush.max-bytes

No

94371840(90M)

積攢在記憶體的資料大小,達到該閾值後資料通過Stream Load一次性匯入StarRocks。設定較大的值可以提高匯入效能,但可能導致更高的匯入延遲。

取值範圍:[64MB, 10GB]。

說明
  • 該參數只在sink.semanticat-least-once才會生效。 

  • sink.semanticexactly-once時,只有在Flink checkpoint觸發時,記憶體中的資料才會被重新整理。在這種情況下,sink.buffer-flush.max-bytes參數將不生效,因為資料不會因為達到閾值而自動重新整理。

sink.buffer-flush.max-rows

No

500000

積攢在記憶體的資料條數,達到該閾值後資料通過Stream Load一次性匯入StarRocks。

取值範圍:[64000, 5000000]。

說明

該參數僅在sink.versionV1sink.semanticat-least-once時才會生效。

sink.buffer-flush.interval-ms

No

300000

設定資料發送的時間間隔,控制資料寫入StarRocks的延遲。

取值範圍:[1000, 3600000]。

說明

該參數僅在sink.semantic設定為at-least-once時才會生效。

sink.max-retries

No

3

Stream Load失敗後的重試次數。超過該數量上限,則資料匯入任務報錯。

取值範圍:[0, 10]。

說明

該參數僅在sink.versionV1才會生效。

sink.connect.timeout-ms

No

30000

與FE建立HTTP串連的逾時時間。

取值範圍:[100, 60000]。

Flink Connector v1.2.9之前,預設值為1000

sink.socket.timeout-ms

No

-1

此參數自Flink connector 1.2.10開始支援。HTTP 用戶端等待資料的逾時時間。單位:毫秒。預設值-1表示沒有逾時時間。

sink.wait-for-continue.timeout-ms

No

10000

此參數自Flink Connector 1.2.7開始支援。等待FE HTTP 100-continue應答的逾時時間。

取值範圍:[3000, 60000]。

sink.ignore.update-before

No

TRUE

此參數自Flink Connector 1.2.8開始支援。將資料匯入到主鍵表時,是否忽略來自Flink的UPDATE_BEFORE記錄。如果將此參數設定為false,則將該記錄在主鍵表中視為DELETE操作。

sink.parallelism

No

NONE

寫入的並行度,僅適用於Flink SQL。如果未設定, Flink planner將決定並行度。在多並行度的情境中,使用者需要確保資料按正確順序寫入。

sink.properties.*

No

NONE

Stream Load的參數,控制Stream Load匯入行為。

sink.properties.format

No

csv

Stream Load匯入時的資料格式。Flink Connector會將記憶體的資料轉換為對應格式,然後通過Stream Load匯入至StarRocks。取值為CSV或JSON。

sink.properties.column_separator

No

\t

CSV資料的資料行分隔符號。

sink.properties.row_delimiter

No

\n

CSV資料的行分隔字元。

sink.properties.max_filter_ratio

No

0

匯入作業的最大容錯率,即匯入作業能夠容忍的因資料品質不合格而過濾掉的資料行所佔的最大比例。

取值範圍:0~1。

sink.properties.partial_update

No

false

是否使用部分更新。取值包括TRUEFALSE(預設值)。

sink.properties.partial_update_mode

No

row

指定部分更新的模式,取值如下:

  • row(預設值):指定使用行模式執行部分更新,比較適用於較多列且小批量的即時更新情境。

  • column:指定使用列模式執行部分更新,比較適用於少數列並且大量行的批處理更新情境。在此情境下,開啟列模式可顯著提升更新效能。

sink.properties.strict_mode

No

false

是否為Stream Load啟用strict 模式。strict 模式會在匯入資料中出現不合格行(如列值不一致)時影響匯入行為。

有效值:truefalse

sink.properties.compression

No

NONE

此參數自Flink Connector 1.2.10開始支援。指定用於Stream Load的壓縮演算法。當前僅支援JSON格式的壓縮。

有效值:lz4_frame

說明

僅StarRocks v3.2.7及更高版本支援JSON格式的壓縮。

資料類型映射

Flink資料類型

StarRocks資料類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

BINARY

INT

CHAR

STRING

VARCHAR

STRING

STRING

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE(N)

DATETIME

TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)

DATETIME

ARRAY<T>

ARRAY<T>

MAP<KT,VT>

JSON STRING

ROW<arg T...>

JSON STRING

準備工作

擷取Flink Connector JAR並上傳至Flink叢集

  1. 您可以通過以下方式擷取Flink Connector JAR包。

    方式1:直接下載

    Maven Central Repository擷取不同版本的Flink Connector JAR檔案。

    方式2:Maven依賴

    在Maven專案的pom.xml檔案中,根據以下格式將Flink Connector添加為依賴項。

    • 適用於Flink 1.15版本及以後的Flink Connector。

      <dependency>
          <groupId>com.starrocks</groupId>
          <artifactId>flink-connector-starrocks</artifactId>
          <version>${connector_version}_flink-${flink_version}</version>
      </dependency>
    • 適用於Flink 1.15版本之前的Flink Connector。

      <dependency>
          <groupId>com.starrocks</groupId>
          <artifactId>flink-connector-starrocks</artifactId>
          <version>${connector_version}_flink-${flink_version}_${scala_version}</version>
      </dependency>

    方式3:手動編譯

    1. 下載Flink Connector代碼

    2. 執行以下命令,將Flink Connector的原始碼編譯成一個JAR檔案。

      sh build.sh <flink_version>

      例如,如果您的環境中的Flink版本為1.17,您需要執行以下命令。

      sh build.sh 1.17
    3. 編譯完成後,在 target/ 目錄下找到產生的JAR檔案。

      例如,檔案名稱通常為flink-connector-starrocks-1.2.7_flink-1.17-SNAPSHOT.jar格式。

      說明

      非正式發布的Flink Connector版本會帶有SNAPSHOT尾碼。

    Flink Connector JAR檔案的命名格式如下:

    • 適用於Flink 1.15版本及以後的Flink Connector命名格式為flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar。例如您安裝了Flink 1.17,並且想要使用1.2.8版本的Flink Connector,則您可以使用flink-connector-starrocks-1.2.8_flink-1.17.jar

    • 適用於Flink 1.15版本之前的Flink Connector命名格式為flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar。例如您安裝了Flink 1.14和Scala 2.12,並且您想要使用1.2.7版本的Flink Connector,您可以使用flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar

      說明

      請根據實際情況替換以下資訊:

      • flink_version:Flink的版本號碼。

      • scala_version:Scala的版本號碼。

      • connector_version:Flink Connector的版本號碼。

  2. 將擷取到的Flink Connector JAR檔案上傳至Flink叢集的flink-{flink_version}/lib目錄下。

    例如,如果您使用的是EMR叢集,且叢集版本為EMR-5.19.0,則JAR檔案應放置於/opt/apps/FLINK/flink-current/lib目錄下。

啟動Flink叢集

  1. 登入Flink叢集的Master節點,詳情請參見登入叢集

  2. 執行以下命令,啟動Flink叢集。

    /opt/apps/FLINK/flink-current/bin/start-cluster.sh

使用樣本

使用Flink SQL寫入資料

  1. 在StarRocks中建立一個名為test的資料庫,並在其中建立一張名為score_board的主鍵表。

    CREATE DATABASE test;
    
    CREATE TABLE test.score_board(
        id int(11) NOT NULL COMMENT "",
        name varchar(65533) NULL DEFAULT "" COMMENT "",
        score int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(id)
    DISTRIBUTED BY HASH(id);
  2. 登入Flink叢集的Master節點,詳情請參見登入叢集

  3. 執行以下命令,啟動Flink SQL。

    /opt/apps/FLINK/flink-current/bin/sql-client.sh
  4. 執行以下命令,建立一個名為score_board的表,並向其中插入資料。

    CREATE TABLE `score_board` (
        `id` INT,
        `name` STRING,
        `score` INT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'score_board',
        'username' = 'admin',
        'password' = '<password>',
    );
    
    INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);

    如果目標是將資料匯入StarRocks主鍵表,則必須在Flink表的DDL中明確指定主鍵。對於其他類型的StarRocks表(如Duplicate Key表),定義主鍵則為可選項。

使用Flink DataStream寫入資料

根據輸入記錄(input records)的不同類型,編寫對應Flink DataStream作業。

  • 寫入CSV格式的字串資料

    如果輸入記錄為CSV格式的字串,對應的Flink DataStream作業的主要代碼如下所示,完整代碼請參見LoadCsvRecords

    /**
     * Generate CSV-format records. Each record has three values separated by "\t". 
     * These values will be loaded to the columns `id`, `name`, and `score` in the StarRocks table.
     */
    String[] records = new String[]{
            "1\tstarrocks-csv\t100",
            "2\tflink-csv\t100"
    };
    DataStream<String> source = env.fromElements(records);
    
    /**
     * Configure the Flink connector with the required properties.
     * You also need to add properties "sink.properties.format" and "sink.properties.column_separator"
     * to tell the Flink connector the input records are CSV-format, and the column separator is "\t".
     * You can also use other column separators in the CSV-format records,
     * but remember to modify the "sink.properties.column_separator" correspondingly.
     */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", jdbcUrl)
            .withProperty("load-url", loadUrl)
            .withProperty("database-name", "test")
            .withProperty("table-name", "score_board")
            .withProperty("username", "root")
            .withProperty("password", "")
            .withProperty("sink.properties.format", "csv")
            .withProperty("sink.properties.column_separator", "\t")
            .build();
    // Create the sink with the options.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • 寫入JSON格式的字串資料

    如果輸入記錄為JSON格式的字串,對應的Flink DataStream作業的主要代碼如下所示,完整代碼請參見LoadJsonRecords

    /**
     * Generate JSON-format records. 
     * Each record has three key-value pairs corresponding to the columns id, name, and score in the StarRocks table.
     */
    String[] records = new String[]{
            "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}",
            "{\"id\":2, \"name\":\"flink-json\", \"score\":100}",
    };
    DataStream<String> source = env.fromElements(records);
    
    /** 
     * Configure the Flink connector with the required properties.
     * You also need to add properties "sink.properties.format" and "sink.properties.strip_outer_array"
     * to tell the Flink connector the input records are JSON-format and to strip the outermost array structure. 
     */
    StarRocksSinkOptions options = StarRocksSinkOptions.builder()
            .withProperty("jdbc-url", jdbcUrl)
            .withProperty("load-url", loadUrl)
            .withProperty("database-name", "test")
            .withProperty("table-name", "score_board")
            .withProperty("username", "root")
            .withProperty("password", "")
            .withProperty("sink.properties.format", "json")
            .withProperty("sink.properties.strip_outer_array", "true")
            .build();
    // Create the sink with the options.
    SinkFunction<String> starRockSink = StarRocksSink.sink(options);
    source.addSink(starRockSink);
  • 寫入自訂Java對象資料

    如果輸入記錄為自訂的Java對象,對應的Flink DataStream作業的主要代碼如下所示,完整代碼請參見LoadCustomJavaRecords

    • 本樣本中,定義一個簡單的POJO類 RowData,用於表示每條記錄。

      public static class RowData {
              public int id;
              public String name;
              public int score;
        
              public RowData() {}
        
              public RowData(int id, String name, int score) {
                  this.id = id;
                  this.name = name;
                  this.score = score;
              }
          }
      
    • 主要代碼如下所示。

      // Generate records which use RowData as the container.
      RowData[] records = new RowData[]{
              new RowData(1, "starrocks-rowdata", 100),
              new RowData(2, "flink-rowdata", 100),
          };
      DataStream<RowData> source = env.fromElements(records);
      
      // Configure the Flink connector with the required properties.
      StarRocksSinkOptions options = StarRocksSinkOptions.builder()
              .withProperty("jdbc-url", jdbcUrl)
              .withProperty("load-url", loadUrl)
              .withProperty("database-name", "test")
              .withProperty("table-name", "score_board")
              .withProperty("username", "root")
              .withProperty("password", "")
              .build();
      
      /**
       * The Flink connector will use a Java object array (Object[]) to represent a row to be loaded into the StarRocks table,
       * and each element is the value for a column.
       * You need to define the schema of the Object[] which matches that of the StarRocks table.
       */
      TableSchema schema = TableSchema.builder()
              .field("id", DataTypes.INT().notNull())
              .field("name", DataTypes.STRING())
              .field("score", DataTypes.INT())
              // When the StarRocks table is a Primary Key table, you must specify notNull(), for example, DataTypes.INT().notNull(), for the primary key `id`.
              .primaryKey("id")
              .build();
      // Transform the RowData to the Object[] according to the schema.
      RowDataTransformer transformer = new RowDataTransformer();
      // Create the sink with the schema, options, and transformer.
      SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer);
      source.addSink(starRockSink);
      

      其中RowDataTransformer定義如下所示。

      private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> {
      
          /**
           * Set each element of the object array according to the input RowData.
           * The schema of the array matches that of the StarRocks table.
           */
          @Override
          public void accept(Object[] internalRow, RowData rowData) {
              internalRow[0] = rowData.id;
              internalRow[1] = rowData.name;
              internalRow[2] = rowData.score;
              // When the StarRocks table is a Primary Key table, you need to set the last element to indicate whether the data loading is an UPSERT or DELETE operation.
              internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
          }
      }  

使用Flink CDC 3.0同步資料

Flink CDC 3.0架構能夠輕鬆構建從CDC資料來源(如MySQL、Kafka)到StarRocks的流式ELT管道。通過該管道,您可以實現以下功能:

  • 自動建立資料庫和表

  • 同步全量和增量資料

  • 同步Schema Change

自StarRocks Flink Connector v1.2.9版本起,該連接器已整合至Flink CDC 3.0架構中,並被命名為StarRocks Pipeline Connector。該連接器具備上述所有功能,建議與StarRocks v3.2.1及以上版本配合使用,以充分利用fast_schema_evolution特性,進一步提升列的增減速度並降低資源消耗。

最佳實務

匯入至主鍵表

  1. 在StarRocks中建立一個名為test的資料庫,並在其中建立一個主鍵表score_board

    CREATE DATABASE `test`;
    
    CREATE TABLE `test`.`score_board`
    (
        `id` int(11) NOT NULL COMMENT "",
        `name` varchar(65533) NULL DEFAULT "" COMMENT "",
        `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`);
  2. 向StarRocks表中插入資料。

    INSERT INTO `test`.`score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);
  3. 執行以下命令,啟動Flink SQL用戶端。

    /opt/apps/FLINK/flink-current/bin/sql-client.sh
  4. 更新資料。

    部分更新

    部分更新允許您僅更新指定列(如name),而不影響其他列(如score)。

    1. 在Flink SQL用戶端建立表score_board,並啟用部分更新功能。

      CREATE TABLE `score_board` (
          `id` INT,
          `name` STRING,
          PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
          'connector' = 'starrocks',
          'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
          'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
          'database-name' = 'test',
          'table-name' = 'score_board',
          'username' = 'admin',
          'password' = '<password>',
          'sink.properties.partial_update' = 'true',
          -- only for Flink connector version <= 1.2.7
          'sink.properties.columns' = 'id,name,__op'
      ); 
      • sink.properties.partial_update:啟用部分更新。

      • sink.properties.columns:指定需要更新的列。如果 Flink Connector版本小於等於1.2.7,則還需要將選項sink.properties.columns設定為id,name,__op,以告訴 Flink connector 需要更新的列。請注意,您需要在末尾附加欄位__op。欄位__op表示匯入是 UPSERT 還是 DELETE 操作,其值由 Flink connector 自動化佈建。

    2. 插入更新資料。

      插入兩行資料,主鍵與現有資料相同,但name列的值被修改。

      INSERT INTO score_board VALUES (1, 'starrocks-update'), (2, 'flink-update');
    3. 在SQL Editor中查詢StarRocks表。

      SELECT * FROM `test`.`score_board`;

      您會看到只有name列的值發生了變化,而score列保持不變。

      image

    條件更新

    本樣本展示如何根據score列的值進行條件更新。只有匯入的資料行中score列值大於等於StarRocks表當前值時,該資料行才會更新。

    1. 在Flink SQL用戶端按照以下方式建立表score_board

      CREATE TABLE `score_board` (
          `id` INT,
          `name` STRING,
          `score` INT,
          PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
          'connector' = 'starrocks',
          'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
          'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
          'database-name' = 'test',
          'table-name' = 'score_board',
          'username' = 'admin',
          'password' = '<password>',
          'sink.properties.merge_condition' = 'score',
          'sink.version' = 'V1'
      );
      • sink.properties.merge_condition:設定為score,表示在資料寫入時,Flink Connector會以 score 列作為更新條件。

      • sink.version:設定為V1,表示Flink Connector使用Stream Load介面匯入資料。

    2. 在Flink SQL用戶端插入兩行資料到表中。

      資料行的主鍵與StarRocks表中的行相同。第一行資料score列中具有較小的值,而第二行資料score列中具有較大的值。

      INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101);
    3. 在SQL Editor中查詢StarRocks表。

      SELECT * FROM `test`.`score_board`;

      您會看到只有第二行資料發生了變化,而第一行資料未發生變化。

      image

匯入至Bitmap列

Bitmap類型常用於加速精確去重計數情境,例如計算獨立訪客數(UV)。以下是一個完整的樣本,展示如何通過Flink SQL將資料匯入到StarRocks表的Bitmap列中,並在StarRocks中查詢UV數。

  1. 在SQL Editor中建立StarRocks彙總表。

    在資料庫test中建立一個彙總表page_uv,其中:

    • visit_users列被定義為 BITMAP 類型,並配置彙總函式BITMAP_UNION。

    • page_idvisit_date作為彙總鍵(AGGREGATE KEY),用於分組和去重。

    CREATE TABLE `test`.`page_uv` (
      `page_id` INT NOT NULL COMMENT 'page ID',
      `visit_date` datetime NOT NULL COMMENT 'access time',
      `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. 在Flink SQL用戶端中建立表。

    由於Flink不支援Bitmap類型,需要通過以下方式實現列映射和類型轉換:

    • 在Flink表中,將visit_user_id列定義為BIGINT類型,以代表StarRocks表中的visit_users列。

    • 使用sink.properties.columns配置,將visit_user_id列的資料通過to_bitmap函數轉換為Bitmap類型。

    CREATE TABLE `page_uv` (
        `page_id` INT,
        `visit_date` TIMESTAMP,
        `visit_user_id` BIGINT
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'page_uv',
        'username' = 'admin',
        'password' = '<password>',
        'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)'
    );
  3. 在Flink SQL用戶端中插入資料。

    向表page_uv插入多行資料,類比不同使用者在不同時間訪問頁面的行為。

    visit_user_id是BIGINT類型,Flink會將其自動轉換為Bitmap類型。
    INSERT INTO `page_uv` VALUES
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13),
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23),
       (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33),
       (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13),
       (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
  4. 在SQL Editor中查詢UV數。

    通過StarRocks的彙總能力,使用COUNT(DISTINCT visit_users)計算每個頁面的獨立訪客數(UV)。

    SELECT page_id, COUNT(DISTINCT visit_users) FROM page_uv GROUP BY page_id;

    返回結果如下所示。

    image

匯入至HLL列

HLL(HyperLogLog)是一種用於近似去重計數的資料類型,適合處理大規模資料的獨立訪客數(UV)計算。以下是一個完整的樣本,展示如何通過Flink SQL將資料匯入到StarRocks表的HL 列中,並在StarRocks中查詢UV數。

  1. 在SQL Editor中建立StarRocks彙總表。

    在資料庫test中建立一個彙總表hll_uv,其中:

    • visit_users列被定義為HLL類型,並配置彙總函式HLL_UNION。

    • page_idvisit_date作為彙總鍵(AGGREGATE KEY),用於分組和去重。

    CREATE TABLE `test`.`hll_uv` (
      `page_id` INT NOT NULL COMMENT 'page ID',
      `visit_date` DATETIME NOT NULL COMMENT 'access time',
      `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID'
    ) ENGINE=OLAP
    AGGREGATE KEY(`page_id`, `visit_date`)
    DISTRIBUTED BY HASH(`page_id`);
  2. 在Flink SQL用戶端中建立表。

    由於Flink不支援HLL類型,需要通過以下方式實現列映射和類型轉換:

    • 在Flink表中,將visit_user_id列定義為BIGINT類型,以代表StarRocks表中的visit_users列。

    • 使用sink.properties.columns配置列映射,並通過hll_hash函數將BIGINT類型的visit_user_id資料轉換為HLL類型。

    CREATE TABLE `hll_uv` (
        `page_id` INT,
        `visit_date` TIMESTAMP,
        `visit_user_id` BIGINT
    ) WITH (
        'connector' = 'starrocks',
        'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030',
        'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030',
        'database-name' = 'test',
        'table-name' = 'hll_uv',
        'username' = 'admin',
        'password' = '<password>',
        'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)'
    );
  3. 在Flink SQL用戶端中插入資料。

    向表hll_uv插入多行資料,類比不同使用者在不同時間訪問頁面的行為。

    visit_user_id是BIGINT類型,Flink會將其自動轉換為HLL類型。
    INSERT INTO `hll_uv` VALUES
       (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78),
       (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2),
       (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
  4. 在SQL Editor中查詢UV數。

    通過StarRocks的彙總能力,使用COUNT(DISTINCT visit_users)計算每個頁面的獨立訪客數(UV)。

    SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;

    返回結果如下所示。

    image