All Products
Search
Document Center

Realtime Compute for Apache Flink:Hologres connector

Last Updated:Sep 27, 2024

This topic describes how to use the Hologres connector.

Background information

Hologres is an end-to-end real-time data warehouse service that allows you to write, update, and analyze large amounts of data in real time. Hologres is compatible with the PostgreSQL protocol and supports standard SQL syntax. Hologres supports online analytical processing (OLAP) and ad hoc queries for petabytes of data, and provides high-concurrency, low-latency online data services. Hologres is seamlessly integrated with MaxCompute, Realtime Compute for Apache Flink, and DataWorks, and provides full-stack online and offline data warehouse solutions. The following table describes the capabilities supported by the Hologres connector.

Item

Description

Table type

Source table, dimension table, and result table

Running mode

Streaming mode and batch mode

Data format

N/A

Metric

  • Metrics for source tables

    • numRecordsIn

    • numRecordsInPerSecond

  • Metrics for result tables

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    Note

    For more information about the metrics, see Metrics.

API type

DataStream API and SQL API

Data update or deletion in a result table

Data update or deletion in a result table is supported.

Features

Features supported for a source table

Feature

Description

Real-time consumption of Hologres data

  • Reads data from a Hologres source table in which the binary logging feature is disabled.

  • Consumes the binary log data of Hologres in real time.

    • Consumes data in non-Change Data Capture (CDC) mode.

    • Consumes data in CDC mode.

    • Consumes full and incremental data by using a source table.

For more information, see Use Realtime Compute for Apache Flink or Blink to consume Hologres binary log data in real time.

Features supported for a result table

Feature

Description

Streaming semantics

Writes changelogs to a Hologres result table.

Data merging into a wide table

Updates only data in specific fields instead of data in an entire row.

Data synchronization based on the CREATE TABLE AS and CREATE DATABASE AS statements

Synchronizes data from an entire database or a single table to Hologres tables in real time and synchronizes schema changes of each table to Hologres tables in real time.

Data update in specific columns

Note

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.7 or later supports this feature.

Pushes down the column names that are specified in the INSERT statement of Realtime Compute for Apache Flink to the Hologres connector to update only the specified columns.

Prerequisites

A Hologres table is created. For more information, see Manage internal tables.

Limits

  • Limits on all types of Hologres tables

    • Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the Hologres connector.

    • The Hologres connector cannot be used to access Hologres foreign tables. For more information about Hologres foreign tables, see Create a foreign table mapped to MaxCompute.

    • For more information about the known defects and release notes of the Hologres connector, see the "Hologres connector release note" section of the Overview topic.

  • Limits only on Hologres source tables

    • By default, Realtime Compute for Apache Flink performs a full table scan only once to read all data from a Hologres source table at a time. Data consumption is complete when the data scan ends. Realtime Compute for Apache Flink does not read the data that is appended to the Hologres source table after the data consumption. Realtime Compute for Apache Flink that uses VVR 3.0.0 or later can consume Hologres data in real time. For more information, see Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time. In Realtime Compute for Apache Flink that uses VVR 6.0.3 or later, filter pushdown is supported when the Hologres connector reads data from Hologres. For more information, see the description of the enable_filter_push_down parameter that is used only for source tables.

    • Realtime Compute for Apache Flink that uses a VVR version earlier than 8.0 does not support watermarks for a Hologres source table in CDC mode. If you want to perform window aggregation on a Hologres source table, you can use a different approach to perform time-based aggregation. For more information, see the "MySQL CDC source tables and Hologres CDC source tables do not support window functions. How do I implement minute-level data aggregation on a MySQL CDC source table or Hologres CDC source table?" section of the FAQ about CDC topic.

  • Limits only on Hologres result tables: None.

  • Limits only on Hologres dimension tables

    • We recommend that you specify a primary key as the JOIN condition for a Hologres dimension table when you join the Hologres dimension table with another table and use row-oriented storage to create a Hologres dimension table. Column-oriented storage incurs large performance overheads for point queries. When you use row-oriented storage to create a Hologres dimension table, you must specify the primary key as the clustering key. For more information, see CREATE TABLE.

    • If you cannot use the primary key as the JOIN condition and you perform a point query by using a non-primary key, we recommend that you use column-oriented storage when you create a Hologres table. If you perform a point query by using a non-primary key, a row of data is queried and multiple rows of data are returned. In this case, we recommend that you specify the distribution key and event time column (segment key) to optimize query performance. For more information, see Distribution key, Event time column (segment key), and Storage models of tables: row-oriented storage, column-oriented storage, and row-column hybrid storage.

    • In Realtime Compute for Apache Flink that uses VVR of a version earlier than 4.0, you can perform a point query by using the primary key of a Hologres dimension table when you join the Hologres dimension table with another table. In Realtime Compute for Apache Flink that uses VVR 4.0 or later, you can perform a point query by using a non-primary key of a Hologres dimension table when you join the Hologres dimension table with another table and a Java Database Connectivity (JDBC)-related mode is used for the Hologres dimension table.

Precautions

  • If you set the sdkMode parameter to rpc for a table, take note of the following items when you upgrade the VVR version:

    The SDK modes that can be configured for dimension tables and result tables of Hologres V2.0 and later are changed. You cannot set the sdkMode parameter to rpc for dimension tables and result tables. Only the JDBC-related modes can be used for tables. The JDBC-related modes include the JDBC, jdbc_fixed, and jdbc_copy modes. If you set the sdkMode parameter to rpc for a table, Realtime Compute for Apache Flink does not deduplicate data that has the same primary key in the same batch. If you want to retain complete data, you can use a JDBC-related mode for the table and set the jdbcWriteBatchSize parameter to 1 to avoid deduplication. You can also upgrade the VVR version to 8.0.5 and set the deduplication.enabled parameter to false to avoid deduplication.

    If you want to upgrade the VVR version from 4.X to 6.X or from 4.X to 8.X for a deployment in which data of Hologres V2.0 is read or written in remote procedure call (RPC) mode, you can configure the SDK mode based on the upgrade scenario.

    • If you upgrade the VVR version from 4.X to a 6.X minor version that ranges from 6.0.4 to 6.0.6, an error may be returned. We recommend that you set the sdkMode parameter to jdbc or jdbc_fixed for dimension tables and result tables.

    • If you upgrade the VVR version from 4.X to 6.0.7 or later, no action is required. Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to JDBC-related modes.

  • If you set the sdkMode parameter to holohub for a binary log source table, take note of the following items when you upgrade the VVR version:

    In Hologres V2.0 and later, you can set the sdkMode to holohub for Hologres binlog source tables only in specific scenarios. In Hologres V2.1 and later, you cannot set the sdkMode parameter to holohub for Hologres binlog source tables and you can only set the sdkMode parameter to jdbc for Hologres binlog source tables.

    If a binary log source table is consumed in your deployment and you do not set the sdkMode parameter to jdbc, the HoloHub mode is used for the binary log source table by default. If you want to upgrade the VVR version from 4.X to 6.X or from 4.X to 8.X, take note of the following points based on the Hologres version when you configure the SDK mode.

    • Hologres V2.0

    • Hologres V2.1

      • If you upgrade the VVR version from 4.X to a VVR version that ranges from 6.0.7 to 8.0.4, binary log data may not be consumed as expected. We recommend that you upgrade the VVR version to 8.0.5.

      • If you upgrade the VVR version from 4.X to 8.0.5 or later, no operation is required. Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to JDBC.

Syntax

