This topic describes how to write data from open source Apache Flink 1.11 to Hologres in real time.
The source code of Hologres is available in open source Apache Flink 1.11 and later. For more information, visit the official sample library of Hologres.
Sample Flink SQL statements used to write data to Hologres
String createHologresTable = String.format( "create table sink(" + " user_id bigint," + " user_name string," + " price decimal(38,2)," + " sale_timestamp timestamp" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," + " 'tablename' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'endpoint' = '%s'" + ")", database, tableName, userName, password, endPoint); tEnv.executeSql(createHologresTable); createScanTable(tEnv); tEnv.executeSql("insert into sink select * from source");
Parameters of a Hologres connector
|connector||Yes||The type of the result table. Set the value to hologres.|
|dbname||Yes||The name of the Hologres database to be connected to.|
|tablename||Yes||The name of the Hologres table that is used to receive data.|
|username||Yes||The AccessKey ID of your Alibaba Cloud account.
You can obtain the AccessKey ID from the Security Management page.
|password||Yes||The AccessKey secret of your Alibaba Cloud account.
You can obtain the AccessKey secret from the Security Management page.
|endpoint||Yes||The VPC endpoint of the Hologres instance to be connected to. You can view the endpoint
of the Hologres instance on the Configurations tab of the instance details page in the Hologres console.
Note The endpoint must contain a port number and must be in the format of
|mutatetype||No||The mode in which the data is to be written. Default value:
|ignoredelete||No||Specifies whether to skip retract messages. In most cases, retract messages are generated
by the GROUP BY operation in Flink. When the retract messages are transferred to Hologres
connectors, Delete requests are generated. Default value:
Note This parameter takes effect only when the streaming semantics is used.
|partitionrouter||No||Specifies whether to write the data to a partitioned table. Default value:
|createparttable||No||Specifies whether to automatically create a partitioned table based on partition key
values if you specify that Flink writes data to a partitioned table. Valid values:
Note Only Flink 1.1 and later can automatically create a partitioned table. We recommend that you make sure that no dirty data exists in the partition key values before you enable this feature. Otherwise, an error is returned when you create a partitioned table.
|connectionSize||No||The size of the JDBC connection pool created by a single Flink Hologres task. Default value: 3. The value is proportional to the throughput.|
|jdbcWriteBatchSize||No||The maximum number of data entries collected in a batch in JDBC mode for a Hologres streaming sink node. Default value: 256.|
|jdbcWriteFlushInterval||No||The maximum period of time to wait for a FLUSH operation to complete. The FLUSH operation is performed for the data entries collected in a batch in JDBC mode for a Hologres streaming sink node. Default value: 10000. Unit: ms. The default value indicates 10 seconds.|