すべてのプロダクト
Search
ドキュメントセンター

Hologres:Flink を使用したバッチデータのインポート

最終更新日:Mar 11, 2026

Hologres 向けの新しい Flink コネクタを使用すると、Hologres にバッチデータをインポートできます。この方法は、ペイロードが低く、効率的なデータインポートを実現します。

背景情報

Hologres は強力なオンライン分析処理(OLAP)システムです。Flink と統合することで、強力なリアルタイムデータストリーム処理機能を提供します。データの適時性が重大でないシナリオ(例:既存データの読み込み、オフラインデータの処理、ログの集計など)では、Flink を使用して Hologres にバッチデータをインポートできます。バッチインポートにより、大量のデータを一度に Hologres に書き込むことが可能です。この方法はより効率的で、インポート速度とリソース利用率の両方を向上させます。要件に応じて、リアルタイムインポートとバッチインポートを選択できます。リアルタイムインポートの詳細については、「Flink 完全管理」をご参照ください。

前提条件

Realtime Compute for Apache Flink を使用したバッチインポート

  1. HoloWeb に接続し、クエリを実行して、Flink からのデータを受信する Hologres 結果テーブルを作成します。詳細については、「HoloWeb への接続とクエリの実行」をご参照ください。本トピックでは、test_sink_customer テーブルを例として使用します。

    -- Create a Hologres sink table.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
      orientation="column"
    );
    説明

    Flink ソーステーブルのフィールド名および型は、Hologres 結果テーブルのものと一致している必要があります。

  2. Realtime Compute for Apache Flink コンソールにログインします。ジョブページで、ジョブのデプロイをクリックします。デプロイジョブを設定し、デプロイをクリックします。パラメーターの詳細については、「JAR ジョブのデプロイ」をご参照ください。

    主なパラメーターを以下の表に示します。

    パラメーター

    説明

    デプロイジョブタイプ

    JAR を選択します。

    デプロイモード

    ストリーミングモードとバッチモードがサポートされています。本トピックではバッチモードを使用します。

    Engine Version

    エンジンバージョンの詳細については、「エンジンバージョン」および「ライフサイクルポリシー」をご参照ください。本トピックでは、バージョン vvr-8.0.7-flink-1.17 を例として使用します。

    JAR URI

    Apache Flink コネクタをアップロードします:hologres-connector-flink-repartition.jar

    説明

    オープンソース Flink コネクタを使用して、Hologres にバッチデータをインポートできます。Flink コネクタプラグインのソースコードの詳細については、公式 Hologres GitHub リポジトリをご参照ください。

    エントリポイントクラス

    プログラムのエントリポイントクラスです。Flink コネクタの場合、メインクラス名を com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample に指定します。

    エントリポイントメイン引数

    repartition.sql ファイルのパスパラメーターを渡します。Realtime Compute for Apache Flink のランタイム中、アタッチされた依存ファイルのパスは /flink/usrlib/ になります。したがって、完全なパラメーターは --sqlFilePath="/flink/usrlib/repartition.sql" です。

    追加依存ファイル

    repartition.sql ファイルをアップロードします。repartition.sql ファイルは Flink SQL スクリプトファイルであり、データソースを定義し、結果テーブルを宣言し、Hologres の接続情報を含みます。以下に repartition.sql ファイルの例を示します。

    -- sourceDDL. This topic uses the Flink DataGen public test data as the source data.
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- sourceDql. The query statement for the source table must ensure that the query result corresponds to the sink table declared in sinkDDL. This includes the number of fields and the field types.
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL. This is the sink table declaration and Hologres connection configuration.
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );
    説明

    repartition.sql ファイル内の Hologres 接続パラメーターの詳細については、「Hologres Flink コネクタパラメーター」をご参照ください。

  3. ジョブ名をクリックして、デプロイ詳細パネルを開きます。リソース設定セクションで、同時実行数を変更します。

    説明

    同時実行数を、Hologres 結果テーブルのシャード数と同じ値に設定してください。

  4. Hologres の結果テーブルをクエリします。

    Flink ジョブを送信後、Hologres に書き込まれたデータをクエリします。以下の文を例として示します。

    SELECT * FROM test_sink_customer;