CREATE TABLE hologres_table (
  name VARCHAR,
  age BIGINT,
  birthday BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'hologres',
  'dbname' = '<yourDBName>',
  'tablename' = '<yourTableName>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint' = '<yourEndpoint>',
  'sdkmode' = 'jdbc'
);

Parameters in the WITH clause

Note

Only Realtime Compute for Apache Flink that uses VVR 4.0.11 or later supports all parameters whose names start with jdbc.

  • Common parameters

    Parameter

    Description

    Type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    String

    Yes

    No default value

    Set the value to hologres.

    dbname

    The database name.

    String

    Yes

    No default value

    Hologres V2.0 introduces virtual warehouse instances as a new type of elastic and high-availability instances. Computing resources are divided into multiple virtual warehouses to implement high-availability deployments. Different virtual warehouses share the same endpoint. You can add a specific suffix to the value of the dbname parameter to specify the virtual warehouse to which you want to connect. For example, if you want to connect a dimension table to the virtual warehouse read_warehouse, you can specify 'dbname' = 'db_test@read_warehouse'.

    Note

    Virtual warehouses are supported only when JDBC-related modes are used for tables. For more information, see the sdkMode parameter of the source table, dimension table, and result table in the "Parameters in the WITH clause" section of this topic.

    tablename

    The table name.

    String

    Yes

    No default value

    If the public schema is not used, you must set tablename to a value in the schema.tableName format.

    username

    The username that is used to access the database. Enter the AccessKey ID of your Alibaba Cloud account.

    String

    Yes

    No default value

    For more information, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.

    Important

    To protect your AccessKey pair, we recommend that you configure the AccessKey ID by using the key management method. For more information, see Manage variables and keys.

    password

    The password that is used to access the database. Enter the AccessKey secret of your Alibaba Cloud account.

    String

    Yes

    No default value

    For more information, see the "How do I view information about the AccessKey ID and AccessKey secret of the account?" section of the Reference topic.

    Important

    To protect your AccessKey pair, we recommend that you configure the AccessKey secret by using the key management method. For more information, see Manage variables and keys.

    endpoint

    The endpoint of Hologres.

    String

    Yes

    No default value

    For more information, see Endpoints for connecting to Hologres.

    connection.ssl.mode

    Specifies whether to enable SSL-encrypted transmission and specifies the SSL-encrypted transmission mode that you want to use.

    String

    No

    disable

    Valid values:

    • disable: SSL-encrypted transmission is disabled. This is the default value.

    • require: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt only the connections that are used to transmit data.

    • verify-ca: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt the connections that transmit data and uses certificate authority (CA) certificates to verify the Hologres server.

    • verify-full: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt the connections that transmit data, uses CA certificates to verify the Hologres server, and checks whether the common name (CN) or Domain Name System (DNS) specified in the CA certificates is consistent with the Hologres endpoint that is specified during connection setup.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.

    • You can set the connection.ssl.mode parameter to require in Hologres V1.1 and later. You can set the connection.ssl.mode parameter to verify-ca or verify-full in Hologres V2.1 and later. For more information, see Transmission encryption.

    • If you set this parameter to verify-ca or verify-full, you must specify the connection.ssl.root-cert.location parameter.

    connection.ssl.root-cert.location

    The path of the certificate if a CA certificate is used.

    String

    No

    No default value

    If you set the connection.ssl.mode parameter to verify-ca or verify-full, you must specify this parameter. You can click Upload Artifact on the Artifacts page in the development console of Realtime Compute for Apache Flink to upload the CA certificate file. The uploaded file is stored in the /flink/usrlib directory. For example, if the name of the CA certificate file that you want to use is certificate.crt, you must set this parameter to '/flink/usrlib/certificate.crt' in the WITH clause. This way, you can use the certificate after you upload the certificate file.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.

    • For more information about how to obtain a CA certificate, see the "Step 2: Download the CA certificate" section of the Transmission encryption topic.

    jdbcRetryCount

    The maximum number of retries allowed to read and write data if a connection failure occurs.

    Integer

    No

    10

    N/A.

    jdbcRetrySleepInitMs

    The fixed waiting period for each retry.

    Long

    No

    1000

    The actual waiting period for a retry is calculated by using the following formula: Value of the jdbcRetrySleepInitMs parameter + Number of retries up to the current retry × Value of the jdbcRetrySleepInitMs parameter. Unit: milliseconds.

    jdbcRetrySleepStepMs

    The accumulated waiting period for each retry.

    Long

    No

    5000

    The actual waiting period for a retry is calculated by using the following formula: Value of the jdbcRetrySleepInitMs parameter + Number of retries up to the current retry × Value of the jdbcRetrySleepInitMs parameter. Unit: milliseconds.

    jdbcConnectionMaxIdleMs

    The maximum duration for which the JDBC connection is idle.

    Long

    No

    60000

    If a JDBC connection stays idle for a period of time that exceeds the value of this parameter, the connection is closed and released. Unit: milliseconds.

    jdbcMetaCacheTTL

    The maximum time for storing the TableSchema information in the cache.

    Long

    No

    60000

    Unit: milliseconds.

    jdbcMetaAutoRefreshFactor

    The factor for triggering automatic cache refresh. If the remaining time for storing data in the cache is less than the time for triggering an automatic refresh of the cache, Realtime Compute for Apache Flink automatically refreshes the cache.

    Integer

    No

    4

    The remaining time for storing data in the cache is calculated by using the following formula: Remaining time for storing data in the cache = Maximum time that is allowed to store data in the cache - Duration for which data is cached. After the cache is automatically refreshed, the duration for which data is cached is recalculated from 0.

    The time for triggering automatic cache refresh is calculated by using the following formula: Time for triggering an automatic refresh of the cache = jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor.

    type-mapping.timestamp-converting.legacy

    Specifies whether to perform time type conversions between Realtime Compute for Apache Flink and Hologres.

    Boolean

    No

    true

    Valid values:

    • true: does not perform time type conversions between Realtime Compute for Apache Flink and Hologres. Time zone conversion is performed based on the time zone of Java Virtual Machine (JVM) in the runtime environment.

    • false: recommended. Performs time type conversions between Realtime Compute for Apache Flink and Hologres. Time zone conversion is performed based on the time zone of Realtime Compute for Apache Flink.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

    • For more information about the time zones of Realtime Compute for Apache Flink and Hologres, see the Time zones of Realtime Compute for Apache Flink and Hologres section of this topic.

    • If you set the property-version parameter to 0, the default value of this parameter is true. If you set the property-version parameter to 1, the default value of this parameter is false.

    property-version

    The parameter version of the connector.

    Integer

    No

    0

    Valid values: 0 and 1. Default value: 0.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.6 or later supports this parameter.

    • The set of available parameters and the default values of the parameters may be different in different parameter versions. If the set of available parameters and the default values of the parameters are different, the differences are described in the description of the parameters.

    • We recommend that you set this parameter to 1.

  • Parameters only for source tables

    Parameter

    Description

    Type

    Required

    Default value

    Remarks

    field_delimiter

    The delimiter that is used between rows of the data that you want to export.

    String

    No

    "\u0002"

    N/A

    binlog

    Specifies whether to consume binary log data.

    Boolean

    No

    false

    Valid values:

    • true: Binary log data is consumed.

    • false: Binary log data is not consumed. This is the default value.

    Note

    In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, the default value of this parameter varies based on the parameter version of the connector.

    • If you set the property-version parameter to 0, the default value of this parameter is false.

    • If you set the property-version parameter to 1, the default value of this parameter is true.

    sdkMode

    The SDK mode.

    String

    No

    holohub

    Valid values:

    • holohub: Binary log data is consumed in HoloHub mode. This is the default value.

    • jdbc: Binary log data is consumed in JDBC mode.

    • jdbc_fixed: Binary log data is consumed in fixed JDBC mode. Compared with data consumption in JDBC mode, data consumption in fixed JDBC mode does not occupy connections and is not subject to limits on the number of connections. This mode does not allow you to consume the binary log data of databases for which the data masking feature is enabled. For more information, see Mask data.

    For more information, see the Create a Hologres source table for which the binary logging feature is enabled section of this topic.

    Note
    • VVR 6.0.3 and earlier: This parameter is not supported.

    • VVR 6.0.4 to VVR 6.0.6: We recommend that you set this parameter to holohub.

    • VVR 6.0.7 and later: We recommend that you set this parameter to jdbc.

      • If the Hologres instance is of a version earlier than Hologres V2.0, Realtime Compute for Apache Flink uses the SDK mode that you configure.

      • In Hologres V2.0 or later, the HoloHub mode is no longer supported. If you set this parameter to holohub, Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to JDBC. If you set this parameter to jdbc, Realtime Compute for Apache Flink uses the JDBC mode.

    • VVR 8.0.4 to VVR 8.0.5:

    • VVR 8.0.6 and later:

      • For Hologres V2.1.27 or later, Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to fixed JDBC.

      • For Hologres V2.1.0 to Hologres V2.1.26, Realtime Compute for Apache Flink automatically changes the SDK mode from HoloHub to JDBC.

      • In Hologres V2.0, Realtime Compute for Apache Flink first attempts to use the JDBC mode. If an exception, such as insufficient permission, exists, Realtime Compute for Apache Flink automatically uses the HoloHub mode.

    jdbcBinlogSlotName

    The slot name of the binary log source table in JDBC mode. For more information about how to create a binary log source table in JDBC mode, see the "JDBC mode used to consume binary log source tables" section of the Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time topic.

    String

    No

    No default value

    This parameter takes effect only when the sdkMode parameter is set to jdbc. If you do not specify this parameter, the Hologres connector creates a slot by default. For more information, see the "JDBC mode used to consume binary log source tables" section of the Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time topic.

    Note

    You do not need to specify this parameter when the version of the Hologres instance is 2.1 or later. The Hologres connector does not attempt to automatically create a slot.

    binlogMaxRetryTimes

    The number of retries after Realtime Compute for Apache Flink fails to read binary log data.

    Integer

    No

    60

    N/A

    binlogRetryIntervalMs

    The interval between retires after Realtime Compute for Apache Flink fails to read binary log data.

    Long

    No

    2000

    Unit: milliseconds.

    binlogBatchReadSize

    The number of rows in which binary log data is read at a time.

    Integer

    No

    100

    N/A

    cdcMode

    Specifies whether to read binary log data in CDC mode.

    Boolean

    No

    false

    Valid values:

    • true: Binary log data is read in CDC mode.

    • false: Binary log data is read in non-CDC mode. This is the default value.

    Note

    In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, the default value of this parameter varies based on the parameter version of the connector.

    • If you set the property-version parameter to 0, the default value of this parameter is false.

    • If you set the property-version parameter to 1, the default value of this parameter is true.

    upsertSource

    Specifies whether the source table reads a changelog stream that contains UPSERT messages.

    Boolean

    No

    false

    This parameter takes effect only in CDC mode. Valid values:

    • true: The source table reads a changelog stream that contains only UPSERT messages, including INSERT, DELETE, and UPDATE_AFTER.

    • false: The source table reads a changelog stream that contains full change messages, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER. This is the default value.

    Note

    For example, if the ROW_NUMBER() function together with an OVER clause of Flink SQL is used to perform deduplication and retract operators exist in the result table, you must set the upsertSource parameter to true. In this case, the source table reads data from Hologres in the upsert fashion.

    binlogStartupMode

    The mode in which binary log data is consumed.

    String

    No

    earliestOffset

    Valid values:

    • initial: The system consumes full data and then reads binary log data to consume incremental data.

    • earliestOffset: The system starts data consumption from the earliest binary log data. This is the default value.

    • timestamp: The system consumes binary log data from the point in time that is specified by the startTime parameter.

    Note

    If the startTime parameter or the start time is specified in the dialog box for starting a deployment, the timestamp mode is forcefully used for the binlogStartupMode parameter. The other two values of this parameter cannot be used. In this case, the startTime parameter has a higher priority.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

    • In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, the default value of this parameter varies based on the parameter version of the connector.

    • If you set the property-version parameter to 0, the default value of this parameter is false.

    • If you set the property-version parameter to 1, the default value of this parameter is true.

    startTime

    The start time when Hologres data is consumed.

    String

    No

    No default value

    Format: yyyy-MM-dd hh:mm:ss. If this parameter is not specified and deployments are not resumed from the state, Realtime Compute for Apache Flink starts to consume the Hologres data from the earliest generated binary log.

    jdbcScanFetchSize

    The number of data records that can be processed at a time during the scan operation.

    Integer

    No

    256

    N/A

    jdbcScanTimeoutSeconds

    The timeout period of the scan operation.

    Integer

    No

    60

    Unit: seconds.

    jdbcScanTransactionSessionTimeoutSeconds

    The timeout period for the transaction to which the scan operation belongs.

    Integer

    No

    600

    A value of 0 indicates that the scan operation never times out. This parameter corresponds to the Grand Unified Configuration (GUC) parameter idle_in_transaction_session_timeout of Hologres. For more information, see GUC parameters.

    Note

    Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.15-flink-1.13 or later supports this parameter.

    enable_filter_push_down

    Specifies whether to perform filter pushdown during the full data reading phase.

    Boolean

    No

    false

    Valid values:

    • false: Filter pushdown is not performed. This is the default value.

    • true: The supported filter conditions are pushed down to Hologres during the full data reading phase. The filter pushdown operation is performed in the following full data reading scenarios: full data reading from a Hologres source table in which the binary logging feature is disabled, and full data reading in a binary log source table when the Hologres connector consumes full and incremental data in a source table.

      Important

      In Realtime Compute for Apache Flink whose engine version is from vvr-6.0.3-flink-1.15 to vvr-6.0.5-flink-1.15, filter pushdown is automatically performed. However, if a deployment uses a Hologres dimension table and the DML statement that is used to write data contains filter conditions for non-primary key fields in the dimension table, the filter conditions of the dimension table are incorrectly pushed down. As a result, an error may occur when the dimension table is joined with another table. To resolve this issue, we recommend that you use Realtime Compute for Apache Flink whose engine version is vvr-6.0.6-flink-1.15 or later and add this parameter to the WITH clause in the DDL statement that is used to create the source table to enable filter pushdown.

  • Parameters only for result tables

    Parameter

    Description

    Type

    Required

    Default value

    Remarks

    sdkMode

    The SDK mode.

    String

    No

    jdbc

    Valid values:

    • jdbc: Data is written by using a JDBC driver. This is the default value.

    • jdbc_copy: Data is written in jdbc_copy mode, which refers to the copy mode of a JDBC driver.

      This mode is a new feature that is supported in Hologres 1.3. Data writing in this mode provides higher throughput and lower data latency, and consumes less client memory resources than data writing in JDBC mode because data is written in streaming mode. If the jdbc_copy mode is used, data cannot be deleted, data cannot be written to a parent partitioned table, and the ignoreNullWhenUpdate parameter is not supported.

    • rpc: Data is written in RPC mode. The effect when you set this parameter to rpc is similar to the effect when you set the useRpcMode parameter to true. Compared with data writing in JDBC mode, data writing in RPC mode does not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be written to Hologres.

    • jdbc_fixed: In this mode, data writing is performed by using a fixed JDBC driver. This feature is in public preview.

      Only Hologres 1.3 or later supports this feature. Compared with data writing that is performed in JDBC mode, data writing that is performed in jdbc_fixed mode do not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be written to Hologres. This mode does not allow you to write data to databases for which the data masking feature is enabled. For more information, see Mask data.

    Note
    • VVR 6.0.3 or earlier: This parameter is not supported.

    • VVR 6.0.4 to VVR 6.0.6: We recommend that you set this parameter to jdbc.

    • VVR 6.0.7 to VVR 8.0.1: We recommend that you set this parameter to jdbc.

      • If the Hologres instance is of Hologres V2.0 or earlier, Realtime Compute for Apache Flink uses the SDK mode that you configure.

      • In Hologres V2.0 or later, the RPC mode is no longer supported. If you set this parameter to rpc, Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to jdbc_fixed. If you set this parameter to a value other than rpc, Realtime Compute for Apache Flink uses the SDK mode that you configure.

      • If you use the RPC mode for a table, Realtime Compute for Apache Flink does not perform deduplication on the data that has the same primary key. If you want to retain complete data, you can use a JDBC-related mode for the table and set the jdbcWriteBatchSize parameter to 1 to avoid deduplication.

    • VVR 8.0.3 and later: We recommend that you set this parameter to jdbc.

      For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, the RPC mode is no longer supported for tables regardless of the versions of Hologres instances. If the RPC mode is used for tables, Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to jdbc_fixed and you can set the jdbcWriteBatchSize parameter to 1 to avoid deduplication.

    • VVR 8.0.5 and later: We recommend that you set this parameter to jdbc.

      If you use the RPC mode for tables, Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to jdbc_fixed and you can set the deduplication.enabled parameter to false to avoid deduplication.

    bulkload

    Specifies whether to write data in bulkload mode.

    Boolean

    No

    false

    This parameter takes effect only when the sdkMode parameter is set to jdbc_copy. You can write data in bulkload mode to tables that do not have primary keys or tables whose primary keys have the same value. Compared with the jdbc_copy mode, less Hologres resources are used when you write data in bulkload mode.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later and Hologres V2.1 or later support this parameter.

    useRpcMode

    Specifies whether to connect to the Hologres connector by using RPC.

    Boolean

    No

    false

    Valid values:

    • true: RPC is used to connect to the Hologres connector.

      The effect of this setting is the same as the effect when you set the sdkMode parameter to rpc. If RPC is used, the number of SQL connections is reduced.

    • false: A JDBC driver is used to connect to the Hologres connector. This is the default value.

      JDBC drivers require SQL connections. This increases the number of JDBC connections.

    Note
    • The RPC mode is no longer supported in Hologres V2.0. We recommend that you specify the sdkMode parameter to write data in JDBC or jdbc_fixed mode.

    • For Realtime Compute for Apache Flink that uses VVR 6.0.7 or VVR 8.0.1 and Hologres V2.0 or later, the SDK mode is automatically changed from RPC to jdbc_fixed.

    • For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, the system automatically changes the SDK mode from RPC to jdbc_fixed.

    • If you use the RPC mode for a table, Realtime Compute for Apache Flink does not perform deduplication on the data that has the same primary key. If you want to retain complete data, we recommend that you use Realtime Compute for Apache Flink that uses VVR 8.0.5 or later, set the sdkMode parameter to jdbc, and set the deduplication.enabled parameter to false to avoid deduplication.

    • If you set the property-version parameter to 1, this parameter is not supported.

    mutatetype

    The data writing mode.

    String

    No

    insertorignore

    For more information, see the Streaming semantics section of this topic.

    Note

    In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, the default value of this parameter varies based on the parameter version of the connector.

    • If you set the property-version parameter to 0, the default value of this parameter is insertorignore.

    • If you set the property-version parameter to 1, the default value of this parameter is insertorupdate.

    partitionrouter

    Specifies whether to write data to a partitioned table.

    Boolean

    No

    false

    N/A

    createparttable

    Specifies whether to automatically create a partitioned table to which data is written based on partition values.

    Boolean

    No

    false

    In RPC mode, if the partition values contain hyphens (-), partitioned tables cannot be automatically created.

    Note
    • In Hologres V1.3.22 or later, a field of the DATE type can be used as the partition key. For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, a partitioned table can be automatically created when you use a field of the DATE type as the partition key.

    • Make sure that partition values do not contain dirty data. If dirty data exists, a failover occurs because an invalid partitioned table is created. Proceed with caution when you use this parameter.

    • If you set the sdk_mode parameter to jdbc_copy, data cannot be written to a parent partitioned table.

    ignoredelete

    Specifies whether to ignore retraction messages.

    Boolean

    No

    true

    Note

    This parameter takes effect only when the streaming semantics is used.

    For Realtime Compute for Apache Flink that uses VVR 8.0.8 or later, we recommend that you specify sink.delete-strategy.

    In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, the default value of this parameter varies based on the parameter version of the connector.

    • If you set the property-version parameter to 0, the default value of this parameter is true.

    • If you set the property-version parameter to 1, the default value of this parameter is false.

    sink.delete-strategy

    The operation that is performed when a retraction message is received.

    String

    No

    No default value

    Valid values:

    • IGNORE_DELETE: ignores UPDATE BEFORE and DELETE messages. This operation applies to scenarios in which data is inserted or updated and no data deletion is required.

    • NON_PK_FIELD_TO_NULL: ignores UPDATE BEFORE messages and updates the non-primary key values in DELETE messages to NULL. This operation applies to scenarios in which you want to delete data without affecting other columns during a partial update operation.

    • DELETE_ROW_ON_PK: ignores UPDATE BEFORE messages and deletes the row that matches the primary key value when a DELETE message is received. This operation applies to scenarios in which you want to delete a row during a partial update operation. The deletion affects other columns.

    • CHANGELOG_STANDARD: The Flink framework runs according to the working principles of Flink SQL changelogs. Delete operations are not ignored. If an update operation is performed, the original data is deleted and then new data is inserted. This ensures data accuracy. This operation applies to scenarios in which partial updates are not involved.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.8 or later supports this parameter.

    • Setting this parameter to NON_PK_FIELD_TO_NULL may result in only the primary key value being recorded and other column values being null.

    connectionSize

    The size of the JDBC connection pool that is created in a Realtime Compute for Apache Flink deployment.

    Integer

    No

    3

    If the deployment provides poor performance, we recommend that you increase the size of the connection pool. The size of the JDBC connection pool is proportional to data throughput.

    jdbcWriteBatchSize

    The maximum number of rows of data that can be processed by a Hologres streaming sink node at a time when a JDBC driver is used.

    Integer

    No

    256

    Note

    You can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all the preceding parameters, the system writes data to a Hologres result table when one of the related conditions is met.

    jdbcWriteBatchByteSize

    The maximum number of bytes of data that can be processed by a Hologres streaming sink node at a time when a JDBC driver is used.

    Long

    No

    2097152 (2 × 1024 × 1024 bytes = 2 MB)

    Note

    You can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all the preceding parameters, the system writes data to a Hologres result table when one of the related conditions is met.

    jdbcWriteFlushInterval

    The maximum period of time that is required to wait for a Hologres streaming sink node to write data from multiple rows to Hologres at the same time when a JDBC driver is used.

    Long

    No

    10000

    Unit: milliseconds.

    Note

    You can specify only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you specify all the preceding parameters, the system writes data to a Hologres result table when one of the related conditions is met.

    ignoreNullWhenUpdate

    Specifies whether to ignore the null value that is written in the data if mutatetype='insertOrUpdate' is specified.

    Boolean

    No

    false

    Valid values:

    • false: The null value is written to a Hologres result table. This is the default value.

    • true: The null value that is written in the data is ignored.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0 or later supports this parameter.

    • If you set the sdk_mode parameter to jdbc_copy, this parameter is not supported.

    connectionPoolName

    The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool.

    String

    No

    No default value

    Set the value to a string other than default. If the same connection pool is configured for multiple tables, the value of the connectionSize parameter for the tables must be the same.

    Note
    • VVR version earlier than 4.0.12: This parameter is not supported.

    • VVR 4.0.12 to VVR 8.0.3: By default, tables do not share a connection pool. Each table uses its connection pool.

    • VVR 8.0.4 or later: By default, tables that use the same endpoint in the same deployment share a connection pool. If the number of tables in a deployment is excessively large, the connections in a connection pool may be relatively insufficient. This affects the deployment performance. In this case, we recommend that you set the connectionPoolName parameter to different values for different tables.

    • You can specify this parameter based on your business requirements. For example, a deployment uses the following Hologres tables: Dimension Table A, Dimension Table B, Result Table C, Result Table D, and Result Table E. You can configure the connection pool pool1 for Dimension Table A and Dimension Table B and the connection pool pool2 for Result Table C and Result Table D. You can configure the connection pool pool3 for Result Table E in which a large number of data is processed.

    jdbcEnableDefaultForNotNullColumn

    Specifies whether to allow the Hologres connector to fill a default value if a null value is written to a non-null column for which the default value is not configured in the Hologres table.

    Boolean

    No

    true

    Valid values:

    • true: The Hologres connector fills a default value. This is the default value. If you set this parameter to true, the system converts a null value into a default value based on the following rules:

      • If the column is of the STRING type, the column is left empty.

      • If the column is of the NUMBER data type, the null value is converted into 0.

      • If the column is of the DATE, TIMESTAMP, or TIMESTAMPTZ data type, the null value is converted into 1970-01-01 00:00:00.

    • false: The Hologres connector does not fill a default value. If a null value is written to a non-null column for which the default value is not configured in the Hologres table, an error is returned.

    remove-u0000-in-text.enabled

    Specifies whether to allow the Hologres connector to remove the invalid characters \u0000 from data of the STRING type that is written to the result table if the data contains the invalid characters \u0000.

    Boolean

    No

    false

    Valid values:

    • false: The Hologres connector does not process data. If the data that is written to the result table contains dirty data, the error message "ERROR: invalid byte sequence for encoding "UTF8": 0x00" may be reported. This is the default value.

      In this case, you need to remove the dirty data from the source table in advance or define the dirty data processing logic in the SQL statement.

    • true: The Hologres connector removes the invalid characters \u0000 from the data of the STRING type to prevent the error message from being reported.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports this parameter.

    partial-insert.enabled

    Specifies whether to insert only the fields declared in the INSERT statement.

    Boolean

    No

    false

    Valid values:

    • false: All fields that are declared in the DDL statement of the result table are updated regardless of which fields are declared in the INSERT statement. Fields that are not declared in the INSERT statement are updated to null values. This is the default value.

    • true: The fields that are declared in the INSERT statement are pushed down to a connector. In this case, only the declared fields are updated in the result table or inserted into the result table.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    • This parameter takes effect only if the mutatetype parameter is set to InsertOrUpdate.

    deduplication.enabled

    Specifies whether to perform deduplication when data is written in batches in jdbc or jdbc_fixed mode.

    Boolean

    No

    true

    Valid values:

    • true: If the data that is written in batches contains data that has the same primary key, deduplication is performed. Only the last data record is retained. This is the default value. In this example, two fields are used and the first field is the primary key.

      • If the INSERT (1,'a') and INSERT (1,'b') records are written to the result table in chronological order, only the record (1,'b') that arrives later is retained and written to the Hologres result table after deduplication is performed.

      • The (1,'a') record already exists in the Hologres result table. In this case, the DELETE (1,'a') and INSERT (1,'b') records are written to the result table in chronological order. Only the (1,'b') record that arrives later is retained and written to the Hologres result table. In this case, data is directly updated rather than deleted and then inserted into the table.

    • false: No deduplication is performed when data is written in batches. If the primary key of the inserted data has the same value as the primary key of the data that is written in batches, the data in batches is written, and then the data that needs to be inserted is written.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.

    • In extreme cases, if no deduplication is performed when data is written in batches and all data has the same primary key, data is not written in batches. This may affect the performance.

  • Parameters only for dimension tables

    Parameter

    Description

    Type

    Required

    Default value

    Remarks

    sdkMode

    The SDK mode.

    String

    No

    jdbc

    Valid values:

    • jdbc: Queries are performed in JDBC mode. Point queries based on a primary key and queries based on a non-primary key are supported. However, queries based on a non-primary key affect the query performance and the query speed is slow. This is the default value.

    • rpc: Point queries are performed in RPC mode. If you set this parameter to rpc, the effect is similar to the effect when you set the useRpcMode parameter to true. Only point queries based on a primary key are supported. When you join a Hologres dimension table with another table, you must specify all the fields in the primary keys of the dimension table in the ON clause. Compared with point queries that are performed in JDBC mode, point queries that are performed in RPC mode do not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be queried in Hologres.

    • jdbc_fixed: In this mode, point queries are performed by using a fixed JDBC driver. This feature is in public preview. Only Hologres 1.3 or later supports this feature. Compared with point queries that are performed in JDBC mode, point queries that are performed in jdbc_fixed mode do not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be queried in Hologres. Only point queries based on a primary key are supported. When you join a Hologres dimension table with another table, you must specify all the fields in the primary keys of the dimension table in the ON clause. This mode does not allow you to query databases for which the data masking feature is enabled. For more information, see Mask data.

    Note
    • VVR 6.0.3 or earlier: This parameter is not supported.

    • VVR 6.0.4 to VVR 6.0.6: We recommend that you set this parameter to jdbc.

      For Realtime Compute for Apache Flink that uses VVR 6.0.6, if you set this parameter to jdbc, a null pointer error may occur when data of the STRING data type is queried in JDBC mode and the query results contain null values. To resolve this issue, we recommend that you use the RPC mode.

    • VVR 6.0.7 and VVR 8.0.1: We recommend that you set this parameter to jdbc.

      • If the Hologres instance is of a version earlier than Hologres V2.0, Realtime Compute for Apache Flink uses the SDK mode that you configure.

      • In Hologres V2.0 or later, the RPC mode is no longer supported. If you set this parameter to rpc, Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to jdbc_fixed. If you set this parameter to a value other than rpc, Realtime Compute for Apache Flink uses the SDK mode that you configure.

    • VVR 8.0.3 and later: We recommend that you set this parameter to jdbc.

      • For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, the RPC mode is no longer supported for tables regardless of the Hologres instance versions. If the RPC mode is used for tables, Realtime Compute for Apache Flink automatically changes the SDK mode from RPC to jdbc_fixed.

    useRpcMode

    Specifies whether to connect to the Hologres connector by using RPC.

    Boolean

    No

    false

    Valid values:

    • true: RPC is used to connect to the Hologres connector. The effect of this setting is equivalent to the effect when you set the sdkMode parameter to rpc. If RPC is used, the number of SQL connections is reduced.

    • false: A JDBC driver is used to connect to the Hologres connector. This is the default value.

      JDBC drivers require SQL connections. This increases the number of JDBC connections.

    Note
    • The RPC mode is no longer supported in Hologres V2.0. We recommend that you set the sdkMode parameter to jdbc or jdbc_fixed.

    • For Realtime Compute for Apache Flink that uses VVR 6.0.7 or VVR 8.0.1 and Hologres V2.0 or later, the SDK mode is automatically changed from RPC to jdbc_fixed.

    • For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, the SDK mode is automatically changed from RPC to jdbc_fixed.

    connectionSize

    The size of the JDBC connection pool that is created in a Realtime Compute for Apache Flink deployment.

    Integer

    No

    3

    If the deployment provides poor performance, we recommend that you increase the size of the connection pool. The size of the JDBC connection pool is proportional to data throughput.

    connectionPoolName

    The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool.

    String

    No

    No default value

    Set the value to a string other than default. If the same connection pool is configured for multiple tables, the value of the connectionSize parameter for the tables must be the same.

    You can specify this parameter based on your business requirements. For example, a deployment uses the following Hologres tables: Dimension Table A, Dimension Table B, Result Table C, Result Table D, and Result Table E. You can configure the connection pool pool1 for Dimension Table A and Dimension Table B and the connection pool pool2 for Result Table C and Result Table D. You can configure the connection pool pool3 for Result Table E in which a large number of data is processed.

    Note
    • VVR version earlier than 4.0.12: This parameter is not supported.

    • VVR 4.0.12 to VVR 8.0.3: By default, tables do not share a connection pool. Each table uses its connection pool.

    • VVR 8.0.4 or later: By default, tables that use the same endpoint in the same deployment share a connection pool. If the number of tables in a deployment is excessively large, the connections in a connection pool may be relatively insufficient. This affects the deployment performance. In this case, we recommend that you set the connectionPoolName parameter to different values for different tables.

    jdbcReadBatchSize

    The maximum number of data records that can be processed at the same time for a point query in a Hologres dimension table.

    Integer

    No

    128

    N/A

    jdbcReadBatchQueueSize

    The maximum number of queued requests allowed in a thread to perform a point query in a Hologres dimension table.

    Integer

    No

    256

    N/A

    jdbcReadTimeoutMs

    The timeout period for performing a point query in a Hologres dimension table.

    Long

    No

    0

    Default value: 0, which indicates that the point query in a Hologres dimension table never times out. Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.15-flink-1.13, vvr-6.0.2-flink-1.15, or their later minor versions supports this parameter.

    jdbcReadRetryCount

    The number of retries when a point query performed in a Hologres dimension table times out.

    Interger

    No

    • VVR version earlier than 8.0.5: 1

    • VVR 8.0.5 or later: 10

    • Only Realtime Compute for Apache Flink that uses VVR 6.0.3 or later supports this parameter.

    • This parameter is different from the jdbcRetryCount parameter. The jdbcRetryCount parameter specifies the maximum number of retries allowed to read and write data if a connection failure occurs.

    jdbcScanFetchSize

    The number of data records that can be processed at the same time by calling the scan operation when you perform a one-to-many table join. In a one-to-many table join, no complete primary key is used.

    Integer

    No

    256

    N/A

    jdbcScanTimeoutSeconds

    The maximum time period for which you need to wait for a scan operation to complete.

    Integer

    No

    60

    Unit: seconds.

    cache

    The cache policy.

    String

    No

    For more information, see the Remarks column.

    Hologres supports only None and LRU cache policies. For more information, see the "Background information" section of the JOIN statements for dimension tables topic.

    Note

    The default value of this parameter varies based on the VVR version:

    • For Realtime Compute for Apache Flink that uses VVR 4.X or later, the default value of this parameter is None.

    • For Realtime Compute for Apache Flink that uses a VVR version earlier than 4.X, the default value of this parameter is LRU.

    cacheSize

    The maximum number of rows of data records that can be cached.

    Integer

    No

    10000

    If you set the cache parameter to LRU, you can specify the cacheSize parameter. Unit: row.

    cacheTTLMs

    The interval at which the system refreshes the cache.

    Long

    No

    For more information, see the Remarks column.

    Unit: milliseconds. The default value of the cacheTTLMs parameter varies based on the value of the cache parameter:

    • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.

    • If you set the cache parameter to None, the cacheTTLMs can be left empty. This indicates that cache entries do not expire.

    cacheEmpty

    Specifies whether to cache the JOIN queries whose return value is empty.

    Boolean

    No

    true

    • true: The JOIN queries whose return value is empty are cached. This is the default value.

    • false: The JOIN queries whose return value is empty are not cached.

    async

    Specifies whether to enable data synchronization in asynchronous mode.

    Boolean

    No

    false

    • true: Data synchronization in asynchronous mode is enabled.

    • false: Data synchronization in asynchronous mode is disabled. This is the default value.

    Note

    Data is not sorted when data is synchronized in asynchronous mode.

