本文為您介紹如何使用即時數倉Hologres連接器。
背景資訊
即時數倉Hologres是一站式即時資料倉庫引擎,支援海量資料即時寫入、即時更新、即時分析,支援標準SQL(相容PostgreSQL協議),支援PB級資料多維分析(OLAP)與即席分析(Ad Hoc),支援高並發低延遲的線上資料服務(Serving),與MaxCompute、Flink、DataWorks深度融合,提供離線線上一體化全棧數倉解決方案。Hologres連接器支援的資訊如下。
類別 | 詳情 |
支援類型 | 源表、維表和結果表 |
運行模式 | 流模式和批模式 |
資料格式 | 暫不支援 |
特有監控指標 | |
API種類 | Datastream和SQL |
是否支援更新或刪除結果表資料 | 是 |
特色功能
功能 | 詳情 |
支援以非Binlog或Binlog方式讀取Hologres資料,相容CDC與非CDC模式。 | |
支援全量、增量及全增量一體化消費。 | |
支援忽略新資料、整行替換或僅更新指定欄位。 | |
只更新修改部分列的資料,而非整行更新。 | |
支援消費物理分區表Binlog,單個作業即可監聽全部分區及新增分區。支援消費邏輯分區表Binlog。 | |
支援向分區表的父表寫入資料,並可自動建立對應的子分區表。 | |
即時同步單表或整庫 | 支援即時同步單表或整庫層級資料 ,具備以下關鍵特性:
|
使用限制與建議
使用限制
外部表格不支援:Hologres連接器不支援訪問Hologres外部表格(如MaxCompute外部表格)。
時間類型限制:暫不支援即時消費TIMESTAMP類型資料。建表時請統一使用TIMESTAMPTZ類型。
源表掃描模式(VVR 8及以下):預設以批模式讀取,僅掃描一次全表,後續新增資料不會被消費。
Watermark限制(VVR 8及以下):CDC模式暫不支援定義Watermark,如需視窗彙總,請改用非視窗彙總方案。
使用建議
儲存模式選擇:
維表點查:建議使用行存模式,並必須設定主鍵 + Clustering Key。
維表一對多查詢:建議使用列存模式,併合理設定Distribution Key(分布鍵)和Segment Key(Event Time Column)以最佳化效能。
高頻更新 + 分析查詢表:若表需同時支援 Binlog 即時消費與 OLAP 分析,強烈建議使用“行列共存”儲存模式。
重要Hologres建表時,若不指定儲存模式,預設為列存(column)。儲存模式一旦建表確定,後續無法修改!詳情請參見Hologres建立表與表格儲存體格式:列存、行存、行列共存。
作業並發設定:建議Flink作業並發數和Hologres Table的Shard個數保持一致。
# 您可以在Hologres控制台上,使用以下語句查看Table的Shard數,其中tablename為您的業務表名稱。 select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';版本與功能:請定期查閱Hologres Connector Release Note,瞭解已知缺陷、功能更新與版本相容性說明。
注意事項
Hologres 與 VVR 版本消費模式相容性及限制表
源表
VVR 8及以下通過指定
sdkMode參數值選擇消費模式。VVR 11及以上通過指定
source.binlog.read-mode參數值選擇消費模式。
VVR版本
Hologres版本
預設/推薦參數值
實際消費模式
備忘
≥ 6.0.7
< 2.0
自訂
holohub(預設)
推薦配置jdbc。
6.0.7 ~ 8.0.4
≥ 2.0
jdbc(自動切換,無需配置)
jdbc(強制)
Hologres 2.0及以上版本下線了holohub服務,自動切換為jdbc,可能存在許可權不足問題。許可權配置詳情請參見許可權問題。
≥ 8.0.5
≥ 2.1
jdbc(自動切換,無需配置)
jdbc(強制)
無許可權問題。Hologres 2.1.27及以上版本,切換為jdbc_fixed。
≥ 11.1
任何版本
AUTO(預設值)
根據Hologres版本自動選擇
2.1.27及以上版本,選擇jdbc模式,同時預設啟用輕量級串連,即connection.fixed.enabled參數預設設定為 true。
2.1.0~2.1.26版本,選擇jdbc模式。
2.0 及以下版本,選擇holohub模式。
重要VVR 11.1及以上版本預設消費Binlog資料,請確認已經開啟Binlog,否則可能會導致報錯。
結果表
VVR 8及以下通過指定
sdkMode參數值選擇消費模式。VVR 11及以上通過指定
sink.write-mode參數值選擇消費模式。
VVR版本
Hologres版本
rpc模式是否受影響
rpc實際消費模式
推薦/預設參數值
備忘
6.0.4 ~ 8.0.2
< 2.0
否
rpc
自訂
/
6.0.4 ~ 8.0.2
≥ 2.0
是
jdbc_fixed(自動切換)
自訂
可以通過設定'jdbcWriteBatchSize'='1'防止去重。
≥ 8.0.3
任何版本
是
jdbc_fixed(自動切換)
自訂
如果配置為rpc模式,將自動切換該參數值為jdbc_fixed且設定'jdbcWriteBatchSize'='1'防止去重。
≥ 8.0.5
任何版本
是
jdbc_fixed(自動切換)
自訂
如果配置為rpc模式,將自動切換該參數值為jdbc_fixed且設定'deduplication.enabled'='false'防止去重。
重要由於Hologres 2.0及以上版本下線了rpc服務,此時如果您將該參數配置為了rpc,Flink系統自動將參數值切換為jdbc_fixed。但是如果您配置為其他值,Flink系統將採用您配置的參數值。
VVR 11.1及以上版本已經取消了rpc模式,推薦採用jdbc模式串連。
在高並發情境下寫入,建議使用
jdbc_copy/COPY_STREAM模式。
維表
VVR版本
Hologres版本
rpc模式是否受影響
rpc實際消費模式
推薦/預設參數值
備忘
6.0.4 ~ 8.0.2
< 2.0
否
rpc
自訂
/
6.0.4 ~ 8.0.2
≥ 2.0
是
jdbc_fixed(自動切換)
自訂
如果Hologres執行個體為2.0及以上版本,由於Hologres 2.0及以上版本下線了rpc服務,此時如果您將該參數配置為了rpc,Flink系統自動將參數值切換為jdbc_fixed。但是如果您配置為其他值,Flink系統將採用您配置的參數值。
≥ 8.0.3
任何版本
是
jdbc_fixed(自動切換)
自訂
≥ 8.0.5
任何版本
是
jdbc_fixed(自動切換)
自訂
重要VVR 11.1及以上版本已經取消了rpc模式,預設採用jdbc模式串連,同時根據實際情況,是否開啟
connection.fixed.enabled參數,使用輕量級串連模式。JDBC模式Binlog源表支援讀取JSONB類型,需要資料庫層級開啟GUC。
--db層級開啟GUC,僅superuser可以執行,每個db只需要設定一次。 alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;UPDATE操作會產生兩條連續的Binlog記錄,記錄連續且舊資料(update_before)記錄在前,新資料(update_after)記錄在後。
不建議對Binlog源表進行TRUNCATE或其他重建表操作,詳情請參見常見問題。
請確保Flink與Hologres 的
DECIMAL類型精度一致,以避免錯誤,詳情請參見常見問題。在啟用Initial模式全增量一體消費源表資料時,不能保證全域有序性。如果您的下遊有依賴時間欄位進行計算的需求,請使用其他純Binlog消費模式讀取。
開啟Binlog
未建表
即時消費功能預設關閉,因此在Hologres控制台上建立表的DDL時,需要設定binlog.level和binlog.ttl參數,樣本如下。
begin;
create table test_table(
id int primary key,
title text not null,
body text);
call set_table_property('test_table', 'orientation', 'row');--建立行存表test_table
call set_table_property('test_table', 'clustering_key', 'id');--在id列建立聚簇索引
call set_table_property('test_table', 'binlog.level', 'replica');--設定表屬性開啟Binlog功能
call set_table_property('test_table', 'binlog.ttl', '86400');--binlog.ttl,Binlog的TTL,單位為秒
commit;已建表
在Hologres控制台上,可以使用以下語句對已有表開啟Binlog並設定Binlog TTL時間。table_name為開啟Binlog的表名稱。
-- 設定表屬性開啟Binlog功能
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;
-- 設定表屬性,配置Binlog TTL時間,單位秒
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;WITH參數
從VVR 11開始,為了更好地支援Hologres,With參數有所調整,部分參數可能已經更名或進行移除,VVR 11 向下相容 VVR 8。請根據您的版本查閱對應的參數說明。
類型映射
Flink與Hologres的資料類型映射請參見Flink與Hologres的資料類型映射。
使用樣本
源表示例
Binlog Source表
CDC模式
該模式下Source消費的Binlog資料,將根據hg_binlog_event_type自動為每行資料設定準確的Flink RowKind類型,無需顯式聲明它們,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER類型,這樣就能完成表的資料的鏡像同步,類似MySQL或Postgres的CDC功能。源表DDL程式碼範例如下。
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='${secret_values.ak_id}', --AK/SK推薦使用變數管理防止密鑰泄露
'password'='${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL', --讀取所有ChangeLog類型,包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER。
'retry-count'='10', --讀取Binlog資料出錯後的重試次數。
'retry-sleep-step-ms'='5000', --重試累加等待時間。第一次重試等待5秒,第二次等待10秒,依此類推。。
'source.binlog.batch-size'='512' --批量讀取Binlog的資料行數。
);VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --AK/SK推薦使用變數管理防止密鑰泄露
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc',
'binlogMaxRetryTimes' = '10', --讀取Binlog資料出錯後的重試次數。
'binlogRetryIntervalMs' = '500', --讀取Binlog資料出錯後的重試時間間隔
'binlogBatchReadSize' = '100' --批量讀取Binlog的資料行數。
);非CDC模式
該模式下Source消費的Binlog資料是作為普通的Flink資料傳遞給下遊節點的,即所有資料都是INSERT類型的資料,您可以根據業務情況選擇如何處理特定hg_binlog_event_type類型的資料。源表DDL程式碼範例如下。
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --AK/SK推薦使用變數管理防止密鑰泄露
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY', --所有ChangeLog類型都當作INSERT處理。
'retry-count'='10', --讀取Binlog資料出錯後的重試次數。
'retry-sleep-step-ms'='5000', --重試累加等待時間。第一次重試等待5秒,第二次等待10秒,依此類推。。
'source.binlog.batch-size'='512' --批量讀取Binlog的資料行數。
);VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --AK/SK推薦使用變數管理防止密鑰泄露
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10', --讀取Binlog資料出錯後的重試次數。
'binlogRetryIntervalMs' = '500', --讀取Binlog資料出錯後的重試時間間隔
'binlogBatchReadSize' = '100' --批量讀取Binlog的資料行數。
);非Binlog Source表
VVR 11+
VVR 11.1及以上版本預設消費Binlog資料,請參見Binlog Source表。
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --AK/SK推薦使用變數管理防止密鑰泄露
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog'='false' --是否消費Binlog資料
);VVR 8+
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --AK/SK推薦使用變數管理防止密鑰泄露
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sdkMode' = 'jdbc'
);結果表示例
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --AK/SK推薦使用變數管理防止密鑰泄露
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;維表示例
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --AK/SK推薦使用變數管理防止密鑰泄露
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;特色功能詳解
全增量一體化消費
適用情境
僅適用於目標表有主鍵的情境,推薦在CDC模式下使用的全增量Hologres源表。
Hologres支援按需開啟Binlog,可以將已有歷史資料的表開啟Binlog。
程式碼範例
VVR 11+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog.startup-mode' = 'INITIAL', --先讀取歷史全量資料,再增量消費Binlog。
'retry-count'='10', --讀取Binlog資料出錯後的重試次數。
'retry-sleep-step-ms'='5000', --重試累加等待時間。第一次重試等待5秒,第二次等待10秒,依此類推。。
'source.binlog.batch-size'='512' --批量讀取Binlog的資料行數。
);source.binlog.startup-mode設定為INITIAL,可以先全量消費資料,再讀取Binlog開始增量消費。如果設定了
startTime參數,或者在啟動介面選擇了啟動時間,則binlogStartUpMode會強制使用timestamp模式消費,其他消費模式則不生效,startTime參數優先順序更高。
VVR 8+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', --先讀取歷史全量資料,再增量消費Binlog。
'binlogMaxRetryTimes' = '10', --讀取Binlog資料出錯後的重試次數。
'binlogRetryIntervalMs' = '500', --讀取Binlog資料出錯後的重試時間間隔
'binlogBatchReadSize' = '100' --批量讀取Binlog的資料行數。
);binlogStartUpMode設定為initial,可以先全量消費資料,再讀取Binlog開始增量消費。如果設定了
startTime參數,或者在啟動介面選擇了啟動時間,則binlogStartUpMode會強制使用timestamp模式消費,其他消費模式則不生效,startTime參數優先順序更高。
主鍵衝突處理策略
寫入Hologres時若存在重複主鍵的資料,連接器提供了三種處理策略。
VVR 11+
通過指定sink.on-conflict-action參數值,來實現不同的處理策略。
sink.on-conflict-action參數值 | 含義 |
INSERT_OR_IGNORE | 保留首次出現的資料,忽略後續重複資料。 |
INSERT_OR_REPLACE | 後續資料整行替換已有資料。 |
INSERT_OR_UPDATE(預設值) | 只更新sink中已提供的欄位,其他欄位保持不變。 |
VVR 8+
通過指定mutatetype參數值,來實現不同的處理策略。
mutatetype參數值 | 含義 |
insertorignore(預設值) | 保留首次出現的資料,忽略後續重複資料。 |
insertorreplace | 後續資料整行替換已有資料。 |
insertorupdate | 只更新sink中已提供的欄位,其他欄位保持不變。 |
假設表有欄位 a、b、c、d,其中 a 是主鍵。若結果表欄位僅提供 a 和 b,則配置INSERT_OR_UPDATE時,僅更新 b 欄位,c 和 d 保持不變。結果表欄位數量可以少於Hologres物理表,但缺失欄位必須允許為空白,否則將會導致寫入失敗。
分區表寫入
預設情況下,Hologres Sink 僅支援向單表匯入資料。如需匯入至分區表的父表 ,需開啟以下配置:
VVR 11+
sink.create-missing-partition設定為true,若未建立子分區表,可以自動建立。
VVR 11.1及以上版本預設支援寫入分區表,自動將資料路由到對應的子分區表。
tablename參數填寫父表的名稱。若未提前建立子表且未設定
sink.create-missing-partition=true,將導致寫入失敗。
VVR 8+
partitionRouter設定為true,自動將資料路由到對應的子分區表。createparttable設定為true,若未建立子分區表,可以自動建立。
tablename參數填寫父表的名稱。若未提前建立子表且未設定
createparttable=true,將導致寫入失敗。
多流寫入的寬表Merge與局部更新
在將多個資料流寫入同一張 Hologres 寬表的情境中,系統支援對主鍵相同的資料進行自動 Merge(合并),並可選擇性地僅更新發生變化的部分列,而非整行替換,從而提升寫入效率與資料一致性。
使用限制
寬表必須有主鍵。
每個資料流的資料都必須包含完整的主鍵欄位。
列存模式的寬表Merge情境在高RPS的情況下,CPU使用率會偏高,建議關閉表中欄位的Dictionary Encoding功能。
使用樣本
假設有兩個Flink資料流,一個資料流中包含a、b和c欄位,另一個資料流中包含a、d和e欄位,Hologres寬表WIDE_TABLE包含a、b、c、d和e欄位,其中a欄位為主鍵。
VVR 11+
// 已經定義的source1和source2
CREATE TEMPORARY TABLE hologres_sink ( -- 聲明a,b,c,d,e五個欄位
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- hologres寬表,包含a,b,c,d,e五個欄位
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sink.on-conflict-action'='INSERT_OR_UPDATE', -- 根據主鍵更新資料部分列
'sink.delete-strategy'='IGNORE_DELETE', -- 撤回訊息的處理策略,IGNORE_DELETE適用於僅需插入或更新資料,而無需刪除資料的情境。
'sink.partial-insert.enabled'='true' -- 開啟部分列更新參數,將INSERT語句中定義的欄位下推給連接器,從而可以只對聲明的欄位進行更新或插入
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- 聲明只插入a,b,c三個欄位
INSERT INTO hologres_sink(a,d,e) select * from source2; -- 聲明只插入a,d,e三個欄位
END;VVR 8+
// 已經定義的source1和source2
CREATE TEMPORARY TABLE hologres_sink ( -- 聲明a,b,c,d,e五個欄位
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- hologres寬表,包含a,b,c,d,e五個欄位
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'mutatetype'='insertorupdate', -- 根據主鍵更新資料部分列
'ignoredelete'='true', -- 忽略回撤訊息產生的Delete請求
'partial-insert.enabled'='true' -- 開啟部分列更新參數,支援僅更新INSERT語句中聲明的欄位
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- 聲明只插入a,b,c三個欄位
INSERT INTO hologres_sink(a,d,e) select * from source2; -- 聲明只插入a,d,e三個欄位
END;ignoredelete設定為true,忽略回撤訊息產生Delete請求。Realtime Compute引擎VVR 8.0.8及以上版本推薦使用sink.delete-strategy配置局部更新時的各種策略。
消費分區表Binlog(公測)
分區表有助於資料歸檔與查詢最佳化。Hologres Connector支援消費物理分區表和邏輯分區表的Binlog。關於物理分區表和邏輯分區表的區別參見CREATE LOGICAL PARTITION TABLE。
消費物理分區表Binlog
Hologres Connector 支援通過單個作業消費分區表 Binlog,並可動態監聽新增分區,顯著提升即時資料處理效率與易用性。
注意事項
僅Realtime Compute引擎VVR 8.0.11及以上版本,Hologres執行個體版本大於等於2.1.27,Binlog源表JDBC模式支援消費分區表。
分區名稱必須嚴格由父表名+底線+分區值組成,即
{parent_table}_{partition_value},非此格式的分區可能無法消費到,詳情請參見動態分區管理。重要對於DYNAMIC模式,VVR 8.0.11版本不支援帶
-分隔字元的分區欄位(如YYYY-MM-DD)。自VVR 11.1版本起,可以消費自訂Format格式的分區欄位。
寫入分區表則不受此格式限制。
Flink中聲明Hologres源表時,必須包含Hologres分區表的分區欄位。
對於DYNAMIC模式,要求分區表必須開啟動態分區管理。並且分區預建立參數
auto_partitioning.num_precreate必須大於1,否則,在嘗試消費最新分區時,作業將會拋出異常。DYNAMIC模式下,新增分區後將不再讀取舊分區的後續資料更新。
使用樣本
模式類型 | 特點 | 適用情境說明 |
DYNAMIC | 動態分區消費 | 自動監聽新增分區,按時間順序動態推進消費進度。適合即時資料流情境。 |
STATIC | 靜態分區消費 | 只消費已存在的分區(或手動指定的分區),不會自動探索新分區。適合固定範圍的歷史資料處理。 |
DYNAMIC模式
VVR 11+
假設Hologres存在如下的DDL分區表,並且已啟用Binlog以及動態分區。
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- 開啟動態分區
auto_partitioning_time_unit = 'DAY', -- 以天為時間單元,自動建立的分區名樣本:test_message_src1_20250512,test_message_src1_20250513
auto_partitioning_num_precreate = '2' -- 會提前建立兩個分區
);
-- 已經存在的分區表,也可以通過ALTER TABLE方式開啟動態分區在Flink中,使用以下SQL聲明對分區表test_message_src1進行DYNAMIC模式消費。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- hologres分區表的分區欄位
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- 開啟了動態分區的父表,
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- 動態監聽最新的分區
'source.binlog.startup-mode' = 'initial' -- 先全量消費資料,再讀取Binlog開始增量消費。
);VVR 8.0.11
假設Hologres存在如下的DDL分區表,並且已啟用Binlog以及動態分區。
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- 開啟動態分區
auto_partitioning_time_unit = 'DAY', -- 以天為時間單元,自動建立的分區名樣本:test_message_src1_20241027,test_message_src1_20241028
auto_partitioning_num_precreate = '2' -- 會提前建立兩個分區
);
-- 已經存在的分區表,也可以通過ALTER TABLE方式開啟動態分區在Flink中,使用以下SQL聲明對分區表test_message_src1進行DYNAMIC模式消費。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- hologres分區表的分區欄位
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- 開啟了動態分區的父表,
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'DYNAMIC', -- 動態監聽最新的分區
'binlogstartUpMode' = 'initial', -- 先全量消費資料,再讀取Binlog開始增量消費。
'sdkMode' = 'jdbc_fixed' -- 使用此模式,避免串連數限制
);STATIC模式
VVR 11+
假設Hologres存在如下的DDL分區表,並且已啟用Binlog。
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
在Flink中,使用以下SQL聲明對分區表test_message_src2進行STATIC模式消費。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- hologres分區表的分區欄位
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- 分區表
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'STATIC', -- 消費固定的分區
'source.binlog.partition-values-to-read' = 'red,blue,green', -- 僅消費配置的3個分區,不會消費'black'分區;以後新增分區也不會消費.不設定則消費父表所有分區。
'source.binlog.startup-mode' = 'initial' -- 先全量消費資料,再讀取Binlog開始增量消費。
);VVR 8.0.11
假設Hologres存在如下的DDL分區表,並且已啟用Binlog。
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
在Flink中,使用以下SQL聲明對分區表test_message_src2進行STATIC模式消費。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- hologres分區表的分區欄位
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- 分區表
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'STATIC', -- 消費固定的分區
'partition-values-to-read' = 'red,blue,green', -- 僅消費配置的3個分區,不會消費'black'分區;以後新增分區也不會消費。不設定則消費父表所有分區。
'binlogstartUpMode' = 'initial', -- 先全量消費資料,再讀取Binlog開始增量消費。
'sdkMode' = 'jdbc_fixed' -- 使用此模式,避免串連數限制
);消費邏輯分區表Binlog
Hologres Connector 支援消費邏輯分區表Binlog,並可通過參數消費指定分區。
注意事項
僅Realtime Compute引擎VVR 11.0.0及以上版本,Hologres執行個體版本大於等於V3.1,支援消費邏輯分區表指定分區的Binlog。
消費邏輯分區表全部分區的Binlog,與消費普通Holo表沒有區別,方法參考源表示例。
使用樣本
參數名 | 說明 | 樣本 |
source.binlog.logical-partition-filter-column-names | 邏輯分區表消費Binlog指定分區的分區列名。分區列名必須用雙引號包裹,多個分區列用逗號分隔,若列名有雙引號前面添加雙引號進行轉義。 | 'source.binlog.logical-partition-filter-column-names'='"Pt","id"' 該分區列有兩個,分別是Pt和id。 |
source.binlog.logical-partition-filter-column-values | 邏輯分區表消費Binlog指定分區的分區值。每個分區可以由多個分區列的值指定,分區列之間用逗號分隔,分區列的值用雙引號包裹,如果分區列的值包含雙引號前面需要添加雙引號進行轉義。多個分區之間用分號分隔 | 'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"' 指定消費兩個分區。分區列有兩個。第一個分區值是(20240910, 0),第二個分區值是(special"value, 9) |
假設Holo中已經建表如下
CREATE TABLE holo_table (
id int not null,
name text,
age numeric(18,4),
"Pt" text,
primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
binlog_level ='replica'
);Flink中消費該表Binlog。
CREATE TEMPORARY TABLE test_src_binlog_table(
id INTEGER,
name VARCHAR,
age decimal(18,4),
`Pt` VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='holo_table',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog'='true',
'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
'source.binlog.change-log-mode'='ALL', --讀取所有ChangeLog類型,包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER。
'retry-count'='10', --讀取Binlog資料出錯後的重試次數。
'retry-sleep-step-ms'='5000', --重試累加等待時間。第一次重試等待5秒,第二次等待10秒,依此類推。。
'source.binlog.batch-size'='512' --批量讀取Binlog的資料行數。
);DataStream API
通過DataStream的方式讀寫資料時,需要使用對應的DataStream連接器串連Realtime ComputeFlink版,DataStream連接器設定方法請參見DataStream連接器使用方法。Maven中央庫中已經放置了Hologres DataStream連接器。在本地調試時,需要使用相應的Uber JAR,詳見本地運行和調試包含連接器的作業。
Hologres源表
Binlog源表
VVR提供了Source的實作類別HologresBinlogSource來讀取Hologres Binlog資料。以下為構建Hologres Binlog Source的樣本。
VVR 8.0.11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 構建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet(),
new ArrayList<>()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 8.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 構建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 6.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 設定或建立預設slotname
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// 構建Binlog Record Converter。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// 構建Hologres Binlog Source。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}Realtime Compute引擎低於8.0.5版本或Hologres低於2.1版本,請注意是否為SuperUser,或具有Replication Role許可權,詳情請參見Hologres許可權問題。
非Binlog源表
VVR提供了RichInputFormat的實作類別HologresBulkreadInputFormat來讀取Hologres表資料。以下為構建Hologres Source讀取表資料的樣本。
public class Sample {
public static void main(String[] args) throws Exception {
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
new HologresConnectionParam(config),
jdbcOptions,
schema,
"",
-1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
env.execute();
}
}Maven依賴
Maven中央庫中已經放置了Hologres DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>Hologres結果表
VVR提供了OutputFormatSinkFunction的實作類別HologresSinkFunction來寫入資料。以下為構建Hologres Sink的樣本。
public class Sample {
public static void main(String[] args) throws Exception {
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的欄位匹配,可以只定義部分欄位。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 構建Hologres Writer,以RowData的方式寫入資料。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam,
schema,
HologresTableSchema.get(hologresConnectionParam),
new Integer[0]);
// 構建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}中繼資料列
Realtime Compute引擎VVR 8.0.11及以上版本的Binlog源表支援中繼資料列。從此版本起,建議以中繼資料列的方式聲明hg_binlog_event_type等Binlog欄位。中繼資料列是SQL標準的擴充,通過中繼資料列可以訪問源表的庫名和表名,以及資料的變更類型,產生時間等特定資訊,您可以基於這些資訊自訂處理邏輯,例如過濾變更類型為DELETE的資料等。
欄位名 | 欄位類型 | 說明 |
db_name | STRING NOT NULL | 包含該行記錄的庫名。 |
table_name | STRING NOT NULL | 包含該行記錄的表名。 |
hg_binlog_lsn | BIGINT NOT NULL | Binlog系統欄位,表示Binlog序號,Shard內部單調遞增但不連續,不同Shard之間不保證唯一和有序。 |
hg_binlog_timestamp_us | BIGINT NOT NULL | 該行記錄在資料庫中的變更時間戳記,單位為微秒(us)。 |
hg_binlog_event_type | BIGINT NOT NULL | 該行記錄的變更類型。參數取值如下:
|
hg_shard_id | INT NOT NULL | 資料所在資料分區Shard。 Shard基本概念詳情請參見Table Group和Shard。 |
在DDL中,採用<meta_column_name> <datatype> METADATA VIRTUAL聲明中繼資料列。樣本如下:
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn bigint METADATA VIRTUAL
hg_binlog_event_type bigint METADATA VIRTUAL
hg_binlog_timestamp_us bigint METADATA VIRTUAL
hg_shard_id int METADATA VIRTUAL
db_name string METADATA VIRTUAL
table_name string METADATA VIRTUAL
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
...
);常見問題
相關文檔
Hologres Catalog的建立與使用,詳情請參見管理Hologres Catalog。
Hologres與Flink深度整合,能夠提供一體化的即時數倉聯合解決方案,詳情請參見Hologres即時數倉搭建。
Hologres 具備高效的資料更新與修正支援,適用於多流寫入情境下的寬表構建,詳情請參見MongoDB+Hologres使用者行為分析。