全部產品
Search
文件中心

Realtime Compute for Apache Flink:即時數倉Hologres

更新時間:Oct 30, 2025

本文為您介紹如何使用即時數倉Hologres連接器。

背景資訊

即時數倉Hologres是一站式即時資料倉庫引擎,支援海量資料即時寫入、即時更新、即時分析,支援標準SQL(相容PostgreSQL協議),支援PB級資料多維分析(OLAP)與即席分析(Ad Hoc),支援高並發低延遲的線上資料服務(Serving),與MaxCompute、Flink、DataWorks深度融合,提供離線線上一體化全棧數倉解決方案。Hologres連接器支援的資訊如下。

類別

詳情

支援類型

源表、維表和結果表

運行模式

流模式和批模式

資料格式

暫不支援

特有監控指標

監控指標

  • 源表:

    • numRecordsIn

    • numRecordsInPerSecond

  • 結果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    說明

    指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支援更新或刪除結果表資料

特色功能

功能

詳情

即時消費Hologres

支援以非Binlog或Binlog方式讀取Hologres資料,相容CDC與非CDC模式。

全增量一體化消費

支援全量、增量及全增量一體化消費。

主鍵衝突處理策略

支援忽略新資料、整行替換或僅更新指定欄位。

多流寫入的寬表Merge與局部更新

只更新修改部分列的資料,而非整行更新。

消費分區表Binlog(公測)

支援消費物理分區表Binlog,單個作業即可監聽全部分區及新增分區。支援消費邏輯分區表Binlog。

分區表寫入

支援向分區表的父表寫入資料,並可自動建立對應的子分區表。

即時同步單表或整庫

支援即時同步單表或整庫層級資料 ,具備以下關鍵特性:

  • 自動感知上遊表結構變更 :當來源資料庫的表結構發生變化時,Hologres 能夠即時將這些變更同步到目標表中。

  • 自動處理結構變更:在新資料流入Hologres時,Flink會先觸發表結構修改操作,再寫入資料,整個過程您無需幹預 。

詳情請參見CREATE TABLE AS(CTAS)語句資料庫即時入倉快速入門

使用限制與建議

使用限制

  • 外部表格不支援:Hologres連接器不支援訪問Hologres外部表格(如MaxCompute外部表格)。

  • 時間類型限制:暫不支援即時消費TIMESTAMP類型資料。建表時請統一使用TIMESTAMPTZ類型

  • 源表掃描模式(VVR 8及以下):預設以批模式讀取,僅掃描一次全表,後續新增資料不會被消費。

  • Watermark限制(VVR 8及以下):CDC模式暫不支援定義Watermark,如需視窗彙總,請改用非視窗彙總方案

使用建議

  • 儲存模式選擇:

    • 維表點查:建議使用行存模式,並必須設定主鍵 + Clustering Key。

    • 維表一對多查詢:建議使用列存模式,併合理設定Distribution Key(分布鍵)Segment Key(Event Time Column)以最佳化效能。

    • 高頻更新 + 分析查詢表:若表需同時支援 Binlog 即時消費與 OLAP 分析,強烈建議使用“行列共存”儲存模式

    重要

    Hologres建表時,若不指定儲存模式,預設為列存(column)。儲存模式一旦建表確定,後續無法修改!詳情請參見Hologres建立表表格儲存體格式:列存、行存、行列共存

  • 作業並發設定:建議Flink作業並發數和Hologres Table的Shard個數保持一致。

    # 您可以在Hologres控制台上,使用以下語句查看Table的Shard數,其中tablename為您的業務表名稱。
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • 版本與功能:請定期查閱Hologres Connector Release Note,瞭解已知缺陷、功能更新與版本相容性說明。

