All Products
Search
Document Center

Hologres:Write data from open source Apache Flink 1.11 or later to Hologres in real time

Last Updated:Apr 27, 2023

This topic describes how to write data from open source Apache Flink 1.11 or later to Hologres in real time.

Prerequisites

  • A Hologres instance is created, and a development tool is used to connect to the instance. For more information, see HoloWeb quick start.

  • A Flink cluster is created. In this example, a cluster of Apache Flink 1.13 is created. You can download a binary file from the Apache Flink official website and create an Apache Flink cluster deployed in Standalone mode. For more information, see Local installation.

Background information

The source code of Hologres is available in open source Apache Flink 1.11 and later. The JAR package of Hologres Connector 1.0.1 is already published in the Maven Central Repository. You can refer to the following pom document for configurations. For more information, visit the alibabacloud-hologres-connectors page on GitHub.

<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-flink-1.13</artifactId>
    <version>1.0.1</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

Sample Flink SQL statements used to write data to Hologres

You can execute Flink SQL statements that use the following syntax to write data to Hologres. For more information about the sample statements, visit the official sample library of Hologres.

        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");

For more information about the sample statements on the following applications, see hologres-connector-flink-examples.

  • FlinkSQLToHoloExample: an application used to write data to Hologres based on the Flink SQL API.

  • FlinkDSAndSQLToHoloExample: an application used to transform a data stream into a table and write it to Hologres. The Flink DataStream API is used for the transformation and the Flink SQL API is used for data writes.

  • FlinkDataStreamToHoloExample: an application used to write data streams to Hologres based on the Flink DataStream API.

  • FlinkRoaringBitmapAggJob: an application used to collect statistics about the number of non-duplicate unique visitors (UVs) in real time and write the statistics to Hologres. Flink APIs and roaring bitmaps are used for the application.

Parameters of a Hologres connector

You can use a Hologres connector to write data from Flink to Hologres. The following table describes the parameters used by the Hologres connector.

Parameter

Required

Description

connector

Yes

The type of the result table. Set the value to hologres.

dbname

Yes

The name of the Hologres database.

tablename

Yes

The name of the Hologres table to which to write data.

username

Yes

The AccessKey ID of your Alibaba Cloud account.

You can obtain the AccessKey ID from the Security Management page.

password

Yes

The AccessKey secret of your Alibaba Cloud account.

You can obtain the AccessKey secret from the Security Management page.

endpoint

Yes

The Virtual Private Cloud (VPC) endpoint of your Hologres instance. You can view the endpoint of the Hologres instance on the Configurations tab of the instance details page in the Hologres console.

Note

