すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:パフォーマンス ホワイトペーパー (Nexmark ベンチマーク)

最終更新日:May 23, 2026

本ホワイトペーパーでは、Nexmark ベンチマークテストツールを使用して、Realtime Compute for Apache Flink のストリーム処理パフォーマンスを評価する方法を説明します。

パフォーマンスの概要

Nexmark は、ストリーム処理エンジンの業界標準パフォーマンスベンチマークです。フィルタリング、集約、ジョイン、ウィンドウなどの一般的なシナリオをカバーする 19 の標準クエリが含まれています。本ホワイトペーパーでは、Nexmark テストツールを使用し、各クエリで 1 億件の入力レコード をベースラインとして、8 CU 構成の Realtime Compute for Apache Flink の包括的なパフォーマンス評価を実施しました。テスト結果は以下のとおりです。

  • q0、q1、q2 などのシンプルなクエリでは、RPS が毎秒 400 万~650 万レコードに達します。

  • q4、q5、q16 などの複雑な集約およびウィンドウクエリでは、RPS が毎秒 15 万~63 万レコードに達します。

総合的に、Realtime Compute for Apache Flink は、オープンソース Flink と比較して 3.24 倍の Nexmark パフォーマンスを実現します。

テストツール

Nexmark は、ストリーム処理エンジン向けの標準的なパフォーマンスベンチマークテストスイートです。テストモデルは次のとおりです:

  • Nexmark ソーステーブル:指定の TPS でテストデータ (Person、Auction、Bid イベント) を生成します。

  • 変換:フィルタリング、変換、集約、結合、ウィンドウなどの一般的なシナリオを網羅する、19 個の標準 Nexmark クエリで構成されます。

  • Blackhole シンクテーブル:外部ストレージによるパフォーマンスへの干渉を排除するため、データを Blackhole シンクに書き込みます。これにより、評価を Flink エンジン自体の処理能力に集中させることができます。

本ホワイトペーパーで用いる Nexmark テストツールは、Realtime Compute for Apache Flink の OpenAPI に基づいて実装されています。ジョブの作成、デプロイメント、モニタリング、結果収集を含む一連のワークフローを自動化します。SQL を手動で記述したり、コンソールでジョブを作成したりする必要はありません。

テスト環境

このテストの Flink ジョブでは、以下の最適化設定を使用します。

パラメーター

説明

table.exec.mini-batch.enabled

true

ミニバッチ集約を有効にします。

table.exec.mini-batch.allow-latency

2 s

ミニバッチのバッファリング間隔です。

table.optimizer.distinct-agg.split.enabled

true

Distinct 集約の分割最適化を有効にします。

execution.checkpointing.interval

3 min

チェックポイントの間隔です。

前提条件

  • Java Development Kit (JDK) 1.8.x 以降がインストールされていること。

  • Realtime Compute for Apache Flink を有効化し、ワークスペースを作成済みであること。 詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。

  • お使いの Alibaba Cloud アカウントの AccessKey ID と AccessKey Secret を取得済みであること。

操作手順

ステップ 1: テストツールのダウンロード

Nexmark テストツールパッケージ nexmark-flink.tar.gz をダウンロードし、解凍します。

解凍後のディレクトリ構造は以下のとおりです。

nexmark-flink/
├── run_nexmark.sh          # テストエントリスクリプト
├── nexmark_env.sh          # 環境変数設定ファイル (編集が必要)
├── bin/                    # ランタイムスクリプト
├── conf/                   # Flink ジョブ設定
├── lib/                    # JAR ファイル (コンソールにアップロード)
└── queries-vvp/            # Nexmark クエリ SQL ファイル

ステップ 2: Nexmark JAR のアップロード

  1. Realtime Compute for Apache Flink コンソール にログインします。

  2. 対象のプロジェクトスペースをクリックします。左側のナビゲーションペインで、[ファイル管理] > [リソースのアップロード] をクリックします。

  3. nexmark-flink-0.2-SNAPSHOT.jar ファイルを選択してアップロードします。このファイルは、テストツールの nexmark-flink/lib ディレクトリにあります。

  4. アップロードが完了したら、ファイル名をクリックして OSS アドレスをコピーします。このアドレスは、後の設定手順で使用します。ファイルパスの形式は、ストレージタイプによって異なります。

    • OSS Bucket ストレージoss://<OSS Bucket 名>/artifacts/namespaces/<プロジェクトスペース名>/<ファイル名>

      例: oss://oss-test/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar
    • フルマネージドストレージoss://flink-fullymanaged-<ワークスペース ID>/artifacts/namespaces/<プロジェクトスペース名>/<ファイル名>

      例: oss://flink-fullymanaged-e6a123456789/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar

    ワークスペースのストレージタイプを確認するには、Realtime Compute for Apache Flink 管理コンソールにアクセスし、対象のワークスペースを見つけて、[操作] 列の [詳細] をクリックします。

ステップ 3: ランタイムパラメーターの設定

nexmark-flink/nexmark_env.sh ファイルを編集し、以下のパラメーターを設定します。

パラメーター

説明

