全部產品
Search
文件中心

Realtime Compute for Apache Flink:參數配置(VVR 11及以上版本)

更新時間:Jan 23, 2026

關於VVR 11及以上版本的With參數說明。

版本參數移除說明

為最佳化系統架構、提升維護效率,對VVR 8及以下版本中的部分歷史參數進行了調整與移除。您可以根據下方不同的列表,查詢已經移除的歷史版本參數與相應的替代方案。

已移除參數

原有參數

參數說明

替代方案/移除說明

jdbcRetrySleepInitMs

每次重試的固定等待時間。

使用遞增的等待時間。retry-sleep-step-ms

jdbcMetaAutoRefreshFactor

如果緩衝的剩餘時間小於觸發時間,則系統會自動重新整理緩衝。

配置緩衝時間參數meta-cache-ttl-ms即可,無需配置此參數。

type-mapping.timestamp-converting.legacy

Flink和Hologres之間是否進行時間類型的相互轉換。

支援TIMESTAMP_LTZ類型時為相容歷史表現引入的參數,當前無需使用。

property-version

Connector參數版本。

已最佳化常用參數預設值,該參數被移除。

field_delimiter

匯出資料時,不同行之間使用的分隔字元。

已最佳化讀取資料方式,該參數移除。

jdbcBinlogSlotName

JDBC模式的binlog源表的Slot名稱。

已最佳化讀取資料方式,該參數移除。

binlogMaxRetryTimes

讀取Binlog資料出錯後的重試次數。

可以使用retry-count參數進行配置。

cdcMode

是否採用CDC模式讀取Binlog資料。

預設採用CDC模式讀取,該參數移除。非CDC模式可以使用source.binlog.change-log-mode參數配置。

upsertSource

源表是否使用upsert類型的Changelog。

使用source.binlog.change-log-mode參數配置。

bulkload

是否採用bulkload寫入。

使用sink.write-mode參數配置。

useRpcMode

是否通過RPC方式使用Hologres連接器。

推薦使用jdbc串連,配置sink.deduplication.enabled參數選擇是否去重。

partitionrouter

是否寫入分區表。

已預設支援寫入分區表,該參數移除。

ignoredelete

是否忽略撤回訊息。

配置sink.delete-strategy參數。撤回訊息的處理策略。

sdkMode

SDK模式。

已最佳化此參數,請根據表類型參考source.binlog.read-modesink.write-mode參數進行配置。

jdbcReadBatchQueueSize

維表點查請求緩衝隊列大小。

當點查效能不理想時,建議配置connection.pool.size參數。

jdbcReadRetryCount

維表點查逾時時的重試次數。

已統一歸併通用重試機制配置項retry-count參數。

jdbcScanTransactionSessionTimeoutSeconds

掃描操作所在事務的逾時時間。

已統一歸併通用掃描逾時配置項source.scan.timeout-seconds參數。

更名參數

原有參數(VVR 8)

VVR 11

參數說明

jdbcRetryCount

retry-count

當串連故障時,寫入和查詢的重試次數。

jdbcRetrySleepStepMs

retry-sleep-step-ms

每次重試的累加等待時間。

jdbcConnectionMaxIdleMs

connection.max-idle-ms

JDBC串連的空閑時間。

jdbcMetaCacheTTL

meta-cache-ttl-ms

本機快取TableSchema資訊的到期時間。

binlog

source.binlog

是否消費Binlog資料。

sdkMode

source.binlog.read-mode

讀模數式

binlogRetryIntervalMs

source.binlog.request-timeout-ms

讀取Binlog資料出錯後的重試時間間隔。

binlogBatchReadSize

source.binlog.batch-size

批量讀取Binlog的資料行數。

binlogStartupMode

source.binlog.startup-mode

Binlog資料消費模式。

jdbcScanFetchSize

source.scan.fetch-size

掃描時攢批大小。

jdbcScanTimeoutSeconds

source.scan.timeout-seconds

掃描操作逾時時間。

enable_filter_push_down

source.scan.filter-push-down.enabled

全量讀取階段是否進行filter下推。

partition-binlog.mode

source.binlog.partition-binlog-mode

消費分區表Binlog模式。

partition-binlog-lateness-timeout-minutes

