All Products
Search
Document Center

Realtime Compute for Apache Flink:Connector options (VVR 11 or later)

Last Updated:Dec 11, 2025

This topic describes Hologres connector options for Ververica Runtime (VVR) 11 or later.

Removed and renamed connector options

In VVR 11 or later versions, to optimize the system architecture and improve maintenance efficiency, we have adjusted or removed some Hologres connector options that are available in VVR 8 or earlier versions. See the following tables for details.

Removed options

Option

Description

Alternative options or removal remarks

jdbcRetrySleepInitMs

The fixed waiting period for each retry.

Specify retry-sleep-step-ms to use incremental waiting period.

jdbcMetaAutoRefreshFactor

The factor for triggering automatic cache refresh. If the remaining time for storing data in the cache is less than the time for triggering an automatic refresh of the cache, the system automatically refreshes the cache.

Use the cache TTL option as an alternative: meta-cache-ttl-ms.

type-mapping.timestamp-converting.legacy

Specifies whether to perform time type conversions between Realtime Compute for Apache Flink and Hologres.

This option was introduced for compatibility with historical behavior to support the TIMESTAMP_LTZ type. It is no longer needed in VVR 11 or later.

property-version

The connector option version.

With optimized default values for common options, this option was removed.

field_delimiter

The delimiter used between rows when data is being exported.

With optimized data reading, this option was removed.

jdbcBinlogSlotName

The slot name of the binary log source table in JDBC mode.

With optimized data reading, this option was removed.

binlogMaxRetryTimes

The number of retries after Realtime Compute for Apache Flink fails to read binary log data.

retry-count

cdcMode

Specifies whether to read binary log data in CDC mode.

With CDC mode used by default, this option was removed.

In non-CDC mode, use source.binlog.change-log-mode.

upsertSource

Specifies whether the source table reads a changelog stream that contains UPSERT messages.

source.binlog.change-log-mode

bulkload

Specifies whether to write data in bulkload mode.

sink.write-mode

useRpcMode

Specifies whether to use the Hologres connector in RPC mode.

We recommend using JDBC mode and configure the sink.deduplication.enabled option to specify whether to deduplicate data.

partitionrouter

Specifies whether to write data to a partitioned table.

With writing to partitioned tables supported by default, this option was removed.

ignoredelete

Specifies whether to ignore retraction messages.

sink.delete-strategy

sdkMode

The SDK mode.

This option has been optimized. Configure the source.binlog.read-mode or sink.write-mode options based on the table type.

jdbcReadBatchQueueSize

The maximum number of queued requests allowed in a thread to perform a point query in a Hologres dimension table.

If the point query performance is not ideal, configure the connection.pool.size option.

jdbcReadRetryCount

The number of retries when a point query performed in a Hologres dimension table times out.

Use retry-count, a general retry connector option, instead.

jdbcScanTransactionSessionTimeoutSeconds

The timeout period for the transaction to which the scan operation belongs.

Use source.scan.timeout-seconds, a general scan timeout option, instead.

Renamed options

VVR 8

VVR 11

Option description

jdbcRetryCount

retry-count

The maximum number of retries allowed to read and write data if a connection failure occurs.

jdbcRetrySleepStepMs

retry-sleep-step-ms

The accumulated waiting period for each retry.

jdbcConnectionMaxIdleMs

connection.max-idle-ms

The maximum duration for which the JDBC connection can remain idle.

jdbcMetaCacheTTL

meta-cache-ttl-ms

The maximum time for storing the TableSchema information in the cache.

binlog

source.binlog

Specifies whether to consume binary log data.

sdkMode

source.binlog.read-mode

The read mode.

binlogRetryIntervalMs

source.binlog.request-timeout-ms

The interval between retries after Realtime Compute for Apache Flink fails to read the binary log data.

binlogBatchReadSize

source.binlog.batch-size

The number of rows in which the binary log data is read at a time.

binlogStartupMode

source.binlog.startup-mode

The binary logs consumption mode.

jdbcScanFetchSize

source.scan.fetch-size

The number of records that can be buffered during the scan operation.

jdbcScanTimeoutSeconds

source.scan.timeout-seconds

