All Products
Search
Document Center

Realtime Compute for Apache Flink:Hologres connector options (VVR 11+)

Last Updated:Mar 26, 2026

This topic describes the WITH clause parameters for the Hologres connector in Ververica Runtime (VVR) 11 and later, including general connection parameters and parameters specific to source tables, sink tables, and dimension tables.

Parameter changes from VVR 8

VVR 11 removes or renames several parameters from VVR 8 and earlier to improve system architecture and maintenance. The following tables list the changes.

Removed parameters

Parameter Description Replacement
jdbcRetrySleepInitMs Fixed wait time per retry. Use retry-sleep-step-ms to set an incremental wait time instead.
jdbcMetaAutoRefreshFactor Triggered cache refresh when remaining TTL fell below a threshold. No longer needed. Use meta-cache-ttl-ms to set the cache TTL directly.
type-mapping.timestamp-converting.legacy Controlled time type conversion between Flink and Hologres. No longer needed. Added for backward compatibility with TIMESTAMP_LTZ; now removed.
property-version Connector parameter version. Removed. Default values for common parameters have been optimized.
field_delimiter Row separator used during data export. Removed. The data reading method has been optimized.
jdbcBinlogSlotName Slot name for the binary log source table in JDBC mode. Removed. The data reading method has been optimized.
binlogMaxRetryTimes Retry count when a binary log read error occurs. Use retry-count.
cdcMode Enabled CDC mode for reading binary log data. Removed. CDC mode is now the default. For non-CDC mode, use source.binlog.change-log-mode.
upsertSource Set the source table to upsert changelog mode. Use source.binlog.change-log-mode.
bulkload Enabled bulk load for writes. Use sink.write-mode.
useRpcMode Enabled RPC mode for the Hologres connector. For JDBC connections, configure sink.deduplication.enabled to control deduplication behavior.
partitionrouter Enabled writes to partitioned tables. Removed. Writing to partitioned tables is supported by default.
ignoredelete Ignored retraction messages. Use sink.delete-strategy to configure the retraction message handling policy.
sdkMode SDK mode. Replaced by source.binlog.read-mode (for sources) and sink.write-mode (for sinks).
jdbcReadBatchQueueSize Buffer queue size for dimension table point queries. If point query performance is poor, configure connection.pool.size.
jdbcReadRetryCount Retry count when a dimension table point query times out. Consolidated into retry-count.
jdbcScanTransactionSessionTimeoutSeconds Transaction timeout for scan operations. Merged into source.scan.timeout-seconds.

Renamed parameters