オープンソース Flink を使用したバッチインポート

  1. HoloWeb に接続し、クエリを実行して、Flink からのデータを受信する Hologres 結果テーブルを作成します。詳細については、「HoloWeb への接続とクエリの実行」をご参照ください。本トピックでは、test_sink_customer テーブルを例として使用します。

    -- Create a Hologres sink table.
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    説明

    データ量に基づいてシャード数を設定してください。シャードの詳細については、「テーブルグループとシャード数の管理」をご参照ください。

  2. repartition.sql ファイルを作成し、Flink クラスター環境内の任意の場所にアップロードします。本トピックでは、/flink-1.15.4/src/repartition.sql を例として使用します。以下に repartition.sql ファイルの例を示します。

    説明

    repartition.sql ファイルは Flink SQL スクリプトファイルであり、データソースを定義し、結果テーブルを宣言し、Hologres の接続情報を含みます。

    -- sourceDDL. This topic uses the Flink DataGen public test data as the source data.
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    -- sourceDql. The query statement for the source table must ensure that the query result corresponds to the sink table declared in sinkDDL. This includes the number of fields and the field types.
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL. This is the sink table declaration and Hologres connection configuration.
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'yourAccessKeyId'
      ,'password' = 'yourAccessKeySecret'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );

    主なパラメーターを以下に示します。

    パラメーター

    必須

    説明

    connector

    はい

    結果テーブルのタイプです。値は hologres に固定されます。

    dbname

    はい

    Hologres データベースの名前です。

    tablename

    はい

    データを受信する Hologres 内のテーブル名です。

    username

    はい

    ご利用の Alibaba Cloud アカウントの AccessKey ID です。

    AccessKey ID を取得するには、AccessKey 管理ページにアクセスしてください。

    password

    はい

    ご利用の Alibaba Cloud アカウントの AccessKey ID に対応する AccessKey Secret です。

    endpoint

    はい

    Hologres インスタンスの VPC エンドポイントです。Hologres コンソールのインスタンス製品ページにアクセスし、Configurationsセクションからエンドポイントを取得してください。

    説明

    エンドポイントにはポート番号を含める必要があります。形式は ip:port です。アプリケーションと Hologres インスタンスが同一リージョンにある場合は VPC エンドポイントを使用し、そうでない場合はパブリックエンドポイントを使用してください。

    jdbccopywritemode

    いいえ

    データ書き込みモードです。有効な値は次のとおりです。

    • false(デフォルト):INSERT モードを使用します。

    • true:COPY モードを使用します。COPY モードにはストリーミングコピー(Fixed Copy)とバッチコピーが含まれます。デフォルトではストリーミングコピー(Fixed Copy)モードが使用されます。

      説明

      INSERT モードと比較して、Fixed Copy モードはストリーミングパターンを使用するためスループットが高く、データ遅延が低く、データのバッチ処理を必要としないためクライアントのメモリ消費も少ないという特徴があります。ただし、このモードはデータのロールバックをサポートしません。

    bulkload

    いいえ

    バッチコピーモードを使用するかどうかを指定します。有効な値は次のとおりです。

    • true:バッチコピーモードを使用します。このパラメーターは、jdbccopywritemode パラメーターも true に設定されている場合にのみ有効です。それ以外の場合は Fixed Copy モードが使用されます。

      説明
      • ストリーミングコピー(Fixed Copy)と比較して、バッチコピーはより効率的で、Hologres リソースをより有効に活用できるため、データ書き込み時のパフォーマンスが向上します。必要に応じてデータ書き込みモードを選択できます。

      • プライマリキーを持つテーブルにバッチコピーでデータを書き込む場合、テーブルロックが発生する可能性があります。target-shards.enabled パラメーターを true に設定することで、ロックの粒度をシャードレベルまで低下させることができます。これにより、複数のバッチインポートタスクを同時に実行でき、テーブルロックの発生を抑制できます。プライマリキーを持つテーブルにバッチコピーでデータを書き込む場合、この方法は Fixed Copy モードと比較して Hologres インスタンスへのペイロードを大幅に削減できます。テストによると、ペイロードは約 66.7% 削減されることが確認されています。

      • バッチコピーを使用する場合、送信先テーブルにプライマリキーが含まれていると、書き込み前にテーブルを空にしておく必要があります。そうでないと、書き込みプロセス中にプライマリキーに基づく重複排除が行われ、書き込みパフォーマンスに影響を与えます。

    • false(デフォルト):設定は無効です。

    target-shards.enabled

    いいえ

    ターゲットシャードへのバッチ書き込みを有効にするかどうかを指定します。有効な値は次のとおりです。

    • true:ターゲットシャードへのバッチ書き込みを有効にします。ソースデータがシャード単位で再パーティションされている場合、ロックの粒度をシャードレベルまで低下させることができます。

    • false(デフォルト):無効です。

    説明

    repartition.sql ファイル内の Hologres 接続パラメーターの詳細については、「Hologres Flink コネクタパラメーター」をご参照ください。

  3. Flink クラスター環境で、オープンソース Flink コネクタ hologres-connector-flink-repartition.jar を任意のディレクトリにアップロードします。本トピックでは、ルートディレクトリを例として使用します。

    説明

    オープンソース Flink コネクタを使用すると、Hologres にバッチデータをインポートできます。このプラグインのソースコードは、公式 Hologres GitHub リポジトリで公開されています。

  4. Flink ジョブを送信します。以下のコードを例として示します。

    ./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"

    パラメーターの説明は次のとおりです。

    • Dexecution.runtime-mode: Flink ジョブの実行モードです。詳細については、「実行モード」をご参照ください。

    • p:ジョブの同時実行数です。同時実行数を結果テーブルのシャード数と同じ値、またはシャード数で割り切れる値に設定してください。

    • c:hologres-connector-flink-repartition.jar 内のメインクラスの名前とパスです。

    • sqlFilePath:repartition.sql ファイルのパスです。

  5. Hologres 結果テーブルをクエリします。

    Flink ジョブを送信後、Hologres に書き込まれたデータをクエリします。以下の文を例として示します。

    SELECT * FROM test_sink_customer;