The timeout period of the scan operation.

enable_filter_push_down

source.scan.filter-push-down.enabled

Specifies whether to perform filter pushdown during the full data reading phase.

partition-binlog.mode

source.binlog.partition-binlog-mode

The mode in which binary logs in a partitioned table are consumed.

partition-binlog-lateness-timeout-minutes

source.binlog.partition-binlog-lateness-timeout-minutes

The maximum latency allowed before a timeout is triggered when data in a partitioned table is dynamically consumed.

partition-values-to-read

source.binlog.partition-values-to-read

The partitions to be consumed when data in a partitioned table is consumed in static mode.

sdkMode

sink.write-mode

The write mode.

mutatetype

sink.on-conflict-action

The strategy to handle primary key conflicts.

createparttable

sink.create-missing-partition

Specifies whether to automatically create non-existing partitioned tables based on partition values.

jdbcWriteBatchSize

sink.insert.batch-size

The maximum number of data records that can be buffered and processed in a single batch by the sink operator.

jdbcWriteBatchByteSize

sink.insert.batch-byte-size

The maximum number of bytes of data that can be buffered and processed in a single batch by the sink operator.

jdbcWriteFlushInterval

sink.insert.flush-interval-ms

The maximum waiting time before the sink operator writes buffered data to Hologres in a single batch.

ignoreNullWhenUpdate

sink.ignore-null-when-update.enabled

Specifies whether to ignore null values in the data that is written when mutatetype='insertOrUpdate' is specified.

jdbcEnableDefaultForNotNullColumn

sink.default-for-not-null-column.enabled

Specifies whether to allow the Hologres connector to fill a default value if a null value is written to a non-null column for which the default value is not configured in the Hologres table.

remove-u0000-in-text.enabled

sink.remove-u0000-in-text.enabled

Specifies whether to allow the Hologres connector to remove the invalid characters \u0000 from STRING data written to the sink table.

partial-insert.enabled

sink.partial-insert.enabled

Specifies whether to insert only the fields declared in the INSERT statement.

deduplication.enabled

sink.deduplication.enabled

Specifies whether to perform deduplication during data buffering.

check-and-put.column

sink.insert.check-and-put.column

Specifies whether to enable the conditional update feature and configure the names of the fields that you want to check.

check-and-put.operator

sink.insert.check-and-put.operator

The comparison operator for the conditional update operation.

check-and-put.null-as

sink.insert.check-and-put.null-as

When you perform the conditional update operation, if the old data record is null, the null value is regarded as the valid value of this option.

aggressive.enabled

sink.aggressive-flush.enabled

Specifies whether to enable the aggressive commit mode.

connectionSize

connection.pool.size

The size of the JDBC connection pool created in a job.

connectionPoolName

connection.pool.name

The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool.

jdbcReadBatchSize

lookup.read.batch-size

The maximum number of records that can be buffered and processed in a single batch for a point lookup on a Hologres dimension table.

jdbcReadTimeoutMs

lookup.read.timeout-ms

The timeout period for performing a point query in a Hologres dimension table.

Connector options in the WITH clause

General

Option

Description

Data type

Required?

Default value

Remarks

connector

The type of the connector.

String

Yes

No default value

Set this option to hologres.

dbname

The database name.

String

Yes

No default value

You can add a specific suffix to the value of the dbname option to specify the virtual warehouse to which you want to connect. For example, if you want to connect a dimension table to the virtual warehouse read_warehouse, specify 'dbname' = 'db_test@read_warehouse'.

tablename

The table name.

String

Yes

No default value

If the schema is not public, set tablename to schema.tableName.

username

  • The username of a custom account in the format of BASIC$<user_name>.

  • The AccessKey ID of your Alibaba Cloud account or a RAM user within the Alibaba Cloud account.

String

Yes

No default value

Important

To enhance security, use variables instead of hardcoding your AccessKey pair.

password

  • The password of the custom account.

  • The AccessKey secret of your Alibaba Cloud account or RAM user.

String

Yes

No default value

endpoint

The endpoint of Hologres.

String

Yes

No default value

For more information, see Endpoints for connecting to Hologres.

connection.pool.size

