All Products
Search
Document Center

Realtime Compute for Apache Flink:Hologres connector options (VVR 8 or earlier)

Last Updated:Mar 27, 2026

Connector options for the WITH clause when using Realtime Compute for Apache Flink with Hologres in Ververica Runtime (VVR) 8.0.x or earlier.

Version quick reference

The following table summarizes options introduced in specific VVR minor versions. Options not listed here are available in all VVR 8.0.x versions.

OptionMinimum version
connection.ssl.modeVVR 8.0.5
connection.ssl.root-cert.locationVVR 8.0.5
bulkload (sink)VVR 8.0.5 + Hologres V2.1
deduplication.enabled (sink)VVR 8.0.5
jdbcReadRetryCount default changes to 10VVR 8.0.5
type-mapping.timestamp-converting.legacyVVR 8.0.6
property-versionVVR 8.0.6
sink.delete-strategyVVR 8.0.8
remove-u0000-in-text.enabled for jdbc_copyVVR 8.0.8
check-and-put.columnVVR 8.0.11
check-and-put.operatorVVR 8.0.11
check-and-put.null-asVVR 8.0.11
aggressive.enabledVVR 8.0.11

Connector options in the WITH clause

General

The following options apply to source tables, sink tables, and dimension tables.

OptionData typeRequiredDefaultDescription
connectorStringYesSet to hologres.
dbnameStringYesThe database name. To connect to a virtual warehouse, append @<warehouse_name> to the database name — for example, 'db_test@read_warehouse'. Virtual warehouses are supported only in JDBC-related modes.
tablenameStringYesThe table name. If the schema is not public, use schema.tableName.
usernameStringYesThe AccessKey ID of your Alibaba Cloud account or RAM user, or a custom account username in the format BASIC$<user_name>. Store credentials as variables — do not hardcode them. See Manage keys.
passwordStringYesThe AccessKey secret of your Alibaba Cloud account or RAM user, or the password of your custom account.
endpointStringYesThe Hologres endpoint. See Endpoints.
connection.ssl.modeStringNodisableThe SSL mode. Valid values: disable, require, verify-ca, verify-full. The verify-ca and verify-full modes require Hologres V2.1 or later and also require connection.ssl.root-cert.location. See Encryption in transit. Requires VVR 8.0.5 or later.
connection.ssl.root-cert.locationStringNoThe path of the CA certificate file. Required when connection.ssl.mode is verify-ca or verify-full. Upload the certificate using the artifacts feature — it is stored in the /flink/usrlib directory. Example: '/flink/usrlib/certificate.crt'. Requires VVR 8.0.5 or later.
jdbcRetryCountIntegerNo10Maximum number of retries after a connection failure when reading or writing data.
jdbcRetrySleepInitMsLongNo1000Fixed component of the wait time per retry, in milliseconds. The actual wait time is jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs.
jdbcRetrySleepStepMsLongNo5000Incremental component of the wait time per retry, in milliseconds. The actual wait time is jdbcRetrySleepInitMs + retry × jdbcRetrySleepStepMs.
jdbcConnectionMaxIdleMsLongNo60000Maximum idle duration for a JDBC connection, in milliseconds. Idle connections exceeding this value are closed and released.
jdbcMetaCacheTTLLongNo60000Maximum time to keep TableSchema information in the cache, in milliseconds.
jdbcMetaAutoRefreshFactorIntegerNo4Controls when a cache refresh is triggered. The auto-refresh threshold is jdbcMetaCacheTTL / jdbcMetaAutoRefreshFactor. When the remaining cache time falls below this threshold, the cache is refreshed automatically.
type-mapping.timestamp-converting.legacyBooleanNoSee descriptionTime zone basis for conversions between Flink and Hologres time types. true uses the JVM time zone; false (recommended) uses the Flink time zone. Default is true when property-version=0, and false when property-version=1. See Time zones. Requires VVR 8.0.6 or later.
property-versionIntegerNo0The connector option version. Set to 1 (recommended) for the latest defaults. The available options and their defaults may vary across major VVR versions. Requires VVR 8.0.6 or later.