VVR 8 parameter VVR 11 parameter Description
jdbcRetryCount retry-count Retry count for writes and queries when a connection fails.
jdbcRetrySleepStepMs retry-sleep-step-ms Incremental wait time per retry.
jdbcConnectionMaxIdleMs connection.max-idle-ms Idle timeout for a JDBC connection.
jdbcMetaCacheTTL meta-cache-ttl-ms TTL for cached TableSchema information.
binlog source.binlog Specifies whether to consume binary log data.
sdkMode source.binlog.read-mode Read mode for binary log consumption.
binlogRetryIntervalMs source.binlog.request-timeout-ms Request timeout when reading binary log data.
binlogBatchReadSize source.binlog.batch-size Batch size for binary log reads.
binlogStartupMode source.binlog.startup-mode Startup mode for binary log consumption.
jdbcScanFetchSize source.scan.fetch-size Batch size for scan operations.
jdbcScanTimeoutSeconds source.scan.timeout-seconds Timeout for scan operations.
enable_filter_push_down source.scan.filter-push-down.enabled Enables filter pushdown during the full data read phase.
partition-binlog.mode source.binlog.partition-binlog-mode Binary log consumption mode for partitioned tables.
partition-binlog-lateness-timeout-minutes source.binlog.partition-binlog-lateness-timeout-minutes Maximum lateness timeout when consuming a partitioned table in DYNAMIC mode.
partition-values-to-read source.binlog.partition-values-to-read Partitions to consume in STATIC mode. Separate multiple values with commas (,).
sdkMode sink.write-mode Write mode.
mutatetype sink.on-conflict-action Primary key conflict handling policy.
createparttable sink.create-missing-partition Automatically creates a missing partition when writing to a partitioned table.
jdbcWriteBatchSize sink.insert.batch-size Maximum number of records to buffer before writing to Hologres.
jdbcWriteBatchByteSize sink.insert.batch-byte-size Maximum size in bytes to buffer before writing to Hologres.
jdbcWriteFlushInterval sink.insert.flush-interval-ms Maximum wait time before flushing buffered data to Hologres.
ignoreNullWhenUpdate sink.ignore-null-when-update.enabled Ignores null values in updated data when sink.on-conflict-action is INSERT_OR_UPDATE.
jdbcEnableDefaultForNotNullColumn sink.default-for-not-null-column.enabled Fills in a default value when null is written to a NOT NULL column with no default.
remove-u0000-in-text.enabled sink.remove-u0000-in-text.enabled Removes the \u0000 character from string fields during writes.
partial-insert.enabled sink.partial-insert.enabled Inserts only the fields declared in the INSERT statement.
deduplication.enabled sink.deduplication.enabled Removes duplicates within a batch before writing.
check-and-put.column sink.insert.check-and-put.column Enables conditional updates and specifies the field to check.
check-and-put.operator sink.insert.check-and-put.operator Comparison operator for conditional updates.
check-and-put.null-as sink.insert.check-and-put.null-as Treats a null value in the old data as the specified value during a conditional update.
aggressive.enabled sink.aggressive-flush.enabled Enables aggressive flush mode.
connectionSize connection.pool.size Size of the JDBC connection pool for a single Flink table task.
connectionPoolName connection.pool.name Connection pool name. Tables sharing the same name in the same TaskManager share the pool.
jdbcReadBatchSize lookup.read.batch-size Maximum number of records per batch for dimension table point queries.
jdbcReadTimeoutMs lookup.read.timeout-ms Timeout for dimension table point queries.

WITH parameters

General

The following parameters apply to source tables, sink tables, and dimension tables.

Connection

Parameter Data type Required Default Description
connector String Yes Set to hologres.
dbname String Yes The database name. To connect to a specific compute group, append the compute group name as a suffix: 'dbname' = 'db_test@read_warehouse'.
tablename String Yes The table name. For non-public schemas, use the schema.tableName format.
username String Yes A custom account username in the BASIC$<user_name> format, or the AccessKey ID of an Alibaba Cloud account or RAM user. The account must have permissions to access the Hologres database. For more information, see Permission model and Manage users. To get an AccessKey ID, see Obtain an AccessKey pair.
Important

Store AccessKey values in project variables rather than hardcoding them. See Project variables.

password String Yes The custom account password, or the AccessKey secret of an Alibaba Cloud account or RAM user.
endpoint String Yes The Hologres service endpoint. See Endpoints.
connection.pool.size Integer No 5 The size of the JDBC connection pool for a single Flink table task. Increase this value if job performance is poor. Pool size is proportional to data throughput. Applies to dimension tables and sink tables only.
connection.pool.name String No 'default' The connection pool name. Tables with the same connection pool name, endpoint, database, and other connection information in the same TaskManager can share the pool. When multiple tables share a pool, the largest connection.pool.size value among them applies. For jobs with many tables, assign different pool names to avoid connection contention.
connection.fixed.enabled Boolean No None Specifies whether to use lightweight connection mode. Starting from Hologres V2.1, real-time writes support lightweight connections that bypass the per-instance connection limit. The default value depends on the Hologres instance version. For dimension and sink tables, the connector automatically selects lightweight connection mode if the Hologres version is later than 3.0.28.
Note

Lightweight connections do not support queries on JSONB and RoaringBitmap types for dimension tables.

connection.max-idle-ms Long No 60000 The idle timeout for a JDBC connection, in milliseconds. Connections idle longer than this value are released and re-created on next use.

SSL

Parameter Data type Required Default Description
connection.ssl.mode String: disable | require | verify-ca | verify-full No disable The SSL encryption mode for data in transit. disable: No encryption. require: Encrypts the data link only. verify-ca: Encrypts the link and verifies the Hologres server using a CA certificate. verify-full: Same as verify-ca, plus checks that the CN or DNS in the certificate matches the configured Hologres endpoint. verify-ca and verify-full require Hologres V2.1 or later. See Encryption in transit.
connection.ssl.root-cert.location String No The path to the CA certificate. Required when connection.ssl.mode is verify-ca or verify-full. Upload the certificate using the File Management feature in the Realtime Compute console; the file is stored in /flink/usrlib. For example, set this to /flink/usrlib/certificate.crt. To download a CA certificate, see Download a CA certificate.