Data type mappings

For more information about the mappings between the data types in Realtime Compute for Apache Flink and Hologres, see the "Data type mappings between Realtime Compute for Apache Flink or Blink and Hologres" section of the Data types topic.

Examples

Create source tables

Create a Hologres source table for which the binary logging feature is disabled

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' -- This parameter is optional. 
  'sdkmode' = 'jdbc'
);

CREATE TEMPORARY TABLE blackhole_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT 
) WITH (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink 
SELECT name, age, birthday
from hologres_source;

Create a Hologres source table for which the binary logging feature is enabled

Realtime Compute for Apache Flink can consume the binary log data of Hologres in real time. For more information, see Use the Hologres connector of Realtime Compute for Apache Flink to consume data of Hologres in real time.

Create a result table

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);

INSERT INTO hologres_sink SELECT * from datagen_source;

Create a dimension table

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector' = 'hologres',
   ...
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

Feature description

Streaming semantics

Stream processing, which is also known as streaming data or streaming event processing, refers to the continuous processing of a series of unbounded data or events. In most cases, the system that processes streaming data or streaming events allows you to specify a reliability pattern or processing semantics to ensure data accuracy. Network or device failures may cause data loss.

Semantics can be classified into the following types based on the configurations of the Hologres streaming sink that you use and the attributes of the Hologres table:

  • Exactly-once: The system processes data or events only once even if multiple failures occur.

  • At-least-once: If the streaming data or events that you want to process are lost, the system transfers the data again from the transmission source. The system may process streaming data or events multiple times. If the first retry is successful, no subsequent retries are required.