source.binlog.partition-binlog-lateness-timeout-minutes

在DYNAMIC模式下消費分區表,允許延遲的最大逾時時間。

partition-values-to-read

source.binlog.partition-values-to-read

在STATIC模式下消費分區表,指定所需消費的分區,分區值之間使用','進行分隔。

sdkMode

sink.write-mode

寫入模式。

mutatetype

sink.on-conflict-action

主鍵衝突處理策略。

createparttable

sink.create-missing-partition

當寫入分區表時,是否根據分區值自動建立不存在的分區表。

jdbcWriteBatchSize

sink.insert.batch-size

Hologres Sink節點資料攢批條數(不是來一條資料處理一條,而是攢一批再處理)的最大值。

jdbcWriteBatchByteSize

sink.insert.batch-byte-size

Hologres Sink節點資料攢批位元組數(不是來一條資料處理一條,而是攢一批再處理)的最大值。

jdbcWriteFlushInterval

sink.insert.flush-interval-ms

Hologres Sink節點資料攢批寫入Hologres的最長等待時間。

ignoreNullWhenUpdate

sink.ignore-null-when-update.enabled

當mutatetype='insertOrUpdate'時,是否忽略更新寫入資料中的Null值。

jdbcEnableDefaultForNotNullColumn

sink.default-for-not-null-column.enabled

如果將Null值寫入Hologres表中Not Null且無預設值的欄位,是否允許連接器協助填充一個預設值。

remove-u0000-in-text.enabled

sink.remove-u0000-in-text.enabled

如果寫入時字串類型包含\u0000非法字元,是否允許連接器協助去除。

partial-insert.enabled

sink.partial-insert.enabled

是否只插入INSERT語句中定義的欄位。

deduplication.enabled

sink.deduplication.enabled

寫入攢批過程中,是否進行去重。

check-and-put.column

sink.insert.check-and-put.column

啟用條件更新能力,並指定檢查的欄位名。

check-and-put.operator

sink.insert.check-and-put.operator

條件更新操作的比較操作符。

check-and-put.null-as

sink.insert.check-and-put.null-as

當條件更新時,如果舊資料為null,則將該null值視為此參數配置的有效值。

aggressive.enabled

sink.aggressive-flush.enabled

是否啟用激進提交模式。

connectionSize

connection.pool.size

單個Flink維表任務所建立的JDBC串連池大小。

connectionPoolName

connection.pool.name

串連池名稱。同一個TaskManager中,配置相同名稱的串連池的表可以共用串連池。

jdbcReadBatchSize

lookup.read.batch-size

點查維表時,攢批處理的最大條數。

jdbcReadTimeoutMs

lookup.read.timeout-ms

維表點查的逾時時間。

WITH參數

通用

參數

說明

資料類型

是否必填

預設值

備忘

connector

表類型。

String

固定值為hologres

dbname

資料庫名稱。

String

可以通過在dbname參數後添加特定的尾碼來指定串連某個計算群組。例如某張維表希望串連特定的計算群組read_warehouse,可以通過'dbname' = 'db_test@read_warehouse' 方式指定。

tablename

表名稱。

String

如果Schema不為Public時,則tablename需要填寫為schema.tableName

username

  • 自訂帳號的使用者名稱,格式為BASIC$<user_name>

  • 阿里雲帳號或RAM使用者的AccessKey ID。

String

重要

為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數

password

  • 自訂帳號的密碼。

  • 阿里雲帳號或RAM使用者的AccessKey Secret。

String

endpoint

Hologres服務地址。

String

詳情請參見訪問網域名稱

connection.pool.size

單個Flink 表在任務中所建立的JDBC串連池大小。

Integer

5

如果作業效能不足,建議您增加串連池大小。串連池大小和資料吞吐成正比。此參數僅對維表和結果表生效。

connection.pool.name

串連池名稱。同一個TaskManager中,配置相同名稱的串連池的表可以共用串連池。

String

'default'

預設取值為'default'。如果多個表設定相同的串連池,則以其中最大的 connection.pool.size參數配置為準。

您可以按需配置此參數,例如作業中有維表A,B以及結果表C,D,E五張hologres表,可以A表和B表使用pool1,C表和D表使用pool2,E表流量較大,單獨使用pool3。

