このトピックでは、Flink JAR のストリームジョブとバッチジョブを作成、デプロイ、開始する方法について説明します。これにより、Realtime Compute for Apache Flink における JAR ジョブのワークフローを理解できます。
前提条件
-
Resource Access Management (RAM) ユーザーまたは RAM ロールを使用して Flink コンソールにアクセスする場合は、必要な権限があることを確認してください。詳細については、「権限管理」をご参照ください。
-
Flink ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
ステップ 1: JAR パッケージの開発
Realtime Compute for Apache Flink コンソールは、JAR パッケージの開発環境を提供していません。ローカル環境でコードを開発、コンパイル、パッケージ化する必要があります。環境依存関係の設定、コネクタの使用、Object Storage Service (OSS) から追加の依存ファイルを読み取る方法については、「JAR ジョブの開発」をご参照ください。
ローカル開発で使用する Flink のバージョンが、ステップ 3: JAR ジョブのデプロイで選択するエンジンバージョンと同じであることを確認してください。また、依存パッケージの範囲にもご注意ください。
Flink JAR ジョブをすぐに開始できるように、このトピックでは、単語の出現頻度をカウントするテスト用の JAR パッケージとデータファイルを提供します。これらをダウンロードして、以降のステップで使用できます。
-
FlinkQuickStart-1.0-SNAPSHOT.jar をクリックして、テスト JAR パッケージをダウンロードします。
ソースコードを表示したい場合は、FlinkQuickStart.zip をクリックしてダウンロードし、コンパイルしてください。
-
Shakespeare をクリックして、Shakespeare データファイルをダウンロードします。
ステップ 2: テスト JAR パッケージとデータファイルのアップロード
-
対象のワークスペースの [操作] 列にある [コンソール] をクリックします。
-
左側のナビゲーションウィンドウで、[ファイル管理] をクリックします。
-
[リソースのアップロード] をクリックして、デプロイメント用の JAR パッケージとデータファイルをアップロードします。
このトピックでは、ステップ 1 でダウンロードした FlinkQuickStart-1.0-SNAPSHOT.jar と Shakespeare ファイルを使用します。ファイルストレージパスの詳細については、「ファイル管理」をご参照ください。
ステップ 3: JAR ジョブのデプロイ
ストリームジョブ
-
で、[ジョブのデプロイ] をクリックして [JAR ジョブ] を選択します。
-
デプロイメント情報を入力します。

パラメーター
説明
例
デプロイメントモード
ストリームを選択します。
ストリーム
デプロイメント名
JAR ジョブの名前を入力します。
flink-streaming-test-jar
エンジンバージョン
ジョブの Flink エンジンバージョン。
[推奨] または [安定] タグが付いたバージョンを使用してください。これらのバージョンは、より高い信頼性とパフォーマンスを提供します。詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。
vvr-8.0.9-flink-1.17
JAR URI
ステップ 2 でリソース管理にアップロードした FlinkQuickStart-1.0-SNAPSHOT.jar ファイルを選択します。右側の
アイコンをクリックして、独自の JAR パッケージを選択してアップロードすることもできます。ファイルがすでに [ファイル管理] に存在する場合、再度アップロードする必要はありません。対象のファイルを選択するだけです。
説明Ververica Runtime (VVR) 8.0.6 以降では、Flink ワークスペースを有効化した際にアタッチしたバケットへのアクセスのみがサポートされます。他のバケットへのアクセスはサポートされていません。
-
エントリポイントクラス
プログラムのエントリポイントクラス。JAR パッケージにメインクラスが指定されていない場合は、エントリポイントクラスの完全なパスを入力します。
このトピックで提供されるテスト JAR パッケージには、ストリームジョブとバッチジョブの両方のコードが含まれています。したがって、ここではストリームジョブのエントリポイントを指定する必要があります。
org.example.WordCountStreaming
エントリーポイントのメイン引数
入力パラメーターを入力します。これらのパラメーターは main メソッドで呼び出されます。
この例では、入力データファイル Shakespeare のストレージパスを入力します。
--input oss://<your-attached-OSS-Bucket-name>/artifacts/namespaces/<project-name>/Shakespeare[ファイル管理] から Shakespeare ファイルの完全なパスをコピーできます。
デプロイメントターゲット
ドロップダウンリストから、ターゲットの [リソースキュー] または [セッションクラスター] (本番環境での使用は非推奨) を選択します。詳細については、「リソースキューの管理」および「ステップ 1: セッションクラスターの作成」をご参照ください。
重要セッションクラスターにデプロイされたジョブは、監視とアラートの表示や設定、自動チューニングの有効化をサポートしていません。本番環境ではセッションクラスターを使用しないでください。セッションクラスターは、開発およびテスト環境として使用できます。詳細については、「ジョブのデバッグ」をご参照ください。
default-queue
その他のパラメーターの詳細については、「ジョブのデプロイ」をご参照ください。
-
[デプロイ] をクリックします。
バッチジョブ
-
で、[ジョブのデプロイ] をクリックし、[JAR ジョブ] を選択します。
-
デプロイメント情報を入力します。