Retry

Parameter Data type Required Default Description
retry-count Integer No 10 The retry count for writes and queries when a connection fails.
retry-sleep-step-ms Long No 5000 The incremental wait time per retry, in milliseconds. For example, with the default value of 5000 ms, the first retry waits 5 seconds, the second waits 10 seconds, and so on.
meta-cache-ttl-ms Long No 600000 The TTL for cached TableSchema information, in milliseconds.

Serverless computing

Parameter Data type Required Default Description
serverless-computing.enabled Boolean No false Specifies whether to use Hologres serverless resources for reads and writes instead of instance resources. Applies only to batch reads and batch imports — not to binary log consumption, dimension table point queries, or real-time writes. Enable this for large-scale full data imports or exports to avoid affecting other workloads on the instance. See Serverless Computing overview.
Note

Batch reading applies when source.binlog is false, or when source.binlog.startup-mode is INITIAL. Batch import applies when sink.write-mode is COPY_BULK_LOAD or COPY_BULK_LOAD_ON_CONFLICT.

Source table parameters

Parameter Data type Required Default Description
source.binlog Boolean No true Specifies whether to consume binary log data. true: Consumes binary logs. false: Performs batch reading only; the job stops when reading is complete.
source.binlog.read-mode Enum: AUTO | HOLOHUB | JDBC No AUTO The binary log read mode. AUTO: Automatically selects the optimal mode based on the Hologres instance version — JDBC mode for V2.1.0 and later (with lightweight connections enabled by default for V2.1.27 and later), HoloHub mode for V2.0 and earlier. HOLOHUB: Uses HoloHub mode. JDBC: Uses JDBC mode.
source.binlog.change-log-mode Enum: ALL | UPSERT | ALL_AS_APPEND_ONLY No UPSERT The changelog types produced by the CDC source table. ALL: All changelog types (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER). UPSERT: Upsert types only (INSERT, DELETE, UPDATE_AFTER). ALL_AS_APPEND_ONLY: Treats all changelog types as INSERT.
Note

If the downstream pipeline includes retraction operators (such as ROW_NUMBER OVER WINDOW for deduplication), set this to UPSERT so the source reads data in upsert mode.

source.binlog.startup-mode Enum: INITIAL | EARLIEST_OFFSET | TIMESTAMP No INITIAL The binary log consumption startup mode. INITIAL: Reads all existing data first, then starts incremental binary log consumption. EARLIEST_OFFSET: Starts from the earliest binary log. TIMESTAMP: Starts from the time specified by startTime.
Note

If startTime is set or a start time is selected in the startup interface, the connector forces TIMESTAMP mode regardless of this parameter. startTime takes precedence.