Source-specific

OptionData typeRequiredDefaultDescription
field_delimiterStringNo"\u0002"Delimiter between rows during data export.
binlogBooleanNoSee descriptionWhether to consume binary log data. Default is false when property-version=0, and true when property-version=1.
sdkModeStringNoholohubThe SDK mode for binary log consumption. holohub: HoloHub mode. jdbc: JDBC mode. jdbc_fixed: fixed JDBC mode, not subject to connection limits, but does not support databases with data masking enabled. For recommended values by VVR version, see Important notes.
jdbcBinlogSlotNameStringNoThe slot name of the binary log source table in JDBC mode. Takes effect only when sdkMode=jdbc. If not specified, the connector creates a slot automatically. If you use Hologres V2.1 or later with VVR 8.0.5 or later, omit this option — the connector does not attempt to create a slot automatically. See Binary log consumption in JDBC mode.
binlogMaxRetryTimesIntegerNo60Number of retries after failing to read binary log data.
binlogRetryIntervalMsLongNo2000Interval between retries after failing to read binary log data, in milliseconds.
binlogBatchReadSizeIntegerNo100Number of rows read from the binary log per batch.
cdcModeBooleanNoSee descriptionWhether to read binary log data in Change Data Capture (CDC) mode. Default is false when property-version=0, and true when property-version=1.
upsertSourceBooleanNofalseTakes effect only in CDC mode. true: only upsert messages (INSERT, DELETE, UPDATE_AFTER). false: all message types (INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER). Set to true if retraction operators exist in the sink table — for example, when ROW_NUMBER() is used with an OVER clause for deduplication.
binlogStartupModeStringNoearliestOffsetThe binary log consumption mode. initial: consume all existing data first, then binary logs. earliestOffset: start from the earliest binary log. timestamp: start from the time set by startTime. If startTime is configured or a start time is selected at job startup, this option is automatically set to timestamp.
startTimeStringNoStart time for consuming Hologres data. Format: yyyy-MM-dd hh:mm:ss. If not set and the job is not resumed from a state, consumption starts from the earliest binary log.
jdbcScanFetchSizeIntegerNo256Number of records buffered per batch during a scan operation.
jdbcScanTimeoutSecondsIntegerNo60Timeout for a scan operation, in seconds.
jdbcScanTransactionSessionTimeoutSecondsIntegerNo600Timeout for the transaction that contains the scan operation, in seconds. Corresponds to the Hologres GUC parameter idle_in_transaction_session_timeout. Set to 0 for no timeout.
enable_filter_push_downBooleanNofalseWhether to push down filter conditions to Hologres during the full data reading phase. Applies when reading from a source table with binary logging disabled, or during the full-data phase of binary log consumption. Requires VVR 6.0.7 or later.
partition-binlog.modeEnumNoDISABLEMode for consuming binary logs from a partitioned table. DISABLE: the table is not treated as a partitioned table — an exception is reported if it is one. DYNAMIC: continuously consumes the latest partition; requires dynamic partitioning to be enabled. STATIC: consumes fixed partitions simultaneously; partitions cannot be added or removed during consumption.
partition-binlog-lateness-timeout-minutesIntegerNo60Maximum latency allowed before a timeout in dynamic partition mode, in minutes. The connector continues monitoring the previous partition after switching to the latest one, to capture late-arriving data. Cannot exceed the partitioning unit time — for a day-partitioned table, the maximum is 1,440 minutes (24 × 60).
partition-values-to-readStringNoPartitions to consume in static partition mode. Specify partition values (not full partition names), separated by commas. If not set, all partitions are consumed. Regular expressions are not supported.

Sink-specific

Write modes