When you use streaming semantics in a Hologres result table, take note of the following points:

  • If no primary keys are configured in the Hologres physical table, the Hologres streaming sink uses the at-least-once semantics.

  • If primary keys are configured in the Hologres physical table, the Hologres streaming sink uses the exactly-once semantics based on the primary keys. If multiple records with the same primary key are written to the table, you must configure the mutatetype parameter to determine how the result table is updated. The mutatetype parameter has the following valid values:

    • insertorignore: retains the first record and discards the subsequent records. This is the default value.

    • insertorreplace: replaces the existing record in an entire row with the one that arrives later.

    • insertorupdate: updates specific columns of the existing data. For example, a table has fields a, b, c, and d. The a field is the primary key. Only data in the a and b fields are written to Hologres. If duplicate primary keys exist, the system updates only data in the b field. Data in the c and d fields remains unchanged.

    Note
    • If the mutatetype parameter is set to insertorreplace or insertorupdate, the system updates data based on the primary key.

    • The number of columns in the result table that is defined by Flink can be different from the number of columns in the Hologres physical table. Make sure that the null value can be used to pad the missing columns. If the null value cannot be used to pad the missing columns, an error is returned.

  • By default, the Hologres streaming sink node can import data to only one non-partitioned table. If the sink node imports data to the parent table of a partitioned table, data queries fail even if the data import is successful. To enable data to be automatically written to a partitioned table, you can set the partitionRouter parameter to true. Take note of the following items:

    • You must set tablename to the name of the parent table.

    • If no partitioned table is created, you must configure createparttable=true in the WITH clause to enable the automatic creation of a partitioned table. If you do not perform the preceding configuration, data import fails.

