すべてのプロダクト
Search
ドキュメントセンター

Hologres:Flink を使用して Hologres にデータをバッチインポートする

最終更新日:Apr 01, 2025

Hologres は、Flink を使用して Hologres にデータをバッチでインポートできる Flink コネクタプラグインの新しいバージョンを提供します。これにより、インポート効率が向上し、インポートプロセス中のワークロードが軽減されます。

背景情報

ビッグデータ処理において、Hologres は強力なオンライン分析処理(OLAP)システムであり、Flink と統合して最適化されたリアルタイムストリーミングデータ処理機能を提供します。既存データのバッチロード、オフラインデータ処理、ログ集約など、データの適時性がそれほど重要ではないシナリオでは、Flink を使用して Hologres にデータをバッチインポートすることをお勧めします。バッチインポートでは、一度に大量のデータをより効率的かつリソースを節約する方法で Hologres に書き込むことができます。これにより、データのインポート効率とリソース使用率が向上します。ビジネス要件とリソースの状態に基づいて、リアルタイムまたはバッチでデータをインポートできます。リアルタイムでのデータのインポート方法の詳細については、「フルマネージド Flink を使用する」をご参照ください。

前提条件

Realtime Compute for Apache Flink を使用してデータをバッチでインポートする

  1. Realtime Compute for Apache Flink を使用してインポートされたデータを受信するために、HoloWeb コンソールで Hologres 結果テーブルを作成します。詳細については、「HoloWeb に接続する」をご参照ください。この例では、test_sink_customer テーブルが作成されます。

    -- Create a Hologres result table.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    説明

    Flink ソーステーブルのフィールドの名前とデータ型は、Hologres 結果テーブルのフィールドの名前とデータ型と一致している必要があります。

  2. Realtime Compute for Apache Flink コンソール にログインします。左側のナビゲーションウィンドウで、[デプロイメント] をクリックします。[デプロイメント] ページで、[デプロイメントの作成] をクリックします。[デプロイメントの作成] ダイアログボックスで、パラメータを構成し、[デプロイ] をクリックします。パラメータ設定の詳細については、「デプロイメントを作成する」の「JAR デプロイメントを作成する」セクションをご参照ください。

    次の表にパラメータを示します。

    パラメータ

    説明

    デプロイメントタイプ

    JAR を選択します。

    デプロイメントモード

    ストリームモードまたはバッチモードを選択します。この例では、バッチモードが選択されています。

    エンジンバージョン

    デプロイメントに使用する Flink エンジンのバージョンを選択します。エンジンバージョンの詳細については、「エンジンバージョン」および「ライフサイクルポリシー」をご参照ください。この例では、vvr-8.0.7-flink-1.17 が使用されています。

    JAR URI

    オープンソース Flink コネクタ hologres-connector-flink-repartition のパッケージをアップロードします。

    説明

    オープンソース Flink コネクタを使用して、Hologres にデータをバッチでインポートできます。 Flink コネクタのオープンソースコードの詳細については、Hologres の公式サンプルライブラリ をご覧ください。

    エントリポイントクラス

    プログラムのエントリポイントクラス。 Flink コネクタのメインクラス名は com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample です。

    エントリポイントのメイン引数

    repartition.sql ファイルのパス パラメータを入力します。 Realtime Compute for Apache Flink の場合、追加の依存ファイルは /flink/usrlib/ に保存されます。この例では、--sqlFilePath="/flink/usrlib/repartition.sql" と入力します。

    追加の依存関係

    repartition.sql ファイルをアップロードします。repartition.sql ファイルは、ソーステーブルの定義、結果テーブルの宣言、および Hologres 接続情報の構成に使用される Flink の SQL スクリプトファイルです。次のサンプルコードは、この例の repartition.sql ファイルの内容を示しています。

    -- The data definition language (DDL) statement that is used to define the source table. In this example, the DataGen public test data of Flink is used as the source data. 
    CREATE TEMPORARY TABLE source_table  -- ソーステーブルを定義するために使用されるデータ定義言語(DDL)文。この例では、Flink の DataGen パブリックテストデータをソースデータとして使用します。
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- The DQL statement that is used to query data from the source table. The quantity and data types of fields that are returned by this DQL statement must be consistent with the quantity and data types of fields that are declared in the DDL statement of the result table. 
    SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- ソーステーブルからデータをクエリするために使用される DQL 文。この DQL 文によって返されるフィールドの数とデータ型は、結果テーブルの DDL 文で宣言されているフィールドの数とデータ型と一致している必要があります。
    
    -- The DDL statement that is used to declare the result table and configure the Hologres connection information. 
    CREATE TABLE sink_table -- 結果テーブルを宣言し、Hologres 接続情報を構成するために使用される DDL 文。
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );
    説明

    repartition.sql ファイルの Hologres 接続パラメータの詳細については、「オープンソース Apache Flink 1.11 以降から Hologres にデータをリアルタイムで書き込む」をご参照ください。

  3. 目的のデプロイメントの名前をクリックします。[構成] タブで、[リソース] セクションのデプロイメント構成と [並列度] パラメータの値を変更します。

    説明

    [並列度] パラメータを Hologres 結果テーブルのシャード数に設定することをお勧めします。

  4. Hologres 結果テーブルからデータをクエリします。

    Flink デプロイメントが送信された後、Hologres 結果テーブルからデータをクエリできます。サンプル文:

    SELECT * FROM test_sink_customer;