注意事項

  • Hologres 與 VVR 版本消費模式相容性及限制表

    源表

    • VVR 8及以下通過指定sdkMode參數值選擇消費模式。

    • VVR 11及以上通過指定source.binlog.read-mode參數值選擇消費模式。

    VVR版本

    Hologres版本

    預設/推薦參數值

    實際消費模式

    備忘

    ≥ 6.0.7

    < 2.0

    自訂

    holohub(預設)

    推薦配置jdbc。

    6.0.7 ~ 8.0.4

    ≥ 2.0

    jdbc(自動切換,無需配置)

    jdbc(強制)

    Hologres 2.0及以上版本下線了holohub服務,自動切換為jdbc,可能存在許可權不足問題。許可權配置詳情請參見許可權問題

    ≥ 8.0.5

    ≥ 2.1

    jdbc(自動切換,無需配置)

    jdbc(強制)

    無許可權問題。Hologres 2.1.27及以上版本,切換為jdbc_fixed。

    ≥ 11.1

    任何版本

    AUTO(預設值)

    根據Hologres版本自動選擇

    • 2.1.27及以上版本,選擇jdbc模式,同時預設啟用輕量級串連,即connection.fixed.enabled參數預設設定為 true。

    • 2.1.0~2.1.26版本,選擇jdbc模式。

    • 2.0 及以下版本,選擇holohub模式。

    重要

    VVR 11.1及以上版本預設消費Binlog資料,請確認已經開啟Binlog,否則可能會導致報錯。

    許可權問題

    如果使用者不是SuperUser,使用JDBC模式消費Binlog需要進行許可權配置。

    user_name為阿里雲帳號ID或RAM使用者,詳情請參見帳號概述

    -- 專家許可權模型下為使用者授予CREATE許可權,以及賦予使用者執行個體的Replication Role許可權
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- 如果Database開啟了簡單許可權模型(SLMP),無法執行GRANT語句,使用spm_grant為使用者授予DB的Admin許可權,也可以在Holoweb中直接賦權
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

    結果表

    • VVR 8及以下通過指定sdkMode參數值選擇消費模式。

    • VVR 11及以上通過指定sink.write-mode參數值選擇消費模式。

    VVR版本

    Hologres版本

    rpc模式是否受影響

    rpc實際消費模式

    推薦/預設參數值

    備忘

    6.0.4 ~ 8.0.2

    < 2.0

    rpc

    自訂

    /

    6.0.4 ~ 8.0.2

    ≥ 2.0

    jdbc_fixed(自動切換)

    自訂

    可以通過設定'jdbcWriteBatchSize'='1'防止去重。

    ≥ 8.0.3

    任何版本

    jdbc_fixed(自動切換)

    自訂

    如果配置為rpc模式,將自動切換該參數值為jdbc_fixed且設定'jdbcWriteBatchSize'='1'防止去重。

    ≥ 8.0.5

    任何版本

    jdbc_fixed(自動切換)

    自訂

    如果配置為rpc模式,將自動切換該參數值為jdbc_fixed且設定'deduplication.enabled'='false'防止去重。

    重要
    • 由於Hologres 2.0及以上版本下線了rpc服務,此時如果您將該參數配置為了rpc,Flink系統自動將參數值切換為jdbc_fixed。但是如果您配置為其他值,Flink系統將採用您配置的參數值。

    • VVR 11.1及以上版本已經取消了rpc模式,推薦採用jdbc模式串連。

    • 在高並發情境下寫入,建議使用jdbc_copy/COPY_STREAM模式。

    維表

    VVR版本

    Hologres版本

    rpc模式是否受影響

    rpc實際消費模式

    推薦/預設參數值

    備忘

    6.0.4 ~ 8.0.2

    < 2.0

    rpc

    自訂

    /

    6.0.4 ~ 8.0.2

    ≥ 2.0

    jdbc_fixed(自動切換)

    自訂

    如果Hologres執行個體為2.0及以上版本,由於Hologres 2.0及以上版本下線了rpc服務,此時如果您將該參數配置為了rpc,Flink系統自動將參數值切換為jdbc_fixed。但是如果您配置為其他值,Flink系統將採用您配置的參數值。

    ≥ 8.0.3

    任何版本

    jdbc_fixed(自動切換)

    自訂

    ≥ 8.0.5

    任何版本

    jdbc_fixed(自動切換)

    自訂

    重要

    VVR 11.1及以上版本已經取消了rpc模式,預設採用jdbc模式串連,同時根據實際情況,是否開啟connection.fixed.enabled參數,使用輕量級串連模式。

  • JDBC模式Binlog源表支援讀取JSONB類型,需要資料庫層級開啟GUC。

    --db層級開啟GUC,僅superuser可以執行,每個db只需要設定一次。
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • UPDATE操作會產生兩條連續的Binlog記錄,記錄連續且舊資料(update_before)記錄在前,新資料(update_after)記錄在後。

  • 不建議對Binlog源表進行TRUNCATE或其他重建表操作,詳情請參見常見問題

  • 請確保Flink與Hologres 的DECIMAL類型精度一致,以避免錯誤,詳情請參見常見問題

  • 在啟用Initial模式全增量一體消費源表資料時,不能保證全域有序性。如果您的下遊有依賴時間欄位進行計算的需求,請使用其他純Binlog消費模式讀取。

