All Products
Search
Document Center

Realtime Compute for Apache Flink:Hologres connector

Last Updated:Mar 14, 2024

This topic describes how to use the Hologres connector.

Background information

Hologres is an end-to-end real-time data warehouse service that allows you to write, update, and analyze large amounts of data in real time. It is compatible with the PostgreSQL protocol and supports standard SQL syntax. Hologres supports online analytical processing (OLAP) and ad hoc queries for petabytes of data, and provides high-concurrency, low-latency online data services. Hologres is seamlessly integrated with MaxCompute, Realtime Compute for Apache Flink, and DataWorks, and provides full-stack online and offline data warehouse solutions. The following table describes the capabilities supported by the Hologres connector.

Item

Description

Table type

Source table, dimension table, and result table

Running mode

Streaming mode and batch mode

Data format

N/A

Metric

API type

DataStream API and SQL API

Data update or deletion in a result table

Supported

Features

Features supported for a source table

Feature

Description

Real-time consumption of Hologres data

  • Reads data from a Hologres source table in which the binary logging feature is disabled.

  • Consumes the binary log data of Hologres in real time.

    • Consumes data in non-Change Data Capture (CDC) mode.

    • Consumes data in CDC mode.

    • Consumes full and incremental data by using a source table.

For more information, see Use Realtime Compute for Apache Flink or Blink to consume Hologres binary log data in real time.

Features supported for a result table

Feature

Description

Streaming semantics

Writes changelogs to a Hologres result table.

Data merging into a wide table

Updates only data in specific fields instead of data in an entire row.

Data synchronization based on the CREATE TABLE AS and CREATE DATABASE AS statements

Synchronizes data from an entire database or a single table to Hologres tables in real time and synchronizes schema changes of each table to Hologres tables in real time.

Data update in specific columns

Note

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.7 or later supports this feature.

Pushes down the column names that are specified in the INSERT statement of Flink to the Hologres connector to update only the specified columns.

Prerequisites

A Hologres table is created. For more information, see Create a Hologres table.

Limits

  • Limits on all types of Hologres tables

    • Only Realtime Compute for Apache Flink that uses VVR 2.0.0 or later supports the Hologres connector.

    • The Hologres connector cannot be used to access Hologres foreign tables. For more information about Hologres foreign tables, see Create a foreign table.

    • For more information about the known defects and related issues of the Hologres connector and the release notes of the destination VVR version of Realtime Compute for Apache Flink, see Hologres connector release note.

  • Limits only on Hologres source tables

    • By default, Realtime Compute for Apache Flink performs a full table scan only once to read all data from a Hologres source table at a time. Data consumption is complete when the data scan ends. Realtime Compute for Apache Flink does not read the data that is appended to the Hologres source table. Realtime Compute for Apache Flink that uses VVR 3.0.0 or later can consume Hologres data in real time. For more information, see Consume Hologres data in real time. In Realtime Compute for Apache Flink that uses VVR 6.0.3 or later, filter pushdown is supported when the Hologres connector reads data from Hologres. For more information, see the description for the enable_filter_push_down parameter that is used only for source tables.

    • You cannot define watermarks for a Hologres source table in CDC mode. If you want to perform window aggregation on a Hologres source table, you can use a different approach to perform time-based aggregation. For more information, see MySQL CDC source tables and Hologres CDC source tables do not support window functions. How do I implement minute-level data aggregation on a MySQL CDC source table or Hologres CDC source table?

  • Limits only on Hologres result tables: None.

  • Limits only on Hologres dimension tables

    • We recommend that you specify a primary key as the JOIN condition for a Hologres dimension table when you join the Hologres dimension table with another table and use row-oriented storage to create a Hologres dimension table. Column-oriented storage incurs large performance overheads for point queries. When you use row-oriented storage to create a Hologres dimension table, you must specify the primary key as the clustering key. For more information, see Overview.

    • If you cannot use the primary key as the JOIN condition and you perform a point query by using a non-primary key, we recommend that you use column-oriented storage when you create a Hologres table. If you perform a point query by using a non-primary key, a row of data is queried and multiple rows of data are returned. In this case, we recommend that you specify the distribution key and event time column (segment key) to optimize query performance. For more information, see Distribution key, Event time column (segment key), and Storage models of tables: row-oriented storage, column-oriented storage, and row-column hybrid storage.

    • In Realtime Compute for Apache Flink that uses VVR of a version earlier than 4.0, you can perform a point query by using the primary key of a Hologres dimension table when you join the Hologres dimension table with another table. In Realtime Compute for Apache Flink that uses VVR 4.0 or later, you can perform a point query by using a non-primary key of a Hologres dimension table when you join the Hologres dimension table with another table and a JDBC-related mode is used for the Hologres dimension table.

Precautions

  • If you set the sdkMode parameter to rpc for a table, take note of the following items when you upgrade the VVR version:

    The SDK modes that can be configured for dimension tables and result tables of Hologres V2.0 and later are changed. You cannot set the sdkMode parameter to rpc for dimension tables and result tables. Only the Java Database Connectivity (JDBC)-related modes can be used for tables. The JDBC-related modes include the JDBC, jdbc_fixed, and jdbc_copy modes. If you set the sdkMode parameter to rpc for a table, Flink does not deduplicate data that has the same primary key in the same batch. If you want to retain complete data, you can use a JDBC-related mode for the table and set the jdbcWriteBatchSize parameter to 1 to avoid deduplication. You can also upgrade the VVR version to 8.0.5 and set the deduplication.enabled parameter to false to avoid deduplication.

    If you want to upgrade the VVR version from 4.X to 6.X or from 4.X to 8.X for a deployment in which data of Hologres V2.0 is read or written in remote procedure call (RPC) mode, you can configure the SDK mode based on the upgrade scenario.

    • If you upgrade the VVR version from 4.X to a 6.X minor version that ranges from 6.0.4 to 6.0.6, an error may be returned. We recommend that you set the sdkMode parameter to jdbc or jdbc_fixed for dimension tables and result tables.

    • If you upgrade the VVR version from 4.X to 6.0.7 or a later 6.X version, no action is required. Flink automatically changes the SDK mode from RPC to JDBC-related modes.

  • If you set the sdkMode parameter to holohub for a binary log source table, take note of the following items when you upgrade the VVR version:

    In Hologres V2.0 and later, you can set the sdkMode to holohub for Hologres binlog source tables only in specific scenarios. In Hologres V2.1 and later, you cannot set the sdkMode parameter to holohub for Hologres binlog source tables and you can only set the sdkMode parameter to jdbc for Hologres binlog source tables.

    If a binary log source table is consumed in your deployment and you do not set the sdkMode parameter to jdbc, the HoloHub mode is used for the binary log source table by default. If you want to upgrade the VVR version from 4.X to 6.X or from 4.X to 8.X, take note of the following points based on the Hologres version when you configure the SDK mode.

    • Hologres V2.0

    • Hologres V2.1

      • If you upgrade the VVR version from 4.X to a VVR version that ranges from 6.0.7 to 8.0.4, binary log data may not be consumed as expected. We recommend that you upgrade the VVR version to 8.0.5.

      • If you upgrade the VVR version from 4.X to 8.0.5 or a later 8.X version, no action is required. Flink automatically changes the SDK mode from HoloHub to JDBC.

