StarRocks提供Apache Flink連接器(以下簡稱Flink Connector),可以通過Flink匯入資料至StarRocks表。相比於Flink內建的flink-connector-jdbc,StarRocks的Flink Connector效能更優越且穩定性更強,特別適合大規模資料匯入情境。
背景資訊
StarRocks Flink Connector通過在記憶體中緩衝小批次資料,並利用StarRocks的Stream Load功能進行大量匯入,支援DataStream API、Table API & SQL以及 Python API,能夠顯著提升資料匯入效率。
前提條件
使用限制
確保Flink所在機器能夠訪問StarRocks執行個體中FE節點的http_port連接埠(預設
8030)和query_port連接埠(預設9030),以及BE節點的be_http_port連接埠(預設8040)。使用Flink Connector匯入資料至StarRocks需要目標表的SELECT和INSERT許可權。
Flink Connector版本與Java、Scala環境及Flink版本等相容性要求如下。
Connector
Flink
StarRocks
Java
Scala
1.2.9
1.15~1.18
2.1及以上
8
2.11、2.12
1.2.8
1.13~1.17
2.1及以上
8
2.11、2.12
1.2.7
1.11~1.15
2.1及以上
8
2.11、2.12
配置相關
這部分將為您介紹StarRocks的參數設定及其相應的資料類型映射。有關更詳細的資訊,請參見Continuously load data from Apache Flink® | StarRocks。
參數說明
參數 | 是否必填 | 預設值 | 描述 |
| Yes | NONE | 指定連接器為StarRocks,固定設定為 |
| Yes | NONE | 用於在StarRocks中執行查詢操作。 例如,jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。其中, 說明 關於如何擷取EMR Serverless StarRocks執行個體FE節點的內網地址,請參見查看執行個體列表與詳情。 |
| Yes | NONE | 指定FE節點的內網地址和HTTP連接埠,格式為 例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。 |
| Yes | NONE | StarRocks資料庫名。 |
| Yes | NONE | StarRocks表名。 |
| Yes | NONE | StarRocks執行個體的使用者名稱。例如,預設的admin。 使用Flink Connector匯入資料至StarRocks需要目標表的SELECT和INSERT許可權。如果您的使用者帳號沒有這些許可權,則需要為使用者授權,詳情請參見系統管理使用者及資料授權。 |
| Yes | NONE | StarRocks執行個體的使用者密碼。 |
| No | at-least-once | 定義Sink的語義保證層級,用於確保資料寫入目標系統時的可靠性和一致性。取值如下:
|
| No | AUTO | 匯入資料的介面。此參數自Flink Connector 1.2.4開始支援。
|
| No | NONE | 指定Stream Load使用的label的首碼。 如果Flink Connector版本為1.2.8及以上,並且Sink保證exactly-once語義,則建議配置label首碼。 |
| No | 94371840(90M) | 積攢在記憶體的資料大小,達到該閾值後資料通過Stream Load一次性匯入StarRocks。設定較大的值可以提高匯入效能,但可能導致更高的匯入延遲。 取值範圍:[64MB, 10GB]。 說明
|
sink.buffer-flush.max-rows | No | 500000 | 積攢在記憶體的資料條數,達到該閾值後資料通過Stream Load一次性匯入StarRocks。 取值範圍:[64000, 5000000]。 說明 該參數僅在 |
sink.buffer-flush.interval-ms | No | 300000 | 設定資料發送的時間間隔,控制資料寫入StarRocks的延遲。 取值範圍:[1000, 3600000]。 說明 該參數僅在 |
sink.max-retries | No | 3 | Stream Load失敗後的重試次數。超過該數量上限,則資料匯入任務報錯。 取值範圍:[0, 10]。 說明 該參數僅在 |
sink.connect.timeout-ms | No | 30000 | 與FE建立HTTP串連的逾時時間。 取值範圍:[100, 60000]。 Flink Connector v1.2.9之前,預設值為 |
sink.socket.timeout-ms | No | -1 | 此參數自Flink connector 1.2.10開始支援。HTTP 用戶端等待資料的逾時時間。單位:毫秒。預設值 |
sink.wait-for-continue.timeout-ms | No | 10000 | 此參數自Flink Connector 1.2.7開始支援。等待FE HTTP 100-continue應答的逾時時間。 取值範圍:[3000, 60000]。 |
sink.ignore.update-before | No | TRUE | 此參數自Flink Connector 1.2.8開始支援。將資料匯入到主鍵表時,是否忽略來自Flink的UPDATE_BEFORE記錄。如果將此參數設定為false,則將該記錄在主鍵表中視為DELETE操作。 |
sink.parallelism | No | NONE | 寫入的並行度,僅適用於Flink SQL。如果未設定, Flink planner將決定並行度。在多並行度的情境中,使用者需要確保資料按正確順序寫入。 |
sink.properties.* | No | NONE | Stream Load的參數,控制Stream Load匯入行為。 |
sink.properties.format | No | csv | Stream Load匯入時的資料格式。Flink Connector會將記憶體的資料轉換為對應格式,然後通過Stream Load匯入至StarRocks。取值為CSV或JSON。 |
sink.properties.column_separator | No | \t | CSV資料的資料行分隔符號。 |
sink.properties.row_delimiter | No | \n | CSV資料的行分隔字元。 |
sink.properties.max_filter_ratio | No | 0 | 匯入作業的最大容錯率,即匯入作業能夠容忍的因資料品質不合格而過濾掉的資料行所佔的最大比例。 取值範圍:0~1。 |
sink.properties.partial_update | No | false | 是否使用部分更新。取值包括 |
sink.properties.partial_update_mode | No | row | 指定部分更新的模式,取值如下:
|
sink.properties.strict_mode | No | false | 是否為Stream Load啟用strict 模式。strict 模式會在匯入資料中出現不合格行(如列值不一致)時影響匯入行為。 有效值: |
sink.properties.compression | No | NONE | 此參數自Flink Connector 1.2.10開始支援。指定用於Stream Load的壓縮演算法。當前僅支援JSON格式的壓縮。 有效值: 說明 僅StarRocks v3.2.7及更高版本支援JSON格式的壓縮。 |
資料類型映射
Flink資料類型 | StarRocks資料類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY<T> | ARRAY<T> |
MAP<KT,VT> | JSON STRING |
ROW<arg T...> | JSON STRING |
準備工作
擷取Flink Connector JAR並上傳至Flink叢集
您可以通過以下方式擷取Flink Connector JAR包。
方式1:直接下載
從Maven Central Repository擷取不同版本的Flink Connector JAR檔案。
方式2:Maven依賴
在Maven專案的
pom.xml檔案中,根據以下格式將Flink Connector添加為依賴項。適用於Flink 1.15版本及以後的Flink Connector。
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>${connector_version}_flink-${flink_version}</version> </dependency>適用於Flink 1.15版本之前的Flink Connector。
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>${connector_version}_flink-${flink_version}_${scala_version}</version> </dependency>
方式3:手動編譯
執行以下命令,將Flink Connector的原始碼編譯成一個JAR檔案。
sh build.sh <flink_version>例如,如果您的環境中的Flink版本為1.17,您需要執行以下命令。
sh build.sh 1.17編譯完成後,在
target/目錄下找到產生的JAR檔案。例如,檔案名稱通常為
flink-connector-starrocks-1.2.7_flink-1.17-SNAPSHOT.jar格式。說明非正式發布的Flink Connector版本會帶有
SNAPSHOT尾碼。
Flink Connector JAR檔案的命名格式如下:
適用於Flink 1.15版本及以後的Flink Connector命名格式為
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar。例如您安裝了Flink 1.17,並且想要使用1.2.8版本的Flink Connector,則您可以使用flink-connector-starrocks-1.2.8_flink-1.17.jar。適用於Flink 1.15版本之前的Flink Connector命名格式為
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar。例如您安裝了Flink 1.14和Scala 2.12,並且您想要使用1.2.7版本的Flink Connector,您可以使用flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar。說明請根據實際情況替換以下資訊:
flink_version:Flink的版本號碼。scala_version:Scala的版本號碼。connector_version:Flink Connector的版本號碼。
將擷取到的Flink Connector JAR檔案上傳至Flink叢集的
flink-{flink_version}/lib目錄下。例如,如果您使用的是EMR叢集,且叢集版本為EMR-5.19.0,則JAR檔案應放置於
/opt/apps/FLINK/flink-current/lib目錄下。
啟動Flink叢集
登入Flink叢集的Master節點,詳情請參見登入叢集。
執行以下命令,啟動Flink叢集。
/opt/apps/FLINK/flink-current/bin/start-cluster.sh
使用樣本
使用Flink SQL寫入資料
在StarRocks中建立一個名為
test的資料庫,並在其中建立一張名為score_board的主鍵表。CREATE DATABASE test; CREATE TABLE test.score_board( id int(11) NOT NULL COMMENT "", name varchar(65533) NULL DEFAULT "" COMMENT "", score int(11) NOT NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP PRIMARY KEY(id) DISTRIBUTED BY HASH(id);登入Flink叢集的Master節點,詳情請參見登入叢集。
執行以下命令,啟動Flink SQL。
/opt/apps/FLINK/flink-current/bin/sql-client.sh執行以下命令,建立一個名為
score_board的表,並向其中插入資料。CREATE TABLE `score_board` ( `id` INT, `name` STRING, `score` INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'admin', 'password' = '<password>', ); INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);如果目標是將資料匯入StarRocks主鍵表,則必須在Flink表的DDL中明確指定主鍵。對於其他類型的StarRocks表(如Duplicate Key表),定義主鍵則為可選項。
使用Flink DataStream寫入資料
根據輸入記錄(input records)的不同類型,編寫對應Flink DataStream作業。
寫入CSV格式的字串資料
如果輸入記錄為CSV格式的字串,對應的Flink DataStream作業的主要代碼如下所示,完整代碼請參見LoadCsvRecords。
/** * Generate CSV-format records. Each record has three values separated by "\t". * These values will be loaded to the columns `id`, `name`, and `score` in the StarRocks table. */ String[] records = new String[]{ "1\tstarrocks-csv\t100", "2\tflink-csv\t100" }; DataStream<String> source = env.fromElements(records); /** * Configure the Flink connector with the required properties. * You also need to add properties "sink.properties.format" and "sink.properties.column_separator" * to tell the Flink connector the input records are CSV-format, and the column separator is "\t". * You can also use other column separators in the CSV-format records, * but remember to modify the "sink.properties.column_separator" correspondingly. */ StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) .withProperty("database-name", "test") .withProperty("table-name", "score_board") .withProperty("username", "root") .withProperty("password", "") .withProperty("sink.properties.format", "csv") .withProperty("sink.properties.column_separator", "\t") .build(); // Create the sink with the options. SinkFunction<String> starRockSink = StarRocksSink.sink(options); source.addSink(starRockSink);寫入JSON格式的字串資料
如果輸入記錄為JSON格式的字串,對應的Flink DataStream作業的主要代碼如下所示,完整代碼請參見LoadJsonRecords。
/** * Generate JSON-format records. * Each record has three key-value pairs corresponding to the columns id, name, and score in the StarRocks table. */ String[] records = new String[]{ "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}", "{\"id\":2, \"name\":\"flink-json\", \"score\":100}", }; DataStream<String> source = env.fromElements(records); /** * Configure the Flink connector with the required properties. * You also need to add properties "sink.properties.format" and "sink.properties.strip_outer_array" * to tell the Flink connector the input records are JSON-format and to strip the outermost array structure. */ StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) .withProperty("database-name", "test") .withProperty("table-name", "score_board") .withProperty("username", "root") .withProperty("password", "") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build(); // Create the sink with the options. SinkFunction<String> starRockSink = StarRocksSink.sink(options); source.addSink(starRockSink);寫入自訂Java對象資料
如果輸入記錄為自訂的Java對象,對應的Flink DataStream作業的主要代碼如下所示,完整代碼請參見LoadCustomJavaRecords。
本樣本中,定義一個簡單的POJO類
RowData,用於表示每條記錄。public static class RowData { public int id; public String name; public int score; public RowData() {} public RowData(int id, String name, int score) { this.id = id; this.name = name; this.score = score; } }主要代碼如下所示。
// Generate records which use RowData as the container. RowData[] records = new RowData[]{ new RowData(1, "starrocks-rowdata", 100), new RowData(2, "flink-rowdata", 100), }; DataStream<RowData> source = env.fromElements(records); // Configure the Flink connector with the required properties. StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) .withProperty("database-name", "test") .withProperty("table-name", "score_board") .withProperty("username", "root") .withProperty("password", "") .build(); /** * The Flink connector will use a Java object array (Object[]) to represent a row to be loaded into the StarRocks table, * and each element is the value for a column. * You need to define the schema of the Object[] which matches that of the StarRocks table. */ TableSchema schema = TableSchema.builder() .field("id", DataTypes.INT().notNull()) .field("name", DataTypes.STRING()) .field("score", DataTypes.INT()) // When the StarRocks table is a Primary Key table, you must specify notNull(), for example, DataTypes.INT().notNull(), for the primary key `id`. .primaryKey("id") .build(); // Transform the RowData to the Object[] according to the schema. RowDataTransformer transformer = new RowDataTransformer(); // Create the sink with the schema, options, and transformer. SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer); source.addSink(starRockSink);其中
RowDataTransformer定義如下所示。private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> { /** * Set each element of the object array according to the input RowData. * The schema of the array matches that of the StarRocks table. */ @Override public void accept(Object[] internalRow, RowData rowData) { internalRow[0] = rowData.id; internalRow[1] = rowData.name; internalRow[2] = rowData.score; // When the StarRocks table is a Primary Key table, you need to set the last element to indicate whether the data loading is an UPSERT or DELETE operation. internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal(); } }
使用Flink CDC 3.0同步資料
Flink CDC 3.0架構能夠輕鬆構建從CDC資料來源(如MySQL、Kafka)到StarRocks的流式ELT管道。通過該管道,您可以實現以下功能:
自動建立資料庫和表
同步全量和增量資料
同步Schema Change
自StarRocks Flink Connector v1.2.9版本起,該連接器已整合至Flink CDC 3.0架構中,並被命名為StarRocks Pipeline Connector。該連接器具備上述所有功能,建議與StarRocks v3.2.1及以上版本配合使用,以充分利用fast_schema_evolution特性,進一步提升列的增減速度並降低資源消耗。
最佳實務
匯入至主鍵表
在StarRocks中建立一個名為
test的資料庫,並在其中建立一個主鍵表score_board。CREATE DATABASE `test`; CREATE TABLE `test`.`score_board` ( `id` int(11) NOT NULL COMMENT "", `name` varchar(65533) NULL DEFAULT "" COMMENT "", `score` int(11) NOT NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP PRIMARY KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`);向StarRocks表中插入資料。
INSERT INTO `test`.`score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);執行以下命令,啟動Flink SQL用戶端。
/opt/apps/FLINK/flink-current/bin/sql-client.sh更新資料。
部分更新
部分更新允許您僅更新指定列(如
name),而不影響其他列(如score)。在Flink SQL用戶端建立表
score_board,並啟用部分更新功能。CREATE TABLE `score_board` ( `id` INT, `name` STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'admin', 'password' = '<password>', 'sink.properties.partial_update' = 'true', -- only for Flink connector version <= 1.2.7 'sink.properties.columns' = 'id,name,__op' );sink.properties.partial_update:啟用部分更新。sink.properties.columns:指定需要更新的列。如果 Flink Connector版本小於等於1.2.7,則還需要將選項sink.properties.columns設定為id,name,__op,以告訴 Flink connector 需要更新的列。請注意,您需要在末尾附加欄位__op。欄位__op表示匯入是 UPSERT 還是 DELETE 操作,其值由 Flink connector 自動化佈建。
插入更新資料。
插入兩行資料,主鍵與現有資料相同,但
name列的值被修改。INSERT INTO score_board VALUES (1, 'starrocks-update'), (2, 'flink-update');在SQL Editor中查詢StarRocks表。
SELECT * FROM `test`.`score_board`;您會看到只有
name列的值發生了變化,而score列保持不變。
條件更新
本樣本展示如何根據
score列的值進行條件更新。只有匯入的資料行中score列值大於等於StarRocks表當前值時,該資料行才會更新。在Flink SQL用戶端按照以下方式建立表
score_board。CREATE TABLE `score_board` ( `id` INT, `name` STRING, `score` INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'admin', 'password' = '<password>', 'sink.properties.merge_condition' = 'score', 'sink.version' = 'V1' );sink.properties.merge_condition:設定為score,表示在資料寫入時,Flink Connector會以score列作為更新條件。sink.version:設定為V1,表示Flink Connector使用Stream Load介面匯入資料。
在Flink SQL用戶端插入兩行資料到表中。
資料行的主鍵與StarRocks表中的行相同。第一行資料
score列中具有較小的值,而第二行資料score列中具有較大的值。INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'flink-update', 101);在SQL Editor中查詢StarRocks表。
SELECT * FROM `test`.`score_board`;您會看到只有第二行資料發生了變化,而第一行資料未發生變化。

匯入至Bitmap列
Bitmap類型常用於加速精確去重計數情境,例如計算獨立訪客數(UV)。以下是一個完整的樣本,展示如何通過Flink SQL將資料匯入到StarRocks表的Bitmap列中,並在StarRocks中查詢UV數。
在SQL Editor中建立StarRocks彙總表。
在資料庫
test中建立一個彙總表page_uv,其中:visit_users列被定義為 BITMAP 類型,並配置彙總函式BITMAP_UNION。page_id和visit_date作為彙總鍵(AGGREGATE KEY),用於分組和去重。
CREATE TABLE `test`.`page_uv` ( `page_id` INT NOT NULL COMMENT 'page ID', `visit_date` datetime NOT NULL COMMENT 'access time', `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID' ) ENGINE=OLAP AGGREGATE KEY(`page_id`, `visit_date`) DISTRIBUTED BY HASH(`page_id`);在Flink SQL用戶端中建立表。
由於Flink不支援Bitmap類型,需要通過以下方式實現列映射和類型轉換:
在Flink表中,將
visit_user_id列定義為BIGINT類型,以代表StarRocks表中的visit_users列。使用
sink.properties.columns配置,將visit_user_id列的資料通過to_bitmap函數轉換為Bitmap類型。
CREATE TABLE `page_uv` ( `page_id` INT, `visit_date` TIMESTAMP, `visit_user_id` BIGINT ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'page_uv', 'username' = 'admin', 'password' = '<password>', 'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)' );在Flink SQL用戶端中插入資料。
向表
page_uv插入多行資料,類比不同使用者在不同時間訪問頁面的行為。visit_user_id是BIGINT類型,Flink會將其自動轉換為Bitmap類型。INSERT INTO `page_uv` VALUES (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13), (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23), (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33), (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13), (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);在SQL Editor中查詢UV數。
通過StarRocks的彙總能力,使用
COUNT(DISTINCT visit_users)計算每個頁面的獨立訪客數(UV)。SELECT page_id, COUNT(DISTINCT visit_users) FROM page_uv GROUP BY page_id;返回結果如下所示。

匯入至HLL列
HLL(HyperLogLog)是一種用於近似去重計數的資料類型,適合處理大規模資料的獨立訪客數(UV)計算。以下是一個完整的樣本,展示如何通過Flink SQL將資料匯入到StarRocks表的HL 列中,並在StarRocks中查詢UV數。
在SQL Editor中建立StarRocks彙總表。
在資料庫
test中建立一個彙總表hll_uv,其中:visit_users列被定義為HLL類型,並配置彙總函式HLL_UNION。page_id和visit_date作為彙總鍵(AGGREGATE KEY),用於分組和去重。
CREATE TABLE `test`.`hll_uv` ( `page_id` INT NOT NULL COMMENT 'page ID', `visit_date` DATETIME NOT NULL COMMENT 'access time', `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID' ) ENGINE=OLAP AGGREGATE KEY(`page_id`, `visit_date`) DISTRIBUTED BY HASH(`page_id`);在Flink SQL用戶端中建立表。
由於Flink不支援HLL類型,需要通過以下方式實現列映射和類型轉換:
在Flink表中,將
visit_user_id列定義為BIGINT類型,以代表StarRocks表中的visit_users列。使用
sink.properties.columns配置列映射,並通過hll_hash函數將BIGINT類型的visit_user_id資料轉換為HLL類型。
CREATE TABLE `hll_uv` ( `page_id` INT, `visit_date` TIMESTAMP, `visit_user_id` BIGINT ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:9030', 'load-url' = '<fe-{srClusterId}-internal.starrocks.aliyuncs.com>:8030', 'database-name' = 'test', 'table-name' = 'hll_uv', 'username' = 'admin', 'password' = '<password>', 'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)' );在Flink SQL用戶端中插入資料。
向表
hll_uv插入多行資料,類比不同使用者在不同時間訪問頁面的行為。visit_user_id是BIGINT類型,Flink會將其自動轉換為HLL類型。INSERT INTO `hll_uv` VALUES (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78), (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2), (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);在SQL Editor中查詢UV數。
通過StarRocks的彙總能力,使用
COUNT(DISTINCT visit_users)計算每個頁面的獨立訪客數(UV)。SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`;返回結果如下所示。