說明
  • connection.pool.name 相同以共用串連池的前提是 endpoint、database 等串連資訊都相同。

  • 作業中表數量較多時串連數可能相對不足影響效能,這種情況下推薦為不同的表設定不同的connection.pool.name。

connection.fixed.enabled

是否使用輕量級串連模式

Boolean

Hologres 的串連數是有上限的,在 Hologres2.1 版本起,即時寫入支援使用不受串連數限制的輕量級串連。

說明
  • 此參數的預設值受Hologres執行個體版本的影響,對維表和結果表,連接器會在hologres版本大於3.0.28時,自動選擇輕量級串連模式。

  • 維表輕量級串連不支援查詢JSONB和RoaringBitmap類型。

connection.max-idle-ms

JDBC串連的空閑時間。

Long

60000

超過這個空閑時間,串連就會斷開釋放掉,下次使用時自動建立。單位為毫秒。

connection.ssl.mode

是否啟用SSL(Secure Sockets Layer)傳輸加密,以及啟用採用何種模式。

String

disable

  • disable(預設值):不啟用傳輸加密。

  • require:啟用SSL,只對資料鏈路加密。

  • verify-ca:啟用SSL,加密資料鏈路,同時使用CA認證驗證Hologres服務端的真實性。

  • verify-full:啟用SSL,加密資料鏈路,使用CA認證驗證Hologres服務端的真實性,同時比對認證內的CN或DNS與串連時配置的Hologres串連地址是否一致。

說明
  • Hologres自2.1版本起新增支援verify-ca和verify-full模式。詳見傳輸加密

  • 當配置為verify-ca或者verify-full時,需要同時配置connection.ssl.root-cert.location參數。

connection.ssl.root-cert.location

當傳輸加密模式需要認證時,配置認證的路徑。

String

當connection.ssl.mode配置為verify-ca或者verify-full時,需要同時配置CA認證的路徑。認證可以使用Realtime Compute控制台的檔案管理功能上傳至平台,上傳後檔案存放在/flink/usrlib目錄下。例如,需要使用的CA認證檔案名稱為certificate.crt,則上傳後參數取值應該為 '/flink/usrlib/certificate.crt'

說明

CA認證擷取方式見傳輸加密-下載CA認證

retry-count

當串連故障時,寫入和查詢的重試次數。

Integer

10

無。

retry-sleep-step-ms

每次重試時遞增的等待時間。

Long

5000

單位為毫秒。例如,預設值 5000(即 5 秒)時,第一次重試等待 5 秒,第二次等待 10 秒,依此類推。

meta-cache-ttl-ms

本機快取TableSchema資訊的到期時間。

Long

600000

單位為毫秒。

serverless-computing.enabled

是否使用serverless資源

Boolean

false

設定為true時,表示使用Hologres的serverless資源來進行讀取和寫入,而不是使用者Hologres執行個體的資源。目前僅批量讀取和大量匯入支援此參數。消費Binlog、維表點查和即時寫入配置此參數無效。目前詳見Serverless Computing概述

說明
  • 批量讀取指source.binlog設定為false,或者source.binlog.startup-mode設定為INITIAL時的全量讀取階段。

  • 大量匯入指sink.write-mode設定為COPY_BULK_LOADCOPY_BULK_LOAD_ON_CONFLICT

說明

若需要執行大規模全量匯入或匯出,且希望避免影響Hologres執行個體上的其他查詢,建議開啟此參數,詳見Serverless Computing概述

源表專屬

參數

說明

資料類型

是否必填

預設值

備忘

source.binlog

是否消費Binlog資料。

Boolean

true

  • true(預設值):消費Binlog資料。

  • false:不消費Binlog資料,只進行批量讀取,讀取結束作業停止。

source.binlog.read-mode

讀模數式。

ENUM

AUTO

  • AUTO(預設值):根據執行個體版本自動選擇最佳模式

  • HOLOHUB:使用holohub模式消費binlog。

  • JDBC:使用jdbc模式消費binlog。

說明