Syntax

CREATE TABLE hologres_table (
  name VARCHAR,
  age BIGINT,
  birthday BIGINT,
  PRIMARY KEY (name) NOT ENFORCED
) WITH (
  'connector' = 'hologres',
  'dbname' = '<yourDBName>',
  'tablename' = '<yourTableName>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint' = '<yourEndpoint>'
);

Parameters in the WITH clause

Note

Only Realtime Compute for Apache Flink that uses VVR 4.0.11 or later supports all parameters whose names start with jdbc.

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    STRING

    Yes

    No default value

    Set the value to hologres.

    dbname

    The name of the database.

    STRING

    Yes

    No default value

    Hologres V2.0 introduces virtual warehouse instances as a new type of elastic and high-availability instances. Computing resources are divided into multiple virtual warehouses that are more suitable for high-availability scenarios. Different virtual warehouses share the same endpoint. You can add a specific suffix to the value of the dbname parameter to specify the virtual warehouse to which you want to connect. For example, if you want to connect a dimension table to the virtual warehouse read_warehouse, you can configure 'dbname' = 'db_test@read_warehouse'.

    Note

    Virtual warehouses are supported only when JDBC-related modes are used for tables. For more information, see the sdkMode parameter in the Parameters in the WITH clause section of the source table, dimension table, and result table.

    tablename

    The name of the table.

    STRING

    Yes

    No default value

    If the public schema is not used, you must set tablename to a value in the schema.tableName format.

    username

    The username that is used to access the database. You must enter the AccessKey ID of your Alibaba Cloud account.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.

    password

    The password that is used to access the database. You must enter the AccessKey secret of your Alibaba Cloud account.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    endpoint

    The endpoint of Hologres.

    STRING

    Yes

    No default value

    For more information, see Endpoints for connecting to Hologres.

    connection.ssl.mode

    Specifies whether to enable Secure Sockets Layer (SSL)-encrypted transmission and specifies the SSL-encrypted transmission mode that you want to use.

    STRING

    No

    disable

    Valid values:

    • disable: SSL-encrypted transmission is disabled. This is the default value.

    • require: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt only the connections that are used to transmit data.

    • verify-ca: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt the connections that transmit data and uses CA certificates to verify the Hologres server.

    • verify-full: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt the connections that transmit data, uses CA certificates to verify the Hologres server, and checks whether the CN or Domain Name System (DNS) specified in the CA certificates is consistent with the Hologres endpoint that is specified during connection setup.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.

    • You can set the connection.ssl.mode parameter to require in Hologres V1.1 and later. You can set the connection.ssl.mode parameter to verify-ca or verify-full in Hologres V2.1 and later. For more information, see SSL-encrypted transmission.

    • If you set this parameter to verify-ca or verify-full, you must configure the connection.ssl.root-cert.location parameter.

    connection.ssl.root-cert.location

    The path of the certificate if a CA certificate is used.

    STRING

    No

    No default value

    If you set the connection.ssl.mode parameter to verify-ca or verify-full, you must configure this parameter. You can click Upload Artifact on the Artifacts page in the console of fully managed Flink to upload the CA certificate file. The uploaded file is stored in the /flink/usrlib directory. For example, if the name of the CA certificate file that you want to use is certificate.crt, you must set this parameter to '/flink/usrlib/certificate.crt' in the WITH clause. This way, you can use the certificate after you upload the certificate file.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.

    • For more information about how to obtain a CA certificate, see Step 2: Download the CA certificate.

    jdbcRetryCount

    The maximum number of retries allowed to read and write data if a connection failure occurs.

    INTEGER

    No

    10

    N/A.

    jdbcRetrySleepInitMs

    The fixed waiting period for each retry.

    LONG

    No

    1000

    The actual waiting period for a retry is calculated by using the following formula: Value of the jdbcRetrySleepInitMs parameter + Number of retries up to the current retry × Value of the jdbcRetrySleepInitMs parameter. Unit: milliseconds.

    jdbcRetrySleepStepMs

    The accumulated waiting period for each retry.

    LONG

    No

    5000

    The actual waiting period for a retry is calculated by using the following formula: Value of the jdbcRetrySleepInitMs parameter + Number of retries up to the current retry × Value of the jdbcRetrySleepInitMs parameter. Unit: milliseconds.

    jdbcConnectionMaxIdleMs

    The maximum duration for which the JDBC connection is idle.

    LONG

    No

    60000

    If a JDBC connection stays idle for a period of time that exceeds the value of this parameter, the connection is closed and released. Unit: milliseconds.

    jdbcMetaCacheTTL

    The maximum time for storing the TableSchema information in the cache.

    LONG

    No

    60000

    Unit: milliseconds.

    jdbcMetaAutoRefreshFactor

    The factor for triggering automatic cache refresh. If the remaining time for storing data in the cache is less than the time for triggering an automatic refresh of the cache, Flink automatically refreshes the cache.

    INTEGER

    No

    4

    The remaining time for storing data in the cache is calculated by using the following formula: Remaining time for storing data in the cache = Maximum time that is allowed to store data in the cache - Duration for which data is cached. After the cache is automatically refreshed, the duration for which data is cached is recalculated from 0.

    The time for triggering automatic cache refresh is calculated by using the following formula: Time for triggering an automatic refresh of the cache = jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor.

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    field_delimiter

    The delimiter that is used between rows of the data that you want to export.

    STRING

    No

    "\u0002"

    N/A.

    binlog

    Specifies whether to consume binary log data.

    BOOLEAN

    No

    false

    Valid values:

    • true: Binary log data is consumed.

    • false: Binary log data is not consumed. This is the default value.

    sdkMode

    The SDK mode.

    STRING

    No

    holohub

    Valid values:

    • holohub: The binary log source table in HoloHub mode is used. This is the default value.

    • jdbc: The binary log source table in JDBC mode is used.

    For more information, see Statement for creating a Hologres source table in which the binary logging feature is enabled.

    Note
    • VVR 6.0.3 and an earlier 6.X version: This parameter is not supported.

    • VVR 6.0.4 to VVR 6.0.6: We recommend that you set this parameter to holohub.

    • VVR 6.0.7 and a later 6.X version: We recommend that you set this parameter to jdbc.

      • If the Hologres instance is of a version earlier than Hologres V2.0, the Flink system uses the SDK mode that you configure.

      • In Hologres V2.0 or later, the HoloHub mode is no longer supported. If you set this parameter to holohub, the Flink system automatically changes the SDK mode from HoloHub to JDBC. If you set this parameter to jdbc, the system uses the JDBC mode.

    • VVR 8.0.4 and a later 8.X version:

    jdbcBinlogSlotName

    The slot name of the binary log source table in JDBC mode. For more information about how to create a binary log source table in JDBC mode, see Binary log source table in JDBC mode.

    STRING

    No

    No default value

    This parameter takes effect only when the sdkMode parameter is set to jdbc. If you do not configure this parameter, the Hologres connector creates a slot by default. For more information, see JDBC mode used to consume binary log source tables.

    Note

    You do not need to configure this parameter when the version of the Hologres instance is V2.1 or later. The Hologres connector does not attempt to automatically create a slot.

    binlogMaxRetryTimes

    The number of retries after Realtime Compute for Apache Flink fails to read binary log data.

    INTEGER

    No

    60

    N/A.

    binlogRetryIntervalMs

    The interval between retires after Realtime Compute for Apache Flink fails to read binary log data.

    LONG

    No

    2000

    Unit: milliseconds.

    binlogBatchReadSize

    The number of rows in which binary log data is read at a time.

    INTEGER

    No

    100

    N/A.

    cdcMode

    Specifies whether to read binary log data in CDC mode.

    BOOLEAN

    No

    false

    Valid values:

    • true: Binary log data is read in CDC mode.

    • false: Binary log data is read in non-CDC mode. This is the default value.

    upsertSource

    Specifies whether the source table reads a changelog stream that contains UPSERT messages.

    BOOLEAN

    No

    false

    This parameter takes effect only in CDC mode. Valid values:

    • true: The source table reads a changelog stream that contains only UPSERT messages, including INSERT, DELETE, and UPDATE_AFTER.

    • false: The source table reads a changelog stream that contains full change messages, including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER. This is the default value.

    Note

    For example, if the ROW_NUMBER() function together with an OVER clause of Flink SQL is used to perform deduplication and retract operators exist in the result table, you must set upsertSource to true. In this case, the source table reads data from Hologres in the upsert fashion.

    binlogStartupMode

    The mode in which binary log data is consumed.

    STRING

    No

    earliestOffset

    Valid values:

    • initial: The system consumes full data and then reads binary log data to consume incremental data.

    • earliestOffset: The system starts data consumption from the earliest binary log data. This is the default value.

    • timestamp: The system consumes binary log data from the point in time that is specified by the startTime parameter.

    Note

    If the startTime parameter is configured or the start time is specified in the dialog box for starting a deployment, the timestamp mode is forcefully used for the binlogStartupMode parameter. The other two values of this parameter cannot be used. In this case, the configuration of the startTime parameter has a higher priority.

    Note

    This parameter is available only when Realtime Compute for Apache Flink uses VVR 4.0.13 or later and the Hologres version is Hologres V0.10 or later.

    startTime

    The start time when Hologres data is consumed.

    STRING

    No

    No default value

    The time is in the yyyy-MM-dd hh:mm:ss format. If this parameter is not specified and deployments are not resumed from the state, Realtime Compute for Apache Flink starts to consume the Hologres data from the earliest generated binary log.

    jdbcScanFetchSize

    The number of data records that can be processed at a time during the scan operation.

    INTEGER

    No

    256

    N/A.

    jdbcScanTimeoutSeconds

    The timeout period of the scan operation.

    INTEGER

    No

    60

    Unit: seconds.

    jdbcScanTransactionSessionTimeoutSeconds

    The timeout period for the transaction to which the scan operation belongs.

    INTEGER

    No

    600

    If this parameter is set to 0, the scan operation never times out. This parameter corresponds to the Grand Unified Configuration (GUC) parameter idle_in_transaction_session_timeout of Hologres. For more information, see GUC parameters.

    Note

    Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.15-flink-1.13 or later supports this parameter.

    enable_filter_push_down

    Specifies whether to perform filter pushdown during the full data reading phase.

    BOOLEAN

    No

    false

    Valid values:

    • false: Filter pushdown is not performed. This is the default value.

    • true: The supported filter conditions are pushed down to Hologres during the full data reading phase. The filter pushdown operation is performed in the following full data reading scenarios: full data reading from a Hologres source table in which the binary logging feature is disabled, and full data reading in a binary log source table when the Hologres connector consumes full and incremental data in a source table.

      Important

      In Realtime Compute for Apache Flink whose engine version is from vvr-6.0.3-flink-1.15 to vvr-6.0.5-flink-1.15, filter pushdown is automatically performed. However, if a deployment uses a Hologres dimension table and the DML statement that is used to write data contains filter conditions for non-primary key fields in the dimension table, the filter conditions of the dimension table are incorrectly pushed down. As a result, an error may occur when the dimension table is joined with another table. To resolve this issue, we recommend that you use Realtime Compute for Apache Flink whose engine version is vvr-6.0.6-flink-1.15 or later and add this parameter to the WITH clause in the DDL statement that is used to create the source table to enable filter pushdown.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    sdkMode

    The SDK mode.

    STRING

    No

    jdbc

    Valid values:

    • jdbc: Data is written by using a JDBC driver. This is the default value.

    • jdbc_copy: Data is written in jdbc_copy mode, which refers to the copy mode of a JDBC driver.

      This mode is a new feature that is supported in Hologres 1.3. Data writing in this mode provides higher throughput and lower data latency, and consumes less client memory resources than data writing in JDBC mode because data is written in streaming mode. If the jdbc_copy mode is used, data cannot be deleted, data cannot be written to a parent partitioned table, and the ignoreNullWhenUpdate parameter is not supported.

    • rpc: Data is written in RPC mode. If you set this parameter to rpc, the effect is similar to the effect when you set the useRpcMode parameter to true. Compared with data writing that is performed in JDBC mode, data writing that is performed in RPC mode does not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be written to Hologres.

    • jdbc_fixed: In this mode, data writing is performed by using a fixed JDBC driver. This feature is in public preview.

      Only Hologres 1.3 or later supports this feature. Compared with data writing that is performed in JDBC mode, data writing that is performed in jdbc_fixed mode do not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be written to Hologres.

    Note
    • VVR 6.0.3 or an earlier 6.X version: This parameter is not supported.

    • VVR 6.0.4 to VVR 6.0.6: We recommend that you set this parameter to jdbc.

    • VVR 6.0.7 to VVR 8.0.1: We recommend that you set this parameter to jdbc.

      • If the Hologres instance is of a version earlier than Hologres V2.0, the Flink system uses the SDK mode that you configure.

      • In Hologres V2.0 or later, the RPC mode is no longer supported. If you set this parameter to rpc, the Flink system automatically changes the SDK mode from RPC to jdbc_fixed. If you set this parameter to a value other than rpc, the Flink system uses the SDK mode that you configure.

      • If you use the RPC mode for a table, Flink does not perform deduplication on the data that has the same primary key. If you want to retain complete data, you can use a JDBC-related mode for the table and set the jdbcWriteBatchSize parameter to 1 to avoid deduplication.

    • VVR 8.0.3 and a later 8.X version: We recommend that you set this parameter to jdbc.

      For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, the RPC mode is no longer supported for tables regardless of the versions of Hologres instances. If the RPC mode is used for tables, Flink automatically changes the SDK mode from RPC to jdbc_fixed and you can set the jdbcWriteBatchSize parameter to 1 to avoid deduplication.

    • VVR 8.0.5 and later: We recommend that you set this parameter to jdbc.

      If you use the RPC mode for tables, Flink automatically changes the SDK mode from RPC to jdbc_fixed and you can set the deduplication.enabled parameter to false to avoid deduplication.

    bulkload

    Specifies whether to write data in bulkload mode.

    BOOLEAN

    No

    false

    This parameter takes effect only when the sdkMode parameter is set to jdbc_copy. You can write data in bulkload mode to tables that do not have primary keys or tables whose primary keys have the same value. Compared with the jdbc_copy mode, less Hologres resources are used when you write data in bulkload mode.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later and Hologres V2.1 or later support this parameter.

    useRpcMode

    Specifies whether to connect to the Hologres connector by using RPC.

    BOOLEAN

    No

    false

    Valid values:

    • true: RPC is used to connect to the Hologres connector.

      The effect of this setting is the same as the effect when you set the sdkMode parameter to rpc. If RPC is used, the number of SQL connections is reduced.

    • false: A JDBC driver is used to connect to the Hologres connector. This is the default value.

      JDBC drivers require SQL connections. This increases the number of JDBC connections.

    Note
    • The RPC mode is no longer supported in Hologres V2.0. We recommend that you configure the sdkMode parameter to write data in JDBC or jdbc_fixed mode.

    • Realtime Compute for Apache Flink that uses VVR 6.0.7 or VVR 8.0.1 automatically changes the SDK mode from RPC to jdbc_fixed for a table when it detects that the version of Hologres is V2.0 or later.

    • For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, Flink automatically changes the SDK mode from RPC to jdbc_fixed.

    • If you use the RPC mode for a table, Flink does not perform deduplication on the data that has the same primary key. If you want to retain complete data, we recommend that you use Realtime Compute for Apache Flink that uses VVR 8.0.5 or later, set the sdkMode parameter to jdbc, and set the deduplication.enabled parameter to false to avoid deduplication.

    mutatetype

    The data writing mode.

    STRING

    No

    insertorignore

    For more information, see Streaming semantics.

    partitionrouter

    Specifies whether to write data to a partitioned table.

    BOOLEAN

    No

    false

    N/A.

    createparttable

    Specifies whether to automatically create a partitioned table to which data is written based on partition values.

    BOOLEAN

    No

    false

    In RPC mode, if the partition values contain hyphens (-), partitioned tables cannot be automatically created.

    Note
    • In Hologres V1.3.22 or later, a field of the DATE type can be used as the partition key. For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, a partitioned table can be automatically created when you use a field of the DATE type as the partition key.

    • Make sure that partition values do not contain dirty data. If dirty data exists, a failover occurs because an invalid partitioned table is created. Proceed with caution when you use this parameter.

    • If you set the sdk_mode parameter to jdbc_copy, data cannot be written to a parent partitioned table.

    ignoredelete

    Specifies whether to ignore retraction messages.

    BOOLEAN

    No

    true

    Note

    This parameter takes effect only when the streaming semantics is used.

    connectionSize

    The size of the JDBC connection pool that is created in a Flink deployment.

    INTEGER

    No

    3

    If the deployment provides poor performance, we recommend that you increase the size of the connection pool. The size of the JDBC connection pool is proportional to data throughput.

    jdbcWriteBatchSize

    The maximum number of rows of data that can be processed by a Hologres streaming sink operator at a time when a JDBC driver is used.

    INTEGER

    No

    256

    Note

    You can configure only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you configure all the preceding parameters, the system writes data to a Hologres result table when one of the related conditions is met.

    jdbcWriteBatchByteSize

    The maximum number of rows of data that can be processed by a Hologres streaming sink operator at a time when a JDBC driver is used.

    LONG

    No

    2097152 (2 × 1024 × 1024 bytes = 2 MB)

    Note

    You can configure only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you configure all the preceding parameters, the system writes data to a Hologres result table when one of the related conditions is met.

    jdbcWriteFlushInterval

    The maximum period of time that is required to wait for a Hologres streaming sink node to write data from multiple rows to Hologres at the same time when a JDBC driver is used.

    LONG

    No

    10000

    Unit: milliseconds.

    Note

    You can configure only one of the following parameters: jdbcWriteBatchSize, jdbcWriteBatchByteSize, and jdbcWriteFlushInterval. If you configure all the preceding parameters, the system writes data to a Hologres result table when one of the related conditions is met.

    ignoreNullWhenUpdate

    Specifies whether to ignore the null value that is written in the data if the mutatetype='insertOrUpdate' setting is used.

    BOOLEAN

    No

    false

    Valid values:

    • false: The null value is written to a Hologres result table. This is the default value.

    • true: The null value that is written in the data is ignored.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 4.0 or later supports this parameter.

    • If you set the sdk_mode parameter to jdbc_copy, this parameter is not supported.

    connectionPoolName

    The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool.

    STRING

    No

    No default value

    Set the value to a string other than default. If the same connection pool is configured for multiple tables, the value of the connectionSize parameter for the tables must be the same.

    Note
    • VVR version earlier than 4.0.12: This parameter is not supported.

    • VVR 4.0.12 to VVR 8.0.3: By default, tables do not share a connection pool. Each table uses its connection pool.

    • VVR 8.0.4 or later: By default, tables that use the same endpoint in the same deployment share a connection pool. If the number of tables in a deployment is excessively large, the connections in a connection pool may be relatively insufficient. This affects the deployment performance. In this case, we recommend that you set the connectionPoolName parameter to different values for different tables.

    • You can configure this parameter based on your business requirements. For example, a deployment uses the following Hologres tables: Dimension Table A, Dimension Table B, Result Table C, Result Table D, and Result Table E. You can configure the connection pool pool1 for Dimension Table A and Dimension Table B and the connection pool pool2 for Result Table C and Result Table D. You can configure the connection pool pool3 for Result Table E in which a large number of data is processed.

    jdbcEnableDefaultForNotNullColumn

    Specifies whether to allow the Hologres connector to fill a default value if a null value is written to a non-null column for which the default value is not configured in the Hologres table.

    BOOLEAN

    No

    true

    Valid values:

    • true: The Hologres connector fills a default value. This is the default value. If you set this parameter to true, the system converts a null value into a default value based on the following rules:

      • If the column is of the STRING type, the column is left empty.

      • If the column is of the NUMBER data type, the null value is converted into 0.

      • If the column is of the DATE, TIMESTAMP, or TIMESTAMPTZ data type, the null value is converted into 1970-01-01 00:00:00.

    • false: The Hologres connector does not fill a default value. If a null value is written to a non-null column for which the default value is not configured in the Hologres table, an error is returned.

    remove-u0000-in-text.enabled

    Specifies whether to allow the Hologres connector to remove the invalid characters \u0000 from data of the STRING type that is written to the result table if the data contains the invalid characters \u0000.

    BOOLEAN

    No

    false

    Valid values:

    • false: The Hologres connector does not process data. If the data that is written to the result table contains dirty data, the error message "ERROR: invalid byte sequence for encoding "UTF8": 0x00" may be reported. This is the default value.

      In this case, you need to remove the dirty data from the source table in advance or define the dirty data processing logic in the SQL statement.

    • true: The Hologres connector removes the invalid characters \u0000 from the data of the STRING type to prevent the error message from being reported.

    partial-insert.enabled

    Specifies whether to insert only the fields declared in the INSERT statement.

    BOOLEAN

    No

    false

    Valid values:

    • false: All fields that are declared in the DDL statement of the result table are updated regardless of which fields are declared in the INSERT statement. Fields that are not declared in the INSERT statement are updated to null values. This is the default value.

    • true: The fields that are declared in the INSERT statement are pushed down to a connector. In this case, only the declared fields are updated in the result table or inserted into the result table.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 6.0.7 or later supports this parameter.

    • This parameter takes effect only if the mutatetype parameter is set to InsertOrUpdate.

    deduplication.enabled

    Specifies whether to perform deduplication when data is written in batches in jdbc or jdbc_fixed mode.

    BOOLEAN

    No

    true

    Valid values:

    • true: If the data that is written in batches contains data that has the same primary key, deduplication is performed. Only the last data record is retained. This is the default value. In this example, two fields are used and the first field is the primary key.

      • If the INSERT (1,'a') and INSERT (1,'b') records are written to the result table in chronological order, only the record (1,'b') that arrives later is retained and written to the Hologres result table after deduplication is performed.

      • The (1,'a') record already exists in the Hologres result table. In this case, the DELETE (1,'a') and INSERT (1,'b') records are written to the result table in chronological order. Only the (1,'b') record that arrives later is retained and written to the Hologres result table. In this case, data is directly updated rather than deleted and then inserted into the table.

    • false: No deduplication is performed when data is written in batches. If the primary key of the inserted data has the same value as the primary key of the data that is written in batches, the data in batches is written, and then the data that needs to be inserted is written.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.5 or later supports this parameter.

    • In extreme cases, if no deduplication is performed when data is written in batches and all data has the same primary key, data is not written in batches. This may affect the performance.

  • Parameters only for dimension tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    sdkMode

    The SDK mode.

    STRING

    No

    jdbc

    Valid values:

    • jdbc: Queries are performed in JDBC mode. Point queries based on a primary key and queries based on a non-primary key are supported. However, queries based on a non-primary key affect the query performance and the query speed is slow. This is the default value.

    • rpc: Point queries are performed in RPC mode. If you set this parameter to rpc, the effect is similar to the effect when you set the useRpcMode parameter to true. Only point queries based on a primary key are supported. When you join a Hologres dimension table with another table, you must specify all the fields in the primary keys of the dimension table in the ON clause. Compared with point queries that are performed in JDBC mode, point queries that are performed in RPC mode do not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be queried in Hologres.

    • jdbc_fixed: In this mode, point queries are performed by using a fixed JDBC driver. This feature is in public preview. Only Hologres 1.3 or later supports this feature. Compared with point queries that are performed in JDBC mode, point queries that are performed in jdbc_fixed mode do not occupy connections. Data of the Jsonb and RoarinBitmap data type cannot be queried in Hologres. Only point queries based on a primary key are supported. When you join a Hologres dimension table with another table, you must specify all the fields in the primary keys of the dimension table in the ON clause.

    Note
    • VVR 6.0.3 or an earlier 6.X version: This parameter is not supported.

    • VVR 6.0.4 to VVR 6.0.6: We recommend that you set this parameter to jdbc.

      For Realtime Compute for Apache Flink that uses VVR 6.0.6, if you set this parameter to jdbc, a null pointer error may occur when data of the STRING data type is queried in JDBC mode and the query results contain null values. To resolve this issue, we recommend that you use the RPC mode.

    • VVR 6.0.7 and VVR 8.0.1: We recommend that you set this parameter to jdbc.

      • If the Hologres instance is of a version earlier than Hologres V2.0, Flink uses the SDK mode that you configure.

      • In Hologres V2.0 or later, the RPC mode is no longer supported. If you set this parameter to rpc, Flink automatically changes the SDK mode from RPC to jdbc_fixed. If you set this parameter to a value other than rpc, Flink uses the SDK mode that you configure.

    • VVR 8.0.3 and a later 8.X version: We recommend that you set this parameter to jdbc.

      • For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, the RPC mode is no longer supported for tables regardless of the versions of Hologres instances. If the RPC mode is used for tables, Flink automatically changes the SDK mode from RPC to jdbc_fixed.

    useRpcMode

    Specifies whether to connect to the Hologres connector by using RPC.

    BOOLEAN

    No

    false

    Valid values:

    • true: RPC is used to connect to the Hologres connector. The effect of this setting is equivalent to the effect when you set the sdkMode parameter to rpc. If RPC is used, the number of SQL connections is reduced.

    • false: A JDBC driver is used to connect to the Hologres connector. This is the default value.

      JDBC drivers require SQL connections. This increases the number of JDBC connections.

    Note
    • The RPC mode is no longer supported in Hologres V2.0. We recommend that you set the sdkMode parameter to jdbc or jdbc_fixed.

    • Realtime Compute for Apache Flink that uses VVR 6.0.7 or VVR 8.0.1 automatically changes the SDK mode from RPC to jdbc_fixed for a table when it detects that the version of Hologres is V2.0 or later.

    • For Realtime Compute for Apache Flink that uses VVR 8.0.3 or later, Flink automatically changes the SDK mode from RPC to jdbc_fixed.

    connectionSize

    The size of the JDBC connection pool that is created in a Flink deployment.

    INTEGER

    No

    3

    If the deployment provides poor performance, we recommend that you increase the size of the connection pool. The size of the JDBC connection pool is proportional to data throughput.

    connectionPoolName

    The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool.

    STRING

    No

    No default value

    Set the value to a string other than default. If the same connection pool is configured for multiple tables, the value of the connectionSize parameter for the tables must be the same.

    You can configure this parameter based on your business requirements. For example, a deployment uses the following Hologres tables: Dimension Table A, Dimension Table B, Result Table C, Result Table D, and Result Table E. You can configure the connection pool pool1 for Dimension Table A and Dimension Table B and the connection pool pool2 for Result Table C and Result Table D. You can configure the connection pool pool3 for Result Table E in which a large number of data is processed.

    Note
    • VVR version earlier than 4.0.12: This parameter is not supported.

    • VVR 4.0.12 to VVR 8.0.3: By default, tables do not share a connection pool. Each table uses its connection pool.

    • VVR 8.0.4 or later: By default, tables that use the same endpoint in the same deployment share a connection pool. If the number of tables in a deployment is excessively large, the connections in a connection pool may be relatively insufficient. This affects the deployment performance. In this case, we recommend that you set the connectionPoolName parameter to different values for different tables.

    jdbcReadBatchSize

    The maximum number of data records that can be processed at the same time for a point query in a Hologres dimension table.

    INTEGER

    No

    128

    N/A.

    jdbcReadBatchQueueSize

    The maximum number of queued requests allowed in a thread to perform a point query in a Hologres dimension table.

    INTEGER

    No

    256

    N/A.

    jdbcReadTimeoutMs

    The timeout period for performing a point query in a Hologres dimension table.

    LONG

    No

    0

    Default value: 0. The default value indicates that the point query in a Hologres dimension table never times out. Only Realtime Compute for Apache Flink whose engine version is vvr-4.0.15-flink-1.13, vvr-6.0.2-flink-1.15, or their later minor versions supports this parameter.

    jdbcReadRetryCount

    The number of retries when a point query performed in a Hologres dimension table times out.

    INTEGER

    No

    • VVR version earlier than 8.0.5: 1

    • VVR 8.0.5 or later: 10

    • Only Realtime Compute for Apache Flink that uses VVR 6.0.3 or later supports this parameter.

    • This parameter is different from the jdbcRetryCount parameter. The jdbcRetryCount parameter specifies the maximum number of retries allowed to read and write data if a connection failure occurs.

    jdbcScanFetchSize

    The number of data records that can be processed at the same time by calling the scan operation when you perform a one-to-many table join. In a one-to-many table join, no complete primary key is used.

    INTEGER

    No

    256

    N/A.

    jdbcScanTimeoutSeconds

    The maximum time period for which you need to wait for a scan operation to complete.

    INTEGER

    No

    60

    Unit: seconds.

    cache

    The cache policy.

    STRING

    No

    For more information, see the Remarks column.

    Hologres supports only None and LRU cache policies. For more information, see Background information.

    Note

    The default value of this parameter varies based on the VVR version:

    • For Realtime Compute for Apache Flink that uses VVR 4.X or later, the default value of this parameter is None.

    • For Realtime Compute for Apache Flink that uses a VVR version earlier than 4.X, the default value of this parameter is LRU.

    cacheSize

    The maximum number of rows of data records that can be cached.

    INTEGER

    No

    10000

    If you set the cache parameter to LRU, you can configure the cacheSize parameter.

    cacheTTLMs

    The interval at which the system refreshes the cache.

    LONG

    No

    For more information, see the Remarks column.

    Unit: milliseconds. The default value of the cacheTTLMs parameter varies based on the value of the cache parameter:

    • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the cache timeout period. By default, cache entries do not expire.

    • If you set the cache parameter to None, the cacheTTLMs can be left empty. This indicates that cache entries do not expire.

    cacheEmpty

    Specifies whether to cache the JOIN queries whose return value is empty.

    BOOLEAN

    No

    true

    • true: The JOIN queries whose return value is empty are cached. This is the default value.

    • false: The JOIN queries whose return value is empty are not cached.

    async

    Specifies whether to enable data synchronization in asynchronous mode.

    BOOLEAN

    No

    false

    • true: Data synchronization in asynchronous mode is enabled.

    • false: Data synchronization in asynchronous mode is disabled. This is the default value.

    Note

    By default, data is not sorted when data is synchronized in asynchronous mode.

    asyncResultOrder

    Specifies whether the returned data is sorted when data is synchronized in asynchronous mode.

    STRING

    No

    unordered

    • unordered: The returned data is not sorted when data is synchronized in asynchronous mode. This is the default value.

    • ordered: The returned data is sorted when data is synchronized in asynchronous mode.

    Note
    • Only Realtime Compute for Apache Flink that uses VVR 8.0 or later supports this parameter.

