This topic describes how to write data from open source Apache Flink 1.11 or later to Hologres in real time.

Prerequisites

  • A Hologres instance is created, and a development tool is used to connect to the instance. For more information, see HoloWeb quick start.
  • A Flink cluster is created. In this example, a cluster of Apache Flink 1.13 is created. You can download a binary file from the Apache Flink official website and create an Apache Flink cluster deployed in Standalone mode. For more information, see Local installation.

Background information

The source code of Hologres is available in open source Apache Flink 1.11 and later. The JAR package of Hologres Connector 1.0.1 is already published in the Maven Central Repository. You can refer to the following pom document for configurations. For more information, visit the alibabacloud-hologres-connectors page on GitHub.
<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-flink-1.13</artifactId>
    <version>1.0.1</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

Sample Flink SQL statements used to write data to Hologres

You can execute Flink SQL statements that use the following syntax to write data to Hologres. For more information about the sample statements, visit the official sample library of Hologres.
        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");
For more information about the sample statements on the following applications, see hologres-connector-flink-examples.
  • FlinkSQLToHoloExample: an application used to write data to Hologres based on the Flink SQL API.
  • FlinkDSAndSQLToHoloExample: an application used to transform a data stream into a table and write it to Hologres. The Flink DataStream API is used for the transformation and the Flink SQL API is used for data writes.
  • FlinkDataStreamToHoloExample: an application used to write data streams to Hologres based on the Flink DataStream API.
  • FlinkRoaringBitmapAggJob: an application used to collect statistics about the number of non-duplicate unique visitors (UVs) in real time and write the statistics to Hologres. Flink APIs and roaring bitmaps are used for the application.

Parameters of a Hologres connector

You can use a Hologres connector to write data from Flink to Hologres. The following table describes the parameters of the Hologres connector.
Parameter Required Description
connector Yes The type of the result table. Set the value to hologres.
dbname Yes The name of the Hologres database.
tablename Yes The name of the Hologres table to which to write data.
username Yes The AccessKey ID of your Alibaba Cloud account.

You can obtain the AccessKey ID from the Security Management page.

password Yes The AccessKey secret of your Alibaba Cloud account.

You can obtain the AccessKey secret from the Security Management page.

endpoint Yes The Virtual Private Cloud (VPC) endpoint of your Hologres instance. You can view the endpoint of the Hologres instance on the Configurations tab of the instance details page in the Hologres console.
Note The endpoint must contain a port number and must be in the format of IP address:Port number. If the Hologres instance and the activated Flink service reside in the same region, you must use the VPC endpoint of your Hologres instance. If they reside in different regions, you can use the public endpoint of the Hologres instance.
  • JDBC connection parameters
    Parameter Required Description
    connectionSize No The number of Java Database Connectivity (JDBC) connections in a single connection pool created by a Flink Hologres task.

    Default value: 3. The value is proportional to the throughput.

    jdbcRetryCount No The maximum number of retries allowed to read and write data if a connection failure occurs.

    Default value: 10.

    jdbcRetrySleepInitMs No The intervals at which retries are performed. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the retrySleepInitMs parameter + Number of retries × Value of the retrySleepStepMs parameter.

    Unit: milliseconds. Default value: 1000.

    jdbcRetrySleepStepMs No The amount of time required for a retry. The amount of time consumed by the retries for a request is calculated by using the following formula: Value of the retrySleepInitMs parameter + Number of retries × Value of the retrySleepStepMs parameter.

    Unit: milliseconds. Default value: 5000.

    jdbcConnectionMaxIdleMs No The idle timeout period that applies to the connections used to read and write data. If a connection does not send or receive data by the time the idle timeout period ends, Holo Client automatically releases the connection.

    Unit: milliseconds. Default value: 60000.

    jdbcMetaCacheTTL No The time-to-live (TTL) period of the table schema information in the cache.

    Unit: milliseconds. Default value: 60000.

    jdbcMetaAutoRefreshFactor No The factor that determines when to automatically refresh the cache. If the remaining TTL period of the table schema information in the cache is less than the result of dividing the value of the metaCacheTTL parameter by the value of the metaAutoRefreshFactor parameter, Holo Client automatically refreshes the cache.

    Default value: -1. The default value indicates that the cache is not automatically refreshed.

  • Data write parameters
    Parameter Required Description
    mutatetype No The mode in which to write data. For more information, see Streaming semantics.

    Default value: insertorignore.

    ignoredelete No Specifies whether to ignore retract messages. In most cases, retract messages are generated by the GROUP BY operation in Flink. When the retract messages are transferred to Hologres connectors, Delete requests are generated.

    Default value: false. This parameter takes effect only when the streaming semantics is used.

    partitionrouter No Specifies whether to write data to a partitioned table.

    Default value: false.

    createparttable No Specifies whether to automatically create a partitioned table based on partition key values if you specify that Flink writes data to a partitioned table. Partitioned tables can be automatically created only in Flink Serverless V2.1.X and later.
    • false: does not automatically create a partitioned table. This is the default value.
    • true: automatically creates a partitioned table.
    Exercise caution when you use this parameter. Make sure that partition key values do not contain dirty data. Otherwise, an invalid partitioned table is created.
    jdbcWriteBatchSize No The maximum number of data entries collected in a batch for a Hologres streaming sink node.

    Default value: 256.

    jdbcWriteBatchByteSize No The maximum size of data collected in a batch for a Hologres streaming sink node in a thread. Unit: byte.

    Default value: 2097152 (2 × 1024 × 1024). The default value indicates 2 MB.

    jdbcWriteBatchTotalByteSize No The maximum size of data collected in a batch for a Hologres streaming sink node in all threads. Unit: byte.

    Default value: 20971520 (20 × 1024 × 1024). The default value indicates 20 MB.

    jdbcWriteFlushInterval No The maximum period of time to wait for a FLUSH operation to complete. The FLUSH operation is performed for the data entries collected in a batch for a Hologres streaming sink node.

    Unit: milliseconds. Default value: 10000. The default value indicates 10 seconds.

    jdbcReWriteBatchedDeletes No Specifies whether to combine multiple DELETE requests in an SQL statement to improve performance.

    Default value: true.

    jdbcRewriteSqlMaxBatchSize No The maximum number of INSERT or DELETE requests allowed in an SQL statement. For example, in an SQL statement of a data write operation, the result of dividing the value of the writeBatchSize parameter by the value of the rewriteSqlMaxBatchSize parameter specifies the maximum number of the INSERT requests allowed.

    Default value: 1024.

    jdbcEnableDefaultForNotNullColumn No Specifies whether to convert the null value to the specified default value when the null value is written to a column that adopts the NOT NULL constraint and has no default value specified. If the destination column is of the STRING type, the null value is converted to an empty string (""). If the destination column is of the NUMBER type, the null value is converted to 0. If the destination column is of the DATE, TIMESTAMP, or TIMESTAMPTZ type, the null value is converted to 1970-01-01 00:00:00.

    Default value: true.

  • Point query parameters
    Parameter Required Description
    jdbcReadBatchSize No The maximum number of requests allowed in a batch in a thread to perform a point query in a dimension table.

    Default value: 128.

    jdbcReadBatchQueueSize No The maximum number of queued requests allowed in a thread to perform a point query in a dimension table.

    Default value: 256.

  • Data type mappings

    For information about the data type mappings between Flink Serverless and Hologres, see Data type mappings between Realtime Compute for Apache Flink and Hologres.