All Products
Search
Document Center

Hologres:Write data from Apache Flink 1.11 or later to Hologres in real time

Last Updated:Jul 25, 2024

This topic describes how to write data from Apache Flink 1.11 or later to Hologres in real time.

Prerequisites

  • A Hologres instance is created, and a development tool is used to connect to the instance. For more information, see Connect to HoloWeb.

  • An Apache Flink cluster is created. In this example, a cluster of Apache Flink 1.15 is created. You can download a binary file from the Apache Flink official website and create an Apache Flink cluster that works in standalone mode. For more information, see Local installation of Flink.

Background information

The code of Hologres is open-sourced in Apache Flink 1.11 and later. The release packages of Hologres connectors that correspond to different Apache Flink versions are published in the Maven repository. The following pom.xml file provides an example of configurations.

<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-flink-1.15</artifactId>
    <version>1.4.0</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

The following table describes the version mappings between Apache Flink and Hologres connectors. We recommend that you use version 1.15 or later to use more features.

Apache Flink version

Hologres connector version

Apache Flink 1.11

hologres-connector-flink-1.11:1.0.1

Apache Flink 1.12

hologres-connector-flink-1.12:1.0.1

Apache Flink 1.13

hologres-connector-flink-1.13:1.3.2

Apache Flink 1.14

hologres-connector-flink-1.14:1.3.2

Apache Flink 1.15

hologres-connector-flink-1.15:1.4.1

Apache Flink 1.17

hologres-connector-flink-1.17:1.4.1

Sample Flink SQL statements used to write data to Hologres

You can execute Flink SQL statements that use the following syntax to write data to Hologres.

        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");

  • FlinkSQLToHoloExample: an application used to write data to Hologres based on the Flink SQL interface.

  • FlinkDSAndSQLToHoloExample: an application used to transform a data stream into a table and write it to Hologres. The Flink DataStream interface is used for transformation and the Flink SQL interface is used for data writes.

  • FlinkDataStreamToHoloExample: an application used to write data streams to Hologres based on the Flink DataStream interface.

  • FlinkRoaringBitmapAggJob: an application used to count the number of unique visitors (UVs) after deduplication in real time and write the statistics to Hologres. Flink, roaring bitmaps, and Hologres dimension tables are used for the application.

  • FlinkToHoloRePartitionExample: an application used to partition data in shards in Hologres during real-time data writes based on the Flink DataStream interface. This helps significantly decrease the number of small files in Hologres, improve write performance, and reduce system loads. This application is suitable for scenarios in which you want to import data to multiple primary key-configured empty tables at the same time. This application achieves the similar effect as the INSERT OVERWRITE operation.

Parameters of a Hologres connector

You can use a Hologres connector to write data from Apache Flink to Hologres. The following table describes the parameters used by the Hologres connector.

Parameter

Required

Description

connector

Yes

The type of the result table. Set the value to hologres.

dbname

Yes

The name of the Hologres database.

tablename

Yes

The name of the Hologres table to which you want to write data.

username

Yes

The AccessKey ID of the Alibaba Cloud account.

You can obtain the AccessKey ID from the AccessKey Pair page.

password

Yes

The AccessKey secret of the Alibaba Cloud account.

You can obtain the AccessKey secret from the AccessKey Pair page.

endpoint

Yes

The virtual private cloud (VPC) endpoint of your Hologres instance. You can view the endpoint of the Hologres instance on the instance details page in the Hologres console.

Note