Data type mappings

For more information about the mappings between the data types in Flink and Hologres, see Data type mappings between Realtime Compute for Apache Flink and Hologres.

Sample code

Sample code for a source table

Statement for creating a Hologres source table in which the binary logging feature is disabled

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'field_delimiter'='|' -- This parameter is optional. 
);

CREATE TEMPORARY TABLE blackhole_sink(
  name varchar,
  age BIGINT,
  birthday BIGINT 
) WITH (
  'connector'='blackhole'
);

INSERT INTO blackhole_sink 
SELECT name, age, birthday
from hologres_source;

Statement for creating a Hologres source table in which the binary logging feature is enabled

Realtime Compute for Apache Flink can consume the binary log data of Hologres in real time. For more information, see Consume Hologres data in real time.

Sample code for a result table

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);

CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);

INSERT INTO hologres_sink SELECT * from datagen_source;

Sample code for a dimension table

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector' = 'hologres',
   ...
);

CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);

INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

Feature description

Streaming semantics

Stream processing, which is also known as streaming data or streaming event processing, refers to the continuous processing of a series of unbounded data or events. In most cases, the system that processes streaming data or streaming events allows you to specify a reliability pattern or processing semantics to ensure data accuracy. Network or device failures may cause data loss.

Semantics can be classified into the following types based on the configurations of the Hologres streaming sink that you use and the attributes of the Hologres table:

  • Exactly-once: The system processes data or events only once even if multiple failures occur.

  • At-least-once: If the streaming data or events that you want to process are lost, the system transfers the data again from the transmission source. The system may process streaming data or events multiple times. If the first retry is successful, no subsequent retries are required.