The size of the JDBC connection pool that is created by a single Flink table in a job.

Integer

No

5

If your job has poor performance, you can set this option to a higher value to increase data throughput. This option takes effect only for dimension and sink tables.

connection.pool.name

The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool.

String

No

'default'

If multiple tables are configured with the same connection pool, the largest connection.pool.size option value among them takes precedence.

Specify this option as needed. Consider a job that involves the following tables: dimension tables A and B and sink tables C, D, and E. You can configure the connection pool pool1 for A and B, the connection pool pool2 for C and D, and the connection pool pool3 for E where a massive amount of data is processed.

Note
  • For tables to share a connection pool, specify identical connection.pool.name, endpoint, dbname, and other connection configurations for them.

  • If a job contains many tables, insufficient connections can affect job performance. In this case, we recommend that you set different connection.pool.name values for different tables.

connection.fixed.enabled

Specifies whether to use the lightweight connection mode.

Boolean

No

No default value

Hologres has an upper limit on the number of connections. Starting from Hologres 2.1, real-time data writing supports the use of lightweight connections that are not limited by the maximum number of connections.

Note
  • The default value of this option is affected by the version of the Hologres instance. For dimension and sink tables, the connector automatically selects the lightweight connection mode when the Hologres version is higher than 3.0.28.

  • Lightweight connections for dimension tables do not support querying JSONB and RoaringBitmap types.

connection.max-idle-ms

The maximum duration for which the JDBC connection is idle.

Long

No

60000

If the idle time exceeds this option value, the connection is released. A new connection is automatically created the next time it is used. Unit: milliseconds.

connection.ssl.mode

Specifies whether to enable SSL-encrypted transmission and specifies the SSL-encrypted transmission mode to use.

String

No

disable

  • disable: SSL-encrypted transmission is disabled.

  • require: SSL is enabled to encrypt the data link.

  • verify-ca: SSL is enabled to encrypt the data link, and a CA certificate is used to verify the authenticity of the Hologres server.

  • verify-full: SSL is enabled to encrypt the data link; and the client uses a CA certificate to verify the authenticity of the Hologres server, and checks the consistency between the common name (CN) or Domain Name System (DNS) in the CA certificate and the configured Hologres endpoint.

Note
  • Hologres V2.1 or later supports the verify-ca and verify-full modes. For more information, see Transmission encryption.

  • If you set this option to verify-ca or verify-full, you must configure connection.ssl.root-cert.location.

connection.ssl.root-cert.location

The path of the certificate if a CA certificate is used.

String

No

No default value

If you set connection.ssl.mode to verify-ca or verify-full, this option becomes required. You can use the artifacts feature in the Realtime Compute for Apache Flink console to upload the certificate, which will be stored in the /flink/usrlib directory. For example, if the CA certificate file name is certificate.crt, set this option to '/flink/usrlib/certificate.crt'.

Note

For information about how to obtain a CA certificate, see Step 2: Download the CA certificate.

retry-count

The maximum number of retries allowed to write and query data if a connection failure occurs.

Integer

No

10

retry-sleep-step-ms

The incremental waiting time for each retry.

Long

No

5000

Unit: milliseconds. For example, when the default value is 5000 (or 5 seconds), the first retry waits for 5 seconds, the second retry waits for 10 seconds, and so on.

meta-cache-ttl-ms

The maximum expiration time for the TableSchema information stored in the cache.

Long

No

600000

Unit: milliseconds.

serverless-computing.enabled

Specifies whether to use serverless resources.

Boolean

No

false

When this option is set to true, Hologres uses serverless resources for data reads and writes. Currently, only batch reads/imports support this setting. For binlog consumption, dimension table lookup joins, and real-time writes, this setting does not take effect. For details, see Overview of Serverless Computing in the Hologres documentation.

Note
  • To enable batch reads, set source.binlog to false or source.binlog.startup-mode to INITIAL (only historical data read supports batch import).

  • To enable bulk imports, set sink.write-mode to COPY_BULK_LOAD or COPY_BULK_LOAD_ON_CONFLICT.

Note