OptionData typeRequiredDefaultDescription
sdkModeStringNojdbcThe SDK mode for writing data. jdbc: uses a JDBC driver. jdbc_copy: high throughput, low latency; does not support deleting data, writing to partitioned tables, or ignoreNullWhenUpdate. rpc: RPC mode; does not support JSONB or RoarinBitmap data types. jdbc_fixed (public preview): fixed JDBC mode, no connection occupancy; does not support JSONB, RoarinBitmap, or databases with data masking enabled. For recommended values by VVR version, see Important notes.
bulkloadBooleanNofalseWhether to write in bulk load mode. Takes effect only when sdkMode=jdbc_copy. Use this for tables without a primary key — primary key values must be unique when bulk loading a table with a primary key. Bulk load mode uses fewer Hologres resources than jdbc_copy mode. Requires VVR 8.0.5 or later and Hologres V2.1 or later.
mutatetypeStringNoSee descriptionHow duplicate records are handled. insertorignore: ignores duplicates. insertorreplace: replaces existing rows. insertorupdate: updates only the fields defined in the sink table DDL, leaving other fields unchanged. Default is insertorignore when property-version=0, and insertorupdate when property-version=1.
partitionrouterBooleanNofalseWhether to write data to a partitioned table.
createparttableBooleanNofalseWhether to automatically create partitions based on incoming partition values. In RPC mode, automatic creation fails if partition values contain hyphens (-). VVR 8.0.3 or later supports DATE-type fields as partition keys. Make sure partition values are clean — dirty data causes a failover by creating an invalid partition. Not supported when sdkMode=jdbc_copy.

Buffering and flush

The sink operator buffers records and flushes them to Hologres when any of the following conditions is met:

  • The number of buffered records reaches jdbcWriteBatchSize.

  • The total size of buffered records reaches jdbcWriteBatchByteSize.

  • The time since the last flush reaches jdbcWriteFlushInterval.

OptionData typeRequiredDefaultDescription
jdbcWriteBatchSizeIntegerNo256Maximum number of records to buffer before flushing, in rows.
jdbcWriteBatchByteSizeLongNo2097152 (2 MB)Maximum total size of buffered records before flushing, in bytes.
jdbcWriteFlushIntervalLongNo10000Maximum time to wait before flushing buffered records, in milliseconds.
deduplication.enabledBooleanNotrueWhether to deduplicate records in the buffer before flushing. When enabled, only the latest record for each primary key is retained. Requires VVR 8.0.5 or later.

Null and data handling

OptionData typeRequiredDefaultDescription
ignoreNullWhenUpdateBooleanNofalseWhether to skip null values when writing with mutatetype=insertOrUpdate. false: null values are written to Hologres. true: null fields in the incoming record are ignored. Not supported when sdkMode=jdbc_copy.
jdbcEnableDefaultForNotNullColumnBooleanNotrueWhether to substitute a default value when a null is written to a non-null column that has no configured default. When true: STRING columns are left blank, NUMBER columns are set to 0, and DATE/TIMESTAMP/TIMESTAMPTZ columns are set to 1970-01-01 00:00:00. When false: an exception is reported.
remove-u0000-in-text.enabledBooleanNofalseWhether to strip \u0000 characters from STRING data before writing. When false, dirty data may cause ERROR: invalid byte sequence for encoding "UTF8": 0x00. When true, the connector removes \u0000 characters — use with caution, as strings like aaa\u0000bbb become aaabbb, which may cause data mismatches. Supported with sdkMode=jdbc in VVR 8.0.1 or later; supported with sdkMode=jdbc_copy or jdbc in VVR 8.0.8 or later.
partial-insert.enabledBooleanNofalseWhether to write only the fields declared in the INSERT statement. false: all DDL-defined fields are written; fields not in the INSERT statement are set to null. true: only the declared fields are written. Takes effect only when mutatetype=InsertOrUpdate.

Delete handling

