This topic describes the WITH parameters for the Hologres connector in Ververica Runtime (VVR) 11 and later.
Version parameter removal
To optimize the system architecture and improve maintenance efficiency, some legacy parameters in Ververica Runtime (VVR) versions 8 and earlier have been adjusted or removed. The following lists detail the removed legacy parameters and their corresponding replacements.
WITH parameters
General
Parameter | Description | Data type | Required | Default value | Remarks |
connector | Table type. | String | Yes | None | Set this parameter to |
dbname | The database name. | String | Yes | None | You can connect to a specific compute group by adding a suffix to the `dbname` parameter. For example, to connect a dimension table to a specific compute group named `read_warehouse`, specify the connection as |
tablename | The table name. | String | Yes | None | If the schema is not Public, specify the table name in the |
username |
| String | Yes | None |
Important To prevent your AccessKey information from being leaked, use variables to specify the AccessKey values. For more information, see Project variables. |
password |
| String | Yes | None | |
endpoint | The endpoint of the Hologres service. | String | Yes | None | For more information, see Endpoints. |
connection.pool.size | The size of the JDBC connection pool created for a single Flink table in a task. | Integer | No | 5 | If the job performance is poor, increase the pool size. The connection pool size is proportional to data throughput. This parameter is effective only for dimension tables and sink tables. |
connection.pool.name | The name of the connection pool. Tables with the same connection pool name in the same TaskManager can share the connection pool. | String | No |
| The default value is You can configure this parameter as needed. For example, if a job has two dimension tables A and B and three sink tables C, D, and E, you can use pool1 for tables A and B, pool2 for tables C and D, and a separate pool3 for table E if it handles high traffic. Note
|
connection.fixed.enabled | Specifies whether to use the lightweight connection mode. | Boolean | No | None | Hologres has an upper limit on the number of connections. Starting from Hologres 2.1, real-time data writing supports the use of lightweight connections that are not limited by the maximum number of connections. Note
|
connection.max-idle-ms | The idle timeout for a JDBC connection. | Long | No | 60000 | If the idle time exceeds this value, the connection is released. A new connection is automatically created when it is next used. The unit is milliseconds. |
connection.ssl.mode | Specifies whether to enable SSL encryption for data in transit and which mode to use. | String | No | disable |
Note
|
connection.ssl.root-cert.location | The path to the certificate when the encryption mode requires a certificate. | String | No | None | If you set connection.ssl.mode to verify-ca or verify-full, you must also specify the path to the CA certificate. You can use the File Management feature in the Realtime Compute console to upload the certificate, which is then stored in the /flink/usrlib directory. For example, if the CA certificate file is named certificate.crt, set this parameter to Note For information about how to obtain a CA certificate, see Download a CA certificate. |
retry-count | The number of retries for writes and queries when a connection fails. | Integer | No | 10 | None. |
retry-sleep-step-ms | The incremental wait time for each retry. | Long | No | 5000 | The unit is milliseconds. For example, if the value is 5000 (5 seconds), the first retry waits for 5 seconds, the second for 10 seconds, and so on. |
meta-cache-ttl-ms | The TTL for the cached TableSchema information. | Long | No | 600000 | The unit is milliseconds. |
serverless-computing.enabled | Specifies whether to use serverless resources. | Boolean | No | false | If set to true, Hologres serverless resources are used for reads and writes instead of the resources of your Hologres instance. This parameter is supported only for batch reads and batch imports. It is not effective for binary logging consumption, dimension table point queries, or real-time writes. For more information, see Serverless Computing overview. Note
Note To perform large-scale full data imports or exports and want to avoid affecting other queries on your Hologres instance, enable this parameter. For more information, see Serverless Computing overview. |
Specific to source tables
Parameter | Description | Data type | Required | Default value | Remarks |
source.binlog | Specifies whether to consume binary logging data. | Boolean | No | true |
|
source.binlog.read-mode | The read mode. | ENUM | No | AUTO |
Note The automatic selection logic for AUTO mode is as follows:
|
source.binlog.change-log-mode | The changelog types supported by the CDC source table. | ENUM | No | UPSERT |
Note If the downstream pipeline includes retraction operators, such as using ROW_NUMBER OVER WINDOW to remove duplicates, you must set upsertSource to true. The source table then reads data from Hologres in an upsert manner. |
source.binlog.startup-mode | The consumption mode for binary logging data. | ENUM | No | INITIAL |
Note If you set the startTime parameter or select a start time in the startup interface, the binlogStartupMode is forced to timestamp mode. Other consumption modes do not take effect. The startTime parameter has a higher priority. |
source.binlog.batch-size | The number of rows read from binary logs in each batch. | Integer | No | 512 | None. |
source.binlog.request-timeout-ms | The timeout period for reading binary logging data. | Long | No | 300000 | The unit is milliseconds. Note A timeout may be caused by backpressure if downstream operators process source table data too slowly. |
source.binlog.project-columns.enabled | Specifies whether to read only the fields specified in the user table when reading binary logging data. | Boolean | No | None | The specified fields are those declared in the Note This parameter is supported only in VVR 11.3 and later and Hologres V3.2 and later. You do not usually need to configure this parameter. The connector enables it by default if the version requirements are met. |
source.binlog.compression.enabled | Specifies whether to enable data compression during transit when reading binary logging data. | Boolean | No | None | When consuming binary logs, the server returns a byte stream compressed with the LZ4 algorithm. This can improve read performance and save bandwidth. Note This parameter is supported only in VVR 11.3 and later and Hologres V3.2 and later. You do not usually need to configure this parameter. The connector enables it by default if the version requirements are met. |
source.binlog.partition-binlog-mode | The consumption mode for binary logging of partitioned tables. | Enum | No | DISABLE |
|
source.binlog.partition-binlog-lateness-timeout-minutes | The maximum lateness timeout when consuming a partitioned table in DYNAMIC mode. | Boolean | No | 60 |
For example, if a table is partitioned by day and the partition is 20240920, and the maximum data lateness is 1 hour, consumption for this partition closes at 2024-09-21 01:00:00, not at 2024-09-21 00:00:00.
If the table is partitioned by day, the maximum value is 24 × 60 = 1440 (minutes). In DYNAMIC mode, only one table is consumed most of the time. During the lateness period, two partitions may be consumed at the same time. |
source.binlog.partition-values-to-read | In STATIC mode, specifies the partitions to consume. Separate partition values with commas (,). | String | No | None |
|
startTime | The start offset time. | String | No | None | The format is yyyy-MM-dd hh:mm:ss. If this parameter is not set and the job does not recover from a state, consumption of Hologres data starts from the earliest binary log. |
source.scan.fetch-size | The batch size for batch reading. | Integer | No | 512 | None. |
source.scan.timeout-seconds | The timeout period for batch reading. | Integer | No | 60 | The unit is seconds. |
source.scan.filter-push-down.enabled | Specifies whether to push down filters during batch reading. | Boolean | No | false |
Note
|
source.binlog.filter-push-down.enabled | Specifies whether to push down filters during binary log consumption. | Boolean | No | false |
Note
|
scan.prefer.physical-column.over.metadata-column | Specifies whether to prioritize reading data from a physical column when it has the same name as a metadata column. | Boolean | No | false | This parameter is supported only in VVR 11.5 and later. Earlier versions always prioritize reading from the metadata column. |
Sink table-specific parameters
Parameter | Description | Data type | Required | Default value | Remarks |
sink.write-mode | The write mode. | ENUM | No | INSERT |
Note
|
sink.on-conflict-action | The policy for handling primary key conflicts. | ENUM | No | INSERT_OR_UPDATE |
|
sink.create-missing-partition | Specifies whether to automatically create a partition based on the partition value if the partition does not exist when writing to a partitioned table. | Boolean | No | false |
|
sink.delete-strategy | The policy for handling retraction messages. | String | No | CHANGELOG_STANDARD |
Note Enabling the NON_PK_FIELD_TO_NULL option may result in records that contain only the primary key, with all other columns being null. |
sink.ignore-null-when-update.enabled | When sink.on-conflict-action='INSERT_OR_UPDATE', specifies whether to ignore null values in the data being written for an update. | Boolean | No | false |
Note This parameter is supported only when |
sink.ignore-null-when-update-by-expr.enabled | When sink.on-conflict-action='INSERT_OR_UPDATE', specifies whether to use an expression to ignore null values in the data being written for an update. | Boolean | No | false | Provides better performance than sink.ignore-null-when-update.enabled.
Note
|
sink.default-for-not-null-column.enabled | If a null value is written to a NOT NULL column that has no default value in a Hologres table, specifies whether to allow the connector to fill in a default value. | Boolean | No | true |
Note This parameter is supported only when |
sink.remove-u0000-in-text.enabled | If a string type contains the invalid character \u0000 during a write, specifies whether to allow the connector to remove it. | Boolean | No | true |
|
sink.partial-insert.enabled | Specifies whether to insert only the fields defined in the INSERT statement. | Boolean | No | false |
Note
|
sink.deduplication.enabled | Specifies whether to remove duplicates during batch writing. | Boolean | No | true |
Note
|
sink.aggressive-flush.enabled | Specifies whether to enable aggressive commit mode. | Boolean | No | false | If set to true, the connection is forced to commit when idle, even if the batch has not reached the expected size. This can effectively reduce data write latency when traffic is low. Note This parameter is supported only when |
sink.insert.check-and-put.column | Enables conditional updates and specifies the field name to check. | String | No | None | The parameter value must be set to an existing field name in the Hologres table. Important
|
sink.insert.check-and-put.operator | The comparison operator for the conditional update operation. | String | No | GREATER | Compares the check field of the new record with the old value in the table. The update is performed if the condition of the comparison operator is met. Supported values are GREATER, GREATER_OR_EQUAL, EQUAL, NOT_EQUAL, LESS, LESS_OR_EQUAL, IS_NULL, and IS_NOT_NULL. |
sink.insert.check-and-put.null-as | During a conditional update, if the old data is null, the null value is treated as the effective value configured by this parameter. | String | No | None | In PostgreSQL, the result of any comparison with NULL is FALSE. Therefore, when the original data in the table is NULL, you must set a NULL-AS parameter for the update operation. This is equivalent to the COALESCE function in SQL. |
sink.insert.batch-size | In INSERT mode, the maximum number of records to buffer in the Hologres sink before writing. | Integer | No | 512 | The |
sink.insert.batch-byte-size | In INSERT mode, the maximum size in bytes of records to buffer in the Hologres sink before writing. | Long | No | 2 × 1024 × 1024 bytes, which is 2 MB | |
sink.insert.flush-interval-ms | In INSERT mode, the maximum wait time before buffered data is written from the Hologres sink to Hologres. | Long | No | 10000 | |
sink.copy.format | The transmission format used in COPY mode. | String | No |
|
Note This parameter is supported only when |
sink.insert.conflict-update-set | The Hologres expression for updates on primary key conflicts. | String | No | None | This is equivalent to the `insert into tbl values(xxx) on conflict(pk) do update set <conflict-update-set>` statement. You can specify a Hologres expression or function. For example, if this parameter is set to col1=old.col1+excluded.col1,col2=excluded.col2, it means that on a primary key conflict, the value of col1 is updated to the sum of the old and new values, and col2 is updated to the new value.
Note This parameter is supported only when |
sink.insert.conflict-where | The Hologres filter condition that triggers an update on a primary key conflict. | String | No | None | This is equivalent to `insert into tbl values(xxx) on conflict(pk) do update set <conflict-update-set> where <conflict-where>`. You can specify a Hologres expression or function. For example, if this parameter is set to excluded.col1>old.col1, it means that on a primary key conflict, the update is triggered only if the new value of col1 is greater than the old value. Note
|
Dimension table-specific parameters
Parameter | Description | Data type | Required | Default value | Remarks |
lookup.read.batch-size | The maximum number of records to buffer for batch processing during a point query on a Hologres dimension table. | Integer | No | 256 | None. |
lookup.read.timeout-ms | The timeout period for a dimension table point query. | Long | No | The default value is 0, which means no timeout. | None. |
lookup.read.column-table.enabled | Specifies whether to use a column-oriented table as a dimension table. | Boolean | No | false | Using a column-oriented table as a dimension table results in poor performance. Use a row-oriented table or a hybrid row-column table instead. If this parameter is enabled and a column-oriented table is used, a warning is logged. |
lookup.insert-if-not-exists | Specifies whether to insert data that does not exist. | Boolean | No | false | If a point query finds that the current data does not exist in the dimension table, the current data is inserted. |
cache | The cache policy. | String | No | None | Hologres supports only the None and LRU cache policies. |
cacheSize | The cache size. | Integer | No | 10000 | After you select the LRU cache policy, you can set the cache size. The unit is rows. |
cacheTTLMs | The cache refresh interval. | Long | No | See Remarks. | The unit is milliseconds. The default value of cacheTTLMs depends on the cache configuration:
|
cacheEmpty | Specifies whether to cache data for which the join result is empty. | Boolean | No | true |
Important Decide whether to enable this switch based on your business scenario. If you want to join with newly inserted records in the dimension table during job runtime, disable this option or set |
async | Specifies whether to return data asynchronously. | Boolean | No | false |
Note Asynchronously returned data is unordered. |
lookup.filter-push-down.enabled | Specifies whether to push down dimension table filter conditions to the Hologres server. | Boolean | No | false | Currently, the pushdown operation is executed only for comparison operations between columns and constants that use equality and comparison operators (such as <, <=, >, >=). Note This parameter can be configured only in VVR 11.4 and later. |