開啟Binlog

未建表

即時消費功能預設關閉,因此在Hologres控制台上建立表的DDL時,需要設定binlog.levelbinlog.ttl參數,樣本如下。

begin;
create table test_table(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_table', 'orientation', 'row');--建立行存表test_table
call set_table_property('test_table', 'clustering_key', 'id');--在id列建立聚簇索引
call set_table_property('test_table', 'binlog.level', 'replica');--設定表屬性開啟Binlog功能
call set_table_property('test_table', 'binlog.ttl', '86400');--binlog.ttl,Binlog的TTL,單位為秒
commit;

已建表

Hologres控制台上,可以使用以下語句對已有表開啟Binlog並設定Binlog TTL時間。table_name為開啟Binlog的表名稱。

-- 設定表屬性開啟Binlog功能
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;

-- 設定表屬性,配置Binlog TTL時間,單位秒
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;

WITH參數

從VVR 11開始,為了更好地支援Hologres,With參數有所調整,部分參數可能已經更名或進行移除,VVR 11 向下相容 VVR 8。請根據您的版本查閱對應的參數說明。

類型映射

Flink與Hologres的資料類型映射請參見Flink與Hologres的資料類型映射

使用樣本

源表示例

Binlog Source表

CDC模式

該模式下Source消費的Binlog資料,將根據hg_binlog_event_type自動為每行資料設定準確的Flink RowKind類型,無需顯式聲明它們,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER類型,這樣就能完成表的資料的鏡像同步,類似MySQL或Postgres的CDC功能。源表DDL程式碼範例如下。

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='${secret_values.ak_id}',            --AK/SK推薦使用變數管理防止密鑰泄露 
  'password'='${secret_values.ak_secret}',        
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL',  --讀取所有ChangeLog類型,包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER。
  'retry-count'='10',                     --讀取Binlog資料出錯後的重試次數。
  'retry-sleep-step-ms'='5000',           --重試累加等待時間。第一次重試等待5秒,第二次等待10秒,依此類推。。
  'source.binlog.batch-size'='512'        --批量讀取Binlog的資料行數。
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --AK/SK推薦使用變數管理防止密鑰泄露   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc',
  'binlogMaxRetryTimes' = '10',     --讀取Binlog資料出錯後的重試次數。
  'binlogRetryIntervalMs' = '500',  --讀取Binlog資料出錯後的重試時間間隔
  'binlogBatchReadSize' = '100'     --批量讀取Binlog的資料行數。
);

非CDC模式