When you use streaming semantics in a Hologres result table, take note of the following points:

  • If no primary keys are configured in the Hologres physical table, the Hologres streaming sink uses the at-least-once semantics.

  • If primary keys are configured in the Hologres physical table, the Hologres streaming sink uses the exactly-once semantics based on the primary keys. If multiple records with the same primary key are written to the table, you must configure the mutatetype parameter to determine how the result table is updated. The mutatetype parameter has the following valid values:

    • insertorignore: retains the first record and discards the subsequent records. This is the default value.

    • insertorreplace: replaces the existing record in an entire row with the one that arrives later.

    • insertorupdate: updates specific columns of the existing data. For example, a table has fields a, b, c, and d. The a field is the primary key. Only data in the a and b fields are written to Hologres. If duplicate primary keys exist, the system updates only data in the b field. Data in the c and d fields remains unchanged.

    Note
    • If the mutatetype parameter is set to insertorreplace or insertorupdate, the system updates data based on the primary key.

    • The number of columns in the result table that is defined by Flink can be different from the number of columns in the Hologres physical table. Make sure that the null value can be used to pad the missing columns. If the null value cannot be used to pad the missing columns, an error is returned.

  • By default, the Hologres streaming sink node can import data to only one non-partitioned table. If the sink node imports data to the parent table of a partitioned table, data queries fail even if the data import is successful. To enable data to be automatically written to a partitioned table, you can set the partitionRouter parameter to true. Take note of the following items:

    • You must set tablename to the name of the parent table.

    • If no partitioned table is created, you must configure createparttable=true in the WITH clause to enable the automatic creation of a partitioned table. If you do not perform the preceding configuration, data import fails.

