Hologres provides a new version of the Flink connector plug-in that allows you to import data to Hologres in batches by using Flink. This improves import efficiency and reduces the workload during the import process.
Background information
In big data processing, Hologres is a powerful online analytical processing (OLAP) system that is integrated with Flink to provide optimized real-time streaming data processing capabilities. For scenarios that do not require high data timeliness, such as batch loading of historical data, offline data processing, or log aggregation, we recommend that you use Flink to import data to Hologres in batches. Batch import allows you to write a large amount of data to Hologres at a time in a more efficient and resource-saving manner. This improves the data import efficiency and resource utilization. You can import data in real time or in batches based on your business requirements and resource status. For more information about how to import data in real time, see Fully managed Flink.
Prerequisites
A Hologres instance is purchased. For more information about how to purchase a Hologres instance, see Purchase a Hologres instance.
A Flink cluster of 1.15 or later is deployed. For more information, see the following topics:
Apache Flink: Deploy Flink
Realtime Compute for Apache Flink: Activate Realtime Compute for Apache Flink
Use Realtime Compute for Apache Flink to import data in batches
Create a Hologres result table in the HoloWeb console to receive data that is imported by using Realtime Compute for Apache Flink. For more information, see Connect to HoloWeb. In this example, the
test_sink_customer
table is created.-- Create a Hologres result table. CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );
NoteThe names and data types of fields in the Flink source table must be consistent with the names and data types of fields in the Hologres result table.
Log on to the Realtime Compute for Apache Flink console. In the left-side navigation pane, click Deployments. On the Deployments page, click Create Deployment. In the Create Deployment dialog box, configure the parameters and click Deploy. For more information about the parameter settings, see the "Create a JAR deployment" section in Create a deployment.
The following table describes the parameters.
Parameter
Description
Deployment Type
Select JAR.
Deployment Mode
Select Stream Mode or Batch Mode. In this example, Batch Mode is selected.
Engine Version
Select the version of the Flink engine that you want to use for the deployment. For more information about engine versions, see Engine version and Lifecycle policies. In this example,
vvr-8.0.7-flink-1.17
is used.JAR URI
Upload the package of the open source Flink connector hologres-connector-flink-repartition.
NoteThe open source Flink connector can be used to import data to Hologres in batches. For more information about the open source code of the Flink connector, visit the official sample library of Hologres.
Entry Point Class
The entry point class of the program. The main class name of the Flink connector is
com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample
.Entry Point Main Arguments
Enter the path parameter of the
repartition.sql
file. For Realtime Compute for Apache Flink, additional dependency files are stored in/flink/usrlib/
. In this example,--sqlFilePath="/flink/usrlib/repartition.sql"
is entered.Additional Dependencies
Upload the
repartition.sql
file. Therepartition.sql
file is an SQL script file of Flink that is used to define a source table, declare a result table, and configure Hologres connection information. The following sample code shows the content of therepartition.sql
file in this example.-- The data definition language (DDL) statement that is used to define the source table. In this example, the DataGen public test data of Flink is used as the source data. CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); -- The DQL statement that is used to query data from the source table. The quantity and data types of fields that are returned by this DQL statement must be consistent with the quantity and data types of fields that are declared in the DDL statement of the result table. SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- The DDL statement that is used to declare the result table and configure the Hologres connection information. CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'LTAI5tJCNqeCY3DtKw8c****' ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );
NoteFor more information about the Hologres connection parameters in the
repartition.sql
file, see Write data from open source Apache Flink 1.11 or later to Hologres in real time.Click the name of the desired deployment. On the Configuration tab, modify the deployment configuration in the Resources section and the value of the Parallelism parameter.
NoteWe recommend that you set the Parallelism parameter to the number of shards in the Hologres result table.
Query data from the Hologres result table.
After the Flink deployment is submitted, you can query data from the Hologres result table. Sample statement:
SELECT * FROM test_sink_customer;
Use Apache Flink to import data to Hologres in batches
Create a Hologres result table in the HoloWeb console to receive data that is imported by using Apache Flink. For more information, see Connect to HoloWeb. In this example, the
test_sink_customer
table is created.-- Create a Hologres result table. CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );
NoteYou can specify the number of shards based on the data amount. For more information about shards, see User guide of table groups and shard counts.
Create the
repartition.sql
file and upload the file to a directory in the Flink cluster. In this example, the file is uploaded to the/flink-1.15.4/src
directory. The following sample code shows the content of therepartition.sql
file.NoteThe
repartition.sql
file is an SQL script file of Flink that is used to define a source table, declare a result table, and configure Hologres connection information.-- The DDL statement that is used to define the source table. In this example, the DataGen public test data of Flink is used as the source data. CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); -- The DQL statement that is used to query data from the source table. The quantity and data types of fields that are returned by this DQL statement must be consistent with the quantity and data types of fields that are declared in the DDL statement of the result table. SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- The DDL statement that is used to declare the result table and configure the Hologres connection information. CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'LTAI5tJCNqeCY3DtKw8c****' ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );
Configure the parameters. The following table describes the parameters.
Parameter
Required
Description
connector
Yes
The type of the result table. Set the parameter to hologres.
dbname
Yes
The name of the Hologres database.
tablename
Yes
The name of the Hologres table to which data is imported.
username
Yes
The AccessKey ID of the Alibaba Cloud account.
You can obtain the AccessKey ID from the AccessKey Pair page.
password
Yes
The AccessKey secret of the Alibaba Cloud account.
endpoint
Yes
The virtual private cloud (VPC) endpoint of your Hologres instance. You can view the endpoint of the Hologres instance on the Configurations tab of the instance details page in the Hologres console.
NoteThe endpoint must contain the port number and must be in the
IP address:Port number
format. If the Hologres instance and the Flink deployment reside in the same region, use the VPC endpoint of your Hologres instance. If the Hologres instance and the Flink deployment reside in different regions, use the public endpoint of your Hologres instance.jdbccopywritemode
No
The method for writing data. Valid values:
false: The INSERT statement is used. This is the default value.
true: A copy mode is used. The batch copy mode and fixed copy mode are provided. By default, the fixed copy mode is used.
NoteIn fixed copy mode, data is written in streaming mode rather than in batches. This ensures higher throughput and lower data latency and consumes less client memory resources than data writing by using the INSERT statement. However, data updated in fixed copy mode cannot be retracted.
bulkload
No
Specifies whether to use the batch copy mode to write data. Valid values:
true: The batch copy mode is used. This parameter takes effect only if you set the jdbccopywritemode parameter to true. If you set this parameter to false, the fixed copy mode is used.
NoteCompared with the fixed copy mode, the batch copy mode provides higher efficiency, better resource utilization, and better performance. Select a data write mode based on your business requirements.
In most cases, when you write data to a primary key-configured table by using the batch copy mode, the table is locked. You can set the target-shards.enabled parameter to true to change the table-level lock to the shard-level lock. This way, multiple data import tasks on the same table can be performed at the same time. Compared with the fixed copy mode, the batch copy mode can significantly reduce the load on the Hologres instance by about 66.7% when you write data to a primary key-configured table. The reduction is verified by tests.
If you want to write data to a primary key-configured table by using the batch copy mode, the result table must be empty. If the result table is not empty, primary key-based data deduplication negatively affects the data write performance.
false: The batch copy mode is not used. This is the default value.
target-shards.enabled
No
Specifies whether to enable batch data write at the shard level. Valid values:
true: Batch data write at the shard level is enabled. If the source data is partitioned by shard, table-level locks are changed to shard-level locks.
false: Batch data write at the shard level is disabled. This is the default value.
NoteFor more information about the Hologres connection parameters in the
repartition.sql
file, see Write data from open source Apache Flink 1.11 or later to Hologres in real time.In the Flink cluster, upload the hologres-connector-flink-repartition package of the open source Flink connector to a directory. In this example, the package is uploaded to the root directory.
NoteThe open source Flink connector can be used to import data to Hologres in batches. For more information about the open source code of the Flink connector, visit the official sample library of Hologres.
Submit the Flink deployment. Sample code:
./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"
Parameters in the preceding code:
Dexecution.runtime-mode: the execution mode of the Flink deployment. For more information, see Execution Mode.
p: the parallelism of the Flink deployment. We recommend that you set this parameter to the number of shards in the result table or a value that is divisible by the number of shards in the result table.
c: the main class name and path of the hologres-connector-flink-repartition file.
sqlFilePath: the path of the
repartition.sql
file.
Query data from the Hologres result table.
After the Flink deployment is submitted, you can query data from the Hologres result table. Sample statement:
SELECT * FROM test_sink_customer;