本ホワイトペーパーでは、Nexmark ベンチマークテストツールを使用して、Realtime Compute for Apache Flink のストリーム処理パフォーマンスを評価する方法を説明します。
パフォーマンスの概要
Nexmark は、ストリーム処理エンジンの業界標準パフォーマンスベンチマークです。フィルタリング、集約、ジョイン、ウィンドウなどの一般的なシナリオをカバーする 19 の標準クエリが含まれています。本ホワイトペーパーでは、Nexmark テストツールを使用し、各クエリで 1 億件の入力レコード をベースラインとして、8 CU 構成の Realtime Compute for Apache Flink の包括的なパフォーマンス評価を実施しました。テスト結果は以下のとおりです。
-
q0、q1、q2 などのシンプルなクエリでは、RPS が毎秒 400 万~650 万レコードに達します。
-
q4、q5、q16 などの複雑な集約およびウィンドウクエリでは、RPS が毎秒 15 万~63 万レコードに達します。
総合的に、Realtime Compute for Apache Flink は、オープンソース Flink と比較して 3.24 倍の Nexmark パフォーマンスを実現します。
テストツール
Nexmark は、ストリーム処理エンジン向けの標準的なパフォーマンスベンチマークテストスイートです。テストモデルは次のとおりです:
-
Nexmark ソーステーブル:指定の TPS でテストデータ (Person、Auction、Bid イベント) を生成します。
-
変換:フィルタリング、変換、集約、結合、ウィンドウなどの一般的なシナリオを網羅する、19 個の標準 Nexmark クエリで構成されます。
-
Blackhole シンクテーブル:外部ストレージによるパフォーマンスへの干渉を排除するため、データを Blackhole シンクに書き込みます。これにより、評価を Flink エンジン自体の処理能力に集中させることができます。
本ホワイトペーパーで用いる Nexmark テストツールは、Realtime Compute for Apache Flink の OpenAPI に基づいて実装されています。ジョブの作成、デプロイメント、モニタリング、結果収集を含む一連のワークフローを自動化します。SQL を手動で記述したり、コンソールでジョブを作成したりする必要はありません。
テスト環境
このテストの Flink ジョブでは、以下の最適化設定を使用します。
|
パラメーター |
値 |
説明 |
|
table.exec.mini-batch.enabled |
true |
ミニバッチ集約を有効にします。 |
|
table.exec.mini-batch.allow-latency |
2 s |
ミニバッチのバッファリング間隔です。 |
|
table.optimizer.distinct-agg.split.enabled |
true |
Distinct 集約の分割最適化を有効にします。 |
|
execution.checkpointing.interval |
3 min |
チェックポイントの間隔です。 |
前提条件
-
Java Development Kit (JDK) 1.8.x 以降がインストールされていること。
-
Realtime Compute for Apache Flink を有効化し、ワークスペースを作成済みであること。 詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
-
お使いの Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret を取得済みであること。
操作手順
ステップ 1: テストツールのダウンロード
Nexmark テストツールパッケージ nexmark-flink.tar.gz をダウンロードし、解凍します。
解凍後のディレクトリ構造は以下のとおりです。
nexmark-flink/
├── run_nexmark.sh # テストエントリスクリプト
├── nexmark_env.sh # 環境変数設定ファイル (編集が必要)
├── bin/ # ランタイムスクリプト
├── conf/ # Flink ジョブ設定
├── lib/ # JAR ファイル (コンソールにアップロード)
└── queries-vvp/ # Nexmark クエリ SQL ファイル
ステップ 2: Nexmark JAR のアップロード
-
Realtime Compute for Apache Flink コンソール にログインします。
-
対象のプロジェクトスペースをクリックします。左側のナビゲーションペインで、 をクリックします。
-
nexmark-flink-0.2-SNAPSHOT.jarファイルを選択してアップロードします。このファイルは、テストツールのnexmark-flink/libディレクトリにあります。 -
アップロードが完了したら、ファイル名をクリックして OSS アドレスをコピーします。このアドレスは、後の設定手順で使用します。ファイルパスの形式は、ストレージタイプによって異なります。
-
OSS Bucket ストレージ:
oss://<OSS Bucket 名>/artifacts/namespaces/<プロジェクトスペース名>/<ファイル名>例:
oss://oss-test/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar -
フルマネージドストレージ:
oss://flink-fullymanaged-<ワークスペース ID>/artifacts/namespaces/<プロジェクトスペース名>/<ファイル名>例: oss://flink-fullymanaged-e6a123456789/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar
ワークスペースのストレージタイプを確認するには、Realtime Compute for Apache Flink 管理コンソールにアクセスし、対象のワークスペースを見つけて、[操作] 列の [詳細] をクリックします。
-
ステップ 3: ランタイムパラメーターの設定
nexmark-flink/nexmark_env.sh ファイルを編集し、以下のパラメーターを設定します。
|
パラメーター |
説明 |
例 |
|
END_POINT |
Realtime Compute for Apache Flink のエンドポイントです。お使いのリージョンのエンドポイントを選択してください。詳細については、「エンドポイント」をご参照ください。 |
ververica.cn-hangzhou.aliyuncs.com |
|
AK |
Alibaba Cloud アカウントのアクセスキー ID です。 |
- |
|
SK |
Alibaba Cloud アカウントのアクセスキーシークレットです。 |
- |
|
WORK_SPACE |
ワークスペースの ID です。 |
e6a123456789 |
|
NAMESPACE |
プロジェクトスペースの名前です。 |
flink-default |
|
NEXMARK_JAR |
ステップ 2 でアップロードした JAR ファイルの OSS アドレスです。 |
oss://flink-fullymanaged-e6a123456789/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar |
|
FLINK_VERSION |
テストする Flink エンジンのバージョンです。 |
vvr-11.5-jdk11-flink-1.20 |
|
QUERIES |
実行するクエリを指定します。複数のクエリを実行する場合は、カンマで区切って指定します (例: |
all |
すべてのクエリを実行すると、時間がかかります。各クエリは、ジョブの作成、データ生成、計算実行などの段階を経ます。本格的なテストを実行する前に、まず単一のクエリ (例: QUERIES を q0 に設定) を実行して、環境設定とパラメーター設定が正しいことを確認することを推奨します。
ステップ 4: テストの実行
-
nexmark-flinkディレクトリで、以下のコマンドを実行します。./run_nexmark.sh -
テストツールは、OpenAPI を使用して Nexmark ジョブを自動的に作成し、実行します。
-
テストが完了すると、各クエリの実行時間がミリ秒単位で表示されます。例えば、以下の出力ではクエリ q0 の実行時間は 13078 ミリ秒です。
INFO com.github.nexmark.flink.vvp.Nexmark - q0 13078 ============================================================================ ✓ Benchmark execution completed successfully ============================================================================
パフォーマンス結果
次の表は、8 CU 構成におけるオープンソース Flink (1.20.4) と Realtime Compute for Apache Flink (vvr-11.5-jdk11-flink-1.20) の Nexmark パフォーマンスを比較したものです。各クエリは 1 億件のレコードを入力として処理します。RPS (秒あたりのレコード数) は、入力レコード数を期間 (秒) で割って算出されます。
次のテストデータは、特定のハードウェア環境および特定のエンジンバージョンで収集したものです。実際のパフォーマンスは、ハードウェアのアップグレードやエンジンの更新によって変動する場合があります。これらの結果は参考情報です。
|
クエリ |
ECS 上のオープンソース Flink バージョン:1.20.4 |
Realtime Compute for Apache Flink バージョン:vvr-11.5-jdk11-flink-1.20 |
|||
|
期間 (ms) |
RPS |
期間 (ms) |
RPS |
オープンソース版との RPS 比 (×) |
|
|
q0 |
58848 |
1,699,293 |
23450 |
4,264,392 |
2.51 |
|
q1 |
57045 |
1,753,002 |
22824 |
4,381,353 |
2.50 |
|
q2 |
51890 |
1,927,154 |
15224 |
6,568,576 |
3.41 |
|
q3 |
84986 |
1,176,664 |
21558 |
4,638,649 |
3.94 |
|
q4 |
553426 |
180,693 |
157117 |
636,468 |
3.52 |
|
q5 |
365636 |
273,496 |
357547 |
279,684 |
1.02 |
|
q7 |
1257452 |
79,526 |
333837 |
299,547 |
3.77 |
|
q8 |
79788 |
1,253,321 |
29939 |
3,340,125 |
2.67 |
|
q9 |
2324518 |
43,020 |
266563 |
375,146 |
8.72 |
|
q10 |
189985 |
526,357 |
51202 |
1,953,049 |
3.71 |
|
q11 |
408384 |
244,868 |
145983 |
685,011 |
2.80 |
|
q12 |
121554 |
822,680 |
36991 |
2,703,360 |
3.29 |
|
q14 |
68903 |
1,451,316 |
20012 |
4,997,002 |
3.44 |
|
q15 |
183709 |
544,339 |
42734 |
2,340,057 |
4.30 |
|
q16 |
917597 |
108,980 |
337293 |
296,478 |
2.72 |
|
q17 |
102847 |
972,318 |
27076 |
3,693,308 |
3.80 |
|
q18 |
574949 |
173,928 |
96335 |
1,038,044 |
5.97 |
|
q19 |
586287 |
170,565 |
95121 |
1,051,293 |
6.16 |
|
q20 |
1340638 |
74,591 |
231482 |
431,999 |
5.79 |
|
q21 |
127089 |
786,850 |
39693 |
2,519,336 |
3.20 |
|
q22 |
94830 |
1,054,519 |
31228 |
3,202,254 |
3.04 |
|
合計 |
9550361 |
15,317,480 |
2383209 |
49,695,131 |
3.24 |
オープンソース Flink のテスト手順
ECS 上のオープンソース Flink で結果を再現するには、以下の手順に従ってください。
環境の準備
以下の構成で、ECS 上の EMR を使用して Flink クラスターを作成します。
-
EMR バージョン:EMR-5.21.0
-
ハードウェア仕様:3 台の ecs.g6a.xlarge インスタンス (4 vCPU / 16 GiB)。マスターノード 1 台とコアノード 2 台を含みます。
-
Hadoop および HDFS サービスを有効にします。
-
すべてのノード間でパスワードレスログインを設定します。たとえば、秘密鍵ファイル (
key.pemなど) をマスターノードにアップロードします。次に、マスターノードの~/.ssh/configファイルに以下の設定を追加します。IP アドレスとファイルパスは、実際の値に置き換えてください。Host 192.168.0.0 HostName 192.168.0.0 User root IdentityFile /path/to/key.pem StrictHostKeyChecking no Host 192.168.0.1 HostName 192.168.0.1 User root IdentityFile /path/to/key.pem StrictHostKeyChecking no Host 192.168.0.2 HostName 192.168.0.2 User root IdentityFile /path/to/key.pem StrictHostKeyChecking nossh を使用して、ノード間のパスワードレスログインが正しく機能していることを確認してください。「bad permissions」エラーが発生した場合は、
chmod 600 /path/to/key.pemを実行して権限を修正してください。
ソフトウェアの準備
-
Flink パッケージ (Apache Flink Downloads) と Nexmark テストパッケージ (nexmark-flink.tgz) をダウンロードします。パッケージをマスターノードにアップロードし、解凍します。
tar -zxvf flink-1.20.4-bin-scala_2.12.tgz tar -zxvf nexmark-flink.tgz mv flink-1.20.4 flink mv nexmark-flink nexmark
-
nexmark/libディレクトリ内の JAR ファイルをflink/libにコピーします。これらの JAR には、Nexmark データジェネレーターが含まれています。cp nexmark/lib/* flink/lib/
-
環境変数を設定します。
~/.bashrcを編集し、以下の設定を追加してから、source ~/.bashrcを実行して変更を反映させます。実際の環境に合わせてパスを設定してください。
export JAVA_HOME=/etc/alternatives/java_sdk_11 export PATH=$JAVA_HOME/bin:$PATH export FLINK_HOME=/mnt/disk1/flink export HADOOP_CLASSPATH=$(/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop classpath)
クラスターの設定と起動
-
Flink ワーカーを設定します。このテストでは 8 つの TaskManager を使用します。マスターノードに 2 つ、各コアノードに 3 つずつデプロイします。
flink/conf/workersを編集し、IP アドレスを実際の値に置き換えてください。192.168.0.0 192.168.0.0 192.168.0.1 192.168.0.1 192.168.0.1 192.168.0.2 192.168.0.2 192.168.0.2
-
flink/conf/config.yamlをnexmark/conf/config.yamlで置き換え、以下の設定項目を更新します。-
jobmanager.rpc.address:マスターノードの IP アドレス (192.168.0.0など)。 -
state.checkpoints.dir:HDFS パス (hdfs:///checkpointsなど)。 -
taskmanager.memory.process.size:4 G
-
-
nexmark/conf/nexmark.yamlを編集し、nexmark.metric.reporter.hostをマスターノードの IP アドレスに設定します。 -
flinkおよびnexmarkディレクトリと環境変数設定を、各コアノードに配布します。IP アドレスは実際の値に置き換えてください。
scp -r flink 192.168.0.1:/mnt/disk1/ scp -r flink 192.168.0.2:/mnt/disk1/ scp -r nexmark 192.168.0.1:/mnt/disk1/ scp -r nexmark 192.168.0.2:/mnt/disk1/ scp ~/.bashrc 192.168.0.1:~/ scp ~/.bashrc 192.168.0.2:~/配布が完了したら、各コアノードで
source ~/.bashrcを実行して、環境変数を反映させます。
-
マスターノードで、Flink クラスターを起動します。
flink/bin/start-cluster.sh
-
Nexmark テスト環境を初期化します。このスクリプトは、各ノードで必要な Metric Reporter を設定します。
nexmark/bin/setup_cluster.sh
リソース制限の設定
Flink クラスターの起動後、cgroups を使用して各 TaskManager プロセスの CPU 使用率を 75% に制限する必要があります。これにより、リソースの競合によるハートビートタイムアウトが原因で TaskManager が切断されるのを防ぎます。
TaskManager が実行されているすべてのノード (マスターノードを含む) で、以下のコマンドを実行してください。
yum install -y libcgroup libcgroup-tools
cgcreate -t root:root -a root:root -g cpu,memory:mygroup
echo 100000 > /sys/fs/cgroup/cpu/mygroup/cpu.cfs_period_us
echo 300000 > /sys/fs/cgroup/cpu/mygroup/cpu.cfs_quota_us
echo $((12 * 1024 * 1024 * 1024)) > /sys/fs/cgroup/memory/mygroup/memory.limit_in_bytes
jps | grep TaskManagerRunner | awk '{print $1}' | xargs cgclassify -g cpu,memory:mygroup
ここで、cpu.cfs_quota_us / cpu.cfs_period_us = 300000 / 100000 = 3 です。これは、cgroup が最大 3 CPU コア (4 vCPU の 75%) を使用できることを意味します。
Realtime Compute for Apache Flink のフルマネージド環境では、リソース使用制限を手動で設定する必要はありません。このサービスは、購入したコンピューティングリソースを完全に活用します。
Nexmark の実行
マスターノードで以下のコマンドを実行します。完了するまで待機し、結果を確認してください。
nexmark/bin/run_query.sh q0,q1,q2,q3,q4,q5,q7,q8,q9,q10,q11,q12,q14,q15,q16,q17,q18,q19,q20,q21,q22