パラメーター
説明
例
デプロイメントモード
バッチを選択します。
バッチ
デプロイメント名
JAR ジョブの名前を入力します。
flink-batch-test-jar
エンジンのバージョン
ジョブの Flink エンジンバージョン。
[推奨] または [安定] タグが付いたバージョンを使用してください。これらのバージョンは、より高い信頼性とパフォーマンスを提供します。詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。
vvr-8.0.9-flink-1.17
[JAR URI]
ステップ 2 でリソース管理にアップロードした FlinkQuickStart-1.0-SNAPSHOT.jar ファイルを選択します。右側の
アイコンをクリックして、独自の JAR パッケージを選択してアップロードすることもできます。-
エントリポイントクラス
プログラムのエントリポイントクラス。JAR パッケージにメインクラスが指定されていない場合は、[エントリポイントクラス] の完全なパスを入力します。
このトピックで提供されるテスト JAR パッケージには、ストリームジョブとバッチジョブの両方のコードが含まれています。したがって、ここではバッチジョブのエントリポイントを指定する必要があります。
org.example.WordCountBatch
[エントリポイントのメイン引数]
入力パラメーターを入力します。これらのパラメーターは main メソッドで呼び出されます。
この例では、入力データファイル Shakespeare と出力データファイル batch-quickstart-test-output.txt のストレージパスを入力します。
説明出力ファイルの完全なパスと名前を指定するだけで、事前にストレージサービスに出力ファイルを作成する必要はありません。出力ファイルのディレクトリパスは、入力ファイルと同じである必要があります。
--input oss://<your attached OSS bucket name>/artifacts/namespaces/<project name>/Shakespeare--output oss://<your attached OSS bucket name>/artifacts/namespaces/<project name>/batch-quickstart-test-output.txt[ファイル管理] で Shakespeare ファイルの完全なパスを直接コピーできます。
Deployment ターゲット
ドロップダウンリストから、ターゲットの [リソースキュー] または [セッションクラスター] (本番環境での使用は非推奨) を選択します。詳細については、「リソースキューの管理」および「ステップ 1: セッションクラスターの作成」をご参照ください。
重要セッションクラスターにデプロイされたジョブは、監視とアラートの表示や設定、自動チューニングの有効化をサポートしていません。本番環境ではセッションクラスターを使用しないでください。セッションクラスターは、開発およびテスト環境として使用できます。詳細については、「ジョブのデバッグ」をご参照ください。
default-queue
その他のパラメーターの詳細については、「ジョブのデプロイ」をご参照ください。
-
[デプロイ] をクリックします。
ステップ 4: ジョブの開始と結果の表示
ストリームジョブ
-
で、対象のジョブを見つけ、[操作] 列の [開始] をクリックします。

-
[ステートレス開始] を選択し、[開始] をクリックします。ジョブの開始方法の詳細については、「ジョブの開始」をご参照ください。
-
ジョブのステータスが [実行中] に変わったら、ストリームジョブの結果を表示できます。
.out で終わる TaskManager ログファイルで、「shakespeare」を検索して Flink の計算結果を表示できます。

バッチジョブ
TaskManager.out ログには最大 2,000 レコードまでしか表示できません。この制限のため、ストリームジョブとバッチジョブの結果レコード数は異なります。この制限の詳細については、「Print」をご参照ください。
(任意) ステップ 5: ジョブの停止
ジョブを変更して変更を有効にするには、ジョブを再デプロイしてから停止し、再起動する必要があります。変更の例としては、コードの変更、WITH パラメーターの追加または削除、ジョブバージョンの変更などがあります。また、ジョブが状態を再利用できない場合、新しいジョブを開始したい場合、または動的に有効にならないパラメーターを更新する場合にも、ジョブを停止して再起動する必要があります。ジョブの停止方法の詳細については、「ジョブの停止」をご参照ください。
参照
-
ジョブを開始する前にジョブリソースを設定したり、ジョブがオンラインになった後に変更したりできます。基本 (粗粒度) とエキスパート (細粒度) の 2 つのリソース割り当てモードがサポートされています。詳細については、「ジョブリソースの設定」をご参照ください。
-
Flink ジョブのパラメーターを動的に更新できます。これにより、設定変更がより迅速に有効になり、ジョブの停止と再起動によるビジネスの中断を減らすことができます。詳細については、「動的スケーリングとパラメーター更新」をご参照ください。
-
ジョブログレベルを設定し、異なるログレベルに対して個別の出力を指定できます。詳細については、「ジョブログ出力の設定」をご参照ください。
-
簡単な例に従って、SQL ジョブの完全な開発フローを迅速に学習できます。詳細については、「Flink SQL ジョブのクイックスタート」をご参照ください。
-
Hologres を使用してリアルタイムデータウェアハウスを構築する方法については、「Hologres を使用したリアルタイムデータウェアハウスの構築」をご参照ください。
-
Paimon と StarRocks を使用してストリーミングデータレイクハウスを構築する方法については、「Paimon と StarRocks を使用したストリーミングデータレイクハウスの構築」をご参照ください。