Apache Flink を使用して Hologres にデータをバッチでインポートする

  1. Apache Flink を使用してインポートされたデータを受信するために、HoloWeb コンソールで Hologres 結果テーブルを作成します。詳細については、「HoloWeb に接続する」をご参照ください。この例では、test_sink_customer テーブルが作成されます。

    -- Create a Hologres result table.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    説明

    データ量に基づいてシャード数を指定できます。シャードの詳細については、「テーブルグループとシャード数のユーザーガイド」をご参照ください。

  2. repartition.sql ファイルを作成し、Flink クラスタのディレクトリにファイルをアップロードします。この例では、ファイルは /flink-1.15.4/src ディレクトリにアップロードされます。次のサンプルコードは、repartition.sql ファイルの内容を示しています。

    説明

    repartition.sql ファイルは、ソーステーブルの定義、結果テーブルの宣言、および Hologres 接続情報の構成に使用される Flink の SQL スクリプトファイルです。

    -- The DDL statement that is used to define the source table. In this example, the DataGen public test data of Flink is used as the source data.
    CREATE TEMPORARY TABLE source_table -- ソーステーブルを定義するために使用されるデータ定義言語(DDL)文。この例では、Flink の DataGen パブリックテストデータをソースデータとして使用します。
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- The DQL statement that is used to query data from the source table. The quantity and data types of fields that are returned by this DQL statement must be consistent with the quantity and data types of fields that are declared in the DDL statement of the result table.
    SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- ソーステーブルからデータをクエリするために使用される DQL 文。この DQL 文によって返されるフィールドの数とデータ型は、結果テーブルの DDL 文で宣言されているフィールドの数とデータ型と一致している必要があります。
    
    
    -- The DDL statement that is used to declare the result table and configure the Hologres connection information.
    CREATE TABLE sink_table -- 結果テーブルを宣言し、Hologres 接続情報を構成するために使用される DDL 文。
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );

    パラメータを構成します。次の表にパラメータを示します。

    パラメータ

    必須

    説明

    connector

    はい

    結果テーブルのタイプ。パラメータを hologres に設定します。

    dbname

    はい

    Hologres データベースの名前。

    tablename

    はい

    データをインポートする Hologres テーブルの名前。

    username

    はい

    Alibaba Cloud アカウントの AccessKey ID。

    アクセスキーペア ページから AccessKey ID を取得できます。

    password

    はい

    Alibaba Cloud アカウントの AccessKey シークレット。

    endpoint

    はい

    Hologres インスタンスの VPC エンドポイント。Hologres コンソール のインスタンス詳細ページの [構成] タブで、Hologres インスタンスのエンドポイントを確認できます。

    説明

    エンドポイントにはポート番号が含まれている必要があり、IPアドレス:ポート番号 形式である必要があります。 Hologres インスタンスと Flink デプロイメントが同じリージョンにある場合は、Hologres インスタンスの VPC エンドポイントを使用します。 Hologres インスタンスと Flink デプロイメントが異なるリージョンにある場合は、Hologres インスタンスのパブリックエンドポイントを使用します。

    jdbccopywritemode

    いいえ

    データの書き込み方法。有効な値:

    • false: INSERT 文が使用されます。これはデフォルト値です。

    • true: コピーモードが使用されます。バッチコピーモードと固定コピーモードが提供されています。デフォルトでは、固定コピーモードが使用されます。

      説明

      固定コピーモードでは、データはバッチではなくストリーミングモードで書き込まれます。これにより、INSERT 文を使用したデータ書き込みよりもスループットが高くなり、データ遅延が短縮され、クライアント メモリ リソースの消費量が少なくなります。ただし、固定コピーモードで更新されたデータは取り消すことができません。

    bulkload

    いいえ

    バッチコピーモードを使用してデータを書き込むかどうかを指定します。有効な値:

    • true: バッチコピーモードが使用されます。このパラメータは、jdbccopywritemode パラメータを true に設定した場合にのみ有効になります。このパラメータを false に設定すると、固定コピーモードが使用されます。

      説明
      • 固定コピーモードと比較して、バッチコピーモードは、より高い効率、より良いリソース使用率、およびより良いパフォーマンスを提供します。ビジネス要件に基づいてデータ書き込みモードを選択してください。

      • ほとんどの場合、バッチコピーモードを使用してプライマリキーが構成されたテーブルにデータを書き込むと、テーブルはロックされます。 target-shards.enabled パラメータを true に設定して、テーブルレベルのロックをシャードレベルのロックに変更できます。これにより、同じテーブルで複数のデータインポートタスクを同時に実行できます。固定コピーモードと比較して、バッチコピーモードは、プライマリキーが構成されたテーブルにデータを書き込む際の Hologres インスタンスの負荷を約 66.7% 削減できます。削減はテストによって検証されています。

      • バッチコピーモードを使用してプライマリキーが構成されたテーブルにデータを書き込む場合は、結果テーブルが空である必要があります。結果テーブルが空でない場合、プライマリキーベースのデータ重複除去はデータ書き込みパフォーマンスに悪影響を及ぼします。

    • false: バッチコピーモードは使用されません。これはデフォルト値です。

    target-shards.enabled

    いいえ

    シャードレベルでバッチデータ書き込みを有効にするかどうかを指定します。有効な値:

    • true: シャードレベルでのバッチデータ書き込みが有効になります。ソースデータがシャードによってパーティション分割されている場合、テーブルレベルのロックはシャードレベルのロックに変更されます。

    • false: シャードレベルでのバッチデータ書き込みは無効になります。これはデフォルト値です。

    説明

    repartition.sql ファイルの Hologres 接続パラメータの詳細については、「オープンソース Apache Flink 1.11 以降から Hologres にデータをリアルタイムで書き込む」をご参照ください。

  3. Flink クラスタで、オープンソース Flink コネクタの hologres-connector-flink-repartition パッケージをディレクトリにアップロードします。この例では、パッケージはルートディレクトリにアップロードされます。

    説明

    オープンソース Flink コネクタを使用して、Hologres にデータをバッチでインポートできます。 Flink コネクタのオープンソースコードの詳細については、Hologres の公式サンプルライブラリ をご覧ください。

  4. Flink デプロイメントを送信します。サンプルコード:

    ./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"

    上記のコードのパラメータ:

    • Dexecution.runtime-mode: Flink デプロイメントの実行モード。詳細については、「実行モード」をご参照ください。

    • p: Flink デプロイメントの並列度。このパラメータは、結果テーブルのシャード数、または結果テーブルのシャード数で割り切れる値に設定することをお勧めします。

    • c: hologres-connector-flink-repartition ファイルのメインクラス名とパス。

    • sqlFilePath: repartition.sql ファイルのパス。

  5. Hologres 結果テーブルからデータをクエリします。

    Flink デプロイメントが送信された後、Hologres 結果テーブルからデータをクエリできます。サンプル文:

    SELECT * FROM test_sink_customer;