The endpoint must include a port number and must be in the IP address:Port number format. If the Hologres instance and the Apache Flink cluster reside in the same region, use the VPC endpoint of your Hologres instance. If the Hologres instance and the Apache Flink cluster reside in different regions, use the public endpoint of your Hologres instance.

  • The following table describes connection parameters.

    Parameter

    Required

    Description

    connectionSize

    No

    The number of Java Database Connectivity (JDBC) connections in a single connection pool created by a Flink Hologres task.

    Default value: 3. The value is proportional to the throughput.

    connectionPoolName

    No

    The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool.

    No default value. Each table uses its own connection pool by default. If you specify a connection pool name, the connectionSize parameter values of all tables must be the same.

    fixedConnectionMode

    No

    The data write and point query operations do not consume connections. This feature is in beta release and requires that the connector version is 1.2.0 or later and the Hologres version is 1.3 or later.

    Default value: false.

    jdbcRetryCount

    No

    The maximum number of retries allowed to write and query data if a connection failure occurs.

    Default value: 10.

    jdbcRetrySleepInitMs

    No

    The interval at which retries are performed is calculated by using the following formula: Value of the retrySleepInitMs parameter + Number of retries × Value of the retrySleepStepMs parameter.

    Unit: milliseconds. Default value: 1000.

    jdbcRetrySleepStepMs

    No

    The interval at which retries are performed is calculated by using the following formula: Value of the retrySleepInitMs parameter + Number of retries × Value of the retrySleepStepMs parameter.

    Unit: milliseconds. Default value: 5000.

    jdbcConnectionMaxIdleMs

    No

    The idle timeout period that applies to connections used for data writes and point queries. If the timeout period expires, the connection is released.

    Unit: milliseconds. Default value: 60000.

    jdbcMetaCacheTTL

    No

    The time-to-live (TTL) period of the table schema information in the cache.

    Unit: milliseconds. Default value: 60000.

    jdbcMetaAutoRefreshFactor

    No

    The factor that determines when to automatically refresh the cache. If the remaining TTL period of the table schema information in the cache is less than the result of dividing the value of the jdbcMetaCacheTTL parameter by the value of this parameter, the cache is automatically refreshed.

    Default value: -1. The default value indicates that the cache is not automatically refreshed.

    connection.ssl.mode

    No

    Specifies whether to enable SSL-encrypted transmission. Valid values:

    • disable: SSL-encrypted transmission is disabled. This is the default value.

    • require: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt only the connections that are used to transmit data.

    • verify-ca: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt the connections that transmit data and uses CA certificates to verify the Hologres server.

    • verify-full: SSL-encrypted transmission is enabled. The client uses SSL-encrypted transmission to encrypt the connections that transmit data, uses CA certificates to verify the Hologres server, and checks whether the CN or Domain Name System (DNS) specified in the CA certificates is consistent with the Hologres endpoint that is specified during connection setup.

    connection.ssl.root-cert.location

    No

    The path of the CA certificate. You must ensure that the CA certificate is uploaded to the Apache Flink cluster.

    Note

    This parameter is required if you set the connection.ssl.mode parameter to verify-ca or verify-full.

    jdbcDirectConnect

    No

    Specifies whether to enable the direct connection mode. Valid values:

    • false: disabled. This is the default value.

    • true: enabled.

      The amount of data that can be written by using Apache Flink is determined by the throughput allowed by the VPC endpoint. If this parameter is set to true, the system checks whether Apache Flink can directly connect to Hologres FE nodes in the current environment. If Apache Flink can directly connect to Hologres FE nodes in the current environment, direct connection is used by default.

  • The following table describes data write parameters.

    Parameter

    Required

    Description

    mutatetype

    No

    The mode in which data is written. For more information, see the "Streaming semantics" section in Create a Hologres result table.

    Default value: insertorignore.

    ignoredelete

    No

    Specifies whether to ignore retract messages. In most cases, retract messages are generated by the GROUP BY operation in Flink. When the retract messages are transferred to Hologres connectors, Delete requests are generated.

    Default value: true. This parameter takes effect only when the streaming semantics is used.

    createparttable

    No

    Specifies whether to automatically create partitions based on partition key values if you write data to a partitioned table.

    • false: Partitions are not automatically created. This is the default value.

    • true: Partitions are automatically created.

    Exercise caution when you use this parameter. Make sure that partition key values do not contain dirty data. Otherwise, an invalid partition is created.

    ignoreNullWhenUpdate

    No

    Specifies whether to ignore null values that are written in the data if the mutatetype='insertOrUpdate' setting is used.

    Default value: false.

    jdbcWriteBatchSize

    No

    The maximum number of data entries that can be collected in a batch for a Hologres streaming sink node.

    Default value: 256.

    jdbcWriteBatchByteSize

    No

    The maximum size of data that can be collected in a batch for a Hologres streaming sink node in a thread. Unit: byte.

    Default value: 2097152 (2 × 1024 × 1024), which is 2 MB.

    jdbcWriteBatchTotalByteSize

    No

    The maximum size of data that can be collected in a batch for a Hologres streaming sink node in all threads. Unit: byte.

    Default value: 20971520 (20 × 1024 × 1024), which is 20 MB.

    jdbcWriteFlushInterval

    No

    The maximum period of time to wait for a FLUSH operation to complete. The FLUSH operation is performed for the data entries collected in a batch for a Hologres streaming sink node.

    Unit: milliseconds. Default value: 10000, which is 10 seconds.

    jdbcUseLegacyPutHandler

    No

    The syntax that is used by SQL statements. Valid values:

    • true: The insert into xxx(c0,c1,...) values (?,,..),.. on conflict; syntax is used.

    • false: The insert into xxx(c0,c1,...) select unnest(?),unnest(?),.. on conflict syntax is used. This is the default value.

    jdbcEnableDefaultForNotNullColumn

    No

    Specifies whether to convert a null value to the specified default value when the null value is written to a column that adopts the NOT NULL constraint and has no default value specified. If the destination column is of the STRING type, the null value is converted to an empty string (""). If the destination column is of the NUMBER type, the null value is converted to 0. If the destination column is of the DATE, TIMESTAMP, or TIMESTAMPTZ type, the null value is converted to 1970-01-01 00:00:00.

    Default value: true.

    remove-u0000-in-text.enabled

    No

    Specifies whether to automatically replace TEXT-type u0000 characters that are not encoded in UTF-8. Valid values:

    • true: The u0000 characters are replaced.

    • false: The u0000 characters are not replaced. This is the default value.

    deduplication.enabled

    No

    Specifies whether to perform deduplication if the amount of data that is written at the same time includes data records with the same primary key value. Valid values:

    • true: Deduplication is performed. Only the last data record that has the same primary key value as other data records is retained. This is the default value.

    • false: Deduplication is not performed.

      After data that is batch processed is written, the data that needs to be inserted is written.

      Note

      If deduplication is disabled, batch data writing may not be performed in extreme cases. For example, if all data that needs to be written has the same primary key value, the data cannot be written at the same time. This negatively affects write performance.

    aggressive.enabled

    No

    Specifies whether to enable the aggressive commit mode. Valid values:

    • true: enabled.

      In aggressive commit mode, the system forcefully commits data when it detects an idle connection regardless of whether the number of data records that are processed at the same time reaches the expected number. In this mode, the latency of data transmission can be reduced when the traffic is low.

    • false: disabled. This is the default value.

    jdbcCopyWriteMode

    No

    Specifies whether to write data in fixed copy mode.

    In fixed copy mode, data is written in streaming mode rather than in batches. This ensures higher throughput and lower data latency and consumes less client memory resources than data writing by using the INSERT statement. However, data updated in fixed copy mode cannot be retracted.

    Default value: false.

    Only Hologres V1.3.1 and later support this parameter.

    jdbcCopyWriteFormat

    No

    Specifies whether to use the binary protocol.

    • binary: The binary protocol is used, which delivers faster transmission speed. This is the default value.

    • text: The text mode is used.

    Only Hologres V1.3.1 and later support this parameter.

    bulkLoad

    Specifies whether to use the batch copy mode to write data. Valid values:

    • true: This parameter value takes effect only when the jdbcCopyWriteMode parameter is set to true.

      Note
      • Compared with the fixed copy mode, the batch copy mode provides better resource utilization.

      • By default, the batch copy mode is supported only when you write data to tables for which no primary key is configured. If you write data to primary key-configured tables, table-level locks may be acquired. This may cause restrictions or make data writes complex.

    • false: disabled. This is the default value.

    Only Hologres V1.4.0 and later support this parameter.

    target-shards.enabled

    Specifies whether to enable batch data write at the shard level. Valid values:

    • false: disabled. This is the default value.

    • true: enabled.

      By default, table-level locks are acquired when you use the batch copy mode to write data to primary key-configured tables. If the source data is partitioned by shard, you can configure this parameter to change table-level locks to shard-level locks. This helps improve write performance. If the source data is not partitioned by shard, we recommend that you set this parameter to false to prevent data loss.

    Only Hologres V1.4.1 and later support this parameter.

  • Point query parameters

    Parameter

    Required

    Description

    jdbcReadBatchSize

    No

    The maximum number of requests allowed in a batch in a thread to perform a point query based on a dimension table.

    Default value: 128.

    jdbcReadBatchQueueSize

    No

    The maximum number of queued requests allowed in a thread to perform a point query based on a dimension table.

    Default value: 256.

    async

    No

    Specifies whether to synchronize data in asynchronous mode.

    Default value: false. In asynchronous mode, multiple requests and responses are concurrently processed. Therefore, consecutive requests do not block each other, and the query throughput is improved. However, requests are not processed in an absolute order in asynchronous mode.

    cache

    No

    The cache policy.

    Default value: None. Valid values: None and LRU. None indicates that no data is cached. LRU indicates that partial data in the dimension table is cached. The system searches for a data record in the cache each time it receives the data record from the source table. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

    cachesize

    No

    The maximum number of data records that can be cached.

    Default value: 10000. If you set the cache parameter to LRU, you can configure the cachesize parameter.

    cachettlms

    No

    The interval at which the cache is updated. Unit: milliseconds.

    If you set the cache parameter to LRU, you can configure the cachettlms parameter. By default, cache entries do not expire.

    cacheempty

    No

    Specifies whether to cache the data of JOIN queries whose return results are empty.

    Default value: true. This value indicates that the system caches the data of JOIN queries whose return results are empty. false: The system does not cache the data of JOIN queries whose return results are empty.

  • Data type mappings

    For information about the data type mappings between fully managed Flink and Hologres, see the "Data type mappings between Realtime Compute for Apache Flink or Blink and Hologres" section in Data types.