END_POINT

Realtime Compute for Apache Flink のエンドポイントです。お使いのリージョンのエンドポイントを選択してください。詳細については、「エンドポイント」をご参照ください。

ververica.cn-hangzhou.aliyuncs.com

AK

Alibaba Cloud アカウントのアクセスキー ID です。

-

SK

Alibaba Cloud アカウントのアクセスキーシークレットです。

-

WORK_SPACE

ワークスペースの ID です。

e6a123456789

NAMESPACE

プロジェクトスペースの名前です。

flink-default

NEXMARK_JAR

ステップ 2 でアップロードした JAR ファイルの OSS アドレスです。

oss://flink-fullymanaged-e6a123456789/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar

FLINK_VERSION

テストする Flink エンジンのバージョンです。

vvr-11.5-jdk11-flink-1.20

QUERIES

実行するクエリを指定します。複数のクエリを実行する場合は、カンマで区切って指定します (例: q0 または q1,q2,q3)。すべてのクエリを実行するには、値を all に設定します。

all

説明

すべてのクエリを実行すると、時間がかかります。各クエリは、ジョブの作成、データ生成、計算実行などの段階を経ます。本格的なテストを実行する前に、まず単一のクエリ (例: QUERIES を q0 に設定) を実行して、環境設定とパラメーター設定が正しいことを確認することを推奨します。

ステップ 4: テストの実行

  1. nexmark-flink ディレクトリで、以下のコマンドを実行します。

    ./run_nexmark.sh
  2. テストツールは、OpenAPI を使用して Nexmark ジョブを自動的に作成し、実行します。

  3. テストが完了すると、各クエリの実行時間がミリ秒単位で表示されます。例えば、以下の出力ではクエリ q0 の実行時間は 13078 ミリ秒です。

    INFO  com.github.nexmark.flink.vvp.Nexmark - q0 13078
    ============================================================================
    ✓ Benchmark execution completed successfully
    ============================================================================

パフォーマンス結果

次の表は、8 CU 構成におけるオープンソース Flink (1.20.4) と Realtime Compute for Apache Flink (vvr-11.5-jdk11-flink-1.20) の Nexmark パフォーマンスを比較したものです。各クエリは 1 億件のレコードを入力として処理します。RPS (秒あたりのレコード数) は、入力レコード数を期間 (秒) で割って算出されます。

説明

次のテストデータは、特定のハードウェア環境および特定のエンジンバージョンで収集したものです。実際のパフォーマンスは、ハードウェアのアップグレードやエンジンの更新によって変動する場合があります。これらの結果は参考情報です。

クエリ

ECS 上のオープンソース Flink

バージョン:1.20.4

Realtime Compute for Apache Flink

バージョン:vvr-11.5-jdk11-flink-1.20

期間 (ms)

RPS

期間 (ms)

RPS

オープンソース版との RPS 比 (×)

q0

58848

1,699,293

23450

4,264,392

2.51

q1

57045

1,753,002

22824

4,381,353

2.50

q2

51890

1,927,154

15224

6,568,576

3.41

q3

84986

1,176,664

21558

4,638,649

3.94

q4

553426

180,693

157117

636,468

3.52

q5

365636

273,496

357547

279,684

1.02

q7

1257452

79,526

333837

299,547

3.77

q8

79788

1,253,321

29939

3,340,125

2.67

q9

2324518

43,020

266563

375,146

8.72

q10

189985

526,357

51202

1,953,049

3.71

q11

408384

244,868

145983

685,011

2.80

q12

121554

822,680

36991

2,703,360

3.29

q14

68903

1,451,316

20012

4,997,002

3.44

q15

183709

544,339

42734

2,340,057

4.30

q16

917597

108,980

337293

296,478

2.72

q17

102847

972,318

27076

3,693,308

3.80

q18

574949

173,928

96335

1,038,044

5.97

q19

586287

170,565

95121

1,051,293

6.16

q20

1340638

74,591

231482

431,999

5.79

q21

127089

786,850

39693

2,519,336

3.20

q22

94830

1,054,519

31228

3,202,254

3.04

合計

9550361

15,317,480

2383209

49,695,131

3.24

オープンソース Flink のテスト手順

ECS 上のオープンソース Flink で結果を再現するには、以下の手順に従ってください。

環境の準備

以下の構成で、ECS 上の EMR を使用して Flink クラスターを作成します。

  • EMR バージョン:EMR-5.21.0

  • ハードウェア仕様:3 台の ecs.g6a.xlarge インスタンス (4 vCPU / 16 GiB)。マスターノード 1 台とコアノード 2 台を含みます。

  • Hadoop および HDFS サービスを有効にします。

  • すべてのノード間でパスワードレスログインを設定します。たとえば、秘密鍵ファイル (key.pem など) をマスターノードにアップロードします。次に、マスターノードの ~/.ssh/config ファイルに以下の設定を追加します。IP アドレスとファイルパスは、実際の値に置き換えてください。

    Host 192.168.0.0
        HostName 192.168.0.0
        User root
        IdentityFile /path/to/key.pem
        StrictHostKeyChecking no
    Host 192.168.0.1
        HostName 192.168.0.1
        User root
        IdentityFile /path/to/key.pem
        StrictHostKeyChecking no
    Host 192.168.0.2
        HostName 192.168.0.2
        User root
        IdentityFile /path/to/key.pem
        StrictHostKeyChecking no
    ssh を使用して、ノード間のパスワードレスログインが正しく機能していることを確認してください。「bad permissions」エラーが発生した場合は、chmod 600 /path/to/key.pem を実行して権限を修正してください。