The endpoint contains a port number and is in the format of IP address:Port number. If the Hologres instance and the activated Flink service reside in the same region, use the VPC endpoint of your Hologres instance. If they reside in different regions, use the public endpoint of the Hologres instance.

  • JDBC connection parameters

    Parameter

    Required

    Description

    connectionSize

    No

    The number of Java Database Connectivity (JDBC) connections in a single connection pool created by a Flink Hologres task.

    Default value: 3. The value is proportional to the throughput.

    connectionPoolName

    No

    The name of the connection pool. In the same TaskManager, tables for which the same connection pool is configured can share the connection pool.

    No default value. Each table uses its own connection pool by default. If you specify a connection pool name, the connectionSize parameter values of all tables must be the same.

    fixedConnectionMode

    No

    The data write and point query operations do not consume connections. This feature is in beta release and requires that the connector version is 1.2.0 or later and the Hologres version is 1.3 or later.

    Default value: false.

    jdbcRetryCount

    No

    The maximum number of retries allowed to read and write data if a connection failure occurs.

    Default value: 10.

    jdbcRetrySleepInitMs

    No

    The intervals at which retries are performed. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the retrySleepInitMs parameter + Number of retries × Value of the retrySleepStepMs parameter.

    Unit: milliseconds. Default value: 1000.

    jdbcRetrySleepStepMs

    No

    The amount of time required for a retry. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the retrySleepInitMs parameter + Number of retries × Value of the retrySleepStepMs parameter.

    Unit: milliseconds. Default value: 5000.

    jdbcConnectionMaxIdleMs

    No

    The idle timeout period that applies to the connections used to read and write data. If a connection does not send or receive data by the time the idle timeout period ends, Holo Client automatically releases the connection.

    Unit: milliseconds. Default value: 60000.

    jdbcMetaCacheTTL

    No

    The time-to-live (TTL) period of the table schema information in the cache.

    Unit: milliseconds. Default value: 60000.

    jdbcMetaAutoRefreshFactor

    No

    The factor that determines when to automatically refresh the cache. If the remaining TTL period of the table schema information in the cache is less than the result of dividing the value of the metaCacheTTL parameter by the value of the metaAutoRefreshFactor parameter, Holo Client automatically refreshes the cache.

    Default value: -1. The default value indicates that the cache is not automatically refreshed.

  • Data write parameters

    Parameter

    Required

    Description

    mutatetype

    No

    The mode in which to write data. For more information, see Streaming semantics.

    Default value: insertorignore.

    ignoredelete

    No

    Specifies whether to ignore retract messages. In most cases, retract messages are generated by the GROUP BY operation in Flink. When the retract messages are transferred to Hologres connectors, Delete requests are generated.

    Default value: true. This parameter takes effect only when the streaming semantics is used.

    createparttable

    No

    Specifies whether to automatically create a partitioned table based on partition key values if you specify that Flink writes data to a partitioned table. Partitioned tables can be automatically created only in Flink Serverless V2.1.X and later.

    • false: does not automatically create a partitioned table. This is the default value.

    • true: automatically creates a partitioned table.

    Exercise caution when you use this parameter. Make sure that partition key values do not contain dirty data. Otherwise, an invalid partitioned table is created.

    ignoreNullWhenUpdate

    No

    Specifies whether to ignore a null value that is written in the data if the mutatetype='insertOrUpdate' setting is used.

    Default value: false.

    jdbcWriteBatchSize

    No

    The maximum number of data entries that can be collected in a batch for a Hologres streaming sink node.

    Default value: 256.

    jdbcWriteBatchByteSize

    No

    The maximum size of data that can be collected in a batch for a Hologres streaming sink node in a thread. Unit: byte.

    Default value: 2097152 (2 × 1024 × 1024). The default value indicates 2 MB.

    jdbcWriteBatchTotalByteSize

    No

    The maximum size of data that can be collected in a batch for a Hologres streaming sink node in all threads. Unit: byte.

    Default value: 20971520 (20 × 1024 × 1024). The default value indicates 20 MB.

    jdbcWriteFlushInterval

    No

    The maximum period of time to wait for a FLUSH operation to complete. The FLUSH operation is performed for the data entries collected in a batch for a Hologres streaming sink node.

    Unit: milliseconds. Default value: 10000. The default value indicates 10 seconds.

    jdbcEnableDefaultForNotNullColumn

    No

    Specifies whether to convert a null value to the specified default value when the null value is written to a column that adopts the NOT NULL constraint and has no default value specified. If the destination column is of the STRING type, the null value is converted to an empty string (""). If the destination column is of the NUMBER type, the null value is converted to 0. If the destination column is of the DATE, TIMESTAMP, or TIMESTAMPTZ type, the null value is converted to 1970-01-01 00:00:00.

    Default value: true.

    jdbcCopyWriteMode

    No

    Specifies whether to write data in fixed copy mode. Fixed copy is a new feature that is supported in Hologres 1.3. In fixed copy mode, data is written in streaming mode and is not written in batches. Therefore, data writing in fixed copy mode achieves higher throughput and lower data latency, and consumes less client memory resources than data writing by using the INSERT statement. However, data updated in fixed copy mode cannot be retracted.

    Default value: false.

    jdbcCopyWriteFormat

    No

    Specifies whether underlying data is transmitted by using the binary protocol.

    • binary: The binary protocol is used, which delivers faster transmission speed. This is the default value.

    • text: The text mode is used.

    jdbcCopyWriteDirectConnect

    No

    Specifies whether to enable direct connection in copy mode. The amount of data that can be written in copy mode is determined by the throughput of the VPC endpoint. The system checks whether the environment can directly connect to the Hologres FE node when data is written in copy mode. Direct connection is used by default if the environment can directly connect to the Hologres FE node. If you set this parameter to false, direct connection in copy mode is not used.

    Default value: true.

  • Point query parameters

    Parameter

    Required

    Description

    jdbcReadBatchSize

    No

    The maximum number of requests allowed in a batch in a thread to perform a point query in a dimension table.

    Default value: 128.

    jdbcReadBatchQueueSize

    No

    The maximum number of queued requests allowed in a thread to perform a point query in a dimension table.

    Default value: 256.

    async

    No

    Specifies whether to synchronize data in asynchronous mode.

    Default value: false. In asynchronous mode, multiple requests and responses are concurrently processed. Therefore, consecutive requests do not block each other, and the query throughput is improved. However, requests are not processed in an absolute order in asynchronous mode.

    cache

    No

    The cache policy.

    Default value: None. Valid values: None and LRU. None indicates that no data is cached. LRU indicates that partial data in the dimension table is cached. The system searches for a data record in the cache each time it receives the data record from the source table. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

    cachesize

    No

    The maximum number of data records that can be cached.

    Default value: 10000. If you set the cache parameter to LRU, you can configure the cachesize parameter.

    cachettlms

    No

    The interval at which the cache is updated. Unit: milliseconds.

    If you set the cache parameter to LRU, you can configure the cachettlms parameter. By default, cache entries do not expire.

    cacheempty

    No

    Specifies whether to cache the data of JOIN queries whose return results are empty.

    Default value: true. This value indicates that the system caches the data of JOIN queries whose return results are empty. false: The system does not cache the data of JOIN queries whose return results are empty.

  • Data type mappings

    For information about the data type mappings between Flink Serverless and Hologres, see Data type mappings between Realtime Compute for Apache Flink and Hologres.