Hologres 向けの新しい Flink コネクタを使用すると、Hologres にバッチデータをインポートできます。この方法は、ペイロードが低く、効率的なデータインポートを実現します。
背景情報
Hologres は強力なオンライン分析処理(OLAP)システムです。Flink と統合することで、強力なリアルタイムデータストリーム処理機能を提供します。データの適時性が重大でないシナリオ(例:既存データの読み込み、オフラインデータの処理、ログの集計など)では、Flink を使用して Hologres にバッチデータをインポートできます。バッチインポートにより、大量のデータを一度に Hologres に書き込むことが可能です。この方法はより効率的で、インポート速度とリソース利用率の両方を向上させます。要件に応じて、リアルタイムインポートとバッチインポートを選択できます。リアルタイムインポートの詳細については、「Flink 完全管理」をご参照ください。
前提条件
Hologres インスタンスを購入済みです。詳細については、「Hologres インスタンスの購入」をご参照ください。
バージョン 1.15 以降の Flink クラスターがデプロイされている必要があります。詳細については、以下のトピックをご参照ください。
オープンソース Flink: Flink をデプロイする。
Alibaba Cloud Realtime Compute for Apache Flink:Realtime Compute for Apache Flink の有効化。
Realtime Compute for Apache Flink を使用したバッチインポート
HoloWeb に接続し、クエリを実行して、Flink からのデータを受信する Hologres 結果テーブルを作成します。詳細については、「HoloWeb への接続とクエリの実行」をご参照ください。本トピックでは、
test_sink_customerテーブルを例として使用します。-- Create a Hologres sink table. CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );説明Flink ソーステーブルのフィールド名および型は、Hologres 結果テーブルのものと一致している必要があります。
Realtime Compute for Apache Flink コンソールにログインします。ジョブページで、ジョブのデプロイをクリックします。デプロイジョブを設定し、デプロイをクリックします。パラメーターの詳細については、「JAR ジョブのデプロイ」をご参照ください。
主なパラメーターを以下の表に示します。
パラメーター
説明
デプロイジョブタイプ
JAR を選択します。
デプロイモード
ストリーミングモードとバッチモードがサポートされています。本トピックではバッチモードを使用します。
Engine Version
エンジンバージョンの詳細については、「エンジンバージョン」および「ライフサイクルポリシー」をご参照ください。本トピックでは、バージョン
vvr-8.0.7-flink-1.17を例として使用します。JAR URI
Apache Flink コネクタをアップロードします:hologres-connector-flink-repartition.jar。
説明オープンソース Flink コネクタを使用して、Hologres にバッチデータをインポートできます。Flink コネクタプラグインのソースコードの詳細については、公式 Hologres GitHub リポジトリをご参照ください。
エントリポイントクラス
プログラムのエントリポイントクラスです。Flink コネクタの場合、メインクラス名を
com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExampleに指定します。エントリポイントメイン引数
repartition.sqlファイルのパスパラメーターを渡します。Realtime Compute for Apache Flink のランタイム中、アタッチされた依存ファイルのパスは/flink/usrlib/になります。したがって、完全なパラメーターは--sqlFilePath="/flink/usrlib/repartition.sql"です。追加依存ファイル
repartition.sqlファイルをアップロードします。repartition.sqlファイルは Flink SQL スクリプトファイルであり、データソースを定義し、結果テーブルを宣言し、Hologres の接続情報を含みます。以下にrepartition.sqlファイルの例を示します。-- sourceDDL. This topic uses the Flink DataGen public test data as the source data. CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); -- sourceDql. The query statement for the source table must ensure that the query result corresponds to the sink table declared in sinkDDL. This includes the number of fields and the field types. SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL. This is the sink table declaration and Hologres connection configuration. CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'yourAccessKeyId' ,'password' = 'yourAccessKeySecret' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );説明repartition.sqlファイル内の Hologres 接続パラメーターの詳細については、「Hologres Flink コネクタパラメーター」をご参照ください。ジョブ名をクリックして、デプロイ詳細パネルを開きます。リソース設定セクションで、同時実行数を変更します。
説明同時実行数を、Hologres 結果テーブルのシャード数と同じ値に設定してください。
Hologres の結果テーブルをクエリします。
Flink ジョブを送信後、Hologres に書き込まれたデータをクエリします。以下の文を例として示します。
SELECT * FROM test_sink_customer;
オープンソース Flink を使用したバッチインポート
HoloWeb に接続し、クエリを実行して、Flink からのデータを受信する Hologres 結果テーブルを作成します。詳細については、「HoloWeb への接続とクエリの実行」をご参照ください。本トピックでは、
test_sink_customerテーブルを例として使用します。-- Create a Hologres sink table. CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );説明データ量に基づいてシャード数を設定してください。シャードの詳細については、「テーブルグループとシャード数の管理」をご参照ください。
repartition.sqlファイルを作成し、Flink クラスター環境内の任意の場所にアップロードします。本トピックでは、/flink-1.15.4/src/repartition.sqlを例として使用します。以下にrepartition.sqlファイルの例を示します。説明repartition.sqlファイルは Flink SQL スクリプトファイルであり、データソースを定義し、結果テーブルを宣言し、Hologres の接続情報を含みます。-- sourceDDL. This topic uses the Flink DataGen public test data as the source data. CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); -- sourceDql. The query statement for the source table must ensure that the query result corresponds to the sink table declared in sinkDDL. This includes the number of fields and the field types. SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL. This is the sink table declaration and Hologres connection configuration. CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'yourAccessKeyId' ,'password' = 'yourAccessKeySecret' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );主なパラメーターを以下に示します。
パラメーター
必須
説明
connector
はい
結果テーブルのタイプです。値は hologres に固定されます。
dbname
はい
Hologres データベースの名前です。
tablename
はい
データを受信する Hologres 内のテーブル名です。
username
はい
ご利用の Alibaba Cloud アカウントの AccessKey ID です。
AccessKey ID を取得するには、AccessKey 管理ページにアクセスしてください。
password
はい
ご利用の Alibaba Cloud アカウントの AccessKey ID に対応する AccessKey Secret です。
endpoint
はい
Hologres インスタンスの VPC エンドポイントです。Hologres コンソールのインスタンス製品ページにアクセスし、Configurationsセクションからエンドポイントを取得してください。
説明エンドポイントにはポート番号を含める必要があります。形式は
ip:portです。アプリケーションと Hologres インスタンスが同一リージョンにある場合は VPC エンドポイントを使用し、そうでない場合はパブリックエンドポイントを使用してください。jdbccopywritemode
いいえ
データ書き込みモードです。有効な値は次のとおりです。
false(デフォルト):INSERT モードを使用します。
true:COPY モードを使用します。COPY モードにはストリーミングコピー(Fixed Copy)とバッチコピーが含まれます。デフォルトではストリーミングコピー(Fixed Copy)モードが使用されます。
説明INSERT モードと比較して、Fixed Copy モードはストリーミングパターンを使用するためスループットが高く、データ遅延が低く、データのバッチ処理を必要としないためクライアントのメモリ消費も少ないという特徴があります。ただし、このモードはデータのロールバックをサポートしません。
bulkload
いいえ
バッチコピーモードを使用するかどうかを指定します。有効な値は次のとおりです。
true:バッチコピーモードを使用します。このパラメーターは、jdbccopywritemode パラメーターも true に設定されている場合にのみ有効です。それ以外の場合は Fixed Copy モードが使用されます。
説明ストリーミングコピー(Fixed Copy)と比較して、バッチコピーはより効率的で、Hologres リソースをより有効に活用できるため、データ書き込み時のパフォーマンスが向上します。必要に応じてデータ書き込みモードを選択できます。
プライマリキーを持つテーブルにバッチコピーでデータを書き込む場合、テーブルロックが発生する可能性があります。target-shards.enabled パラメーターを true に設定することで、ロックの粒度をシャードレベルまで低下させることができます。これにより、複数のバッチインポートタスクを同時に実行でき、テーブルロックの発生を抑制できます。プライマリキーを持つテーブルにバッチコピーでデータを書き込む場合、この方法は Fixed Copy モードと比較して Hologres インスタンスへのペイロードを大幅に削減できます。テストによると、ペイロードは約 66.7% 削減されることが確認されています。
バッチコピーを使用する場合、送信先テーブルにプライマリキーが含まれていると、書き込み前にテーブルを空にしておく必要があります。そうでないと、書き込みプロセス中にプライマリキーに基づく重複排除が行われ、書き込みパフォーマンスに影響を与えます。
false(デフォルト):設定は無効です。
target-shards.enabled
いいえ
ターゲットシャードへのバッチ書き込みを有効にするかどうかを指定します。有効な値は次のとおりです。
true:ターゲットシャードへのバッチ書き込みを有効にします。ソースデータがシャード単位で再パーティションされている場合、ロックの粒度をシャードレベルまで低下させることができます。
false(デフォルト):無効です。
説明repartition.sqlファイル内の Hologres 接続パラメーターの詳細については、「Hologres Flink コネクタパラメーター」をご参照ください。Flink クラスター環境で、オープンソース Flink コネクタ hologres-connector-flink-repartition.jar を任意のディレクトリにアップロードします。本トピックでは、ルートディレクトリを例として使用します。
説明オープンソース Flink コネクタを使用すると、Hologres にバッチデータをインポートできます。このプラグインのソースコードは、公式 Hologres GitHub リポジトリで公開されています。
Flink ジョブを送信します。以下のコードを例として示します。
./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"パラメーターの説明は次のとおりです。
Dexecution.runtime-mode: Flink ジョブの実行モードです。詳細については、「実行モード」をご参照ください。
p:ジョブの同時実行数です。同時実行数を結果テーブルのシャード数と同じ値、またはシャード数で割り切れる値に設定してください。
c:hologres-connector-flink-repartition.jar 内のメインクラスの名前とパスです。
sqlFilePath:
repartition.sqlファイルのパスです。
Hologres 結果テーブルをクエリします。
Flink ジョブを送信後、Hologres に書き込まれたデータをクエリします。以下の文を例として示します。
SELECT * FROM test_sink_customer;