Data merging into a wide table

If you want to write data from multiple streaming deployments to a Hologres wide table, you can merge the data into a wide table. The following section provides examples of the two methods that can be used to merge data into a wide table.

Note

Take note of the following limits:

  • The wide table must have a primary key.

  • The data of each stream must contain all fields in the primary key.

  • If the wide table is a column-oriented table and the requests per second (RPS) value is large, the CPU utilization increases. We recommend that you disable dictionary encoding for the fields in the table.

Method 1

For example, one Flink data stream contains fields a, b, and c, and the other Flink data stream contains fields a, d, and e. The Hologres wide table WIDE_TABLE contains fields a, b, c, d, and e, among which field a is the primary key. You can perform the following operations:

  1. Execute Flink SQL statements to create two Hologres result tables. One table is used to declare fields a, b, and c, and the other is used to declare fields a, d, and e. Map the two result tables to the Hologres wide table WIDE_TABLE.

  2. Parameter settings of the two Hologres result tables:

    • Set the mutatetype parameter to insertorupdate. This indicates that data is updated based on the primary key.

    • Set the ignoredelete parameter to true. This prevents retract messages from generating Delete requests.

  3. Insert data from the two Flink data streams into the two result tables.

    // In this example, the data sources are source1 and source2.
    
    CREATE TEMPORARY TABLE hologres_sink_1 ( -- Declare only fields a, b, and c.
      a BIGINT, 
      b STRING,
      c STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>', -- Specify the Hologres wide table. The table contains fields a, b, c, d, and e.
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype' = 'insertorupdate', -- Update data based on the primary key.
      'ignoredelete' = 'true'             -- Ignore Delete requests.
    );
    
    CREATE TEMPORARY TABLE hologres_sink_2 ( -- Declare only the fields a, d, and e.
      a BIGINT, 
      d STRING,
      e STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>', -- Specify the Hologres wide table. The table contains fields a, b, c, d, and e.
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype'='insertorupdate',    -- Update data based on the primary key.
      'ignoredelete'='true'             -- Ignore Delete requests.
    );
    
    BEGIN STATEMENT SET;
    INSERT INTO hologres_sink_1 select * from source1;
    INSERT INTO hologres_sink_2 select * from source2;
    END;