該模式下Source消費的Binlog資料是作為普通的Flink資料傳遞給下遊節點的,即所有資料都是INSERT類型的資料,您可以根據業務情況選擇如何處理特定hg_binlog_event_type類型的資料。源表DDL程式碼範例如下。

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --AK/SK推薦使用變數管理防止密鑰泄露   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY',  --所有ChangeLog類型都當作INSERT處理。
  'retry-count'='10',                     --讀取Binlog資料出錯後的重試次數。
  'retry-sleep-step-ms'='5000',           --重試累加等待時間。第一次重試等待5秒,第二次等待10秒,依此類推。。
  'source.binlog.batch-size'='512'        --批量讀取Binlog的資料行數。
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --AK/SK推薦使用變數管理防止密鑰泄露   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',     --讀取Binlog資料出錯後的重試次數。
  'binlogRetryIntervalMs' = '500',  --讀取Binlog資料出錯後的重試時間間隔
  'binlogBatchReadSize' = '100'     --批量讀取Binlog的資料行數。
);

非Binlog Source表

VVR 11+

重要

VVR 11.1及以上版本預設消費Binlog資料,請參見Binlog Source表

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --AK/SK推薦使用變數管理防止密鑰泄露   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='false'                      --是否消費Binlog資料
);

VVR 8+

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --AK/SK推薦使用變數管理防止密鑰泄露   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sdkMode' = 'jdbc'
);

結果表示例

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --AK/SK推薦使用變數管理防止密鑰泄露   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;

維表示例

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --AK/SK推薦使用變數管理防止密鑰泄露   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

特色功能詳解

全增量一體化消費

適用情境

  • 僅適用於目標表有主鍵的情境,推薦在CDC模式下使用的全增量Hologres源表。

  • Hologres支援按需開啟Binlog,可以將已有歷史資料的表開啟Binlog

程式碼範例

VVR 11+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.startup-mode' = 'INITIAL',   --先讀取歷史全量資料,再增量消費Binlog。
  'retry-count'='10',                         --讀取Binlog資料出錯後的重試次數。
  'retry-sleep-step-ms'='5000',               --重試累加等待時間。第一次重試等待5秒,第二次等待10秒,依此類推。。
  'source.binlog.batch-size'='512'            --批量讀取Binlog的資料行數。
  );
說明
  • source.binlog.startup-mode設定為INITIAL,可以先全量消費資料,再讀取Binlog開始增量消費。

  • 如果設定了startTime參數,或者在啟動介面選擇了啟動時間,則binlogStartUpMode會強制使用timestamp模式消費,其他消費模式則不生效,startTime參數優先順序更高。

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', --先讀取歷史全量資料,再增量消費Binlog。
  'binlogMaxRetryTimes' = '10',     --讀取Binlog資料出錯後的重試次數。
  'binlogRetryIntervalMs' = '500',  --讀取Binlog資料出錯後的重試時間間隔
  'binlogBatchReadSize' = '100'     --批量讀取Binlog的資料行數。
  );
說明
  • binlogStartUpMode設定為initial,可以先全量消費資料,再讀取Binlog開始增量消費。

  • 如果設定了startTime參數,或者在啟動介面選擇了啟動時間,則binlogStartUpMode會強制使用timestamp模式消費,其他消費模式則不生效,startTime參數優先順序更高。

主鍵衝突處理策略

寫入Hologres時若存在重複主鍵的資料,連接器提供了三種處理策略。

VVR 11+

通過指定sink.on-conflict-action參數值,來實現不同的處理策略。

sink.on-conflict-action參數值

含義

INSERT_OR_IGNORE

保留首次出現的資料,忽略後續重複資料。

INSERT_OR_REPLACE

後續資料整行替換已有資料。

INSERT_OR_UPDATE(預設值)

只更新sink中已提供的欄位,其他欄位保持不變。

VVR 8+

通過指定mutatetype參數值,來實現不同的處理策略。

mutatetype參數值

含義

insertorignore(預設值)