AUTO 模式的自動選擇邏輯如下:

  • Hologres執行個體2.1.27及以上版本,選擇JDBC模式,同時預設啟用輕量級串連,即connection.fixed.enabled 參數預設設定為 true。

  • Hologres執行個體2.1.0~2.1.26版本,選擇JDBC模式。

  • Hologres執行個體2.0 及以下版本,選擇HOLOHUB模式。

source.binlog.change-log-mode

CDC 源表支援的ChangeLog 類型

ENUM

UPSERT

  • ALL:支援所有 ChangeLog 類型,包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER。

  • UPSERT(預設值):僅支援Upsert類型的ChangeLog,包括INSERT、DELETE和UPDATE_AFTER。

  • ALL_AS_APPEND_ONLY:所有 ChangeLog 類型都當作 INSERT 處理。

說明

如果下遊包含回撤運算元(例如使用ROW_NUMBER OVER WINDOW去重),則需要設定upsertSource為true,此時源表會以Upsert方式從Hologres中讀取資料。

source.binlog.startup-mode

Binlog資料消費模式。

ENUM

INITIAL

  • INITIAL(預設值):先全量消費資料,再讀取Binlog開始增量消費。

  • EARLIEST_OFFSET:從最早的Binlog開始消費。

  • TIMESTAMP:從設定的startTime開始消費Binlog。

說明

如果設定了startTime或者在啟動介面選擇了啟動時間,則binlogStartupMode強制使用timestamp模式,其他消費模式不生效,即startTime參數優先順序更高。

source.binlog.batch-size

讀取Binlog的資料每批的行數。

Integer

512

無。

source.binlog.request-timeout-ms

讀取Binlog資料逾時等待時間。

Long

300000

單位為毫秒。

說明

如果發生timeout,可能的原因是作業下遊運算元處理源表資料太慢導致反壓。

source.binlog.project-columns.enabled

讀取binlog資料時,是否唯讀取使用者表指定的欄位。

Boolean

指定的欄位是指CREATE TEMPORARY TABLE聲明的欄位,未聲明的欄位不會讀取。在表欄位較多但只希望消費部分欄位時,可以避免不必要的資料轉送和轉換,可以提高讀取效能,節省頻寬。

說明

僅Realtime Compute引擎VVR 11.3及以上版本,同時Hologres執行個體3.2及以上版本支援該參數。一般無需配置,連接器判斷版本滿足預設開啟。

source.binlog.compression.enabled

讀取binlog資料時,是否在傳輸過程中啟用資料壓縮。

Boolean

消費Binlog時,服務端返回通過LZ4演算法壓縮後的位元組流,可以提高讀取效能,節省頻寬。

說明

僅Realtime Compute引擎VVR 11.3及以上版本,同時Hologres執行個體3.2及以上版本支援該參數。一般無需配置,連接器判斷版本滿足預設開啟。

source.binlog.partition-binlog-mode

消費分區表Binlog模式。

Enum

DISABLE

  • DISABLE(預設值):源表是非分區表,如果指定的Hologres表為分區表,將拋出異常。

  • DYNAMIC:持續消費分區表的最新分區。分區表必須開啟動態分區管理,DYNAMIC模式會按照從舊到新的順序消費各個分區。當消費到次新分區時,會在新的單位時間到來時,開啟最新分區的消費。

  • STATIC:消費分區表的固定分區,可同時消費多個分區。分區在消費過程中無法新增或移除。預設消費此父表的所有分區。

source.binlog.partition-binlog-lateness-timeout-minutes

在DYNAMIC模式下消費分區表,允許延遲的最大逾時時間。

Boolean

60

  • 單位為分鐘,DYNAMIC模式會在新的單位時間到來時開啟目前時間對應的最新分區的消費,但不會立刻關閉前一個分區,而是會持續監聽以保證可以讀取到上一個分區的延遲資料。

例如:如果動態分區以DAY為單位,分區是20240920,允許資料最大延遲是1小時,對於這個分區,其消費會在2024-09-21 01:00:00關閉,而不是在2024-09-21 00:00:00停止消費。

  • lateness-timeout時間不允許超過分區的單位時間。

如果按天分區,其最大值為24 * 60 = 1440(min),DYNAMIC模式大多數時間只會同時消費一張表,在延遲時間內可能同時消費兩個分區。

