This topic describes how to write data from open source Apache Flink 1.11 to Hologres in real time.

Background information

The source code of Hologres is available in open source Apache Flink 1.11 and later. For more information, visit the official sample library of Hologres.

Sample Flink SQL statements used to write data to Hologres

You can execute the Flink SQL statements that use the following syntax to write data to Hologres. For more information about the sample statements, visit the official sample library of Hologres.
        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");

Parameters of a Hologres connector

You can use a Hologres connector to write data from Flink to Hologres. The following table describes the parameters of the Hologres connector.
Parameter Required Description
connector Yes The type of the result table. Set the value to hologres.
dbname Yes The name of the Hologres database to be connected to.
tablename Yes The name of the Hologres table that is used to receive data.
username Yes The AccessKey ID of your Alibaba Cloud account.

You can obtain the AccessKey ID from the Security Management page.

password Yes The AccessKey secret of your Alibaba Cloud account.

You can obtain the AccessKey secret from the Security Management page.

endpoint Yes The VPC endpoint of the Hologres instance to be connected to. You can view the endpoint of the Hologres instance on the Configurations tab of the instance details page in the Hologres console.
Note The endpoint must contain a port number and must be in the format of IP address:Port number. If the Hologres instance and the activated Flink service reside in the same region, use the VPC endpoint of your Hologres instance. If they reside in different regions, use the public endpoint of the Hologres instance.
mutatetype No The mode in which the data is to be written. Default value: insertorignore.
ignoredelete No Specifies whether to skip retract messages. In most cases, retract messages are generated by the GROUP BY operation in Flink. When the retract messages are transferred to Hologres connectors, Delete requests are generated. Default value: false.
Note This parameter takes effect only when the streaming semantics is used.
partitionrouter No Specifies whether to write the data to a partitioned table. Default value: false.
createparttable No Specifies whether to automatically create a partitioned table based on partition key values if you specify that Flink writes data to a partitioned table. Valid values:
  • false: does not automatically create a partitioned table. This is the default value.
  • true: automatically creates a partitioned table.
Note Only Flink 1.1 and later can automatically create a partitioned table. We recommend that you make sure that no dirty data exists in the partition key values before you enable this feature. Otherwise, an error is returned when you create a partitioned table.
connectionSize No The size of the JDBC connection pool created by a single Flink Hologres task. Default value: 3. The value is proportional to the throughput.
jdbcWriteBatchSize No The maximum number of data entries collected in a batch in JDBC mode for a Hologres streaming sink node. Default value: 256.
jdbcWriteFlushInterval No The maximum period of time to wait for a FLUSH operation to complete. The FLUSH operation is performed for the data entries collected in a batch in JDBC mode for a Hologres streaming sink node. Default value: 10000. Unit: ms. The default value indicates 10 seconds.