保留首次出現的資料,忽略後續重複資料。

insertorreplace

後續資料整行替換已有資料。

insertorupdate

只更新sink中已提供的欄位,其他欄位保持不變。

假設表有欄位 a、b、c、d,其中 a 是主鍵。若結果表欄位僅提供 a 和 b,則配置INSERT_OR_UPDATE時,僅更新 b 欄位,c 和 d 保持不變。
說明

結果表欄位數量可以少於Hologres物理表,但缺失欄位必須允許為空白,否則將會導致寫入失敗。

分區表寫入

預設情況下,Hologres Sink 僅支援向單表匯入資料。如需匯入至分區表的父表 ,需開啟以下配置:

VVR 11+

sink.create-missing-partition設定為true,若未建立子分區表,可以自動建立。

說明
  • VVR 11.1及以上版本預設支援寫入分區表,自動將資料路由到對應的子分區表。

  • tablename 參數填寫父表的名稱。

  • 若未提前建立子表且未設定 sink.create-missing-partition=true,將導致寫入失敗。

VVR 8+

  • partitionRouter設定為true,自動將資料路由到對應的子分區表。

  • createparttable設定為true,若未建立子分區表,可以自動建立。

說明
  • tablename 參數填寫父表的名稱。

  • 若未提前建立子表且未設定 createparttable=true,將導致寫入失敗。

多流寫入的寬表Merge與局部更新

在將多個資料流寫入同一張 Hologres 寬表的情境中,系統支援對主鍵相同的資料進行自動 Merge(合并),並可選擇性地僅更新發生變化的部分列,而非整行替換,從而提升寫入效率與資料一致性。

使用限制

  • 寬表必須有主鍵。

  • 每個資料流的資料都必須包含完整的主鍵欄位。

  • 列存模式的寬表Merge情境在高RPS的情況下,CPU使用率會偏高,建議關閉表中欄位的Dictionary Encoding功能。

使用樣本

假設有兩個Flink資料流,一個資料流中包含a、b和c欄位,另一個資料流中包含a、d和e欄位,Hologres寬表WIDE_TABLE包含a、b、c、d和e欄位,其中a欄位為主鍵。

VVR 11+