source.binlog.partition-values-to-read

在STATIC模式下消費分區表,指定所需消費的分區,分區值之間使用','進行分隔。

String

  • 不配置此參數時,STATIC模式會消費指定父表的所有分區,指定時則僅消費被指定的分區。

  • 此參數僅需要填寫分區值,不需要完整的分區名稱,多個分區值使用,分割。目前不支援通過Regex配置。

startTime

啟動位點的時間。

String

格式為yyyy-MM-dd hh:mm:ss。如果沒有設定該參數,且作業沒有從State恢複,則從最早的Binlog開始消費Hologres資料。

source.scan.fetch-size

批量讀取時攢批大小。

Integer

512

無。

source.scan.timeout-seconds

批量讀取時逾時時間。

Integer

60

單位為秒。

source.scan.filter-push-down.enabled

批量讀取是否進行filter下推。

Boolean

false

  • false(預設值):不進行filter下推。

  • true:批量讀取時,將支援的過濾條件下推到Hologres執行。

說明
  • source.binlog.filter-push-down.enabled參數不能同時開啟。

  • 兩種情況下生效:

    • source.binlog設定為false,表示進行批量讀取,filter下推生效。

    • source.binlog設定為true,source.binlog.startup-mode設定為INITIAL,表示進行全增量讀取,全量階段filter下推生效。

source.binlog.filter-push-down.enabled

消費binlog是否進行filter下推。

Boolean

false

  • false(預設值):不進行filter下推。

  • true:消費Binlog時,將支援的過濾條件下推到Hologres執行。

說明
  • 僅Realtime Compute引擎VVR 11.3及以上版本,同時Hologres執行個體4.0及以上版本支援該參數。與source.scan.filter-push-down.enabled參數不能同時開啟。

  • source.binlog設定為true時,filter下推都會生效。例如當source.binlog.startup-mode設定為INITIAL時,filter下推對全量和增量階段都有效。

scan.prefer.physical-column.over.metadata-column

在物理列和中繼資料列重名時,是否優先讀取物理列的資料。

Boolean

false

僅Realtime Compute引擎VVR 11.5及更高版本支援此配置。較早版本總是優先讀取中繼資料列。

結果表專屬

參數

說明

資料類型

是否必填

預設值

備忘

sink.write-mode

寫入模式。

ENUM

INSERT

  • INSERT:預設值,表示使用jdbc,通過 INSERT 方式進行寫入。

  • COPY_STREAM:是否使用流式的 fixed copy方式寫入。fixed copy是一種高效能的流式寫入方式,適用於對吞吐和延遲要求高的情境。但此模式暫不支援delete資料,也不支援寫入分區父表,不支援ignoreNullWhenUpdate參數。

  • COPY_BULK_LOAD:表示使用COPY協議進行批量寫入,COPY_BULK_LOAD目前僅適用於無主鍵表(主鍵重複會拋出異常),相比COPY_STREAM,寫入使用更少的Hologres資源。

  • COPY_BULK_LOAD_ON_CONFLICT:表示使用COPY協議進行批量寫入,支援寫入有主鍵表並處理主鍵衝突的情況。

說明
  • COPY_BULK_LOAD_ON_CONFLICT僅Realtime Compute引擎VVR 11.3及以上版本支援該模式。此模式要求Hologres執行個體版本3.1及以上,實現原理為在Flink側根據Hologres結果表的DistributionKey對資料進行Reshuffle,從而使相同Shard的資料在同一個Flink Task寫入,將大量匯入的表鎖降低為Shard粒度,實現不同Shard的並發寫入。因此建議作業並發與Hologres結果表的shard數一致。

  • COPY_BULK_LOADCOPY_BULK_LOAD_ON_CONFLICT模式寫入時,資料在Checkpoint成功之後可見,適合對資料可見度要求不高,或者歷史資料的大量匯入情境。

sink.on-conflict-action

主鍵衝突處理策略。

ENUM

INSERT_OR_UPDATE

  • INSERT_OR_IGNORE:保留首次出現的資料,忽略後續所有資料。

  • INSERT_OR_REPLACE:後出現的資料整行替換已有資料。

  • INSERT_OR_UPDATE(預設值):更新已有資料的部分列。

    例如一張表有a、b、c和d四個欄位,a是主鍵,若結果表欄位僅提供a和b兩個欄位,在主鍵重複的情況下,系統只會更新b欄位,c和d保持不變。