To prevent bulk reads/writes from affecting other queries on this instance, enable this option. For details, see Overview of Serverless Computing in the Hologres documentation.

Source-specific

Option

Description

Data type

Required?

Default value

Remarks

source.binlog

Specifies whether to consume binary log data.

Boolean

No

true

Valid values:

  • true

  • false: Binlog is not consumed and the job is stopped after historical data read is complete.

source.binlog.read-mode

The binary logs reading mode.

ENUM

No

AUTO

  • AUTO: Automatically selects an optimal mode based on the instance version.

  • HOLOHUB: Uses HoloHub mode to consume binary logs.

  • JDBC: Uses JDBC mode to consume binary logs.

Note

The automatic selection logic of AUTO mode is as follows:

  • For Hologres instances of version 2.1.27 and later, the JDBC mode is selected and lightweight connections are enabled by default (which means the connection.fixed.enabled option is automatically set to true).

  • For Hologres instances of versions 2.1.0 to 2.1.26, the JDBC mode is selected.

  • For Hologres instances of version 2.0 and earlier, the HoloHub mode is selected.

source.binlog.change-log-mode

The changelog event types supported by the CDC source table.

ENUM

No

UPSERT

  • ALL: All changelog event types are supported, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER.

  • UPSERT: Only Upsert event times, including INSERT, DELETE, and UPDATE_AFTER, are supported.

  • ALL_AS_APPEND_ONLY: All changelog event types are treated as INSERT.

Note

If downstream operations include retractions, such as using ROW_NUMBER() with OVER for deduplication, you must set upsertSource to true. This allows the source table to read data from Hologres in an upsert manner.

source.binlog.startup-mode

The mode in which binary log data is consumed.

ENUM

No

INITIAL

  • INITIAL: Consumes the snapshot and then binary logs.

  • ERALIEST_OFFSET: Reads from the earliest binary log.

  • TIMESTAMP: Reads binary logs from the specified startTime.

Note

The startTime option has a higher priority. This means if you configure the startTime option or select a start time point at job startup, the binlogStartupMode option is forcibly set to timestamp.

source.binlog.batch-size

The number of binlog rows read in a batch at a time.

Integer

No

512

source.binlog.request-timeout-ms

The timeout period for reading binary logs.

Long

No

300000

Unit: milliseconds.

Note

Timeout is possibly caused by backpressure due to downstream operators processing data too slowly.

source.binlog.project-columns.enabled

Specifies whether to read only user-specified columns when reading binlog data.

Boolean

No

No default value

When this option is enabled, the connector only reads columns declared in your CREATE TEMPORARY TABLE statement. This significantly improves read performance and saves bandwidth, especially for source tables with many columns where only a subset is required.

Note

This setting requires VVR 11.3+ and Hologres V3.2+. It is automatically enabled when these version requirements are met.

source.binlog.compression.enabled

Specifies whether to enable data compression during data transmission.

Boolean

No

No default value

With this option enabled, the server returns LZ4-compressed byte streams, enhancing read performance and saving bandwidth.

Note

This setting requires VVR 11.3+ and Hologres V3.2+. It is automatically enabled when these version requirements are met.

source.binlog.partition-binlog-mode

The mode in which binary logs of a partitioned table are consumed.

Enum

No

DISABLE

  • DISABLE: The source table is not a partitioned table. If the specified Hologres table is a partitioned table, an exception is reported.

  • DYNAMIC: The latest partition of the partitioned table is continuously consumed. The dynamic partitioning feature must be enabled for the partitioned table. In dynamic mode, partitions are consumed chronologically, from oldest to latest. When data in a partition previous to the latest is being consumed, the connector starts to consume data in the latest partition when the new unit time arrives.

  • STATIC: Fixed partitions of the partitioned table are consumed. Multiple partitions can be consumed at the same time. Partitions cannot be added or removed during the consumption process. By default, all partitions of the partitioned table are consumed.

source.binlog.partition-binlog-lateness-timeout-minutes

The maximum latency allowed before a timeout is triggered when data in a partitioned table is dynamically consumed.

Boolean

No