Data merging into a wide table

If you want to write data from multiple streaming deployments to a Hologres wide table, you can merge the data into a wide table. The following section provides examples of the two methods that can be used to merge data into a wide table.

Note

Take note of the following limits:

  • The wide table must have a primary key.

  • The data of each stream must contain all fields in the primary key.

  • If the wide table is a column-oriented table and the requests per second (RPS) value is large, the CPU utilization increases. We recommend that you disable dictionary encoding for the fields in the table.

Method 1 (recommended)

Note

You can use this method to merge data into a wide table only in Realtime Compute for Apache Flink that uses VVR 6.0.7 or later.

For example, one Realtime Compute for Apache Flink data stream contains fields a, b, and c, and the other Flink data stream contains fields a, d, and e. The Hologres wide table WIDE_TABLE contains fields a, b, c, d, and e, among which field a is the primary key. To merge data into a wide table, perform the following operations:

  1. Execute Flink SQL statements to create a Hologres result table. Declare fields a, b, c, d, and e in the Flink SQL statements. Map the Hologres result table to the Hologres wide table WIDE_TABLE.

  2. Parameter settings of the Hologres result table:

    • Set the mutatetype parameter to insertorupdate. This indicates that data is updated based on the primary key.

    • Set the ignoredelete parameter to true. This prevents retract messages from generating Delete requests. For Realtime Compute for Apache Flink that uses VVR 8.0.8 or later, you can use sink.delete-strategy to configure strategies for partial updates.

  3. Insert data from the two data streams into the two result tables.

    // In this example, the data sources are source1 and source2.
    CREATE TEMPORARY TABLE hologres_sink ( -- Declare fields a, b, c, d, and e.
      a BIGINT, 
      b STRING,
      c STRING,
      d STRING,
      e STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>', -- Specify the Hologres wide table. The table contains fields a, b, c, d, and e.
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype'='insertorupdate',    -- Update data based on the primary key.
      'ignoredelete'='true',            -- Ignore Delete requests.
      'partial-insert.enabled'='true' -- Set the partial-insert.enabled parameter to true to update only the fields that are declared in the INSERT statement.
    );
    
    BEGIN STATEMENT SET;
    INSERT INTO hologres_sink(a,b,c) select * from source1; -- Declare that only fields a, b, and c are inserted into the wide table.
    INSERT INTO hologres_sink(a,d,e) select * from source2; -- Declare that only fields a, d, and e are inserted into the wide table.
    END;

