The new Flink connector for Hologres enables you to batch import data into Hologres. Use batch import for scenarios where data timeliness is not critical — loading historical data, processing offline data, or aggregating logs. Batch import writes large volumes of data to Hologres in a single operation, improving throughput and resource utilization.
Write modes
Choose a write mode based on your workload before configuring the connector.
| Write mode | Parameters | Throughput | Notes |
|---|---|---|---|
| INSERT | jdbccopywritemode=false (default) | Standard | Standard INSERT behavior; supports rollback |
| Fixed Copy (streaming copy) | jdbccopywritemode=true | Higher | Lower latency; lower client memory usage; does not support rollback |
| Batch Copy | jdbccopywritemode=true, bulkload=true | Higher than Fixed Copy | Best Hologres resource utilization; table must be empty if destination has a primary key |
Recommendation: Use Batch Copy with target-shards.enabled=true for the best performance. If the destination table has a primary key, make sure it is empty before writing; otherwise, deduplication during the write degrades performance.
For real-time import, see Realtime Compute for Apache Flink.
Prerequisites
Before you begin, ensure that you have:
A Hologres instance. For more information, see Purchase a Hologres instance.
A Flink cluster of version 1.15 or later:
Open source Flink: Deploy Flink
Realtime Compute for Apache Flink: Activate Realtime Compute for Apache Flink
Batch import using Realtime Compute for Apache Flink
In HoloWeb, create a Hologres sink table. For more information, see Connect to HoloWeb and execute queries. This example uses the
test_sink_customertable.-- Create a Hologres sink table. CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );NoteThe field names and types of the Flink source table must match those of the Hologres sink table.
Log in to the Realtime Compute for Apache Flink console. On the Jobs page, click Deploy Job. Configure the following parameters, then click Deploy. For a full parameter reference, see Deploy a JAR job
Parameter Description Deployment job type Select JAR. Deployment mode Streaming and batch modes are supported. This example uses batch mode. Engine Version For supported versions, see Engine versions and Lifecycle policies. This example uses vvr-8.0.7-flink-1.17.JAR URI Download and upload hologres-connector-flink-repartition.jar. The source code is available in the official Hologres GitHub repository. Entry point class com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExampleEntry point main arguments --sqlFilePath="/flink/usrlib/repartition.sql"— Realtime Compute for Apache Flink places attached dependency files under/flink/usrlib/at runtime.Additional dependency files Upload the repartition.sqlfile. This SQL script defines the data source, declares the sink table, and contains the Hologres connection configuration. See the example below.Example repartition.sql:
-- sourceDDL: use Flink DataGen as the source. CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); -- sourceDql: the query result must match the sink table schema (field count and types). SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL: sink table declaration and Hologres connection configuration. CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'yourAccessKeyId' ,'password' = 'yourAccessKeySecret' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );NoteFor the full list of Hologres connection parameters in
repartition.sql, see Hologres Flink Connector parameters.Click the job name to open the Deployment Details panel. In the Resource Configurations section, set Concurrency to the same value as the shard count of the Hologres sink table.
After the job is submitted, query the sink table to verify the imported data.
SELECT * FROM test_sink_customer;
Batch import using open source Flink
In HoloWeb, create a Hologres sink table. For more information, see Connect to HoloWeb and execute queries. This example uses the
test_sink_customertable.-- Create a Hologres sink table. CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );NoteSet the shard count based on the data volume. For more information, see Manage table groups and shard counts.
Create a
repartition.sqlfile and upload it to your Flink cluster. This example uses/flink-1.15.4/src/repartition.sqlas the path.repartition.sqlis a Flink SQL script that defines the data source, declares the sink table, and contains the Hologres connection configuration.-- sourceDDL: use Flink DataGen as the source. CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); -- sourceDql: the query result must match the sink table schema (field count and types). SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL: sink table declaration and Hologres connection configuration. CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'yourAccessKeyId' ,'password' = 'yourAccessKeySecret' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );The key parameters are described in the following table.
Parameter Required Default Description connectorYes — Fixed to hologres.dbnameYes — Name of the Hologres database. tablenameYes — Name of the Hologres sink table. usernameYes — Your Alibaba Cloud AccessKey ID. Get one from the AccessKey Management page. passwordYes — The AccessKey secret corresponding to your AccessKey ID. endpointYes — The endpoint of the Hologres instance in ip:portformat. Find it in the Configurations section on the instance page in the Hologres console. Use the VPC endpoint if your Flink cluster is in the same region; otherwise, use the public endpoint.jdbccopywritemodeNo falseWrite mode. false: INSERT mode.true: COPY mode (Fixed Copy by default). Fixed Copy provides higher throughput, lower latency, and lower client memory usage than INSERT, but does not support rollback.bulkloadNo falsetrue: enables Batch Copy mode. Requiresjdbccopywritemode=true. Batch Copy is more efficient than Fixed Copy and makes better use of Hologres resources. When writing to a table with a primary key, table locks may occur — settarget-shards.enabled=trueto reduce lock granularity to the shard level and allow concurrent batch imports, reducing payload by approximately 66.7% compared to Fixed Copy. If the destination table has a primary key, it must be empty before writing; otherwise, deduplication during the write degrades performance.target-shards.enabledNo falsetrue: enables target shard batch writing. When source data is repartitioned by shard, lock granularity is reduced to the shard level, allowing multiple batch import tasks to run concurrently.NoteFor the full list of Hologres connection parameters, see Hologres Flink Connector parameters.
Upload hologres-connector-flink-repartition.jar to your Flink cluster. This example places it in the root directory. The source code is available in the official Hologres GitHub repository.
Submit the Flink job.
./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"Parameter Description -Dexecution.runtime-modeExecution mode. Set to BATCH. For details, see Execution mode.-pParallelism (number of concurrent tasks). Set to the shard count of the sink table, or a value evenly divisible by the shard count. -cMain class name and path in hologres-connector-flink-repartition.jar.--sqlFilePathPath of the repartition.sqlfile.After the job is submitted, query the sink table to verify the imported data.
SELECT * FROM test_sink_customer;
Troubleshooting
Write performance is lower than expected
If write performance is lower than expected, check the following:
Confirm that parallelism (
-pfor open source Flink, or Concurrency for Realtime Compute) matches the shard count of the sink table, or is evenly divisible by it.If you are using Fixed Copy (
jdbccopywritemode=true,bulkload=false), consider switching to Batch Copy (bulkload=true) for higher throughput.If you are using Batch Copy with a table that has a primary key, set
target-shards.enabled=trueto reduce lock granularity to the shard level and allow concurrent imports.
Table locks occur during batch import
Table locks occur when you use Batch Copy (bulkload=true) to write to a table with a primary key. Set target-shards.enabled=true to reduce lock granularity to the shard level. This allows multiple batch import tasks to run concurrently and reduces payload by approximately 66.7% compared to Fixed Copy.
Field type mismatch causes job failure
The Flink source table schema must match the Hologres sink table schema exactly — field names, field count, and types must all correspond. Check that the SELECT statement in repartition.sql produces a result set that matches the sink table column order and types.