All Products
Search
Document Center

Hologres:Use Flink to import data to Hologres in batches

Last Updated:Aug 16, 2024

Hologres provides a new version of the Flink connector plug-in that allows you to import data to Hologres in batches by using Flink. This improves import efficiency and reduces the workload during the import process.

Background information

In big data processing, Hologres is a powerful online analytical processing (OLAP) system that is integrated with Flink to provide optimized real-time streaming data processing capabilities. For scenarios that do not require high data timeliness, such as batch loading of historical data, offline data processing, or log aggregation, we recommend that you use Flink to import data to Hologres in batches. Batch import allows you to write a large amount of data to Hologres at a time in a more efficient and resource-saving manner. This improves the data import efficiency and resource utilization. You can import data in real time or in batches based on your business requirements and resource status. For more information about how to import data in real time, see Fully managed Flink.

Prerequisites

Use Realtime Compute for Apache Flink to import data in batches

  1. Create a Hologres result table in the HoloWeb console to receive data that is imported by using Realtime Compute for Apache Flink. For more information, see Connect to HoloWeb. In this example, the test_sink_customer table is created.

    -- Create a Hologres result table.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    Note

    The names and data types of fields in the Flink source table must be consistent with the names and data types of fields in the Hologres result table.

  2. Log on to the Realtime Compute for Apache Flink console. In the left-side navigation pane, click Deployments. On the Deployments page, click Create Deployment. In the Create Deployment dialog box, configure the parameters and click Deploy. For more information about the parameter settings, see the "Create a JAR deployment" section in Create a deployment.

    The following table describes the parameters.

    Parameter

    Description

    Deployment Type

    Select JAR.

    Deployment Mode

    Select Stream Mode or Batch Mode. In this example, Batch Mode is selected.

    Engine Version

    Select the version of the Flink engine that you want to use for the deployment. For more information about engine versions, see Engine version and Lifecycle policies. In this example, vvr-8.0.7-flink-1.17 is used.

    JAR URI

    Upload the package of the open source Flink connector hologres-connector-flink-repartition.

    Note

    The open source Flink connector can be used to import data to Hologres in batches. For more information about the open source code of the Flink connector, visit the official sample library of Hologres.

    Entry Point Class

    The entry point class of the program. The main class name of the Flink connector is com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample.

    Entry Point Main Arguments

    Enter the path parameter of the repartition.sql file. For Realtime Compute for Apache Flink, additional dependency files are stored in /flink/usrlib/. In this example, --sqlFilePath="/flink/usrlib/repartition.sql" is entered.

    Additional Dependencies

    Upload the repartition.sql file. The repartition.sql file is an SQL script file of Flink that is used to define a source table, declare a result table, and configure Hologres connection information. The following sample code shows the content of the repartition.sql file in this example.

    -- The data definition language (DDL) statement that is used to define the source table. In this example, the DataGen public test data of Flink is used as the source data. 
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- The DQL statement that is used to query data from the source table. The quantity and data types of fields that are returned by this DQL statement must be consistent with the quantity and data types of fields that are declared in the DDL statement of the result table. 
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- The DDL statement that is used to declare the result table and configure the Hologres connection information. 
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'LTAI5tJCNqeCY3DtKw8c****'
      ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );
    Note

    For more information about the Hologres connection parameters in the repartition.sql file, see Write data from open source Apache Flink 1.11 or later to Hologres in real time.

  3. Click the name of the desired deployment. On the Configuration tab, modify the deployment configuration in the Resources section and the value of the Parallelism parameter.

    Note

    We recommend that you set the Parallelism parameter to the number of shards in the Hologres result table.

  4. Query data from the Hologres result table.

    After the Flink deployment is submitted, you can query data from the Hologres result table. Sample statement:

    SELECT * FROM test_sink_customer;