60

  • Unit: minutes.

  • In DYNAMIC mode, the Hologres connector starts consuming data from the latest partition when a new unit time arrives. However, it does not immediately stops consuming the previous partition. Instead, it continuously monitors the previous partition to ensure all late data is consumed.

    For example, in a day-partitioned table with a 60-minute maximum latency, data consumption for the 20240920 partition will extend until 01:00:00 on September 21, 2024, instead of concluding at 00:00:00.

  • The option's value cannot exceed the partitioning unit time.

    For a table partitioned by day, the maximum value of this option is 24 × 60 = 1440 minutes. In dynamic mode, typically, the connector consumes one partition at a time. However, it can consume two partitions in parallel during the defined max latency period.

source.binlog.partition-values-to-read

The partitions to be consumed when data in a partitioned table is consumed in static mode.

String

No

No default value

  • If you do not configure this option, the Hologres connector consumes all partitions of the specified table in static mode. If you configure this option, the connector consumes only the specified partitions.

  • Provide partition values for this option rather than complete partition names. Separate multiple partition values with commas (,). Regular expressions are not supported.

startTime

The start offset time.

String

No

No default value

The format is yyyy-MM-dd hh:mm:ss. If this option is not configured and jobs are not resumed from a state, Realtime Compute for Apache Flink starts to consume Hologres data from the earliest binary log.

source.scan.fetch-size

The maximum size of records that can be buffered before they are processed in a single batch.

Integer

No

512

source.scan.timeout-seconds

The timeout period for batch reading.

Integer

No

60

Unit: seconds.

source.scan.filter-push-down.enabled

Controls whether filter conditions are pushed down to Hologres during the batch reading of historical data.

Boolean

No

false

  • false: No filter pushdown will be performed.

  • true: Supported filter conditions will be pushed down to Hologres during the batch historical data read.

Note
  • Do not set this option and source.binlog.filter-push-down.enabled simultaneously.

  • This option takes effect under any of the following conditions:

    • When source.binlog is set to false (which enables batch read).

    • When source.binlog is set to true and source.binlog.startup-mode is set to INITIAL (effective during historical data reading).

source.binlog.filter-push-down.enabled

Controls whether filter conditions are pushed down to Hologres during binlog consumption.

Boolean

No

false

  • false: No filter pushdown will be performed.

  • true: Supported filter conditions will be pushed down to Hologres during the batch historical data read.

Note
  • Setting this option requires VVR 11.3+ and Hologres V4.0+. Do not set this option and source.binlog.filter-push-down.enabled simultaneously.

  • Filter pushdown is enabled when source.binlog is set to true. For example, source.binlog.startup-mode is INITIAL, filter pushdown will occur during both historical and incremental data reading.

Sink-specific

Option

Description

Data type

Required?

Default value

Remarks

sink.write-mode

The write mode.

ENUM

No

INSERT

  • INSERT: Uses JDBC to write data in an insert manner.

  • COPY_STREAM: Uses FIXED COPY, a streaming import method, for data writing.

    This mode is highly performant in writing streaming data, and is useful for high-throughput, low-latency scenarios. It does not support deleting data, writing to a partitioned table, or the ignoreNullWhenUpdate option.

  • COPY_BULK_LOAD: Uses COPY for bulk data load. Compared to COPY_STREAM, this mode uses fewer Hologres resources.

  • COPY_BULK_LOAD_ON_CONFLICT: Uses COPY for bulk data load. Useful for handling primary key conflicts.

Note
  • COPY_BULK_LOAD_ON_CONFLICT requires VVR 11.3+ and Hologres V3.1+. Mechanism: Flink reshuffles data according to the sink table's distribution key, ensuring data for the same shard is handled by a single Flink task. This reduces bulk import locking from table-level to shard-level, facilitating concurrent writes to different shards and improving overall throughput. If you use COPY_BULK_LOAD_ON_CONFLICT, align job parallelism with the sink table's shard count for optimal performance.

  • COPY_BULK_LOAD and COPY_BULK_LOAD_ON_CONFLICT: Data is accessible post-checkpointing. These modes are best for bulk historical data imports or when immediate data availability is not critical.

sink.on-conflict-action

The policy to handle primary key conflicts.

ENUM

No