source.binlog.batch-size Integer No 512 The number of rows to read from binary logs per batch.
source.binlog.request-timeout-ms Long No 300000 The request timeout for binary log reads, in milliseconds. Timeouts may indicate downstream backpressure.
source.binlog.project-columns.enabled Boolean No None Specifies whether to read only the fields declared in the CREATE TEMPORARY TABLE statement, skipping undeclared fields. Useful for large tables where only a subset of columns is needed — reduces data transfer and improves read performance. Supported in VVR 11.3 and later and Hologres V3.2 and later. You do not usually need to configure this parameter. The connector enables it by default if the version requirements are met.
source.binlog.compression.enabled Boolean No None Specifies whether to enable LZ4 compression during binary log transit. Improves read performance and reduces bandwidth. Supported in VVR 11.3 and later and Hologres V3.2 and later. You do not usually need to configure this parameter. The connector enables it by default if the version requirements are met.
source.binlog.partition-binlog-mode Enum: DISABLE | DYNAMIC | STATIC No DISABLE The binary log consumption mode for partitioned tables. DISABLE: Source table is not a partitioned table; throws an exception if the Hologres table is partitioned. DYNAMIC: Continuously consumes the latest partition in chronological order. Requires dynamic partitioning to be enabled. STATIC: Consumes a fixed set of partitions simultaneously; partitions cannot be added or removed during consumption.
source.binlog.partition-binlog-lateness-timeout-minutes Boolean No 60 The maximum lateness timeout in DYNAMIC mode, in minutes. After a new time unit starts, the previous partition remains open for this duration to capture late-arriving data. For example, for a daily-partitioned table, partition 20240920 closes at 2024-09-21 01:00:00 (not 00:00:00) when this is set to 60. Must not exceed the partition's time unit (for daily partitions, maximum is 1440 minutes).
source.binlog.partition-values-to-read String No Partitions to consume in STATIC mode, as a comma-separated list of partition values (not full partition names). Regular expressions are not supported. If not set, all partitions of the parent table are consumed.
startTime String No The start offset for binary log consumption, in yyyy-MM-dd hh:mm:ss format. If not set and the job has no saved state, consumption starts from the earliest binary log.
source.scan.fetch-size Integer No 512 The batch size for batch reads.
source.scan.timeout-seconds Integer No 60 The timeout for batch reads, in seconds.
source.scan.filter-push-down.enabled Boolean No false Specifies whether to push down supported filter conditions to Hologres during batch reads. Effective when source.binlog is false, or when source.binlog is true and source.binlog.startup-mode is INITIAL (applies to the full data read phase only). Do not enable this together with source.binlog.filter-push-down.enabled.
source.binlog.filter-push-down.enabled Boolean No false Specifies whether to push down supported filter conditions to Hologres during binary log consumption. When source.binlog is true, filter pushdown applies to both the full and incremental phases (including when source.binlog.startup-mode is INITIAL). Supported in VVR 11.3 and later and Hologres V4.0 and later. Do not enable this together with source.scan.filter-push-down.enabled.
scan.prefer.physical-column.over.metadata-column Boolean No false Specifies whether to prioritize reading from a physical column when it shares a name with a metadata column. Supported in VVR 11.5 and later. Earlier versions always read from the metadata column.

Sink table parameters

Parameter Data type Required Default Description
sink.write-mode Enum: INSERT | COPY_STREAM | COPY_BULK_LOAD | COPY_BULK_LOAD_ON_CONFLICT No INSERT The write mode. INSERT: Uses JDBC with INSERT statements. COPY_STREAM: High-performance streaming writes using fixed copy — suitable for high-throughput, low-latency scenarios. Does not support deletes, writes to a parent partitioned table, or sink.ignore-null-when-update.enabled. COPY_BULK_LOAD: Batch writes using the COPY protocol, suitable for tables without a primary key. Primary key conflicts throw an exception. Uses fewer Hologres resources than COPY_STREAM. COPY_BULK_LOAD_ON_CONFLICT: Batch writes with primary key conflict handling. Flink reshuffles data by the Hologres DistributionKey, enabling concurrent shard-level writes. For best performance, set job concurrency to match the shard count of the sink table. Requires VVR 11.3 and later and Hologres V3.1 and later. In COPY_BULK_LOAD and COPY_BULK_LOAD_ON_CONFLICT modes, data becomes visible only after a successful checkpoint.
sink.on-conflict-action Enum: INSERT_OR_IGNORE | INSERT_OR_REPLACE | INSERT_OR_UPDATE No INSERT_OR_UPDATE The primary key conflict handling policy. INSERT_OR_IGNORE: Keeps the first record and ignores all subsequent records with the same primary key. INSERT_OR_REPLACE: Replaces the existing row with the new data. INSERT_OR_UPDATE: Updates the specified columns of the existing row. For example, in a table with columns a (primary key), b, c, and d, writing only a and b updates column b while leaving c and d unchanged.
sink.create-missing-partition Boolean No false Specifies whether to automatically create a partition when writing to a partitioned table if the target partition does not exist. For DATE-type partition keys with dynamic partitioning enabled, the auto-created partition name format matches the dynamic partition naming convention. Verify that partition values are clean before enabling — dirty data can create incorrect partitions and trigger a failover. Writing to a parent partitioned table requires sink.write-mode to be INSERT.
sink.delete-strategy String: IGNORE_DELETE | NON_PK_FIELD_TO_NULL | DELETE_ROW_ON_PK | CHANGELOG_STANDARD No CHANGELOG_STANDARD The retraction message handling policy. IGNORE_DELETE: Ignores Update Before and Delete messages. Use for insert-only or update-only scenarios. NON_PK_FIELD_TO_NULL: Ignores Update Before messages; processes Delete messages by setting non-primary key fields to NULL. Use for partial updates where you want to null out non-key columns without affecting other columns. DELETE_ROW_ON_PK: Ignores Update Before messages; processes Delete messages by deleting the entire row by primary key. Use for partial updates where you want to delete the entire row. CHANGELOG_STANDARD: Follows standard Flink SQL changelog semantics — does not ignore deletes, and performs updates by deleting then inserting. Use when partial updates are not involved.
Note

