Connector options for the WITH clause when using Realtime Compute for Apache Flink with Hologres in Ververica Runtime (VVR) 8.0.x or earlier.
Version quick reference
The following table summarizes options introduced in specific VVR minor versions. Options not listed here are available in all VVR 8.0.x versions.
| Option | Minimum version |
|---|---|
connection.ssl.mode | VVR 8.0.5 |
connection.ssl.root-cert.location | VVR 8.0.5 |
bulkload (sink) | VVR 8.0.5 + Hologres V2.1 |
deduplication.enabled (sink) | VVR 8.0.5 |
jdbcReadRetryCount default changes to 10 | VVR 8.0.5 |
type-mapping.timestamp-converting.legacy | VVR 8.0.6 |
property-version | VVR 8.0.6 |
sink.delete-strategy | VVR 8.0.8 |
remove-u0000-in-text.enabled for jdbc_copy | VVR 8.0.8 |
check-and-put.column | VVR 8.0.11 |
check-and-put.operator | VVR 8.0.11 |
check-and-put.null-as | VVR 8.0.11 |
aggressive.enabled | VVR 8.0.11 |
Connector options in the WITH clause
General
The following options apply to source tables, sink tables, and dimension tables.
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
connector | String | Yes | — | Set to hologres. |
dbname | String | Yes | — | The database name. To connect to a virtual warehouse, append @<warehouse_name> to the database name — for example, 'db_test@read_warehouse'. Virtual warehouses are supported only in JDBC-related modes. |
tablename | String | Yes | — | The table name. If the schema is not public, use schema.tableName. |
username | String | Yes | — | The AccessKey ID of your Alibaba Cloud account or RAM user, or a custom account username in the format BASIC$<user_name>. Store credentials as variables — do not hardcode them. See Manage keys. |
password | String | Yes | — | The AccessKey secret of your Alibaba Cloud account or RAM user, or the password of your custom account. |
endpoint | String | Yes | — | The Hologres endpoint. See Endpoints. |
connection.ssl.mode | String | No | disable | The SSL mode. Valid values: disable, require, verify-ca, verify-full. The verify-ca and verify-full modes require Hologres V2.1 or later and also require connection.ssl.root-cert.location. See Encryption in transit. Requires VVR 8.0.5 or later. |
connection.ssl.root-cert.location | String | No | — | The path of the CA certificate file. Required when connection.ssl.mode is verify-ca or verify-full. Upload the certificate using the artifacts feature — it is stored in the /flink/usrlib directory. Example: '/flink/usrlib/certificate.crt'. Requires VVR 8.0.5 or later. |
jdbcRetryCount | Integer | No | 10 | Maximum number of retries after a connection failure when reading or writing data. |
jdbcRetrySleepInitMs | Long | No | 1000 | Fixed component of the wait time per retry, in milliseconds. The actual wait time is jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs. |
jdbcRetrySleepStepMs | Long | No | 5000 | Incremental component of the wait time per retry, in milliseconds. The actual wait time is jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs. |
jdbcConnectionMaxIdleMs | Long | No | 60000 | Maximum idle duration for a JDBC connection, in milliseconds. Idle connections exceeding this value are closed and released. |
jdbcMetaCacheTTL | Long | No | 60000 | Maximum time to keep TableSchema information in the cache, in milliseconds. |
jdbcMetaAutoRefreshFactor | Integer | No | 4 | Controls when a cache refresh is triggered. The auto-refresh threshold is jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor. When the remaining cache time falls below this threshold, the cache is refreshed automatically. |
type-mapping.timestamp-converting.legacy | Boolean | No | See description | Time zone basis for conversions between Flink and Hologres time types. true uses the JVM time zone; false (recommended) uses the Flink time zone. Default is true when property-version=0, and false when property-version=1. See Time zones. Requires VVR 8.0.6 or later. |
property-version | Integer | No | 0 | The connector option version. Set to 1 (recommended) for the latest defaults. The available options and their defaults may vary across major VVR versions. Requires VVR 8.0.6 or later. |
Source-specific
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
field_delimiter | String | No | "\u0002" | Delimiter between rows during data export. |
binlog | Boolean | No | See description | Whether to consume binary log data. Default is false when property-version=0, and true when property-version=1. |
sdkMode | String | No | holohub | The SDK mode for binary log consumption. holohub: HoloHub mode. jdbc: JDBC mode. jdbc_fixed: fixed JDBC mode, not subject to connection limits, but does not support databases with data masking enabled. For recommended values by VVR version, see Important notes. |
jdbcBinlogSlotName | String | No | — | The slot name of the binary log source table in JDBC mode. Takes effect only when sdkMode=jdbc. If not specified, the connector creates a slot automatically. If you use Hologres V2.1 or later with VVR 8.0.5 or later, omit this option — the connector does not attempt to create a slot automatically. See Binary log consumption in JDBC mode. |
binlogMaxRetryTimes | Integer | No | 60 | Number of retries after failing to read binary log data. |
binlogRetryIntervalMs | Long | No | 2000 | Interval between retries after failing to read binary log data, in milliseconds. |
binlogBatchReadSize | Integer | No | 100 | Number of rows read from the binary log per batch. |
cdcMode | Boolean | No | See description | Whether to read binary log data in Change Data Capture (CDC) mode. Default is false when property-version=0, and true when property-version=1. |
upsertSource | Boolean | No | false | Takes effect only in CDC mode. true: only upsert messages (INSERT, DELETE, UPDATE_AFTER). false: all message types (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER). Set to true if retraction operators exist in the sink table — for example, when ROW_NUMBER() is used with an OVER clause for deduplication. |
binlogStartupMode | String | No | earliestOffset | The binary log consumption mode. initial: consume all existing data first, then binary logs. earliestOffset: start from the earliest binary log. timestamp: start from the time set by startTime. If startTime is configured or a start time is selected at job startup, this option is automatically set to timestamp. |
startTime | String | No | — | Start time for consuming Hologres data. Format: yyyy-MM-dd hh:mm:ss. If not set and the job is not resumed from a state, consumption starts from the earliest binary log. |
jdbcScanFetchSize | Integer | No | 256 | Number of records buffered per batch during a scan operation. |
jdbcScanTimeoutSeconds | Integer | No | 60 | Timeout for a scan operation, in seconds. |
jdbcScanTransactionSessionTimeoutSeconds | Integer | No | 600 | Timeout for the transaction that contains the scan operation, in seconds. Corresponds to the Hologres GUC parameter idle_in_transaction_session_timeout. Set to 0 for no timeout. |
enable_filter_push_down | Boolean | No | false | Whether to push down filter conditions to Hologres during the full data reading phase. Applies when reading from a source table with binary logging disabled, or during the full-data phase of binary log consumption. Requires VVR 6.0.7 or later. |
partition-binlog.mode | Enum | No | DISABLE | Mode for consuming binary logs from a partitioned table. DISABLE: the table is not treated as a partitioned table — an exception is reported if it is one. DYNAMIC: continuously consumes the latest partition; requires dynamic partitioning to be enabled. STATIC: consumes fixed partitions simultaneously; partitions cannot be added or removed during consumption. |
partition-binlog-lateness-timeout-minutes | Integer | No | 60 | Maximum latency allowed before a timeout in dynamic partition mode, in minutes. The connector continues monitoring the previous partition after switching to the latest one, to capture late-arriving data. Cannot exceed the partitioning unit time — for a day-partitioned table, the maximum is 1,440 minutes (24 × 60). |
partition-values-to-read | String | No | — | Partitions to consume in static partition mode. Specify partition values (not full partition names), separated by commas. If not set, all partitions are consumed. Regular expressions are not supported. |
Sink-specific
Write modes
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
sdkMode | String | No | jdbc | The SDK mode for writing data. jdbc: uses a JDBC driver. jdbc_copy: high throughput, low latency; does not support deleting data, writing to partitioned tables, or ignoreNullWhenUpdate. rpc: RPC mode; does not support JSONB or RoarinBitmap data types. jdbc_fixed (public preview): fixed JDBC mode, no connection occupancy; does not support JSONB, RoarinBitmap, or databases with data masking enabled. For recommended values by VVR version, see Important notes. |
bulkload | Boolean | No | false | Whether to write in bulk load mode. Takes effect only when sdkMode=jdbc_copy. Use this for tables without a primary key — primary key values must be unique when bulk loading a table with a primary key. Bulk load mode uses fewer Hologres resources than jdbc_copy mode. Requires VVR 8.0.5 or later and Hologres V2.1 or later. |
mutatetype | String | No | See description | How duplicate records are handled. insertorignore: ignores duplicates. insertorreplace: replaces existing rows. insertorupdate: updates only the fields defined in the sink table DDL, leaving other fields unchanged. Default is insertorignore when property-version=0, and insertorupdate when property-version=1. |
partitionrouter | Boolean | No | false | Whether to write data to a partitioned table. |
createparttable | Boolean | No | false | Whether to automatically create partitions based on incoming partition values. In RPC mode, automatic creation fails if partition values contain hyphens (-). VVR 8.0.3 or later supports DATE-type fields as partition keys. Make sure partition values are clean — dirty data causes a failover by creating an invalid partition. Not supported when sdkMode=jdbc_copy. |
Buffering and flush
The sink operator buffers records and flushes them to Hologres when any of the following conditions is met:
The number of buffered records reaches
jdbcWriteBatchSize.The total size of buffered records reaches
jdbcWriteBatchByteSize.The time since the last flush reaches
jdbcWriteFlushInterval.
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
jdbcWriteBatchSize | Integer | No | 256 | Maximum number of records to buffer before flushing, in rows. |
jdbcWriteBatchByteSize | Long | No | 2097152 (2 MB) | Maximum total size of buffered records before flushing, in bytes. |
jdbcWriteFlushInterval | Long | No | 10000 | Maximum time to wait before flushing buffered records, in milliseconds. |
deduplication.enabled | Boolean | No | true | Whether to deduplicate records in the buffer before flushing. When enabled, only the latest record for each primary key is retained. Requires VVR 8.0.5 or later. |
Null and data handling
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
ignoreNullWhenUpdate | Boolean | No | false | Whether to skip null values when writing with mutatetype=insertOrUpdate. false: null values are written to Hologres. true: null fields in the incoming record are ignored. Not supported when sdkMode=jdbc_copy. |
jdbcEnableDefaultForNotNullColumn | Boolean | No | true | Whether to substitute a default value when a null is written to a non-null column that has no configured default. When true: STRING columns are left blank, NUMBER columns are set to 0, and DATE/TIMESTAMP/TIMESTAMPTZ columns are set to 1970-01-01 00:00:00. When false: an exception is reported. |
remove-u0000-in-text.enabled | Boolean | No | false | Whether to strip \u0000 characters from STRING data before writing. When false, dirty data may cause ERROR: invalid byte sequence for encoding "UTF8": 0x00. When true, the connector removes \u0000 characters — use with caution, as strings like aaa\u0000bbb become aaabbb, which may cause data mismatches. Supported with sdkMode=jdbc in VVR 8.0.1 or later; supported with sdkMode=jdbc_copy or jdbc in VVR 8.0.8 or later. |
partial-insert.enabled | Boolean | No | false | Whether to write only the fields declared in the INSERT statement. false: all DDL-defined fields are written; fields not in the INSERT statement are set to null. true: only the declared fields are written. Takes effect only when mutatetype=InsertOrUpdate. |
Delete handling
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
sink.delete-strategy | String | No | — | Strategy for processing retraction messages. IGNORE_DELETE: ignores UPDATE_BEFORE and DELETE messages. NON_PK_FIELD_TO_NULL: ignores UPDATE_BEFORE messages; handles DELETE by setting non-primary key fields to null (for partial update scenarios). DELETE_ROW_ON_PK: ignores UPDATE_BEFORE messages; handles DELETE by deleting the entire row based on the primary key (for partial update scenarios). CHANGELOG_STANDARD: follows the Flink SQL changelog standard — treats UPDATE as DELETE + INSERT, ensuring data accuracy; use this for scenarios without partial update. Setting NON_PK_FIELD_TO_NULL may result in records with null non-primary key values. Requires VVR 8.0.8 or later. |
Conditional update
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
check-and-put.column | String | No | — | Enables the conditional update feature. Set to the name of the field to check. Requires: a primary key on the sink table, mutateType=Insertorupdate or insertorreplace, and sdkMode=jdbc_fixed or jdbc. For reverse lookups, use a row-based or hybrid row-column table. In data with many duplicates, check-and-put degrades to single writes, reducing write throughput. Requires VVR 8.0.11 or later. |
check-and-put.operator | String | No | GREATER | Comparison operator for the conditional update. The new record's check field is compared with the existing record's check field; the update proceeds if the comparison matches. Valid values: GREATER, GREATER_OR_EQUAL, EQUAL, NOT_EQUAL, LESS, LESS_OR_EQUAL, IS_NULL, IS_NOT_NULL. Requires VVR 8.0.11 or later. |
check-and-put.null-as | String | No | — | How to treat null in the existing record during a conditional update. In PostgreSQL, comparing any value with NULL returns FALSE. Set this option to specify the substitute value, equivalent to the COALESCE function. Requires VVR 8.0.11 or later. |
Connection pool
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
connectionSize | Integer | No | 3 | Size of the JDBC connection pool for the job. Increase this value if the job has poor throughput. |
connectionPoolName | String | No | — | Name of the connection pool. Tables with the same pool name in the same TaskManager share a connection pool. Set this to any string other than 'default', and use the same connectionSize for all tables sharing the pool. In VVR 8.0.3 and earlier, each table has its own pool by default. In VVR 8.0.4 and later, tables with the same endpoint in a job share a pool — if many tables share a pool, set different connectionPoolName values to avoid connection shortages. |
aggressive.enabled | Boolean | No | false | Whether to force a commit during idle connection periods, even if no flush condition has been met. Reduces write latency under low traffic. Supported when sdkMode=jdbc_fixed, jdbc, or jdbc_copy. Requires VVR 8.0.11 or later. |
Deprecated options
The following options are deprecated. Use the replacement options instead.
| Option | Replacement | Notes |
|---|---|---|
useRpcMode | sdkMode=rpc | Setting useRpcMode=true is equivalent to sdkMode=rpc. Not available when property-version=1. |
ignoredelete | sink.delete-strategy | Takes effect only when mutatetype=insertorupdate. For VVR 8.0.8 or later, use sink.delete-strategy instead. If both are set, only sink.delete-strategy applies. Default is true when property-version=0, and false when property-version=1. |
Dimension table-specific
SDK mode and connection
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
sdkMode | String | No | jdbc | The SDK mode for querying data. jdbc: supports both point queries (primary key) and non-primary-key queries; non-primary-key queries are slow. rpc: point queries on primary keys only; all primary key fields must be in the ON clause; does not support JSONB or RoarinBitmap. jdbc_fixed: point queries on primary keys only; all primary key fields must be in the ON clause; does not support JSONB, RoarinBitmap, or databases with data masking enabled. For recommended values by VVR version, see Important notes. |
useRpcMode | Boolean | No | false | Whether to query using RPC. Setting true is equivalent to sdkMode=rpc. |
connectionSize | Integer | No | 3 | Size of the JDBC connection pool. Increase this value if the job has poor throughput. |
connectionPoolName | String | No | — | Name of the connection pool. Same behavior as in sink tables — see the sink connectionPoolName option for details. |
Batch and scan
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
jdbcReadBatchSize | Integer | No | 128 | Maximum number of records per batch for point queries. |
jdbcReadBatchQueueSize | Integer | No | 256 | Maximum number of queued requests per thread for point queries. |
jdbcReadTimeoutMs | Long | No | 0 | Timeout for point queries, in milliseconds. 0 means no timeout. |
jdbcReadRetryCount | Integer | No | 1 (before VVR 8.0.5), 10 (VVR 8.0.5+) | Number of retries when a point query times out. This is different from jdbcRetryCount, which applies to connection failures. |
jdbcScanFetchSize | Integer | No | 256 | Number of records per batch during a scan operation in a one-to-many join (no complete primary key). |
jdbcScanTimeoutSeconds | Integer | No | 60 | Timeout for a scan operation, in seconds. |
Cache
| Option | Data type | Required | Default | Description |
|---|---|---|---|---|
cache | String | No | None | Cache policy. None: no caching. LRU: cache query results using a least-recently-used policy. |
cacheSize | Integer | No | 10000 | Maximum number of rows to cache. Applies only when cache=LRU. |
cacheTTLMs | Long | No | — | Cache timeout, in milliseconds. When cache=LRU, specifies the TTL — by default, cache entries do not expire. When cache=None, this option has no effect. |
cacheEmpty | Boolean | No | true | Whether to cache JOIN queries that return empty results. |
async | Boolean | No | false | Whether to return query results asynchronously. When true, results are not sorted. |
Time zones of Realtime Compute for Apache Flink and Hologres
Time types
| Service | Type | Description |
|---|---|---|
| Flink | TIMESTAMP | Date and time without a time zone. Represented as a string, for example, 1970-01-01 00:00:04.001. |
| Flink | TIMESTAMP_LTZ | An absolute point in time. Stored as LONG (milliseconds since epoch) or INT (nanoseconds within milliseconds). The epoch is 00:00:00 UTC on January 1, 1970. Interpreted and displayed based on the session time zone, so the same value appears as different local times in different time zones — for example, 2024-03-19T04:00:00Z appears as 2024-03-19T12:00:00 in Shanghai (UTC+8). |
| Hologres | TIMESTAMP | Date and time without a time zone, similar to Flink TIMESTAMP. The value does not change when the Hologres client time zone changes. |
| Hologres | TIMESTAMPTZ | Date and time with a time zone, similar to Flink TIMESTAMP_LTZ. Stored as UTC; converted to the client's time zone when queried. |
Time zone mappings
With type-mapping.timestamp-converting.legacy=false (VVR 8.0.6 or later), all type conversions between Flink and Hologres are supported without data deviation.
| Flink type | Hologres type | Behavior |
|---|---|---|
| TIMESTAMP | TIMESTAMP | No time zone conversion. Recommended for reading from and writing to Hologres. |
| TIMESTAMP_LTZ | TIMESTAMPTZ | — |
| TIMESTAMP | TIMESTAMPTZ | Time zone conversion is performed. Set the Flink time zone using table.local-time-zone to ensure accuracy. For example, 'table.local-time-zone': 'Asia/Shanghai' sets the Flink time zone to UTC+8. Writing the TIMESTAMP value 2022-01-01 01:01:01.123456 then produces the TIMESTAMPTZ value 2022-01-01 01:01:01.123456+8. |
| TIMESTAMP_LTZ | TIMESTAMP | Time zone conversion is performed. |
With type-mapping.timestamp-converting.legacy=true (VVR 8.0.6 or later) or in VVR 8.0.5 or earlier, data deviation may occur for conversions other than TIMESTAMP-to-TIMESTAMP.
| Flink type | Hologres type | Behavior |
|---|---|---|
| TIMESTAMP | TIMESTAMP | No time zone conversion. Recommended. |
| TIMESTAMP_LTZ | TIMESTAMPTZ | Data deviation may occur. TIMESTAMP_LTZ data is treated as a time without a time zone when written. For example, 2024-03-19T04:00:00Z (which represents 2024-03-19T12:00:00 in Shanghai, UTC+8) is written as 2024-03-19T04:00:00 and stored as 2024-03-19T04:00:00+08, producing an 8-hour deviation. |
| TIMESTAMP | TIMESTAMPTZ | Data deviation may occur if the Flink time zone differs from the JVM time zone. Conversions use the JVM time zone, not the Flink time zone. |
| TIMESTAMP_LTZ | TIMESTAMP | — |