OptionData typeRequiredDefaultDescription
sink.delete-strategyStringNoStrategy for processing retraction messages. IGNORE_DELETE: ignores UPDATE_BEFORE and DELETE messages. NON_PK_FIELD_TO_NULL: ignores UPDATE_BEFORE messages; handles DELETE by setting non-primary key fields to null (for partial update scenarios). DELETE_ROW_ON_PK: ignores UPDATE_BEFORE messages; handles DELETE by deleting the entire row based on the primary key (for partial update scenarios). CHANGELOG_STANDARD: follows the Flink SQL changelog standard — treats UPDATE as DELETE + INSERT, ensuring data accuracy; use this for scenarios without partial update. Setting NON_PK_FIELD_TO_NULL may result in records with null non-primary key values. Requires VVR 8.0.8 or later.

Conditional update

OptionData typeRequiredDefaultDescription
check-and-put.columnStringNoEnables the conditional update feature. Set to the name of the field to check. Requires: a primary key on the sink table, mutateType=Insertorupdate or insertorreplace, and sdkMode=jdbc_fixed or jdbc. For reverse lookups, use a row-based or hybrid row-column table. In data with many duplicates, check-and-put degrades to single writes, reducing write throughput. Requires VVR 8.0.11 or later.
check-and-put.operatorStringNoGREATERComparison operator for the conditional update. The new record's check field is compared with the existing record's check field; the update proceeds if the comparison matches. Valid values: GREATER, GREATER_OR_EQUAL, EQUAL, NOT_EQUAL, LESS, LESS_OR_EQUAL, IS_NULL, IS_NOT_NULL. Requires VVR 8.0.11 or later.
check-and-put.null-asStringNoHow to treat null in the existing record during a conditional update. In PostgreSQL, comparing any value with NULL returns FALSE. Set this option to specify the substitute value, equivalent to the COALESCE function. Requires VVR 8.0.11 or later.

Connection pool

OptionData typeRequiredDefaultDescription
connectionSizeIntegerNo3Size of the JDBC connection pool for the job. Increase this value if the job has poor throughput.
connectionPoolNameStringNoName of the connection pool. Tables with the same pool name in the same TaskManager share a connection pool. Set this to any string other than 'default', and use the same connectionSize for all tables sharing the pool. In VVR 8.0.3 and earlier, each table has its own pool by default. In VVR 8.0.4 and later, tables with the same endpoint in a job share a pool — if many tables share a pool, set different connectionPoolName values to avoid connection shortages.
aggressive.enabledBooleanNofalseWhether to force a commit during idle connection periods, even if no flush condition has been met. Reduces write latency under low traffic. Supported when sdkMode=jdbc_fixed, jdbc, or jdbc_copy. Requires VVR 8.0.11 or later.

Deprecated options

The following options are deprecated. Use the replacement options instead.
OptionReplacementNotes
useRpcModesdkMode=rpcSetting useRpcMode=true is equivalent to sdkMode=rpc. Not available when property-version=1.
ignoredeletesink.delete-strategyTakes effect only when mutatetype=insertorupdate. For VVR 8.0.8 or later, use sink.delete-strategy instead. If both are set, only sink.delete-strategy applies. Default is true when property-version=0, and false when property-version=1.

Dimension table-specific

SDK mode and connection

OptionData typeRequiredDefaultDescription
sdkModeStringNojdbcThe SDK mode for querying data. jdbc: supports both point queries (primary key) and non-primary-key queries; non-primary-key queries are slow. rpc: point queries on primary keys only; all primary key fields must be in the ON clause; does not support JSONB or RoarinBitmap. jdbc_fixed: point queries on primary keys only; all primary key fields must be in the ON clause; does not support JSONB, RoarinBitmap, or databases with data masking enabled. For recommended values by VVR version, see Important notes.
useRpcModeBooleanNofalseWhether to query using RPC. Setting true is equivalent to sdkMode=rpc.
connectionSizeIntegerNo3Size of the JDBC connection pool. Increase this value if the job has poor throughput.
connectionPoolNameStringNoName of the connection pool. Same behavior as in sink tables — see the sink connectionPoolName option for details.

Batch and scan