Method 2

Note

You can use this method to merge data into a wide table only in Realtime Compute for Apache Flink that uses VVR 6.0.6 or earlier.

For example, one Realtime Compute for Apache Flink data stream contains fields a, b, and c, and the other Flink data stream contains fields a, d, and e. The Hologres wide table WIDE_TABLE contains fields a, b, c, d, and e, among which field a is the primary key. To merge data into a wide table, perform the following operations:

  1. Execute Flink SQL statements to create two Hologres result tables. One table is used to declare fields a, b, and c, and the other is used to declare fields a, d, and e. Map the two result tables to the Hologres wide table WIDE_TABLE.

  2. Parameter settings of the two Hologres result tables:

    • Set the mutatetype parameter to insertorupdate. This indicates that data is updated based on the primary key.

    • Set the ignoredelete parameter to true. This prevents retract messages from generating Delete requests.

  3. Insert data from the two data streams into the two result tables.

    // In this example, the data sources are source1 and source2.
    
    CREATE TEMPORARY TABLE hologres_sink_1 ( -- Declare only fields a, b, and c.
      a BIGINT, 
      b STRING,
      c STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>', -- Specify the Hologres wide table. The table contains fields a, b, c, d, and e.
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype' = 'insertorupdate', -- Update data based on the primary key.
      'ignoredelete' = 'true'             -- Ignore Delete requests.
    );
    
    CREATE TEMPORARY TABLE hologres_sink_2 ( -- Declare only the fields a, d, and e.
      a BIGINT, 
      d STRING,
      e STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>', -- Specify the Hologres wide table. The table contains fields a, b, c, d, and e.
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype'='insertorupdate',    -- Update data based on the primary key.
      'ignoredelete'='true'             -- Ignore Delete requests.
    );
    
    BEGIN STATEMENT SET;
    INSERT INTO hologres_sink_1 select * from source1;
    INSERT INTO hologres_sink_2 select * from source2;
    END;