INSERT_OR_UPDATE

  • INSERT_OR_IGNORE: Ignores subsequent duplicate records.

  • INSERT_OR_REPLACE: Replaces existing rows with duplicate records.

  • INSERT_OR_UPDATE: Updates specified fields for duplicate records, preserving values in other fields.

    Consider a table with four fields: a (primary key), b, c, and d. Only fields a and b are defined in the sink table. When an incoming record's primary key value is duplicate, only field b is updated while fields c and d remain unchanged.

sink.create-missing-partition

Specifies whether to automatically create a non-existent partition based on partition key values.

Boolean

No

false

  • When you use a field of the DATE type as the partition key and have dynamic partitioning enabled, the name format of the automatically created partition table is consistent with that of dynamic partitions by default.

  • Make sure that partition values do not contain dirty data. If dirty data exists, a failover occurs because an invalid partition table is created. Proceed with caution when you use this option.

  • When sink.write-mode is not set to INSERT, writing data to the parent table is not supported.

sink.delete-strategy

Specifies the strategy to process retraction messages.

String

No

CHANGELOG_STANDARD

Valid values:

  • IGNORE_DELETE: Ignores UPDATE_BEFORE and DELETE messages. This is ideal for scenarios focused on inserting and updating data and requiring no deletions.

  • DELETE_ROW_ON_PK: Ignores UPDATE_BEFORE messages and processes DELETE messages by deleting entire rows based on their primary key values. This strategy enables you to delete entire rows during partial update.

  • CHANGELOG_STANDARD: Adheres to the Flink SQL changelog standard: Does not ignore DELETE messages and treats an UPDATE message as a DELETE followed by an INSERT, ensuring data accuracy. This strategy applies to scenarios without partial update.

Note

Setting this option to NON_PK_FIELD_TO_NULL may result in records with non-primary key values set to null.

sink.ignore-null-when-update.enabled

Specifies whether to ignore null values during update when sink.on-conflict-action='INSERT_OR_UPDATE' is specified.

Boolean

No

false

  • false: Null values are written to the Hologres sink table.

  • true: Null values in the data that is written are ignored.

Note

This option is supported only when sink.write-mode is set to INSERT.

sink.ignore-null-when-update-by-expr.enabled

When sink.on-conflict-action='INSERT_OR_UPDATE', this option specifies whether to use an expression to ignore null values in the update stream.

Boolean

No

false

This offers better performance than sink.ignore-null-when-update.enabled. Valid values:

  • false:

    • If sink.ignore-null-when-update.enabled is enabled, ignores null values in the update stream.

    • If sink.ignore-null-when-update.enabled is disabled, writes null values to Hologres.

  • true: Always ignores null values in the update stream.

Note
  • This option is supported only when sink.write-mode is set to INSERT.

  • It requires Hologres V4.0+.

sink.default-for-not-null-column.enabled

Specifies whether to allow the Hologres connector to fill a default value if a null value is written to a non-null column for which the default value is not configured in the Hologres table.

Boolean

No

true

  • true: The connector fills in a default value based on the following rules:

    • If the column is of the STRING type, the column is left empty ("").

    • If the column is of the NUMBER type, the null value is converted into 0.

    • If the column is of the DATE, TIMESTAMP, or TIMESTAMPTZ type, the null value is converted into 1970-01-01 00:00:00.

  • false: The connector does not fill in a default value. If a null value is written to a non-null column, an exception is reported.

Note

This option is supported only when sink.write-mode is set to INSERT and sink.on-conflict-action is not set to INSERT_OR_UPDATE.

sink.remove-u0000-in-text.enabled

Specifies whether to allow the Hologres connector to remove the invalid characters \u0000 from STRING data written to the sink table.

Boolean

No

true

  • false: The connector does not remove the invalid characters, but may report an error, such as ERROR: invalid byte sequence for encoding "UTF8": 0x00, when identifying dirty data.

    To resolve this error, manually remove the dirty data from the source table or define how to process dirty data in the job code.

  • true: The connector removes the invalid characters \u0000.

sink.partial-insert.enabled

Specifies whether to insert only the fields declared in the INSERT statement.

Boolean

No