Method 2

Note

You can use Method 2 to merge data into a wide table only in Realtime Compute for Apache Flink that uses VVR 6.0.7 or later.

For example, one Flink data stream contains fields a, b, and c, and the other Flink data stream contains fields a, d, and e. The Hologres wide table WIDE_TABLE contains fields a, b, c, d, and e, among which field a is the primary key. You can perform the following operations:

  1. Execute Flink SQL statements to create a Hologres result table. Declare fields a, b, c, d, and e in the Flink SQL statements. Map the Hologres result table to the Hologres wide table WIDE_TABLE.

  2. Parameter settings of the Hologres result table:

    • Set the mutatetype parameter to insertorupdate. This indicates that data is updated based on the primary key.

    • Set the ignoredelete parameter to true. This prevents retract messages from generating Delete requests.

  3. Insert data from the two Flink data streams into the two result tables.

    // In this example, the data sources are source1 and source2.
    CREATE TEMPORARY TABLE hologres_sink ( -- Declare fields a, b, c, d, and e.
      a BIGINT, 
      b STRING,
      c STRING,
      d STRING,
      e STRING,
      primary key(a) not enforced
    ) WITH (
      'connector'='hologres',           
      'dbname'='<yourDbname>',
      'tablename'='<yourWideTablename>', -- Specify the Hologres wide table. The table contains fields a, b, c, d, and e.
      'username' = '${secret_values.ak_id}',
      'password' = '${secret_values.ak_secret}',
      'endpoint'='<yourEndpoint>',
      'mutatetype'='insertorupdate',    -- Update data based on the primary key.
      'ignoredelete'='true',            -- Ignore Delete requests.
      'partial-insert.enabled'='true' -- Set the partial-insert.enabled parameter to true to update only the fields that are declared in the INSERT statement.
    );
    
    BEGIN STATEMENT SET;
    INSERT INTO hologres_sink(a,b,c) select * from source1; -- Declare that only fields a, b, and c are inserted into the wide table.
    INSERT INTO hologres_sink(a,d,e) select * from source2; -- Declare that only fields a, d, and e are inserted into the wide table.
    END;

