Hologres は、PostgreSQL プロトコルと互換性のあるワンストップのリアルタイム データウェアハウスエンジンです。リアルタイムおよびオフラインの統合シナリオで大規模なデータの書き込み、更新、クエリをサポートします。このトピックでは、Spark を使用して Hologres にデータをバッチ書き込みする際のパフォーマンステスト結果に基づいて、さまざまなバッチ書き込みシナリオに適した書き込みモードを選択する方法について説明します。
リアルタイムのデータ書き込みと更新については、パフォーマンステスト計画と結果について「データ書き込み、データ更新、およびポイントクエリの負荷テストのベストプラクティス」をご参照ください。
バッチ書き込みモードの比較
バッチデータ書き込みシナリオの場合、Hologres は、従来の COPY モード、COPY プロトコルに基づいて開発された FIXED COPY ストリーミングインポートモード、およびバッチインポートをストリーミングインポートに変換する INSERT INTO VALUES モードなど、さまざまな書き込みモードをサポートしています。
次の表は、3 つの書き込みモードの詳細な比較を示しています。
比較項目 | COPY | FIXED COPY | INSERT INTO VALUES | |
概要 | プライマリキーのないテーブルへの COPY ベースのバッチインポート | プライマリキーのあるテーブルへの COPY ベースのバッチインポート | COPY プロトコルに基づいて開発されたストリーミングインポートモード | 基本的なストリーミングインポートモード |
典型的なシナリオ |
|
|
| |
ロックの粒度 | 行ロック | テーブルロック | 行ロック | 行ロック |
データの可視性 | COPY 操作の完了後に表示 | COPY 操作の完了後に表示 | リアルタイムで表示 | リアルタイムで表示 |
パフォーマンス | 高 | 高 | 中 | 中 |
Hologres リソース消費量 | 低 | 低 | 高 | 高 |
クライアントリソース消費量 | 低 | 高 | 低 | 中 |
プライマリキーの競合ポリシー | 該当なし |
|
|
|
バッチ書き込みモードの選択
バッチデータ書き込みシナリオでは、3 つのモードの特性は次のとおりです。
COPY: 従来の COPY モード。最適な書き込みパフォーマンスと最小の Hologres リソース消費を実現します。
FIXED COPY: COPY プロトコルに基づいて開発されたストリーミングインポートモード。行ロックのみを生成し、データはリアルタイムで表示されます。
INSERT INTO VALUES: バッチインポートを従来のストリーミング書き込みに変換します。このモードは、バッチ書き込みシナリオでは明らかな利点がありません。
説明INSERT INTO VALUES モードは、バッチ書き込みシナリオでのみ明らかな利点はありません。ただし、このモードだけがデータの取り消し(削除)機能を備えており、バイナリ ログ データの書き込みなどのシナリオでは依然として必要です。
データのリアルタイムの可視性、ロックの粒度、またはデータソースの負荷に関する特別な要件がない場合は、バッチ書き込みに COPY モードを優先できます。テーブルロックが生成されると、テーブルは複数のデータ書き込みタスクを同時にサポートできないことに注意してください。
Spark を使用したバッチ書き込み: Hologres インスタンスを V2.2.25 以降にアップグレードし、コネクタの 書き込みパラメータ
write.mode
をauto
(デフォルト値) に設定することをお勧めします。システムは最適な書き込みモードを自動的に選択します。Flink を使用したバッチ書き込み: まず、以下の決定木に従って COPY モードまたは FIXED COPY モードを使用するかどうかを決定し、次に以下のパラメータを構成する必要があります。
jdbccopywritemode: このパラメータを TRUE に設定します。これは、INSERT INTO VALUES モードが使用されないことを意味します。
bulkload: TRUE は COPY モードを示し、FALSE は FIXED COPY モードを示します。ビジネス要件に基づいてこのパラメータを構成します。
次の決定木に従って、適切な書き込みモードを選択できます。
バッチ書き込みパフォーマンステスト
テストプロセスでは、Hologres によって開発されたオープンソースコンポーネント Hologres-Spark-Connector (オープンソース) が使用されます。
準備
基本環境の準備
次の環境を準備する必要があります。
Hologres インスタンスと EMR Spark クラスタは同じリージョンに存在し、同じ VPC を使用する必要があります。
V2.2.25 以降の Hologres インスタンスを購入し、データベースを作成します。
EMR Spark クラスタを作成します。Spark のバージョンは 3.3.0 以上である必要があります。詳細については、「クラスタの作成」をご参照ください。
Spark-Connector パッケージをダウンロードします。
Spark が Hologres からデータを読み書きするために必要なコネクタ JAR パッケージ
hologres-connector-spark-3.x
は、Maven Central Repository からダウンロードできます。
このトピックでは、次の環境情報を使用します。
サービス | バージョン | 仕様 |
Hologres | V3.0.30 | 64 コア、256 GB (1 CU = 1 コア、4 GB) |
EMR Spark | EMR V5.18.1、Spark 3.5.3 | 8 コア、32 GB × 8(マスターノード 1 つとコアノード 7 つ) 重要 OSS-HDFS サービスをアクティブにする必要があります。 |
Spark-Connector | 1.5.2 | 該当なし |
テストデータの準備
生データを準備します。
EMR Spark クラスタのマスターノードにログインします。詳細については、「特定の ECS インスタンスに接続してクラスタのマスターノードにログインする」をご参照ください。
TPC-H_Tools_v3.0.0.zip をクリックして TPC-H ツールをダウンロードし、マスターノードの ECS インスタンスにコピーして解凍し、
TPC-H_Tools_v3.0.0/TPC-H_Tools_v3.0.0/dbgen
ディレクトリに移動します。次のコードを実行して、
dbgen
ディレクトリに1 TB
のテストデータセットファイルcustomer.tbl
を生成します。元の tbl ファイルサイズは 23 GB です。./dbgen -s 1000 -T c
次の表は、TPC-H 1 TB データセットの customer テーブルの詳細情報を示しています。
テーブル情報
説明
フィールド数
8
フィールドタイプ
INT、BIGINT、TEXT、DECIMAL
データ行数
150,000,000
テーブルシャード数
40
テストデータを Spark にインポートします。
次のコマンドを実行して、customer.tbl ファイルを Spark クラスタにアップロードします。
hadoop fs -put customer.tbl <spark_resource>
spark_resource は、EMR Spark クラスタの作成時に [クラスタのルートストレージディレクトリ] パラメータに構成されたパスを示します。
Hologres にさまざまなストレージ形式のテーブルを作成します。SQL ステートメントの例:
行と列のハイブリッドストレージ
CREATE TABLE test_table_mixed ( C_CUSTKEY BIGINT PRIMARY KEY, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'column,row' );
列指向ストレージ (プライマリキーあり)
CREATE TABLE test_table_column ( C_CUSTKEY BIGINT PRIMARY KEY, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'column' );
列指向ストレージ (プライマリキーなし)
CREATE TABLE test_table_column_no_pk ( C_CUSTKEY BIGINT, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'column' );
行指向ストレージ (プライマリキーあり)
CREATE TABLE test_table_row ( C_CUSTKEY BIGINT PRIMARY KEY, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'row' );
行指向ストレージ (プライマリキーなし)
CREATE TABLE test_table_row_no_pk ( C_CUSTKEY BIGINT, C_NAME TEXT, C_ADDRESS TEXT, C_NATIONKEY INT, C_PHONE TEXT, C_ACCTBAL DECIMAL(15, 2), C_MKTSEGMENT TEXT, C_COMMENT TEXT ) WITH ( orientation = 'row' );
パフォーマンステスト
テスト構成
このトピックでは、主にさまざまなモードでのインポートパフォーマンスをテストします。
EMR Spark クラスタのマスターノードにログインし (ログイン方法については、「特定の ECS インスタンスに接続してクラスタのマスターノードにログインする」をご参照ください)、ダウンロードした Spark-Connector パッケージをアップロードし、次のコマンドを実行して spark-sql コマンドラインインターフェイス (CLI) に移動します。
説明spark.sql.files.maxPartitionBytes
パラメータの値を調整することで、Spark が HDFS ファイルを読み取る際の同時実行性を制御できます。この例の同時実行性は 40 です。# spark-sql CLI に移動します。 spark-sql --jars <path>/hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.executor.instances=40 \ --conf spark.executor.cores=1 \ --conf spark.executor.memory=4g \ --conf spark.sql.files.maxPartitionBytes=644245094
path は、hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar パッケージが配置されているルートパスを示します。
spark-sql CLI で次の SQL ステートメントを実行して、一時テーブルを作成することでデータを書き込みます。
テストプロセス中にパラメータを複数回調整してさまざまなモードの書き込みパフォーマンスをテストする必要があるため、この例では一時テーブルを使用します。
説明実際のビジネスシナリオでは、カタログ を使用して Hologres テーブルを直接ロードできます。これはより便利です。
-- CSV 形式で一時テーブルを作成します CREATE TEMPORARY VIEW csvtable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "<spark_resources>/customer.tbl", sep "|" ); CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://<hologres_vpc_endpoint>/<database_name>", username "<accesskey_id>", password "<accesskey_secret>", table "<table_name>", direct_connect "false", write.mode "auto", write.insert.thread_size "3", write.insert.batch_size "2048" ); INSERT INTO hologresTable SELECT * FROM csvTable;
次の表にパラメータを示します。
パラメータ
説明
spark_resources
EMR Spark クラスタの作成時に [クラスタのルートストレージディレクトリ] パラメータに構成されたパス。
EMR on ECS コンソール にログインし、ターゲットクラスタ ID をクリックし、[基本情報] タブの [クラスタ情報] セクションでクラスタのルートストレージパスを取得できます。
hologres_vpc_endpoint
手順 2: App Service プランを作成する
Hologres コンソール にログインし、対象のインスタンス ID をクリックして、[インスタンスの詳細] ページの [ネットワーク情報] セクションで VPC エンドポイントを取得できます。たとえば、中国 (杭州) リージョンの VPC エンドポイントは
<Instance ID>-cn-hangzhou-vpc-st.hologres.aliyuncs.com:80
の形式です。database_name
Hologres インスタンス内のデータベースの名前です。
アクセスキー ID
対応する Hologres データベースに対する読み取り権限を持つ AccessKey ID です。
アクセスキーシークレット
対応する Hologres データベースに対する読み取り権限を持つ AccessKey シークレット。
table_name
コピー先の Hologres テーブルの名前です。
write.mode
書き込みモード。有効な値:
auto
: デフォルト値。コネクタが最適なモードを自動的に選択することを示します。insert
: INSERT INTO VALUES モードを使用します。stream
: ストリーミング書き込みに FIXED COPY モードを使用します。bulk_load
: プライマリキーのないテーブルへのバッチ インポートに COPY モードを使用します。bulk_load_on_conflict
: プライマリキーのあるテーブルへのバッチ インポートに COPY モードを使用します。
write.insert.thread_size
書き込みの同時実行数。INSERT INTO VALUES モードを使用する場合にのみ有効です。
write.insert.batch_size
バッチ書き込みサイズ。INSERT INTO VALUES モードを使用する場合にのみ有効です。
write.on_conflict_action
INSERT_OR_REPLACE (デフォルト): プライマリキーの競合が発生した場合に更新します。
INSERT_OR_IGNORE: プライマリキーの競合が発生した場合に無視します。
詳細については、「パラメーター」をご参照ください。
テストシナリオ
テストシナリオ | 利用可能なオプション |
テーブルストレージフォーマット |
|
データ更新方法 |
|
データ書き込み方法 |
|
テスト結果
テスト結果には、次のフィールドが含まれます。
フィールド | 説明 |
タスク実行の合計時間 | Spark タスクの実行合計時間。 このフィールドは、EMR Spark クラスタのマスターノードで spark-sql CLI の INSERT 操作を完了するのにかかった時間を示します。次の図は、時間の取得方法を示しています。 |
データ書き込みの平均時間 | Spark クラスタのスケジューリング、データの読み取り、およびデータの再配布の時間を除く、データ書き込みのみに費やされた平均時間。 Hologres コンソールの HoloWeb で次の SQL 文を実行して、シャードの同時実行数 (count) とデータ書き込みの平均時間 (avg_duration_ms、ミリ秒単位) を取得できます。
パラメーター:
|
Hologres の負荷 | Hologres インスタンスの CPU 使用率。 Hologres コンソールのインスタンスの [監視情報] ページで CPU 使用率を取得できます。 |
Spark の負荷 | EMR ノードの負荷。 EMR on ECS コンソール にログインし、クラスタ ID をクリックして、[監視と診断] タブをクリックします。次に、[メトリック監視] タブをクリックし、次のパラメーターを構成します。
次に、[CPU 使用率] メトリックをクエリします。これが Spark の負荷です。 |
次の表に、詳細なテスト結果を示します。
ストレージ形式 | プライマリキー | 書き込みモード | タスク実行の合計時間 | データ書き込みの平均時間 | Hologres の負荷 | Spark の負荷 |
列指向ストレージ | プライマリキーなし | insert | 241.61s | 232.70s | 92% | 15% |
stream | 228.11s | 222.34s | 100% | 36% | ||
bulk_load | 88.72s | 57.16s | 97% | 47% | ||
プライマリキーあり 無視 | insert | 190.96s | 172.60s | 90% | 14% | |
stream | 149.60s | 142.16s | 100% | 14% | ||
bulk_load_on_conflict | 115.96s | 42.92s | 60% | 75% | ||
プライマリキーあり 置換 | insert | 600.40s | 574.31s | 91% | 5% | |
stream | 550.29s | 540.32s | 100% | 5% | ||
bulk_load_on_conflict | 188.05s | 109.77s | 93% | 78% | ||
行指向ストレージ | プライマリキーなし | insert | 132.38s | 123.79s | 94% | 22% |
stream | 114.41s | 103.81s | 100% | 17% | ||
bulk_load | 68.20s | 41.22s | 98% | 32% | ||
プライマリキーあり 無視 | insert | 190.48s | 170.49s | 89% | 15% | |
stream | 185.46s | 172.48s | 85% | 14% | ||
bulk_load_on_conflict | 117.81s | 47.69s | 58% | 75% | ||
プライマリキーあり 置換 | insert | 177.97s | 170.78s | 93% | 15% | |
stream | 142.44s | 130.16s | 100% | 20% | ||
bulk_load_on_conflict | 137.69s | 65.18s | 92% | 78% | ||
行と列のハイブリッドストレージ | プライマリキーあり 無視 | insert | 172.19s | 158.74s | 86% | 16% |
stream | 150.63s | 149.76s | 100% | 12% | ||
bulk_load_on_conflict | 128.83s | 42.09s | 59% | 79% | ||
プライマリキーあり 置換 | insert | 690.37s | 662.00s | 92% | 5% | |
stream | 625.84s | 623.08s | 100% | 4% | ||
bulk_load_on_conflict | 202.07s | 121.58s | 93% | 80% |
書き込みモードのマッピングは次のとおりです。
insert: INSERT INTO VALUES
stream: FIXED COPY
bulk_load: プライマリキーのないテーブルへの COPY ベースのインポート
bulk_load_on_conflict: プライマリキーのあるテーブルへの COPY ベースのインポート