// 已經定義的source1和source2
CREATE TEMPORARY TABLE hologres_sink ( -- 聲明a,b,c,d,e五個欄位
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- hologres寬表,包含a,b,c,d,e五個欄位
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sink.on-conflict-action'='INSERT_OR_UPDATE',   -- 根據主鍵更新資料部分列
  'sink.delete-strategy'='IGNORE_DELETE',         -- 撤回訊息的處理策略,IGNORE_DELETE適用於僅需插入或更新資料,而無需刪除資料的情境。
  'sink.partial-insert.enabled'='true'            -- 開啟部分列更新參數,將INSERT語句中定義的欄位下推給連接器,從而可以只對聲明的欄位進行更新或插入
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- 聲明只插入a,b,c三個欄位
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- 聲明只插入a,d,e三個欄位
END;

VVR 8+

// 已經定義的source1和source2
CREATE TEMPORARY TABLE hologres_sink ( -- 聲明a,b,c,d,e五個欄位
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- hologres寬表,包含a,b,c,d,e五個欄位
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'mutatetype'='insertorupdate',    -- 根據主鍵更新資料部分列
  'ignoredelete'='true',            -- 忽略回撤訊息產生的Delete請求
  'partial-insert.enabled'='true'   -- 開啟部分列更新參數,支援僅更新INSERT語句中聲明的欄位
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- 聲明只插入a,b,c三個欄位
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- 聲明只插入a,d,e三個欄位
END;
說明

ignoredelete設定為true,忽略回撤訊息產生Delete請求。Realtime Compute引擎VVR 8.0.8及以上版本推薦使用sink.delete-strategy配置局部更新時的各種策略。

消費分區表Binlog(公測)

分區表有助於資料歸檔與查詢最佳化。Hologres Connector支援消費物理分區表和邏輯分區表的Binlog。關於物理分區表和邏輯分區表的區別參見CREATE LOGICAL PARTITION TABLE

消費物理分區表Binlog

Hologres Connector 支援通過單個作業消費分區表 Binlog,並可動態監聽新增分區,顯著提升即時資料處理效率與易用性。

注意事項

  • 僅Realtime Compute引擎VVR 8.0.11及以上版本,Hologres執行個體版本大於等於2.1.27,Binlog源表JDBC模式支援消費分區表。

  • 分區名稱必須嚴格由父表名+底線+分區值組成,即{parent_table}_{partition_value},非此格式的分區可能無法消費到,詳情請參見動態分區管理

    重要
    • 對於DYNAMIC模式,VVR 8.0.11版本不支援帶-分隔字元的分區欄位(如YYYY-MM-DD)。

    • 自VVR 11.1版本起,可以消費自訂Format格式的分區欄位。

    • 寫入分區表則不受此格式限制。

  • Flink中聲明Hologres源表時,必須包含Hologres分區表的分區欄位。

  • 對於DYNAMIC模式,要求分區表必須開啟動態分區管理。並且分區預建立參數auto_partitioning.num_precreate必須大於1,否則,在嘗試消費最新分區時,作業將會拋出異常。

  • DYNAMIC模式下,新增分區後將不再讀取舊分區的後續資料更新。

使用樣本

模式類型

特點

適用情境說明

DYNAMIC

動態分區消費

自動監聽新增分區,按時間順序動態推進消費進度。適合即時資料流情境。

STATIC

靜態分區消費

只消費已存在的分區(或手動指定的分區),不會自動探索新分區。適合固定範圍的歷史資料處理。

DYNAMIC模式

VVR 11+

假設Hologres存在如下的DDL分區表,並且已啟用Binlog以及動態分區。

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- 開啟動態分區
    auto_partitioning_time_unit = 'DAY',  -- 以天為時間單元,自動建立的分區名樣本:test_message_src1_20250512,test_message_src1_20250513
    auto_partitioning_num_precreate = '2' -- 會提前建立兩個分區
);
-- 已經存在的分區表,也可以通過ALTER TABLE方式開啟動態分區

在Flink中,使用以下SQL聲明對分區表test_message_src1進行DYNAMIC模式消費。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- hologres分區表的分區欄位
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- 開啟了動態分區的父表,
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- 動態監聽最新的分區
  'source.binlog.startup-mode' = 'initial'           -- 先全量消費資料,再讀取Binlog開始增量消費。
);

VVR 8.0.11

假設Hologres存在如下的DDL分區表,並且已啟用Binlog以及動態分區。

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- 開啟動態分區
    auto_partitioning_time_unit = 'DAY',  -- 以天為時間單元,自動建立的分區名樣本:test_message_src1_20241027,test_message_src1_20241028
    auto_partitioning_num_precreate = '2' -- 會提前建立兩個分區
);

-- 已經存在的分區表,也可以通過ALTER TABLE方式開啟動態分區

在Flink中,使用以下SQL聲明對分區表test_message_src1進行DYNAMIC模式消費。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- hologres分區表的分區欄位
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- 開啟了動態分區的父表,
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'DYNAMIC',  -- 動態監聽最新的分區
  'binlogstartUpMode' = 'initial',      -- 先全量消費資料,再讀取Binlog開始增量消費。
  'sdkMode' = 'jdbc_fixed'              -- 使用此模式,避免串連數限制
);

STATIC模式

VVR 11+

假設Hologres存在如下的DDL分區表,並且已啟用Binlog。

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