Data synchronization based on the CREATE TABLE AS and CREATE DATABASE AS statements

You can execute the CREATE TABLE AS or CREATE DATABASE AS statements to synchronize data from a single table or an entire database in real time. The changes to the schema of the source table can also be synchronized to the related Hologres table in real time during database or table synchronization. If the schema of the source table is changed, Flink synchronizes the changes to the table schema to the related Hologres table and then writes data to the Hologres table. Flink automatically completes this process. For more information, see CREATE TABLE AS statement and Ingest data into data warehouses in real time.

DataStream API

Important

If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to fully managed Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors. The Hologres DataStream connectors of different versions are stored in the Maven central repository for you to use. For Realtime Compute for Apache Flink that uses VVR 6.0.7, use the dependency of 1.15-vvr-6.0.7-1.

Build an implementation class to read data from a Hologres source table

VVR provides the implementation class HologresBulkreadInputFormat of RichInputFormat to read data from a Hologres source table. The following example shows how to build the implementation class HologresBulkreadInputFormat to read data from a Hologres source table.

VVR 4.0.15

// Define the table schema of the sink. You can define fields in the schema for each column or specific columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();
// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// Build JDBC options. 
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
  jdbcOptions.getTable(), schema.getFieldNames());

// Build HologresBulkreadInputFormat to read data from the Hologres source table. 
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
  .print();