false

  • false: All fields defined in the sink table's DDL statement are updated. Values of fields not declared in the INSERT statement are updated to null.

  • true: Fields defined in the INSERT statement are pushed down to the connector. This way, only the declared fields are updated or inserted.

Note
  • This option takes effect only when the sink.on-conflict-action option is set to INSERT_OR_UPDATE.

sink.deduplication.enabled

Specifies whether to perform deduplication during data buffering.

Boolean

No

true

  • true: Retains the latest record among those with identical primary key values. Assume the first column serves as the primary key:

    • If INSERT (1,'a') and INSERT (1,'b') arrive in sequence, only (1,'b') is retained and written to Hologres.

    • If the record (1,'a') already exists in the Hologres sink table and DELETE (1,'a') and INSERT (1,'b') arrive in sequence, only (1,'b') is retained and written to Hologres. This is equivalent to a direct update operation, instead of a deletion followed by an insertion.

  • false: Deduplication is not performed during data buffering. If a new record arrives with a primary key that duplicates one already in the current batch, the older record is written before the new one.

Note
  • This option is supported only when sink.write-mode is set to INSERT.

  • In extreme cases where all data has the same primary key, if no deduplication is performed, record is written one by one, rather than in a batch. This affects the job performance.

sink.aggressive-flush.enabled

Specifies whether to enable the aggressive commit mode.

Boolean

No

false

If you set this option to true, data will be forced to commit during connection idle periods, even if the configured condition is not met. This reduces data write latency when traffic is low.

Note

This option is supported only when sink.write-mode is set to INSERT or COPY_STREAM.

sink.insert.check-and-put.column

Specifies whether to enable conditional update and configure the names of the fields that you want to check.

String

No

No default value

You must set this option to a field name in the Hologres table.

Important
  • This option is supported only when the sink.write-mode option is set to INSERT.

  • The sink table must have a primary key, and the sink.on-conflict-action option must be set to INSERT_OR_UPDATE or INSERT_OR_REPLACE.

  • We recommend that you create a row-based sink table or a hybrid row-column sink table when reverse lookup is required.

  • In data with many duplicates, check-and-put operations will degenerate into single writes, which will reduce the write performance.

sink.insert.check-and-put.operator

The comparison operator for the conditional update operation.

String

No

GREATER

This option allows you to compare the check field in the new data record with the check field in the old data record in the table. If the comparison result meets the value of this option, you can perform the conditional update operation.

Valid values: GREATER, GREATER_OR_EQUAL, EQUAL, NOT_EQUAL, LESS, LESS_OR_EQUAL, IS_NULL, and IS_NOT_NULL.

sink.insert.check-and-put.null-as

When you perform the conditional update operation, if the old data record is null, the null value is regarded as the valid value of this option.

String

No

No default value

In PostgreSQL, the result of comparing any value with NULL is FALSE. Therefore, when the original data in the table is NULL, you must set NULL-AS as a parameter when you perform the conditional update operation. The NULL-AS parameter equals the COALESCE function in Flink SQL.

sink.insert.batch-size

In INSERT write mode, the maximum number of records buffered by the sink operator before they are processed all at once.

Integer

No

512

When any of the conditions specified by the sink.insert.batch-size, sink.insert.batch-byte-size, and sink.insert.flush-interval-ms options is met, buffered data is processed all at once.

sink.insert.batch-byte-size

In INSERT write mode, the maximum number of bytes of data buffered by the sink operator before it is processed all at once.

Long

No

2097152 (2 × 1024 × 1024 bytes = 2 MB)

sink.insert.flush-interval-ms

In INSERT write mode, the maximum data batching time of the sink operator.

Long

No

10000

sink.copy.format

Data format for COPY (COPY_STREAM, COPY_BULK_LOAD, and COPY_BULK_LOAD_ON_CONFLICT) write modes.

String

No

  • COPY_STREA: binary

  • COPY_BULK_LOAD or COPY_BULK_LOAD_ON_CONFLICT: text

The COPY_STREAM write mode supports:

  • binary

  • text

  • binaryrow (Hologres V4.1.0+)

The COPY_BULK_LOAD or COPY_BULK_LOAD_ON_CONFLICT write mode supports text.