Data synchronization based on the CREATE TABLE AS and CREATE DATABASE AS statements

You can execute the CREATE TABLE AS or CREATE DATABASE AS statements to synchronize data from a single table or an entire database in real time. The changes to the schema of the source table can also be synchronized to the related Hologres table in real time during database or table synchronization. If the schema of the source table is changed, Flink synchronizes the changes to the table schema to the related Hologres table and then writes data to the Hologres table. Flink automatically completes this process. For more information, see CREATE TABLE AS statement and Ingest data into data warehouses in real time.

DataStream API

Important

If you want to call a DataStream API to read or write data, you must use the corresponding DataStream connector to access Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see Settings of DataStream connectors. You can use the Hologres DataStream connectors of different versions stored in the Maven central repository. For Realtime Compute for Apache Flink that uses VVR 6.0.7, use the dependency of 1.15-vvr-6.0.7-1. For Realtime Compute for Apache Flink that uses VVR 8.0.7, download and use the dependency file ververica-connector-hologres-1.17-vvr-8.0.7.jar. When you debug a deployment in an on-premises environment, you must use the related uber JAR package. For more information, see Run or debug a Flink deployment that includes a connector in an on-premises environment. The uber JAR package that corresponds to VVR 8.0.7 is ververica-connector-hologres-1.17-vvr-8.0.7-uber.jar.

Build an implementation class to read data from a Hologres source table

VVR provides the implementation class HologresBulkreadInputFormat of RichInputFormat to read data from a Hologres source table. The following examples show how to build the implementation class HologresBulkreadInputFormat to read data from a Hologres source table.

VVR 4.0.15

// Initialize the schema of the source table from which you want to read data. You can define fields in the schema for each column or specific columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();
// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// Build JDBC options. 
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
  jdbcOptions.getTable(), schema.getFieldNames());

// Build HologresBulkreadInputFormat to read data from the Hologres source table. 
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
  .print();
env.execute();

VVR 6.0.7 and VVR 8.0.7

// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the source table from which you want to read data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");

// Build JDBC options. 
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);

HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions,  schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
  .print();
env.execute();

Build an implementation class to read data from a Hologres binlog source table

VVR provides the implementation class HologresBinlogSource of Source to read data from a Hologres source table for which binary logging is enabled. The following examples show how to build the implementation class HologresBinlogSource to read data from a Hologres source table for which binary logging is enabled.

VVR 4.0.15

// Initialize the schema of the source table from which you want to read data. You can define fields in the schema for each column or specific columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);

// Build JDBC options. 
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);

// Build HologresBinlogSource to read data from the Hologres source table for which binary logging is enabled. 
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
  schema,
  config,
  jdbcOptions,
  recordConverter,
  startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();