env.execute();

VVR 6.0.7

// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the table schema of the sink. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");

// Build JDBC options. 
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);

HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions,  schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
  .print();
env.execute();

Build an implementation class to read data from a Hologres binlog source table

VVR provides the implementation class HologresBinlogSource of Source to read data from a Hologres binlog source table. The following sample code shows how to build the implementation class HologresBinlogSource to read data from a Hologres binlog source table.

VVR 4.0.15

// Define the table schema of the sink. You can define fields in the schema for each column or specific columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);

// Build JDBC options. 
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);

// Build HologresBinlogSource to read data from the Hologres binlog source table. 
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
  schema,
  config,
  jdbcOptions,
  recordConverter,
  startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();

VVR 6.0.7

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the table schema of the sink. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// Build JDBC options. 
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// Create a slot and configure the default name of the slot.
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
  && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// Build the record converter to read data from the Hologres binlog source table. 
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
  jdbcOptions.getTable(),
  schema,
  new HologresConnectionParam(config),
  cdcMode,
  Collections.emptySet());

// Build HologresBinlogSource to read data from the Hologres binlog source table. 
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
  new HologresConnectionParam(config),
  schema,
  config,
  jdbcOptions,
  startTimeMs,
  StartupMode.TIMESTAMP,
  recordConverter,
  "",
  -1);

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
Note

Build an implementation class to write data to a Hologres result table

VVR provides the implementation class HologresSinkFunction of OutputFormatSinkFunction to write data to a Hologres result table. The following sample code shows how to build the implementation class OutputFormatSinkFunction to write data to a Hologres result table.

VVR 4.0.15

// Initialize the schema of the result table to which data is written. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .field("b", DataTypes.STRING())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);

// Build a Hologres Writer to write data in the data structure of the RowData class. 
AbstractHologresWriter<RowData> hologresWriter =
  buildHologresWriter(schema, config, hologresConnectionParam);

// Build HologresSinkFunction to write data to a Hologres result table. 
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
  .addSink(sinkFunction);
env.execute();

VVR 6.0.7&VVR 8.0.1

// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the table schema of the sink. You can define fields in the schema for each column or some columns of the Hologres table based on your business requirements. 
TableSchema schema = TableSchema.builder()
  .field("a", DataTypes.INT())
  .field("b", DataTypes.STRING())
  .build();

// Configure Hologres-related parameters. 
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");

HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// Build a Hologres Writer to write data in the data structure of the RowData class. 
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
  hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// Build HologresSinkFunction to write data to a Hologres result table. 
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
  .addSink(sinkFunction);

env.execute();
Note

The buildHologresWriter method is not included in the dependency of the VVR connector. This method is provided in Sample code.

References