sink.create-missing-partition

當寫入分區表時,是否根據分區值自動建立不存在的分區表。

Boolean

false

  • 使用Date類型做分區鍵時,如果開啟了動態分區管理,自動建立的分區表名格式預設與動態分區一致。

  • 請確保分區值不會出現髒資料,否則會建立錯誤的分區表導致Failover,建議慎用該參數。

  • 當sink.write-mode不是INSERT時,不支援寫入分區父表。

sink.delete-strategy

撤回訊息的處理策略。

String

CHANGELOG_STANDARD

  • IGNORE_DELETE:忽略Update Before和Delete訊息。適用於僅需插入或更新資料,而無需刪除資料的情境。

  • NON_PK_FIELD_TO_NULL:忽略Update Before訊息,並將Delete訊息執行為將非主鍵欄位更新為NULL。適用於希望在局部更新操作中執行刪除操作而不影響其他列的情境。

  • DELETE_ROW_ON_PK:忽略Update Before訊息,並將Delete訊息執行為根據主鍵刪除整行。適用於在局部更新過程中,希望執行刪除整行操作,從而影響其他列的情境。

  • CHANGELOG_STANDARD:Flink架構按照 Flink SQL Changelog的工作原理運行,不忽略刪除操作,並通過先刪除資料再插入的方式執行更新操作,以確保資料準確性。適用於不涉及局部更新的情境

說明

啟用NON_PK_FIELD_TO_NULL選項可能會導致記錄中只有主鍵,其他所有列都為null。

sink.ignore-null-when-update.enabled

當sink.on-conflict-action='INSERT_OR_UPDATE'時,是否忽略更新寫入資料中的Null值。

Boolean

false

  • false(預設值):將Null值寫到Hologres結果表裡。

  • true:忽略更新寫入資料中的Null值。

說明

sink.write-mode設定為INSERT時支援此參數。

sink.ignore-null-when-update-by-expr.enabled

當sink.on-conflict-action='INSERT_OR_UPDATE'時,是否用運算式方式忽略更新寫入資料中的Null值。

Boolean

false

效能優於sink.ignore-null-when-update.enabled。

  • false(預設值):

    • sink.ignore-null-when-update.enabled開啟則忽略更新寫入資料中的Null值。

    • sink.ignore-null-when-update.enabled關閉則將Null值寫到Hologres結果表裡。

  • true:無論sink.ignore-null-when-update.enabled是否開啟,均會忽略更新寫入資料中的Null值。

說明
  • sink.write-mode設定為INSERT時支援此參數。

  • Hologres版本需>=4.0

sink.default-for-not-null-column.enabled

如果將Null值寫入Hologres表中Not Null且無預設值的欄位,是否允許連接器協助填充一個預設值。

Boolean

true

  • true(預設值):允許連接器填充預設值並寫入,規則如下。

    • 如果欄位是String類型,則預設寫為空白("")。

    • 如果欄位是Number類型,則預設寫為0。

    • 如果是Date、timestamp或timestamptz時間類型欄位,則預設寫為1970-01-01 00:00:00

  • false:不填充預設值,寫Null到Not Null欄位時,會拋出異常。

說明

sink.write-mode設定為INSERT、且sink.on-conflict-action設定為除INSERT_OR_UPDATE之外的選項時支援此參數。

sink.remove-u0000-in-text.enabled

如果寫入時字串類型包含\u0000非法字元,是否允許連接器協助去除。

Boolean

true

  • false:連接器不對資料進行操作,但碰到髒資料時寫入可能拋出如下異常,ERROR: invalid byte sequence for encoding "UTF8": 0x00

    此時需要在源表提前處理髒資料,或者在SQL中定義髒資料處理邏輯。

  • true(預設值):連接器會協助去除字串類型中的\u0000,防止寫入拋出異常。

sink.partial-insert.enabled

是否只插入INSERT語句中定義的欄位。

Boolean

