All Products
Search
Document Center

Hologres:Import batch data with Flink

Last Updated:Mar 26, 2026

The new Flink connector for Hologres enables you to batch import data into Hologres. Use batch import for scenarios where data timeliness is not critical — loading historical data, processing offline data, or aggregating logs. Batch import writes large volumes of data to Hologres in a single operation, improving throughput and resource utilization.

Write modes

Choose a write mode based on your workload before configuring the connector.

Write modeParametersThroughputNotes
INSERTjdbccopywritemode=false (default)StandardStandard INSERT behavior; supports rollback
Fixed Copy (streaming copy)jdbccopywritemode=trueHigherLower latency; lower client memory usage; does not support rollback
Batch Copyjdbccopywritemode=true, bulkload=trueHigher than Fixed CopyBest Hologres resource utilization; table must be empty if destination has a primary key

Recommendation: Use Batch Copy with target-shards.enabled=true for the best performance. If the destination table has a primary key, make sure it is empty before writing; otherwise, deduplication during the write degrades performance.

For real-time import, see Realtime Compute for Apache Flink.

Prerequisites

Before you begin, ensure that you have:

Batch import using Realtime Compute for Apache Flink

  1. In HoloWeb, create a Hologres sink table. For more information, see Connect to HoloWeb and execute queries. This example uses the test_sink_customer table.

    -- Create a Hologres sink table.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date",
      orientation="column"
    );
    Note

    The field names and types of the Flink source table must match those of the Hologres sink table.

  2. Log in to the Realtime Compute for Apache Flink console. On the Jobs page, click Deploy Job. Configure the following parameters, then click Deploy. For a full parameter reference, see Deploy a JAR job

    ParameterDescription
    Deployment job typeSelect JAR.
    Deployment modeStreaming and batch modes are supported. This example uses batch mode.
    Engine VersionFor supported versions, see Engine versions and Lifecycle policies. This example uses vvr-8.0.7-flink-1.17.
    JAR URIDownload and upload hologres-connector-flink-repartition.jar. The source code is available in the official Hologres GitHub repository.
    Entry point classcom.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample
    Entry point main arguments--sqlFilePath="/flink/usrlib/repartition.sql" — Realtime Compute for Apache Flink places attached dependency files under /flink/usrlib/ at runtime.
    Additional dependency filesUpload the repartition.sql file. This SQL script defines the data source, declares the sink table, and contains the Hologres connection configuration. See the example below.

    Example repartition.sql:

    -- sourceDDL: use Flink DataGen as the source.
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- sourceDql: the query result must match the sink table schema (field count and types).
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL: sink table declaration and Hologres connection configuration.
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );
    Note

    For the full list of Hologres connection parameters in repartition.sql, see Hologres Flink Connector parameters.

  3. Click the job name to open the Deployment Details panel. In the Resource Configurations section, set Concurrency to the same value as the shard count of the Hologres sink table.

  4. After the job is submitted, query the sink table to verify the imported data.

    SELECT * FROM test_sink_customer;

Batch import using open source Flink

  1. In HoloWeb, create a Hologres sink table. For more information, see Connect to HoloWeb and execute queries. This example uses the test_sink_customer table.

    -- Create a Hologres sink table.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date",
      orientation="column"
    );
    Note

    Set the shard count based on the data volume. For more information, see Manage table groups and shard counts.

  2. Create a repartition.sql file and upload it to your Flink cluster. This example uses /flink-1.15.4/src/repartition.sql as the path. repartition.sql is a Flink SQL script that defines the data source, declares the sink table, and contains the Hologres connection configuration.

    -- sourceDDL: use Flink DataGen as the source.
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- sourceDql: the query result must match the sink table schema (field count and types).
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL: sink table declaration and Hologres connection configuration.
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );

    The key parameters are described in the following table.

    ParameterRequiredDefaultDescription
    connectorYesFixed to hologres.
    dbnameYesName of the Hologres database.
    tablenameYesName of the Hologres sink table.
    usernameYesYour Alibaba Cloud AccessKey ID. Get one from the AccessKey Management page.
    passwordYesThe AccessKey secret corresponding to your AccessKey ID.
    endpointYesThe endpoint of the Hologres instance in ip:port format. Find it in the Configurations section on the instance page in the Hologres console. Use the VPC endpoint if your Flink cluster is in the same region; otherwise, use the public endpoint.
    jdbccopywritemodeNofalseWrite mode. false: INSERT mode. true: COPY mode (Fixed Copy by default). Fixed Copy provides higher throughput, lower latency, and lower client memory usage than INSERT, but does not support rollback.
    bulkloadNofalsetrue: enables Batch Copy mode. Requires jdbccopywritemode=true. Batch Copy is more efficient than Fixed Copy and makes better use of Hologres resources. When writing to a table with a primary key, table locks may occur — set target-shards.enabled=true to reduce lock granularity to the shard level and allow concurrent batch imports, reducing payload by approximately 66.7% compared to Fixed Copy. If the destination table has a primary key, it must be empty before writing; otherwise, deduplication during the write degrades performance.
    target-shards.enabledNofalsetrue: enables target shard batch writing. When source data is repartitioned by shard, lock granularity is reduced to the shard level, allowing multiple batch import tasks to run concurrently.
    Note

    For the full list of Hologres connection parameters, see Hologres Flink Connector parameters.

  3. Upload hologres-connector-flink-repartition.jar to your Flink cluster. This example places it in the root directory. The source code is available in the official Hologres GitHub repository.

  4. Submit the Flink job.

    ./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"
    ParameterDescription
    -Dexecution.runtime-modeExecution mode. Set to BATCH. For details, see Execution mode.
    -pParallelism (number of concurrent tasks). Set to the shard count of the sink table, or a value evenly divisible by the shard count.
    -cMain class name and path in hologres-connector-flink-repartition.jar.
    --sqlFilePathPath of the repartition.sql file.
  5. After the job is submitted, query the sink table to verify the imported data.

    SELECT * FROM test_sink_customer;

Troubleshooting

Write performance is lower than expected

If write performance is lower than expected, check the following:

  • Confirm that parallelism (-p for open source Flink, or Concurrency for Realtime Compute) matches the shard count of the sink table, or is evenly divisible by it.

  • If you are using Fixed Copy (jdbccopywritemode=true, bulkload=false), consider switching to Batch Copy (bulkload=true) for higher throughput.

  • If you are using Batch Copy with a table that has a primary key, set target-shards.enabled=true to reduce lock granularity to the shard level and allow concurrent imports.

Table locks occur during batch import

Table locks occur when you use Batch Copy (bulkload=true) to write to a table with a primary key. Set target-shards.enabled=true to reduce lock granularity to the shard level. This allows multiple batch import tasks to run concurrently and reduces payload by approximately 66.7% compared to Fixed Copy.

Field type mismatch causes job failure

The Flink source table schema must match the Hologres sink table schema exactly — field names, field count, and types must all correspond. Check that the SELECT statement in repartition.sql produces a result set that matches the sink table column order and types.