Enabling NON_PK_FIELD_TO_NULL may produce records where only the primary key is non-null.

sink.ignore-null-when-update.enabled Boolean No false When sink.on-conflict-action is INSERT_OR_UPDATE, specifies whether to ignore null values in incoming data during an update, leaving the existing column values unchanged. Applies only when sink.write-mode is INSERT.
sink.ignore-null-when-update-by-expr.enabled Boolean No false When sink.on-conflict-action is INSERT_OR_UPDATE, specifies whether to use an expression-based approach to ignore null values in updates. Provides better performance than sink.ignore-null-when-update.enabled. When true, null values are ignored regardless of whether sink.ignore-null-when-update.enabled is set. Applies only when sink.write-mode is INSERT. Requires Hologres V4.0 and later.
sink.default-for-not-null-column.enabled Boolean No true Specifies whether to fill in a type-specific default when null is written to a NOT NULL column that has no default value. When true, the defaults are: empty string ("") for String, 0 for Number, and 1970-01-01 00:00:00 for Date, timestamp, and timestamptz. When false, throws an exception instead. Applies only when sink.write-mode is INSERT and sink.on-conflict-action is not INSERT_OR_UPDATE.
sink.remove-u0000-in-text.enabled Boolean No true Specifies whether to remove \u0000 characters from string fields during writes. When false, the connector writes data as-is; encountering \u0000 throws ERROR: invalid byte sequence for encoding "UTF8": 0x00.
sink.partial-insert.enabled Boolean No false Specifies whether to push only the fields declared in the INSERT statement to the connector. When false, all fields defined in the sink DDL are written; fields not in the INSERT statement are set to null. Applies only when sink.on-conflict-action is INSERT_OR_UPDATE.
sink.deduplication.enabled Boolean No true Specifies whether to remove duplicate records within a batch before writing. When true, if a batch contains multiple records with the same primary key, only the last one is written. For example, if the table already contains (1, 'a') and the batch has DELETE (1, 'a') then INSERT (1, 'b'), the result is a direct update to (1, 'b'). When false, if a new record conflicts with one in the current batch, the batch is flushed first, then the new record is written. In extreme cases (all records share the same primary key), writes degrade to single-row writes. Applies only when sink.write-mode is INSERT.
sink.aggressive-flush.enabled Boolean No false Specifies whether to force a flush when a connection is idle, even if the batch has not reached its configured size. Reduces write latency during low-traffic periods. Applies only when sink.write-mode is INSERT or COPY_STREAM.
sink.insert.check-and-put.column String No Enables conditional updates and specifies the column to check. Set to an existing column name in the Hologres table. Requirements: sink.write-mode must be INSERT; the sink table must have a primary key; sink.on-conflict-action must be INSERT_OR_UPDATE or INSERT_OR_REPLACE; the sink table must be row-oriented or hybrid row-column (a reverse lookup is required). If many records share the same primary key, check-and-put operations degrade to single-row writes.
sink.insert.check-and-put.operator String: GREATER | GREATER_OR_EQUAL | EQUAL | NOT_EQUAL | LESS | LESS_OR_EQUAL | IS_NULL | IS_NOT_NULL No GREATER The comparison operator for conditional updates. Compares the check column of the incoming record against the existing value in the table. The update proceeds if the condition is met.
sink.insert.check-and-put.null-as String No During a conditional update, treats a null value in the existing data as the value specified here. In PostgreSQL, any comparison with NULL returns FALSE, so set this parameter when the existing data may be NULL. Equivalent to the COALESCE function in SQL.
sink.insert.batch-size Integer No 512 In INSERT mode, the maximum number of records to buffer before writing. A write is triggered when any of sink.insert.batch-size, sink.insert.batch-byte-size, or sink.insert.flush-interval-ms is reached (logical OR).
sink.insert.batch-byte-size Long No 2097152 (2 MB) In INSERT mode, the maximum buffer size in bytes before writing. Evaluated together with sink.insert.batch-size and sink.insert.flush-interval-ms using a logical OR.
sink.insert.flush-interval-ms Long No 10000 In INSERT mode, the maximum wait time in milliseconds before flushing buffered data. Evaluated together with sink.insert.batch-size and sink.insert.batch-byte-size using a logical OR.
sink.copy.format String: binary | text | binaryrow No binary for COPY_STREAM; text for COPY_BULK_LOAD and COPY_BULK_LOAD_ON_CONFLICT The transmission format in COPY mode. COPY_STREAM supports binary, text, and binaryrow (requires Hologres engine V4.1.0 and later). COPY_BULK_LOAD and COPY_BULK_LOAD_ON_CONFLICT support text only. Applies only when sink.write-mode is COPY_STREAM, COPY_BULK_LOAD, or COPY_BULK_LOAD_ON_CONFLICT.
sink.insert.conflict-update-set String No The Hologres expression applied when a primary key conflict occurs. Equivalent to INSERT INTO tbl VALUES(...) ON CONFLICT(pk) DO UPDATE SET <conflict-update-set>. For example, col1=old.col1+excluded.col1,col2=excluded.col2 sets col1 to the sum of the old and new values, and col2 to the new value. If not set, all incoming fields are updated to their new values. For stateful expressions (where the result depends on the old value, such as col=old.col+excluded.col), use a version column and set sink.insert.conflict-where to excluded.seq>old.seq to ensure correctness after failover recovery. Applies only when sink.write-mode is INSERT.
sink.insert.conflict-where String No The Hologres filter condition that must be met for an update to proceed on a primary key conflict. Equivalent to INSERT INTO tbl VALUES(...) ON CONFLICT(pk) DO UPDATE SET <conflict-update-set> WHERE <conflict-where>. For example, excluded.col1>old.col1 updates only when the incoming col1 is greater than the existing value. Cannot be set together with sink.insert.check-and-put.* parameters. Applies only when sink.write-mode is INSERT.

