This topic describes the WITH clause parameters for the Hologres connector in Ververica Runtime (VVR) 11 and later, including general connection parameters and parameters specific to source tables, sink tables, and dimension tables.
Parameter changes from VVR 8
VVR 11 removes or renames several parameters from VVR 8 and earlier to improve system architecture and maintenance. The following tables list the changes.
WITH parameters
General
The following parameters apply to source tables, sink tables, and dimension tables.
Connection
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | — | Set to hologres. |
dbname |
String | Yes | — | The database name. To connect to a specific compute group, append the compute group name as a suffix: 'dbname' = 'db_test@read_warehouse'. |
tablename |
String | Yes | — | The table name. For non-public schemas, use the schema.tableName format. |
username |
String | Yes | — | A custom account username in the BASIC$<user_name> format, or the AccessKey ID of an Alibaba Cloud account or RAM user. The account must have permissions to access the Hologres database. For more information, see Permission model and Manage users. To get an AccessKey ID, see Obtain an AccessKey pair. Important
Store AccessKey values in project variables rather than hardcoding them. See Project variables. |
password |
String | Yes | — | The custom account password, or the AccessKey secret of an Alibaba Cloud account or RAM user. |
endpoint |
String | Yes | — | The Hologres service endpoint. See Endpoints. |
connection.pool.size |
Integer | No | 5 |
The size of the JDBC connection pool for a single Flink table task. Increase this value if job performance is poor. Pool size is proportional to data throughput. Applies to dimension tables and sink tables only. |
connection.pool.name |
String | No | 'default' |
The connection pool name. Tables with the same connection pool name, endpoint, database, and other connection information in the same TaskManager can share the pool. When multiple tables share a pool, the largest connection.pool.size value among them applies. For jobs with many tables, assign different pool names to avoid connection contention. |
connection.fixed.enabled |
Boolean | No | None |
Specifies whether to use lightweight connection mode. Starting from Hologres V2.1, real-time writes support lightweight connections that bypass the per-instance connection limit. The default value depends on the Hologres instance version. For dimension and sink tables, the connector automatically selects lightweight connection mode if the Hologres version is later than 3.0.28. Note
Lightweight connections do not support queries on |
connection.max-idle-ms |
Long | No | 60000 |
The idle timeout for a JDBC connection, in milliseconds. Connections idle longer than this value are released and re-created on next use. |
SSL
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
connection.ssl.mode |
String: disable | require | verify-ca | verify-full |
No | disable |
The SSL encryption mode for data in transit. disable: No encryption. require: Encrypts the data link only. verify-ca: Encrypts the link and verifies the Hologres server using a CA certificate. verify-full: Same as verify-ca, plus checks that the CN or DNS in the certificate matches the configured Hologres endpoint. verify-ca and verify-full require Hologres V2.1 or later. See Encryption in transit. |
connection.ssl.root-cert.location |
String | No | — | The path to the CA certificate. Required when connection.ssl.mode is verify-ca or verify-full. Upload the certificate using the File Management feature in the Realtime Compute console; the file is stored in /flink/usrlib. For example, set this to /flink/usrlib/certificate.crt. To download a CA certificate, see Download a CA certificate. |
Retry
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
retry-count |
Integer | No | 10 |
The retry count for writes and queries when a connection fails. |
retry-sleep-step-ms |
Long | No | 5000 |
The incremental wait time per retry, in milliseconds. For example, with the default value of 5000 ms, the first retry waits 5 seconds, the second waits 10 seconds, and so on. |
meta-cache-ttl-ms |
Long | No | 600000 |
The TTL for cached TableSchema information, in milliseconds. |
Serverless computing
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
serverless-computing.enabled |
Boolean | No | false |
Specifies whether to use Hologres serverless resources for reads and writes instead of instance resources. Applies only to batch reads and batch imports — not to binary log consumption, dimension table point queries, or real-time writes. Enable this for large-scale full data imports or exports to avoid affecting other workloads on the instance. See Serverless Computing overview. Note
Batch reading applies when |
Source table parameters
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
source.binlog |
Boolean | No | true |
Specifies whether to consume binary log data. true: Consumes binary logs. false: Performs batch reading only; the job stops when reading is complete. |
source.binlog.read-mode |
Enum: AUTO | HOLOHUB | JDBC |
No | AUTO |
The binary log read mode. AUTO: Automatically selects the optimal mode based on the Hologres instance version — JDBC mode for V2.1.0 and later (with lightweight connections enabled by default for V2.1.27 and later), HoloHub mode for V2.0 and earlier. HOLOHUB: Uses HoloHub mode. JDBC: Uses JDBC mode. |
source.binlog.change-log-mode |
Enum: ALL | UPSERT | ALL_AS_APPEND_ONLY |
No | UPSERT |
The changelog types produced by the CDC source table. ALL: All changelog types (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER). UPSERT: Upsert types only (INSERT, DELETE, UPDATE_AFTER). ALL_AS_APPEND_ONLY: Treats all changelog types as INSERT. Note
If the downstream pipeline includes retraction operators (such as |
source.binlog.startup-mode |
Enum: INITIAL | EARLIEST_OFFSET | TIMESTAMP |
No | INITIAL |
The binary log consumption startup mode. INITIAL: Reads all existing data first, then starts incremental binary log consumption. EARLIEST_OFFSET: Starts from the earliest binary log. TIMESTAMP: Starts from the time specified by startTime. Note
If |
source.binlog.batch-size |
Integer | No | 512 |
The number of rows to read from binary logs per batch. |
source.binlog.request-timeout-ms |
Long | No | 300000 |
The request timeout for binary log reads, in milliseconds. Timeouts may indicate downstream backpressure. |
source.binlog.project-columns.enabled |
Boolean | No | None |
Specifies whether to read only the fields declared in the CREATE TEMPORARY TABLE statement, skipping undeclared fields. Useful for large tables where only a subset of columns is needed — reduces data transfer and improves read performance. Supported in VVR 11.3 and later and Hologres V3.2 and later. You do not usually need to configure this parameter. The connector enables it by default if the version requirements are met. |
source.binlog.compression.enabled |
Boolean | No | None |
Specifies whether to enable LZ4 compression during binary log transit. Improves read performance and reduces bandwidth. Supported in VVR 11.3 and later and Hologres V3.2 and later. You do not usually need to configure this parameter. The connector enables it by default if the version requirements are met. |
source.binlog.partition-binlog-mode |
Enum: DISABLE | DYNAMIC | STATIC |
No | DISABLE |
The binary log consumption mode for partitioned tables. DISABLE: Source table is not a partitioned table; throws an exception if the Hologres table is partitioned. DYNAMIC: Continuously consumes the latest partition in chronological order. Requires dynamic partitioning to be enabled. STATIC: Consumes a fixed set of partitions simultaneously; partitions cannot be added or removed during consumption. |
source.binlog.partition-binlog-lateness-timeout-minutes |
Boolean | No | 60 |
The maximum lateness timeout in DYNAMIC mode, in minutes. After a new time unit starts, the previous partition remains open for this duration to capture late-arriving data. For example, for a daily-partitioned table, partition 20240920 closes at 2024-09-21 01:00:00 (not 00:00:00) when this is set to 60. Must not exceed the partition's time unit (for daily partitions, maximum is 1440 minutes). |
source.binlog.partition-values-to-read |
String | No | — | Partitions to consume in STATIC mode, as a comma-separated list of partition values (not full partition names). Regular expressions are not supported. If not set, all partitions of the parent table are consumed. |
startTime |
String | No | — | The start offset for binary log consumption, in yyyy-MM-dd hh:mm:ss format. If not set and the job has no saved state, consumption starts from the earliest binary log. |
source.scan.fetch-size |
Integer | No | 512 |
The batch size for batch reads. |
source.scan.timeout-seconds |
Integer | No | 60 |
The timeout for batch reads, in seconds. |
source.scan.filter-push-down.enabled |
Boolean | No | false |
Specifies whether to push down supported filter conditions to Hologres during batch reads. Effective when source.binlog is false, or when source.binlog is true and source.binlog.startup-mode is INITIAL (applies to the full data read phase only). Do not enable this together with source.binlog.filter-push-down.enabled. |
source.binlog.filter-push-down.enabled |
Boolean | No | false |
Specifies whether to push down supported filter conditions to Hologres during binary log consumption. When source.binlog is true, filter pushdown applies to both the full and incremental phases (including when source.binlog.startup-mode is INITIAL). Supported in VVR 11.3 and later and Hologres V4.0 and later. Do not enable this together with source.scan.filter-push-down.enabled. |
scan.prefer.physical-column.over.metadata-column |
Boolean | No | false |
Specifies whether to prioritize reading from a physical column when it shares a name with a metadata column. Supported in VVR 11.5 and later. Earlier versions always read from the metadata column. |
Sink table parameters
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
sink.write-mode |
Enum: INSERT | COPY_STREAM | COPY_BULK_LOAD | COPY_BULK_LOAD_ON_CONFLICT |
No | INSERT |
The write mode. INSERT: Uses JDBC with INSERT statements. COPY_STREAM: High-performance streaming writes using fixed copy — suitable for high-throughput, low-latency scenarios. Does not support deletes, writes to a parent partitioned table, or sink.ignore-null-when-update.enabled. COPY_BULK_LOAD: Batch writes using the COPY protocol, suitable for tables without a primary key. Primary key conflicts throw an exception. Uses fewer Hologres resources than COPY_STREAM. COPY_BULK_LOAD_ON_CONFLICT: Batch writes with primary key conflict handling. Flink reshuffles data by the Hologres DistributionKey, enabling concurrent shard-level writes. For best performance, set job concurrency to match the shard count of the sink table. Requires VVR 11.3 and later and Hologres V3.1 and later. In COPY_BULK_LOAD and COPY_BULK_LOAD_ON_CONFLICT modes, data becomes visible only after a successful checkpoint. |
sink.on-conflict-action |
Enum: INSERT_OR_IGNORE | INSERT_OR_REPLACE | INSERT_OR_UPDATE |
No | INSERT_OR_UPDATE |
The primary key conflict handling policy. INSERT_OR_IGNORE: Keeps the first record and ignores all subsequent records with the same primary key. INSERT_OR_REPLACE: Replaces the existing row with the new data. INSERT_OR_UPDATE: Updates the specified columns of the existing row. For example, in a table with columns a (primary key), b, c, and d, writing only a and b updates column b while leaving c and d unchanged. |
sink.create-missing-partition |
Boolean | No | false |
Specifies whether to automatically create a partition when writing to a partitioned table if the target partition does not exist. For DATE-type partition keys with dynamic partitioning enabled, the auto-created partition name format matches the dynamic partition naming convention. Verify that partition values are clean before enabling — dirty data can create incorrect partitions and trigger a failover. Writing to a parent partitioned table requires sink.write-mode to be INSERT. |
sink.delete-strategy |
String: IGNORE_DELETE | NON_PK_FIELD_TO_NULL | DELETE_ROW_ON_PK | CHANGELOG_STANDARD |
No | CHANGELOG_STANDARD |
The retraction message handling policy. IGNORE_DELETE: Ignores Update Before and Delete messages. Use for insert-only or update-only scenarios. NON_PK_FIELD_TO_NULL: Ignores Update Before messages; processes Delete messages by setting non-primary key fields to NULL. Use for partial updates where you want to null out non-key columns without affecting other columns. DELETE_ROW_ON_PK: Ignores Update Before messages; processes Delete messages by deleting the entire row by primary key. Use for partial updates where you want to delete the entire row. CHANGELOG_STANDARD: Follows standard Flink SQL changelog semantics — does not ignore deletes, and performs updates by deleting then inserting. Use when partial updates are not involved. Note
Enabling |
sink.ignore-null-when-update.enabled |
Boolean | No | false |
When sink.on-conflict-action is INSERT_OR_UPDATE, specifies whether to ignore null values in incoming data during an update, leaving the existing column values unchanged. Applies only when sink.write-mode is INSERT. |
sink.ignore-null-when-update-by-expr.enabled |
Boolean | No | false |
When sink.on-conflict-action is INSERT_OR_UPDATE, specifies whether to use an expression-based approach to ignore null values in updates. Provides better performance than sink.ignore-null-when-update.enabled. When true, null values are ignored regardless of whether sink.ignore-null-when-update.enabled is set. Applies only when sink.write-mode is INSERT. Requires Hologres V4.0 and later. |
sink.default-for-not-null-column.enabled |
Boolean | No | true |
Specifies whether to fill in a type-specific default when null is written to a NOT NULL column that has no default value. When true, the defaults are: empty string ("") for String, 0 for Number, and 1970-01-01 00:00:00 for Date, timestamp, and timestamptz. When false, throws an exception instead. Applies only when sink.write-mode is INSERT and sink.on-conflict-action is not INSERT_OR_UPDATE. |
sink.remove-u0000-in-text.enabled |
Boolean | No | true |
Specifies whether to remove \u0000 characters from string fields during writes. When false, the connector writes data as-is; encountering \u0000 throws ERROR: invalid byte sequence for encoding "UTF8": 0x00. |
sink.partial-insert.enabled |
Boolean | No | false |
Specifies whether to push only the fields declared in the INSERT statement to the connector. When false, all fields defined in the sink DDL are written; fields not in the INSERT statement are set to null. Applies only when sink.on-conflict-action is INSERT_OR_UPDATE. |
sink.deduplication.enabled |
Boolean | No | true |
Specifies whether to remove duplicate records within a batch before writing. When true, if a batch contains multiple records with the same primary key, only the last one is written. For example, if the table already contains (1, 'a') and the batch has DELETE (1, 'a') then INSERT (1, 'b'), the result is a direct update to (1, 'b'). When false, if a new record conflicts with one in the current batch, the batch is flushed first, then the new record is written. In extreme cases (all records share the same primary key), writes degrade to single-row writes. Applies only when sink.write-mode is INSERT. |
sink.aggressive-flush.enabled |
Boolean | No | false |
Specifies whether to force a flush when a connection is idle, even if the batch has not reached its configured size. Reduces write latency during low-traffic periods. Applies only when sink.write-mode is INSERT or COPY_STREAM. |
sink.insert.check-and-put.column |
String | No | — | Enables conditional updates and specifies the column to check. Set to an existing column name in the Hologres table. Requirements: sink.write-mode must be INSERT; the sink table must have a primary key; sink.on-conflict-action must be INSERT_OR_UPDATE or INSERT_OR_REPLACE; the sink table must be row-oriented or hybrid row-column (a reverse lookup is required). If many records share the same primary key, check-and-put operations degrade to single-row writes. |
sink.insert.check-and-put.operator |
String: GREATER | GREATER_OR_EQUAL | EQUAL | NOT_EQUAL | LESS | LESS_OR_EQUAL | IS_NULL | IS_NOT_NULL |
No | GREATER |
The comparison operator for conditional updates. Compares the check column of the incoming record against the existing value in the table. The update proceeds if the condition is met. |
sink.insert.check-and-put.null-as |
String | No | — | During a conditional update, treats a null value in the existing data as the value specified here. In PostgreSQL, any comparison with NULL returns FALSE, so set this parameter when the existing data may be NULL. Equivalent to the COALESCE function in SQL. |
sink.insert.batch-size |
Integer | No | 512 |
In INSERT mode, the maximum number of records to buffer before writing. A write is triggered when any of sink.insert.batch-size, sink.insert.batch-byte-size, or sink.insert.flush-interval-ms is reached (logical OR). |
sink.insert.batch-byte-size |
Long | No | 2097152 (2 MB) |
In INSERT mode, the maximum buffer size in bytes before writing. Evaluated together with sink.insert.batch-size and sink.insert.flush-interval-ms using a logical OR. |
sink.insert.flush-interval-ms |
Long | No | 10000 |
In INSERT mode, the maximum wait time in milliseconds before flushing buffered data. Evaluated together with sink.insert.batch-size and sink.insert.batch-byte-size using a logical OR. |
sink.copy.format |
String: binary | text | binaryrow |
No | binary for COPY_STREAM; text for COPY_BULK_LOAD and COPY_BULK_LOAD_ON_CONFLICT |
The transmission format in COPY mode. COPY_STREAM supports binary, text, and binaryrow (requires Hologres engine V4.1.0 and later). COPY_BULK_LOAD and COPY_BULK_LOAD_ON_CONFLICT support text only. Applies only when sink.write-mode is COPY_STREAM, COPY_BULK_LOAD, or COPY_BULK_LOAD_ON_CONFLICT. |
sink.insert.conflict-update-set |
String | No | — | The Hologres expression applied when a primary key conflict occurs. Equivalent to INSERT INTO tbl VALUES(...) ON CONFLICT(pk) DO UPDATE SET <conflict-update-set>. For example, col1=old.col1+excluded.col1,col2=excluded.col2 sets col1 to the sum of the old and new values, and col2 to the new value. If not set, all incoming fields are updated to their new values. For stateful expressions (where the result depends on the old value, such as col=old.col+excluded.col), use a version column and set sink.insert.conflict-where to excluded.seq>old.seq to ensure correctness after failover recovery. Applies only when sink.write-mode is INSERT. |
sink.insert.conflict-where |
String | No | — | The Hologres filter condition that must be met for an update to proceed on a primary key conflict. Equivalent to INSERT INTO tbl VALUES(...) ON CONFLICT(pk) DO UPDATE SET <conflict-update-set> WHERE <conflict-where>. For example, excluded.col1>old.col1 updates only when the incoming col1 is greater than the existing value. Cannot be set together with sink.insert.check-and-put.* parameters. Applies only when sink.write-mode is INSERT. |
Dimension table parameters
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
lookup.read.batch-size |
Integer | No | 256 |
The maximum number of records per batch for point queries on the dimension table. |
lookup.read.timeout-ms |
Long | No | 0 (no timeout) |
The timeout for dimension table point queries, in milliseconds. |
lookup.read.column-table.enabled |
Boolean | No | false |
Specifies whether to allow using a column-oriented table as a dimension table. Column-oriented tables perform poorly for point queries; use a row-oriented or hybrid row-column table instead. When enabled with a column-oriented table, the connector logs a warning. |
lookup.insert-if-not-exists |
Boolean | No | false |
Specifies whether to insert the current record into the dimension table if a point query finds no matching row. |
cache |
String: None | LRU |
No | None |
The cache policy. Supported values: None, LRU. |
cacheSize |
Integer | No | 10000 |
The LRU cache size in rows. Applies only when cache is LRU. |
cacheTTLMs |
Long | No | See Remarks. | The cache TTL in milliseconds. When cache is LRU, this sets the cache entry timeout (no expiry by default). When cache is None, this parameter has no effect. If joining with rows inserted into the dimension table during job runtime, set a short TTL or set cacheEmpty to false to prevent stale null results from blocking subsequent joins. |
cacheEmpty |
Boolean | No | true |
Specifies whether to cache empty join results. When true, a null result from a point query is cached. When false, null results are not cached — however, if the condition before AND in the join is met but the condition after AND is not, the empty result is still cached. Important
If you need to join with rows inserted into the dimension table during job runtime, set this to |
async |
Boolean | No | false |
Specifies whether to return dimension table query results asynchronously. Asynchronous results are unordered. |
lookup.filter-push-down.enabled |
Boolean | No | false |
Specifies whether to push dimension table filter conditions to the Hologres server. Currently applies only to equality and comparison operators (<, <=, >, >=) between columns and constants. Supported in VVR 11.4 and later. |