Note

This option is supported only when sink.write-mode is set to COPY_STREAM, COPY_BULK_LOAD, or COPY_BULK_LOAD_ON_CONFLICT.

sink.insert.conflict-update-set

The Hologres expression for updates on primary key conflicts.

String

No

No default value

This option is equivalent to insert into table values(xxx) on conflict (PK) do update set <conflict-update-set> Hologres SQL statement. You can enter a Hologres expression or function.

For example, if this option is set to col1=old.col1+excluded.col1,col2=excluded.col2, it means that on a primary key conflict, the value of col1 is updated to the sum of the old and new values, and col2 is updated to the new value.

  • If this option is not specified, all columns are updated to their new values by default.

  • If the update expression is stateful, for example, col=old.col+excluded.col where the result depends on the old value, ensure that there is a column that can serve as a row version number and set sink.insert.conflict-where to excluded.seq>old.seq to ensure data correctness after job failover.

Note

This option applies when sink.write-mode is set to INSERT.

sink.insert.conflict-where

The condition to trigger an update on a primary key conflict.

String

No

No default value

This is equivalent to insert into tbl values(xxx) on conflict(pk) do update set <conflict-update-set> where <conflict-where>. It accepts a Hologres expression or function.

For example, if this option is set to excluded.col1>old.col1, it means that on a primary key conflict, the update is triggered only if the new value of col1 is greater than the old value.

Note
  • This option applies when sink.write-mode is set to INSERT.

  • Do not configure this option and any sink.insert.check-and-put* option at the same time to avoid errors.

Dimension table-specific

Option

Description

Data type

Required?

Default value

Remarks

lookup.read.batch-size

The maximum number of records that can be buffered before they are processed in a single batch during the point lookup on a Hologres dimension table.

Integer

No

256

lookup.read.timeout-ms

The timeout period for performing a point query on a Hologres dimension table.

Long

No

0

The default value 0 means timeout isn't allowed.

lookup.read.column-table.enabled

Specifies whether to use a column-oriented table as a dimension table.

Boolean

No

false

Column-oriented tables perform poorly as dimension tables. If a column-oriented table is used as a dimension table, a warning will be logged.

lookup.insert-if-not-exists

Specifies whether to insert data that does not exist.

Boolean

No

false

During a point query, if the current record is not found in the dimension table, the current record will be inserted.

cache

The cache policy.

String

No

None

Valid values:

  • None

  • LRU

cacheSize

The maximum number of rows of data records that can be cached.

Integer

No

10000

This option can be configured when cache is set to LRU. Unit: rows.

cacheTTLMs

The interval at which the system refreshes the cache.

Long

No

See the Remarks column.

Unit: milliseconds. The default value of cacheTTLMs depends on the cache settings:

  • If cache is set to LRU, cacheTTLMs specifies the cache timeout period. By default, cache entries do not expire.

  • If cache is set to None, you can skip configuring cacheTTLMs and the cache does not expire.

cacheEmpty

Specifies whether to cache the JOIN queries whose return results are empty.

Boolean

No

true

  • true: The JOIN queries whose return results are empty are cached.

  • false: The JOIN queries whose return results are empty are not cached.

    If the condition before AND is met but the condition after AND is not met in a JOIN statement, the JOIN queries whose return results are empty are also cached. The following sample code provides an example:

    LEFT JOIN latest_emergency FOR SYSTEM_TIME AS OF PROCTIME() AS t2
     ON t1.alarm_id = t2.alarm_id -- If dynamic alerting is configured, add the alert id during matching.
     AND CASE
     WHEN alarm_type = 2 THEN t1.dynamic_id = t2.dynamic_alarm_id
     ELSE true
     END
Important

For dynamic association with new dimension table records during job execution, disable this option or set cacheTTLMs to a small value. This prevents null record caching and lookup join failures.

async

Specifies whether to return data asynchronously.

Boolean

No

false

  • true

  • false

Note

Data is out-of-order when returned asynchronously.

lookup.filter-push-down.enabled

Specifies whether to push dimension table filters down to Hologres.

Boolean

No

false

Note

This option requires VVR 11.4+.