Dimension table parameters

Parameter Data type Required Default Description
lookup.read.batch-size Integer No 256 The maximum number of records per batch for point queries on the dimension table.
lookup.read.timeout-ms Long No 0 (no timeout) The timeout for dimension table point queries, in milliseconds.
lookup.read.column-table.enabled Boolean No false Specifies whether to allow using a column-oriented table as a dimension table. Column-oriented tables perform poorly for point queries; use a row-oriented or hybrid row-column table instead. When enabled with a column-oriented table, the connector logs a warning.
lookup.insert-if-not-exists Boolean No false Specifies whether to insert the current record into the dimension table if a point query finds no matching row.
cache String: None | LRU No None The cache policy. Supported values: None, LRU.
cacheSize Integer No 10000 The LRU cache size in rows. Applies only when cache is LRU.
cacheTTLMs Long No See Remarks. The cache TTL in milliseconds. When cache is LRU, this sets the cache entry timeout (no expiry by default). When cache is None, this parameter has no effect. If joining with rows inserted into the dimension table during job runtime, set a short TTL or set cacheEmpty to false to prevent stale null results from blocking subsequent joins.
cacheEmpty Boolean No true Specifies whether to cache empty join results. When true, a null result from a point query is cached. When false, null results are not cached — however, if the condition before AND in the join is met but the condition after AND is not, the empty result is still cached.
Important

If you need to join with rows inserted into the dimension table during job runtime, set this to false or configure cacheTTLMs to a short interval.

async Boolean No false Specifies whether to return dimension table query results asynchronously. Asynchronous results are unordered.
lookup.filter-push-down.enabled Boolean No false Specifies whether to push dimension table filter conditions to the Hologres server. Currently applies only to equality and comparison operators (<, <=, >, >=) between columns and constants. Supported in VVR 11.4 and later.