在Flink中,使用以下SQL聲明對分區表test_message_src2進行STATIC模式消費。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- hologres分區表的分區欄位
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- 分區表
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'STATIC', -- 消費固定的分區
  'source.binlog.partition-values-to-read' = 'red,blue,green',  -- 僅消費配置的3個分區,不會消費'black'分區;以後新增分區也不會消費.不設定則消費父表所有分區。
  'source.binlog.startup-mode' = 'initial'  -- 先全量消費資料,再讀取Binlog開始增量消費。
);

VVR 8.0.11

假設Hologres存在如下的DDL分區表,並且已啟用Binlog。

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

在Flink中,使用以下SQL聲明對分區表test_message_src2進行STATIC模式消費。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- hologres分區表的分區欄位
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- 分區表
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'STATIC', -- 消費固定的分區
  'partition-values-to-read' = 'red,blue,green',  -- 僅消費配置的3個分區,不會消費'black'分區;以後新增分區也不會消費。不設定則消費父表所有分區。
  'binlogstartUpMode' = 'initial',  -- 先全量消費資料,再讀取Binlog開始增量消費。
  'sdkMode' = 'jdbc_fixed' -- 使用此模式,避免串連數限制
);

消費邏輯分區表Binlog

Hologres Connector 支援消費邏輯分區表Binlog,並可通過參數消費指定分區。

注意事項

  • 僅Realtime Compute引擎VVR 11.0.0及以上版本,Hologres執行個體版本大於等於V3.1,支援消費邏輯分區表指定分區的Binlog。

  • 消費邏輯分區表全部分區的Binlog,與消費普通Holo表沒有區別,方法參考源表示例

使用樣本

參數名

說明

樣本

source.binlog.logical-partition-filter-column-names

邏輯分區表消費Binlog指定分區的分區列名。分區列名必須用雙引號包裹,多個分區列用逗號分隔,若列名有雙引號前面添加雙引號進行轉義。

'source.binlog.logical-partition-filter-column-names'='"Pt","id"'

該分區列有兩個,分別是Pt和id。

source.binlog.logical-partition-filter-column-values

邏輯分區表消費Binlog指定分區的分區值。每個分區可以由多個分區列的值指定,分區列之間用逗號分隔,分區列的值用雙引號包裹,如果分區列的值包含雙引號前面需要添加雙引號進行轉義。多個分區之間用分號分隔

'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"'

指定消費兩個分區。分區列有兩個。第一個分區值是(20240910, 0),第二個分區值是(special"value, 9)

假設Holo中已經建表如下

CREATE TABLE holo_table (
    id int not null,
    name text,
    age numeric(18,4),
    "Pt" text,
    primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
    binlog_level ='replica'
);

Flink中消費該表Binlog。

CREATE TEMPORARY TABLE test_src_binlog_table(
  id INTEGER,
  name VARCHAR,
  age decimal(18,4),
  `Pt` VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='holo_table',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='true',
  'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
  'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
  'source.binlog.change-log-mode'='ALL',  --讀取所有ChangeLog類型,包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER。
  'retry-count'='10',                     --讀取Binlog資料出錯後的重試次數。
  'retry-sleep-step-ms'='5000',           --重試累加等待時間。第一次重試等待5秒,第二次等待10秒,依此類推。。
  'source.binlog.batch-size'='512'        --批量讀取Binlog的資料行數。
);

DataStream API

重要

通過DataStream的方式讀寫資料時,需要使用對應的DataStream連接器串連Realtime ComputeFlink版,DataStream連接器設定方法請參見DataStream連接器使用方法。Maven中央庫中已經放置了Hologres DataStream連接器。在本地調試時,需要使用相應的Uber JAR,詳見本地運行和調試包含連接器的作業

Hologres源表

Binlog源表

VVR提供了Source的實作類別HologresBinlogSource來讀取Hologres Binlog資料。以下為構建Hologres Binlog Source的樣本。

VVR 8.0.11+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres的相關參數。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // 構建JDBC Options。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // 構建Hologres Binlog Source。
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet(),
                new ArrayList<>()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 8.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres的相關參數。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // 構建JDBC Options。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // 構建Hologres Binlog Source。
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 6.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .build();
         // Hologres的相關參數。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // 構建JDBC Options。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // 設定或建立預設slotname
        config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

        boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
        // 構建Binlog Record Converter。
        JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
                jdbcOptions.getTable(),
                schema,
                new HologresConnectionParam(config),
                cdcMode,
                Collections.emptySet());
        
        // 構建Hologres Binlog Source。
        long startTimeMs = 0;
        HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.TIMESTAMP,
                recordConverter,
                "",
                -1);
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}
重要