false

  • false(預設值):無論INSERT語句中聲明了哪些欄位,都會更新結果表DDL中定義的所有欄位,對於未在INSERT語句中聲明的欄位,會被更新為null。

  • true:將INSERT語句中定義的欄位下推給連接器,從而可以只對聲明的欄位進行更新或插入。

說明
  • 此參數僅在sink.on-conflict-action參數配置為INSERT_OR_UPDATE時生效。

sink.deduplication.enabled

寫入攢批過程中,是否進行去重。

Boolean

true

  • true(預設值):如果一批資料中有主鍵相同的資料,預設進行去重,只保留最後一條到達的資料。以兩個欄位,其中第一個欄位為主鍵的資料舉例:

    • INSERT (1,'a')INSERT (1,'b')兩條記錄先後到達,去重之後只保留後到達的(1,'b')寫入Hologres結果表中。

    • Hologres結果表中已經存在記錄(1,'a'),此時DELETE (1,'a')INSERT (1,'b')兩條記錄先後到達,只保留後到達的(1,'b')寫入hologres中,表現為直接更新,而不是先刪除再插入。

  • false:在攢批過程中不進行去重,如果發現新到的資料和目前攢批的資料中存在主鍵相同的情況,先將攢批資料寫入,寫入完成之後再繼續寫入新到的資料。

說明
  • sink.write-mode設定為INSERT時支援此參數。

  • 不允許攢批去重時,極端情況下(例如所有資料的主鍵都相同)寫入會退化為不攢批的單條寫入,對效能有一定影響。

sink.aggressive-flush.enabled

是否啟用激進提交模式。

Boolean

false

設定為true時,即便攢批未達到預期條數,串連在空閑時將會被強制提交。在流量較小時,可以有效減少資料寫入的延時。

說明

sink.write-mode配置為 INSERTCOPY_STREAM時支援該參數。

sink.insert.check-and-put.column

啟用條件更新能力,並指定檢查的欄位名。

String

參數取值必須設定為Hologres表存在的欄位名。

重要
  • sink.write-mode配置為 INSERT 時支援該參數。

  • 結果表必須有主鍵,sink.on-conflict-action參數值必須是INSERT_OR_UPDATE或者INSERT_OR_REPLACE

  • 由於需要反查,建議結果表建立為行存表或者行列混存表。

  • 在資料重複較多的情況下,check-and-put操作會退化為單條寫入,這將導致寫入效能的降低。

sink.insert.check-and-put.operator

條件更新操作的比較操作符。

String

GREATER

比較新record的check欄位與表中舊值,符合條件判斷操作符時進行更新。目前支援配置為GREATER、GREATER_OR_EQUAL、EQUAL、NOT_EQUAL、LESS、LESS_OR_EQUAL、IS_NULL、IS_NOT_NULL。

sink.insert.check-and-put.null-as

當條件更新時,如果舊資料為null,則將該null值視為此參數配置的有效值。

String

由於在PostgreSQL中,任何值與NULL進行比較的結果均為FALSE,因此當表中的原有資料為NULL時,進行更新操作時需要設定一個NULL-AS作為參數,相當於SQL中的COALESCE函數。

sink.insert.batch-size

INSERT模式,Hologres Sink節點資料攢批條數(不是來一條資料處理一條,而是攢一批再處理)的最大值。

Integer

512

sink.insert.batch-sizesink.insert.batch-byte-sizesink.insert.flush-interval-ms三者之間為或的關係。如果同時設定了這三個參數,則滿足其中一個,就進行寫入結果資料。

sink.insert.batch-byte-size

INSERT模式,Hologres Sink節點資料攢批位元組數(不是來一條資料處理一條,而是攢一批再處理)的最大值。

Long

2*1024*1024位元組,即2 MB

sink.insert.flush-interval-ms

INSERT模式,Hologres Sink節點資料攢批寫入Hologres的最長等待時間。

Long

10000

sink.copy.format

COPY模式使用的傳輸格式

String

  • COPY_STREAM 模式預設binary

  • COPY_BULK_LOADCOPY_BULK_LOAD_ON_CONFLICT預設text

COPY_STREAM模式支援:

  • binary

  • text

  • binaryrow(Hologres引擎版本>=4.1.0)

COPY_BULK_LOADCOPY_BULK_LOAD_ON_CONFLICT僅支援:text