OptionData typeRequiredDefaultDescription
jdbcReadBatchSizeIntegerNo128Maximum number of records per batch for point queries.
jdbcReadBatchQueueSizeIntegerNo256Maximum number of queued requests per thread for point queries.
jdbcReadTimeoutMsLongNo0Timeout for point queries, in milliseconds. 0 means no timeout.
jdbcReadRetryCountIntegerNo1 (before VVR 8.0.5), 10 (VVR 8.0.5+)Number of retries when a point query times out. This is different from jdbcRetryCount, which applies to connection failures.
jdbcScanFetchSizeIntegerNo256Number of records per batch during a scan operation in a one-to-many join (no complete primary key).
jdbcScanTimeoutSecondsIntegerNo60Timeout for a scan operation, in seconds.

Cache

OptionData typeRequiredDefaultDescription
cacheStringNoNoneCache policy. None: no caching. LRU: cache query results using a least-recently-used policy.
cacheSizeIntegerNo10000Maximum number of rows to cache. Applies only when cache=LRU.
cacheTTLMsLongNoCache timeout, in milliseconds. When cache=LRU, specifies the TTL — by default, cache entries do not expire. When cache=None, this option has no effect.
cacheEmptyBooleanNotrueWhether to cache JOIN queries that return empty results.
asyncBooleanNofalseWhether to return query results asynchronously. When true, results are not sorted.

Time zones of Realtime Compute for Apache Flink and Hologres

Time types

ServiceTypeDescription
FlinkTIMESTAMPDate and time without a time zone. Represented as a string, for example, 1970-01-01 00:00:04.001.
FlinkTIMESTAMP_LTZAn absolute point in time. Stored as LONG (milliseconds since epoch) or INT (nanoseconds within milliseconds). The epoch is 00:00:00 UTC on January 1, 1970. Interpreted and displayed based on the session time zone, so the same value appears as different local times in different time zones — for example, 2024-03-19T04:00:00Z appears as 2024-03-19T12:00:00 in Shanghai (UTC+8).
HologresTIMESTAMPDate and time without a time zone, similar to Flink TIMESTAMP. The value does not change when the Hologres client time zone changes.
HologresTIMESTAMPTZDate and time with a time zone, similar to Flink TIMESTAMP_LTZ. Stored as UTC; converted to the client's time zone when queried.

Time zone mappings

With type-mapping.timestamp-converting.legacy=false (VVR 8.0.6 or later), all type conversions between Flink and Hologres are supported without data deviation.

Flink typeHologres typeBehavior
TIMESTAMPTIMESTAMPNo time zone conversion. Recommended for reading from and writing to Hologres.
TIMESTAMP_LTZTIMESTAMPTZ
TIMESTAMPTIMESTAMPTZTime zone conversion is performed. Set the Flink time zone using table.local-time-zone to ensure accuracy. For example, 'table.local-time-zone': 'Asia/Shanghai' sets the Flink time zone to UTC+8. Writing the TIMESTAMP value 2022-01-01 01:01:01.123456 then produces the TIMESTAMPTZ value 2022-01-01 01:01:01.123456+8.
TIMESTAMP_LTZTIMESTAMPTime zone conversion is performed.

With type-mapping.timestamp-converting.legacy=true (VVR 8.0.6 or later) or in VVR 8.0.5 or earlier, data deviation may occur for conversions other than TIMESTAMP-to-TIMESTAMP.

Flink typeHologres typeBehavior
TIMESTAMPTIMESTAMPNo time zone conversion. Recommended.
TIMESTAMP_LTZTIMESTAMPTZData deviation may occur. TIMESTAMP_LTZ data is treated as a time without a time zone when written. For example, 2024-03-19T04:00:00Z (which represents 2024-03-19T12:00:00 in Shanghai, UTC+8) is written as 2024-03-19T04:00:00 and stored as 2024-03-19T04:00:00+08, producing an 8-hour deviation.
TIMESTAMPTIMESTAMPTZData deviation may occur if the Flink time zone differs from the JVM time zone. Conversions use the JVM time zone, not the Flink time zone.
TIMESTAMP_LTZTIMESTAMP