Realtime Compute引擎低於8.0.5版本或Hologres低於2.1版本,請注意是否為SuperUser,或具有Replication Role許可權,詳情請參見Hologres許可權問題

非Binlog源表

VVR提供了RichInputFormat的實作類別HologresBulkreadInputFormat來讀取Hologres表資料。以下為構建Hologres Source讀取表資料的樣本。

public class Sample {
    public static void main(String[] args) throws Exception {
        // set up the Java DataStream API
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres的相關參數。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        // 構建JDBC Options。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
                new HologresConnectionParam(config),
                jdbcOptions,
                schema,
                "",
                -1);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
        env.execute();
    }
}

Maven依賴

Maven中央庫中已經放置了Hologres DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>${vvr-version}</version>
</dependency>

Hologres結果表

VVR提供了OutputFormatSinkFunction的實作類別HologresSinkFunction來寫入資料。以下為構建Hologres Sink的樣本。

public class Sample {
    public static void main(String[] args) throws Exception {
        // set up the Java DataStream API
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .build();
        // Hologres的相關參數。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
        
         // 構建Hologres Writer,以RowData的方式寫入資料。
        AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
                hologresConnectionParam, 
                schema, 
                HologresTableSchema.get(hologresConnectionParam), 
                new Integer[0]);
        // 構建Hologres Sink。
        HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
        env.execute();
    }
}

中繼資料列

Realtime Compute引擎VVR 8.0.11及以上版本的Binlog源表支援中繼資料列。從此版本起,建議以中繼資料列的方式聲明hg_binlog_event_type等Binlog欄位。中繼資料列是SQL標準的擴充,通過中繼資料列可以訪問源表的庫名和表名,以及資料的變更類型,產生時間等特定資訊,您可以基於這些資訊自訂處理邏輯,例如過濾變更類型為DELETE的資料等。

欄位名

欄位類型

說明

db_name

STRING NOT NULL

包含該行記錄的庫名。

table_name

STRING NOT NULL

包含該行記錄的表名。

hg_binlog_lsn

BIGINT NOT NULL

Binlog系統欄位,表示Binlog序號,Shard內部單調遞增但不連續,不同Shard之間不保證唯一和有序。

hg_binlog_timestamp_us

BIGINT NOT NULL

該行記錄在資料庫中的變更時間戳記,單位為微秒(us)。

hg_binlog_event_type

BIGINT NOT NULL

該行記錄的變更類型。參數取值如下:

  • 5:表示INSERT訊息。

  • 2:表示DELETE訊息。

  • 3:表示UPDATE_BEFORE訊息。

  • 7:表示UPDATE_AFTER訊息。

hg_shard_id

INT NOT NULL

資料所在資料分區Shard。 Shard基本概念詳情請參見Table Group和Shard

在DDL中,採用<meta_column_name> <datatype> METADATA VIRTUAL聲明中繼資料列。樣本如下:

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn bigint METADATA VIRTUAL
  hg_binlog_event_type bigint METADATA VIRTUAL
  hg_binlog_timestamp_us bigint METADATA VIRTUAL
  hg_shard_id int METADATA VIRTUAL
  db_name string METADATA VIRTUAL
  table_name string METADATA VIRTUAL
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  ...
  );

常見問題

相關文檔