This topic describes Hologres connector options for Ververica Runtime (VVR) 11 or later.
Removed and renamed connector options
In VVR 11 or later versions, to optimize the system architecture and improve maintenance efficiency, we have adjusted or removed some Hologres connector options that are available in VVR 8 or earlier versions. See the following tables for details.
Connector options in the WITH clause
General
Option | Description | Data type | Required? | Default value | Remarks |
connector | The type of the connector. | String | Yes | No default value | Set this option to |
dbname | The database name. | String | Yes | No default value | You can add a specific suffix to the value of the |
tablename | The table name. | String | Yes | No default value | If the schema is not public, set |
username |
| String | Yes | No default value |
Important To enhance security, use variables instead of hardcoding your AccessKey pair. |
password |
| String | Yes | No default value | |
endpoint | The endpoint of Hologres. | String | Yes | No default value | For more information, see Endpoints for connecting to Hologres. |
connection.pool.size | The size of the JDBC connection pool that is created by a single Flink table in a job. | Integer | No | 5 | If your job has poor performance, you can set this option to a higher value to increase data throughput. This option takes effect only for dimension and sink tables. |
connection.pool.name | The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool. | String | No |
| If multiple tables are configured with the same connection pool, the largest Specify this option as needed. Consider a job that involves the following tables: dimension tables Note
|
connection.fixed.enabled | Specifies whether to use the lightweight connection mode. | Boolean | No | No default value | Hologres has an upper limit on the number of connections. Starting from Hologres 2.1, real-time data writing supports the use of lightweight connections that are not limited by the maximum number of connections. Note
|
connection.max-idle-ms | The maximum duration for which the JDBC connection is idle. | Long | No | 60000 | If the idle time exceeds this option value, the connection is released. A new connection is automatically created the next time it is used. Unit: milliseconds. |
connection.ssl.mode | Specifies whether to enable SSL-encrypted transmission and specifies the SSL-encrypted transmission mode to use. | String | No |
|
Note
|
connection.ssl.root-cert.location | The path of the certificate if a CA certificate is used. | String | No | No default value | If you set Note For information about how to obtain a CA certificate, see Step 2: Download the CA certificate. |
retry-count | The maximum number of retries allowed to write and query data if a connection failure occurs. | Integer | No | 10 | |
retry-sleep-step-ms | The incremental waiting time for each retry. | Long | No | 5000 | Unit: milliseconds. For example, when the default value is 5000 (or 5 seconds), the first retry waits for 5 seconds, the second retry waits for 10 seconds, and so on. |
meta-cache-ttl-ms | The maximum expiration time for the TableSchema information stored in the cache. | Long | No | 600000 | Unit: milliseconds. |
serverless-computing.enabled | Specifies whether to use serverless resources. | Boolean | No | false | When this option is set to Note
Note To prevent bulk reads/writes from affecting other queries on this instance, enable this option. For details, see Overview of Serverless Computing in the Hologres documentation. |
Source-specific
Option | Description | Data type | Required? | Default value | Remarks |
source.binlog | Specifies whether to consume binary log data. | Boolean | No |
| Valid values:
|
source.binlog.read-mode | The binary logs reading mode. | ENUM | No |
|
Note The automatic selection logic of
|
source.binlog.change-log-mode | The changelog event types supported by the CDC source table. | ENUM | No |
|
Note If downstream operations include retractions, such as using ROW_NUMBER() with OVER for deduplication, you must set |
source.binlog.startup-mode | The mode in which binary log data is consumed. | ENUM | No |
|
Note The startTime option has a higher priority. This means if you configure the startTime option or select a start time point at job startup, the binlogStartupMode option is forcibly set to |
source.binlog.batch-size | The number of binlog rows read in a batch at a time. | Integer | No | 512 | |
source.binlog.request-timeout-ms | The timeout period for reading binary logs. | Long | No | 300000 | Unit: milliseconds. Note Timeout is possibly caused by backpressure due to downstream operators processing data too slowly. |
source.binlog.project-columns.enabled | Specifies whether to read only user-specified columns when reading binlog data. | Boolean | No | No default value | When this option is enabled, the connector only reads columns declared in your Note This setting requires VVR 11.3+ and Hologres V3.2+. It is automatically enabled when these version requirements are met. |
source.binlog.compression.enabled | Specifies whether to enable data compression during data transmission. | Boolean | No | No default value | With this option enabled, the server returns LZ4-compressed byte streams, enhancing read performance and saving bandwidth. Note This setting requires VVR 11.3+ and Hologres V3.2+. It is automatically enabled when these version requirements are met. |
source.binlog.partition-binlog-mode | The mode in which binary logs of a partitioned table are consumed. | Enum | No |
|
|
source.binlog.partition-binlog-lateness-timeout-minutes | The maximum latency allowed before a timeout is triggered when data in a partitioned table is dynamically consumed. | Boolean | No |
|
|
source.binlog.partition-values-to-read | The partitions to be consumed when data in a partitioned table is consumed in static mode. | String | No | No default value |
|
startTime | The start offset time. | String | No | No default value | The format is yyyy-MM-dd hh:mm:ss. If this option is not configured and jobs are not resumed from a state, Realtime Compute for Apache Flink starts to consume Hologres data from the earliest binary log. |
source.scan.fetch-size | The maximum size of records that can be buffered before they are processed in a single batch. | Integer | No |
| |
source.scan.timeout-seconds | The timeout period for batch reading. | Integer | No |
| Unit: seconds. |
source.scan.filter-push-down.enabled | Controls whether filter conditions are pushed down to Hologres during the batch reading of historical data. | Boolean | No |
|
Note
|
source.binlog.filter-push-down.enabled | Controls whether filter conditions are pushed down to Hologres during binlog consumption. | Boolean | No |
|
Note
|
Sink-specific
Option | Description | Data type | Required? | Default value | Remarks |
sink.write-mode | The write mode. | ENUM | No |
|
Note
|
sink.on-conflict-action | The policy to handle primary key conflicts. | ENUM | No |
|
|
sink.create-missing-partition | Specifies whether to automatically create a non-existent partition based on partition key values. | Boolean | No |
|
|
sink.delete-strategy | Specifies the strategy to process retraction messages. | String | No |
| Valid values:
Note Setting this option to |
sink.ignore-null-when-update.enabled | Specifies whether to ignore null values during update when | Boolean | No |
|
Note This option is supported only when |
sink.ignore-null-when-update-by-expr.enabled | When | Boolean | No |
| This offers better performance than
Note
|
sink.default-for-not-null-column.enabled | Specifies whether to allow the Hologres connector to fill a default value if a null value is written to a non-null column for which the default value is not configured in the Hologres table. | Boolean | No |
|
Note This option is supported only when |
sink.remove-u0000-in-text.enabled | Specifies whether to allow the Hologres connector to remove the invalid characters \u0000 from STRING data written to the sink table. | Boolean | No |
|
|
sink.partial-insert.enabled | Specifies whether to insert only the fields declared in the INSERT statement. | Boolean | No |
|
Note
|
sink.deduplication.enabled | Specifies whether to perform deduplication during data buffering. | Boolean | No |
|
Note
|
sink.aggressive-flush.enabled | Specifies whether to enable the aggressive commit mode. | Boolean | No |
| If you set this option to Note This option is supported only when |
sink.insert.check-and-put.column | Specifies whether to enable conditional update and configure the names of the fields that you want to check. | String | No | No default value | You must set this option to a field name in the Hologres table. Important
|
sink.insert.check-and-put.operator | The comparison operator for the conditional update operation. | String | No |
| This option allows you to compare the check field in the new data record with the check field in the old data record in the table. If the comparison result meets the value of this option, you can perform the conditional update operation. Valid values: GREATER, GREATER_OR_EQUAL, EQUAL, NOT_EQUAL, LESS, LESS_OR_EQUAL, IS_NULL, and IS_NOT_NULL. |
sink.insert.check-and-put.null-as | When you perform the conditional update operation, if the old data record is null, the null value is regarded as the valid value of this option. | String | No | No default value | In PostgreSQL, the result of comparing any value with NULL is FALSE. Therefore, when the original data in the table is NULL, you must set NULL-AS as a parameter when you perform the conditional update operation. The NULL-AS parameter equals the COALESCE function in Flink SQL. |
sink.insert.batch-size | In INSERT write mode, the maximum number of records buffered by the sink operator before they are processed all at once. | Integer | No |
| When any of the conditions specified by the |
sink.insert.batch-byte-size | In INSERT write mode, the maximum number of bytes of data buffered by the sink operator before it is processed all at once. | Long | No |
| |
sink.insert.flush-interval-ms | In INSERT write mode, the maximum data batching time of the sink operator. | Long | No |
| |
sink.copy.format | Data format for COPY ( | String | No |
| The
The Note This option is supported only when |
sink.insert.conflict-update-set | The Hologres expression for updates on primary key conflicts. | String | No | No default value | This option is equivalent to For example, if this option is set to
Note This option applies when |
sink.insert.conflict-where | The condition to trigger an update on a primary key conflict. | String | No | No default value | This is equivalent to For example, if this option is set to Note
|
Dimension table-specific
Option | Description | Data type | Required? | Default value | Remarks |
lookup.read.batch-size | The maximum number of records that can be buffered before they are processed in a single batch during the point lookup on a Hologres dimension table. | Integer | No |
| |
lookup.read.timeout-ms | The timeout period for performing a point query on a Hologres dimension table. | Long | No |
| The default value |
lookup.read.column-table.enabled | Specifies whether to use a column-oriented table as a dimension table. | Boolean | No |
| Column-oriented tables perform poorly as dimension tables. If a column-oriented table is used as a dimension table, a warning will be logged. |
lookup.insert-if-not-exists | Specifies whether to insert data that does not exist. | Boolean | No |
| During a point query, if the current record is not found in the dimension table, the current record will be inserted. |
cache | The cache policy. | String | No |
| Valid values:
|
cacheSize | The maximum number of rows of data records that can be cached. | Integer | No |
| This option can be configured when |
cacheTTLMs | The interval at which the system refreshes the cache. | Long | No | See the Remarks column. | Unit: milliseconds. The default value of cacheTTLMs depends on the
|
cacheEmpty | Specifies whether to cache the JOIN queries whose return results are empty. | Boolean | No |
|
Important For dynamic association with new dimension table records during job execution, disable this option or set |
async | Specifies whether to return data asynchronously. | Boolean | No |
|
Note Data is out-of-order when returned asynchronously. |
lookup.filter-push-down.enabled | Specifies whether to push dimension table filters down to Hologres. | Boolean | No |
| Note This option requires VVR 11.4+. |