VVR 6.0.7

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the source table from which you want to read data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBC options. 
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Create a slot and configure the default name of the slot.
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
  && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// Build the record converter to read data from the Hologres source table for which binary logging is enabled. 
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
  jdbcOptions.getTable(),
  schema,
  new HologresConnectionParam(config),
  cdcMode,
  Collections.emptySet());

// Build HologresBinlogSource to read data from the Hologres source table for which binary logging is enabled. 
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
  new HologresConnectionParam(config),
  schema,
  config,
  jdbcOptions,
  startTimeMs,
  StartupMode.TIMESTAMP,
  recordConverter,
  "",
  -1);

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();

VVR 8.0.7

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the source table from which you want to read data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBC options. 
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Build HologresBinlogSource to read data from the Hologres source table for which binary logging is enabled. 
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
  new HologresConnectionParam(config),
  schema,
  config,
  jdbcOptions,
  startTimeMs,
  StartupMode.INITIAL,
  "",
  "",
  -1,
  Collections.emptySet());

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
Note

Build an implementation class to write data to a Hologres result table

VVR provides the implementation class HologresSinkFunction of OutputFormatSinkFunction to write data to a Hologres result table. The following examples show how to build the implementation class OutputFormatSinkFunction to write data to a Hologres result table.

VVR 4.0.15

// Initialize the schema of the result table to which you want to write data. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .field("b", DataTypes.STRING())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);

// Build a Hologres Writer to write data in the data structure of the RowData class. 
AbstractHologresWriter<RowData> hologresWriter =
  buildHologresWriter(schema, config, hologresConnectionParam);

// Build HologresSinkFunction to write data to a Hologres result table. 
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
  .addSink(sinkFunction);
env.execute();

VVR 6.0.7 and VVR 8.0.7

// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initialize the schema of the result table to which you want to write data. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .field("b", DataTypes.STRING())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");

HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// Build a Hologres Writer to write data in the data structure of the RowData class. 
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
  hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// Build HologresSinkFunction to write data to a Hologres result table. 
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
  .addSink(sinkFunction);

env.execute();
Note

The buildHologresWriter method is not included in the dependency of the VVR connector. This method is provided in ververica-connector-demo.

Time zones of Realtime Compute for Apache Flink and Hologres

Time types

Service

Time type

Description

Realtime Compute for Apache Flink

Flink TIMESTAMP

The date and time without the time zone. Data of the TIMESTAMP type is a timestamp that represents the year, month, day, hour, minute, second, and fractional second. Data of the TIMESTAMP type can be a string, such as 1970-01-01 00:00:04.001.

Flink TIMESTAMP_LTZ

An absolute point in time on the timeline. Data of the LONG type indicates the number of milliseconds that have elapsed since the epoch time. Data of the INT type indicates the number of nanoseconds in milliseconds. The epoch time refers to 00:00:00 UTC on January 1, 1970 in Java. Data of the TIMESTAMP_LTZ type is interpreted for calculations and visualization based on the time zone that is configured in the current session. The TIMESTAMP_LTZ type can be used for calculations across time zones. A value of the TIMESTAMP_LTZ type represents the same absolute point in time in different time zones based on the epoch time.

A value of the TIMESTAMP_LTZ type may indicate different local values of the TIMESTAMP type in different time zones. For example, if a value of the TIMESTAMP_LTZ type is 2024-03-19T04:00:00Z, the local timestamp in the time zone of Shanghai (UTC+8) is expressed as 2024-03-19T12:00:00 and the local timestamp in the time zone of Greenwich (UTC+0) is expressed as 2024-03-19T04:00:00.

Hologres

TIMESTAMP

The date and time without the time zone, which is similar to the TIMESTAMP type of Realtime Compute for Apache Flink. Data of the TIMESTAMP type in Hologres does not change even if the time zone of the Hologres client changes. For example, data of the TIMESTAMP type can be expressed as 2022-01-01 01:01:01.123456.

TIMESTAMP WITH TIME ZONE (TIMESTAMPTZ)

The date and time with the time zone, which is similar to the TIMESTAMP_LTZ type of Realtime Compute for Apache Flink. When Hologres stores TIMESTAMPTZ data, Hologres converts the data into the values of the time zone in UTC. When you query data, Hologres converts the values of the time zone in UTC into the values of the time zone on the client based on the time zone parameters of the client.

For example, if the timestamp of the time zone of Beijing (UTC+8) is 2022-02-01 10:33:20.125+08 and the timestamp is stored as the TIMESTAMPTZ type in Hologres, the timestamp is expressed as 2022-02-01 10:33:20.125+08.

Time type mappings

  • If you set the type-mapping.timestamp-converting.legacy parameter to false in Realtime Compute for Apache Flink that uses VVR 8.0.6 or later, you can perform conversions of all time types between Realtime Compute for Apache Flink and Hologres.

    Realtime Compute for Apache Flink

    Hologres

    Description

    TIMESTAMP

    TIMESTAMP

    Time type conversions are performed without time zone conversions. We recommend that you use this type of time type conversion to read data from or write data to Hologres.

    TIMESTAMP LTZ

    TIMESTAMPTZ

    TIMESTAMP

    TIMESTAMPTZ

    Time type conversions are performed with time zone conversions. To ensure data accuracy during conversion, you must specify the table.local-time-zone parameter to specify the time zone of Realtime Compute for Apache Flink. For more information about how to configure the parameter, see How do I configure parameters for deployment running?

    For example, you specify 'table.local-time-zone': 'Asia/Shanghai' to set the time zone of Realtime Compute for Apache Flink to the time zone of Shanghai (UTC+8). After you write the data 2022-01-01 01:01:01.123456 of the TIMESTAMP type from Realtime Compute for Apache Flink to Hologres, the data is converted to 2022-01-01 01:01:01: 01.123456+8 of the TIMESTAMPTZ type.

    TIMESTAMP LTZ

    TIMESTAMP

  • In Realtime Compute for Apache Flink that uses VVR 8.0.6 or later and you specify type-mapping.timestamp-converting.legacy=true or in Realtime Compute for Apache Flink that uses VVR 8.0.5 or earlier, data deviation may occur during time type conversions except for the conversions of the TIMESTAMP type.

    Realtime Compute for Apache Flink

    Hologres

    Description

    TIMESTAMP

    TIMESTAMP

    Time type conversions are performed without time zone conversions. We recommend that you use this type of time type conversion to read data from or write data to Hologres.

    TIMESTAMP LTZ

    TIMESTAMPTZ

    Data of the TIMESTAMP LTZ and TIMESTAMPTZ types is expressed as the time without the time zone when Realtime Compute for Apache Flink reads data from or write data to Hologres. This may cause data deviation.

    For example, if data of the TIMESTAMP_LTZ type in Realtime Compute for Apache Flink is 2024-03-19T04:00:00Z, the time without the time zone in Shanghai (UTC+8) is 2024-03-19T12:00:00. However, when data is written to Hologres, 2024-03-19T04:00:00 is used as the time without the time zone and is converted to 2024-03-19T04:00:00+08 of the TIMESTAMPTZ type in Hologres. This causes an 8-hour data deviation.

    TIMESTAMP

    TIMESTAMPTZ

    Time zone conversions are performed based on the time zone of JVM in the runtime environment instead of the time zone of Realtime Compute for Apache Flink. This is different from the time zone conversions in Realtime Compute for Apache Flink. If the time zone of Realtime Compute for Apache Flink is different from the time zone of JVM, data deviation may occur. We recommend that you read data from and write data to Hologres based on the time zone of Realtime Compute for Apache Flink.

    TIMESTAMP LTZ

    TIMESTAMP

References