Use Apache Flink to import data to Hologres in batches

  1. Create a Hologres result table in the HoloWeb console to receive data that is imported by using Apache Flink. For more information, see Connect to HoloWeb. In this example, the test_sink_customer table is created.

    -- Create a Hologres result table.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    Note

    You can specify the number of shards based on the data amount. For more information about shards, see User guide of table groups and shard counts.

  2. Create the repartition.sql file and upload the file to a directory in the Flink cluster. In this example, the file is uploaded to the /flink-1.15.4/src directory. The following sample code shows the content of the repartition.sql file.

    Note

    The repartition.sql file is an SQL script file of Flink that is used to define a source table, declare a result table, and configure Hologres connection information.

    -- The DDL statement that is used to define the source table. In this example, the DataGen public test data of Flink is used as the source data. 
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- The DQL statement that is used to query data from the source table. The quantity and data types of fields that are returned by this DQL statement must be consistent with the quantity and data types of fields that are declared in the DDL statement of the result table. 
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- The DDL statement that is used to declare the result table and configure the Hologres connection information. 
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'LTAI5tJCNqeCY3DtKw8c****'
      ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );

    Configure the parameters. The following table describes the parameters.

    Parameter

    Required

    Description

    connector

    Yes

    The type of the result table. Set the parameter to hologres.

    dbname

    Yes

    The name of the Hologres database.

    tablename

    Yes

    The name of the Hologres table to which data is imported.

    username

    Yes

    The AccessKey ID of the Alibaba Cloud account.

    You can obtain the AccessKey ID from the AccessKey Pair page.

    password

    Yes

    The AccessKey secret of the Alibaba Cloud account.

    endpoint

    Yes

    The virtual private cloud (VPC) endpoint of your Hologres instance. You can view the endpoint of the Hologres instance on the Configurations tab of the instance details page in the Hologres console.

    Note

    The endpoint must contain the port number and must be in the IP address:Port number format. If the Hologres instance and the Flink deployment reside in the same region, use the VPC endpoint of your Hologres instance. If the Hologres instance and the Flink deployment reside in different regions, use the public endpoint of your Hologres instance.

    jdbccopywritemode

    No

    The method for writing data. Valid values:

    • false: The INSERT statement is used. This is the default value.

    • true: A copy mode is used. The batch copy mode and fixed copy mode are provided. By default, the fixed copy mode is used.

      Note

      In fixed copy mode, data is written in streaming mode rather than in batches. This ensures higher throughput and lower data latency and consumes less client memory resources than data writing by using the INSERT statement. However, data updated in fixed copy mode cannot be retracted.

    bulkload

    No

    Specifies whether to use the batch copy mode to write data. Valid values:

    • true: The batch copy mode is used. This parameter takes effect only if you set the jdbccopywritemode parameter to true. If you set this parameter to false, the fixed copy mode is used.

      Note
      • Compared with the fixed copy mode, the batch copy mode provides higher efficiency, better resource utilization, and better performance. Select a data write mode based on your business requirements.

      • In most cases, when you write data to a primary key-configured table by using the batch copy mode, the table is locked. You can set the target-shards.enabled parameter to true to change the table-level lock to the shard-level lock. This way, multiple data import tasks on the same table can be performed at the same time. Compared with the fixed copy mode, the batch copy mode can significantly reduce the load on the Hologres instance by about 66.7% when you write data to a primary key-configured table. The reduction is verified by tests.

      • If you want to write data to a primary key-configured table by using the batch copy mode, the result table must be empty. If the result table is not empty, primary key-based data deduplication negatively affects the data write performance.

    • false: The batch copy mode is not used. This is the default value.

    target-shards.enabled

    No

    Specifies whether to enable batch data write at the shard level. Valid values:

    • true: Batch data write at the shard level is enabled. If the source data is partitioned by shard, table-level locks are changed to shard-level locks.

    • false: Batch data write at the shard level is disabled. This is the default value.

    Note

    For more information about the Hologres connection parameters in the repartition.sql file, see Write data from open source Apache Flink 1.11 or later to Hologres in real time.

  3. In the Flink cluster, upload the hologres-connector-flink-repartition package of the open source Flink connector to a directory. In this example, the package is uploaded to the root directory.

    Note

    The open source Flink connector can be used to import data to Hologres in batches. For more information about the open source code of the Flink connector, visit the official sample library of Hologres.

  4. Submit the Flink deployment. Sample code:

    ./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"

    Parameters in the preceding code:

    • Dexecution.runtime-mode: the execution mode of the Flink deployment. For more information, see Execution Mode.

    • p: the parallelism of the Flink deployment. We recommend that you set this parameter to the number of shards in the result table or a value that is divisible by the number of shards in the result table.

    • c: the main class name and path of the hologres-connector-flink-repartition file.

    • sqlFilePath: the path of the repartition.sql file.

  5. Query data from the Hologres result table.

    After the Flink deployment is submitted, you can query data from the Hologres result table. Sample statement:

    SELECT * FROM test_sink_customer;