說明

sink.write-mode配置為 COPY_STREAMCOPY_BULK_LOADCOPY_BULK_LOAD_ON_CONFLICT時支援該參數。

sink.insert.conflict-update-set

主鍵衝突時更新的Hologres運算式

String

等價於insert into tbl values(xxx) on conflict(pk) do update set <conflict-update-set>,可填寫holo運算式/函數。

例如:該配置項值為col1=old.col1+excluded.col1,col2=excluded.col2時,表示當主鍵衝突時將col1的值更新為舊值與新值相加,col2更新為新值。

  • 當此項不填時,預設為更新傳入的所有欄位為新值。

  • 當更新運算式有狀態時,比如col=old.col+excluded.col結果依賴舊值的狀態,為保證failover恢複後的資料正確性,請確保有一個可以作為行的版本號碼的欄位,並配置sink.insert.conflict-where為excluded.seq>old.seq,保證資料回放時資料的正確性。

說明

sink.write-mode配置為 INSERT 時支援該參數。

sink.insert.conflict-where

主鍵衝突時觸發更新的Hologres過濾條件

String

等價於insert into tbl values(xxx) on conflict(pk) do update set <conflict-update-set> where <conflict-where>,可填寫holo運算式/函數。

例如: 該配置項值為excluded.col1>old.col1時,表示當主鍵衝突時,只有col1的新值大於舊值,才會觸發更新。

說明
  • sink.write-mode配置為 INSERT 時支援該參數。

  • 該配置項與sink.insert.check-and-put*衝突,當同時配置時會發生異常報錯

維表專屬

參數

說明

資料類型

是否必填

預設值

備忘

lookup.read.batch-size

點查Hologres維表時,攢批處理的最大條數。

Integer

256

無。

lookup.read.timeout-ms

維表點查的逾時時間。

Long

預設值為0,表示不會逾時

無。

lookup.read.column-table.enabled

是否使用列存表做維表

Boolean

false

使用列存表做維表時效能較差,推薦使用行存表或者行列混存表。 此參數開啟,且使用列存表時,會列印warn日誌提示。

lookup.insert-if-not-exists

是否插入不存在的資料

Boolean

false

如果點查發現維表中不存在當前資料,則插入當前資料。

cache

緩衝策略。

String

None

Hologres僅支援None和LRU兩種緩衝策略。

cacheSize

緩衝大小。

Integer

10000

選擇LRU緩衝策略後,可以設定緩衝大小。單位為條。

cacheTTLMs

緩衝更新時間間隔。

Long

見備忘列。

單位為毫秒。cacheTTLMs預設值和cache的配置有關:

  • 如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。

  • 如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。

cacheEmpty

是否緩衝join結果為空白的資料。

Boolean

true

  • true(預設值):緩衝join結果為空白的資料。

  • false:不緩衝join結果為空白的資料。

    但當join語句中AND前麵條件符合而後麵條件不符合時,依然會緩衝join結果為空白的資料。程式碼範例如下。

    LEFT JOIN latest_emergency FOR SYSTEM_TIME AS OF PROCTIME() AS t2
     ON t1.alarm_id = t2.alarm_id -- 如果發現是動態警示,則匹配時加入動態警示id,否則無需考慮動態警示id欄位。
     AND CASE
     WHEN alarm_type = 2 THEN t1.dynamic_id = t2.dynamic_alarm_id
     ELSE true
     END
重要

請根據實際業務情境決定是否啟用此開關。若您希望在作業運行時關聯新插入維表中的記錄,請關閉此選項,或是將cacheTTLMs設定得足夠短,以免空值記錄被緩衝,導致後續維表關聯失敗。

async

是否非同步返回資料。

Boolean

false

  • true:表示非同步返回資料。

  • false(預設值):表示不進行非同步返回資料。

說明

非同步返回資料是無序的。

lookup.filter-push-down.enabled

是否將維表過濾條件下推到Hologres服務端。

Boolean

false

目前僅支援列與常量之間的比較運算,僅對等值運算子及大小比較子(如 <, <=, >, >=)執行下推操作。

說明

僅Realtime Compute引擎VVR 11.4及以上版本支援配置此參數。