ソフトウェアの準備

  1. Flink パッケージ (Apache Flink Downloads) と Nexmark テストパッケージ (nexmark-flink.tgz) をダウンロードします。パッケージをマスターノードにアップロードし、解凍します。

    tar -zxvf flink-1.20.4-bin-scala_2.12.tgz
    tar -zxvf nexmark-flink.tgz
    mv flink-1.20.4 flink
    mv nexmark-flink nexmark
  1. nexmark/lib ディレクトリ内の JAR ファイルを flink/lib にコピーします。これらの JAR には、Nexmark データジェネレーターが含まれています。

    cp nexmark/lib/* flink/lib/
  1. 環境変数を設定します。~/.bashrc を編集し、以下の設定を追加してから、source ~/.bashrc を実行して変更を反映させます。

    実際の環境に合わせてパスを設定してください。
    export JAVA_HOME=/etc/alternatives/java_sdk_11
    export PATH=$JAVA_HOME/bin:$PATH
    export FLINK_HOME=/mnt/disk1/flink
    export HADOOP_CLASSPATH=$(/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop classpath)
  1. Flink ワーカーを設定します。このテストでは 8 つの TaskManager を使用します。マスターノードに 2 つ、各コアノードに 3 つずつデプロイします。

    flink/conf/workers を編集し、IP アドレスを実際の値に置き換えてください。

    192.168.0.0
    192.168.0.0
    192.168.0.1
    192.168.0.1
    192.168.0.1
    192.168.0.2
    192.168.0.2
    192.168.0.2
  1. flink/conf/config.yamlnexmark/conf/config.yaml で置き換え、以下の設定項目を更新します。

    • jobmanager.rpc.address:マスターノードの IP アドレス (192.168.0.0 など)。

    • state.checkpoints.dir:HDFS パス (hdfs:///checkpoints など)。

    • taskmanager.memory.process.size4 G

  1. nexmark/conf/nexmark.yaml を編集し、nexmark.metric.reporter.host をマスターノードの IP アドレスに設定します。

  2. flink および nexmark ディレクトリと環境変数設定を、各コアノードに配布します。

    IP アドレスは実際の値に置き換えてください。
    scp -r flink 192.168.0.1:/mnt/disk1/
    scp -r flink 192.168.0.2:/mnt/disk1/
    scp -r nexmark 192.168.0.1:/mnt/disk1/
    scp -r nexmark 192.168.0.2:/mnt/disk1/
    scp ~/.bashrc 192.168.0.1:~/
    scp ~/.bashrc 192.168.0.2:~/

    配布が完了したら、各コアノードで source ~/.bashrc を実行して、環境変数を反映させます。

  1. マスターノードで、Flink クラスターを起動します。

    flink/bin/start-cluster.sh
  1. Nexmark テスト環境を初期化します。このスクリプトは、各ノードで必要な Metric Reporter を設定します。

    nexmark/bin/setup_cluster.sh

リソース制限の設定

Flink クラスターの起動後、cgroups を使用して各 TaskManager プロセスの CPU 使用率を 75% に制限する必要があります。これにより、リソースの競合によるハートビートタイムアウトが原因で TaskManager が切断されるのを防ぎます。

TaskManager が実行されているすべてのノード (マスターノードを含む) で、以下のコマンドを実行してください。

yum install -y libcgroup libcgroup-tools
cgcreate -t root:root -a root:root -g cpu,memory:mygroup
echo 100000 > /sys/fs/cgroup/cpu/mygroup/cpu.cfs_period_us
echo 300000 > /sys/fs/cgroup/cpu/mygroup/cpu.cfs_quota_us
echo $((12 * 1024 * 1024 * 1024)) > /sys/fs/cgroup/memory/mygroup/memory.limit_in_bytes
jps | grep TaskManagerRunner | awk '{print $1}' | xargs cgclassify -g cpu,memory:mygroup

ここで、cpu.cfs_quota_us / cpu.cfs_period_us = 300000 / 100000 = 3 です。これは、cgroup が最大 3 CPU コア (4 vCPU の 75%) を使用できることを意味します。

説明

Realtime Compute for Apache Flink のフルマネージド環境では、リソース使用制限を手動で設定する必要はありません。このサービスは、購入したコンピューティングリソースを完全に活用します。

Nexmark の実行

マスターノードで以下のコマンドを実行します。完了するまで待機し、結果を確認してください。

nexmark/bin/run_query.sh q0,q1,q2,q3,q4,q5,q7,q8,q9,q10,q11,q12,q14,q15,q16,q17,q18,q19,q20,q21,q22