このトピックでは、Realtime Compute for Apache Flink を使用した Flink Python ストリームジョブおよびバッチジョブのデプロイと起動手順について説明します。
前提条件
-
Resource Access Management (RAM) ユーザーまたは RAM ロールを使用する場合、Flink コンソールへのアクセスに必要な権限が付与されていることを確認してください。詳細については、「権限管理」をご参照ください。
-
Flink ワークスペースを作成しました。詳細については、「Apache Flink 向け Realtime Compute の有効化」をご参照ください。
ステップ 1:Python コードファイルの準備
Realtime Compute for Apache Flink コンソール には Python 開発環境は提供されていません。そのため、ジョブはローカルで開発する必要があります。ジョブのデバッグ方法やコネクタの使用方法については、「Python ジョブの開発」をご参照ください。
ローカル開発で使用する Flink のバージョンは、「ステップ 3:Python ジョブのデプロイメント」で選択するエンジンバージョンと一致していることを確認してください。カスタム Python 仮想環境、サードパーティ製 Python パッケージ、JAR パッケージ、データファイルなど、Python ジョブで他の依存関係を使用する方法については、「Python 依存関係の使用」をご参照ください。
このトピックでは、開始に役立つサンプル Python ファイルおよびデータファイルを提供しています。サンプルジョブは単語の出現頻度をカウントするものです。以下の手順で使用するために、これらのファイルをダウンロードしてください。
-
Python テストジョブをダウンロードできます。
-
ストリームジョブ:word_count_streaming.py
-
バッチジョブ:word_count_batch.py
-
-
Shakespeare をクリックして Shakespeare データファイルをダウンロードします。
ステップ 2:Python ファイルおよびデータファイルのアップロード
-
Realtime Compute for Apache Flink コンソール にログインします。
-
対象のワークスペースの 操作 列で、コンソール をクリックします。
-
左側のナビゲーションウィンドウで、ファイル管理 をクリックします。
-
リソースのアップロード をクリックし、Python ファイルおよびデータファイルをアップロードします。
ステップ 1 でダウンロードしたサンプル Python ファイルおよびデータファイルをアップロードします。ファイルの保存パスについては、「ファイル管理」をご参照ください。
ステップ 3:Python ジョブのデプロイメント
ストリームジョブ
-
オペレーションセンター の ページで、
-
デプロイメント情報を入力します。

パラメーター
説明
例
デプロイモード
「ストリーム」を選択します。
ストリーム
デプロイ名
Python ジョブの名前を入力します。
flink-streaming-test-python
エンジンバージョン
ジョブ用の Flink エンジンバージョンです。
信頼性およびパフォーマンスを向上させるため、推奨 または 安定 ラベルが付いたバージョンを使用してください。詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。
vvr-8.0.9-flink-1.17
Python ファイルパス
word_count_streaming.py をクリックしてサンプル Python ファイルをダウンロードし、右側の
アイコンをクリックしてファイルを選択・アップロードします。ファイルが既に ファイル管理 に存在する場合は、再度アップロードする必要はありません。ファイルを直接選択してください。
-
エントリモジュール
プログラムのエントリクラスです。
-
Python ジョブファイルが .py ファイルの場合、このフィールドは必須ではありません。
-
ジョブファイルが .zip ファイルの場合、`word_count` のようにエントリモジュールを入力します。
必須ではありません
エントリポイントのメイン引数
入力パラメーターを入力します。これらのパラメーターは main メソッド内で呼び出されます。
この例では、入力データファイル `Shakespeare` のストレージパスを入力します。
--input oss://<your-oss-bucket-name>/artifacts/namespaces/<project-name>/ShakespeareShakespeare ファイルの完全なパスは、ファイル管理 からコピーできます。
デプロイ先
ドロップダウンリストから、対象の リソースキュー または セッションクラスター(本番環境向けではありません)を選択します。詳細については、「リソースキューの管理」および「セッションクラスターの作成」をご参照ください。
重要セッションクラスターにデプロイされたジョブは、モニタリングおよびアラート(またはデータ曲線)、モニタリングおよびアラート設定、自動チューニングをサポートしません。セッションクラスターは本番環境での使用を避けてください。開発およびテスト用途でのみ使用可能です。詳細については、「ジョブのデバッグ」をご参照ください。
default-queue
構成パラメーターの詳細については、「ジョブのデプロイメント」をご参照ください。
-
-
デプロイ をクリックします。
バッチジョブ
-
オペレーションセンター の ページで、ジョブのデプロイメント をクリックし、Python ジョブ を選択します。
-
デプロイメント情報を入力します。

パラメーター
説明
例
デプロイモード
「バッチ」を選択します。
バッチ
デプロイ名
Python ジョブの名前を入力します。
flink-batch-test-python
エンジンバージョン
ジョブ用の Flink エンジンバージョンです。
信頼性およびパフォーマンスを向上させるため、推奨 または 安定 ラベルが付いたバージョンを使用してください。詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。
vvr-8.0.9-flink-1.17
Python ファイルパス
word_count_batch.py をクリックしてサンプル Python ファイルをダウンロードし、右側の
アイコンをクリックしてファイルを選択・アップロードします。-
エントリモジュール
プログラムのエントリとして機能するクラスです。
-
Python ジョブファイルが .py ファイルの場合、このフィールドは必須ではありません。
-
ジョブファイルが .zip ファイルの場合、`word_count` のようにエントリモジュールを入力します。
必須ではありません
エントリポイントのメイン引数
入力パラメーターを入力します。これらのパラメーターは main メソッド内で呼び出されます。
このトピックでは、入力データファイル Shakespeare のパスおよび出力データディレクトリ `batch-quickstart-test-output` のパスを入力します。
説明出力ディレクトリの名前のみを指定すれば十分です。事前にストレージサービスで出力ディレクトリを作成する必要はありません。ただし、出力ディレクトリの親ディレクトリは入力ファイルの親ディレクトリと同じである必要があります。
--input oss://<your attached OSS Bucket name>/artifacts/namespaces/<project name>/Shakespeare--output oss://<your attached OSS Bucket name>/artifacts/namespaces/<project name>/python-batch-quickstart-test-outputShakespeare ファイルの完全なパスは、ファイル管理 からコピーできます。
デプロイ先
ドロップダウンリストから、対象の リソースキュー または セッションクラスター(本番環境向けではありません)を選択します。詳細については、「リソースキューの管理」および「セッションクラスターの作成」をご参照ください。
重要セッションクラスターにデプロイされたジョブは、モニタリングおよびアラート、モニタリングおよびアラート設定、自動チューニングをサポートしません。セッションクラスターは本番環境での使用を避けてください。開発およびテスト用途でのみ使用可能です。詳細については、「ジョブのデバッグ」をご参照ください。
default-queue
構成パラメーターの詳細については、「ジョブのデプロイメント」をご参照ください。
-
-
デプロイ をクリックします。
ステップ 4:Python ジョブの起動および Flink 計算結果の確認
ストリームジョブ
-
ページで、対象ジョブの 操作 列にある 起動 をクリックします。

-
ステートレス起動 を選択し、起動 をクリックします。ジョブの起動方法については、「ジョブの起動」をご参照ください。
ジョブを起動すると、そのステータスが 実行中 または 完了 に変わります。これはジョブが正常に実行されていることを示します。このトピックのサンプル Python ジョブでは、最終的なジョブステータスは 完了 となります。
-
ジョブステータスが 実行中 に変わると、サンプルストリームジョブの計算結果を確認できます。
重要このトピックのサンプル Python ジョブでは、ストリームジョブのステータスが 完了 に変わると、ジョブ結果が削除されます。したがって、計算結果を確認できるのは、ジョブステータスが 実行中 のときのみです。
TaskManager の、拡張子が .out で終わるログファイル内を `shakespeare` で検索し、Flink の計算結果を確認します。

バッチジョブ
-
ページで、対象ジョブを見つけ、起動 をクリックします。

-
ジョブの起動 ダイアログボックスで、起動 をクリックします。ジョブの起動方法については、「ジョブの起動」をご参照ください。
-
ジョブステータスが 完了 に変わると、サンプルバッチジョブの計算結果を確認できます。
OSS コンソール にログインします。oss://<your OSS Bucket name>/artifacts/namespaces/<project name>/python-batch-quickstart-test-output フォルダーで、ジョブの起動日時を名前に持つフォルダーをクリックします。その後、オブジェクトファイル名をクリックし、表示されるパネル内の ダウンロード をクリックします。

バッチジョブの結果は .ext ファイルです。ファイルをダウンロード後、テキストエディターまたは Microsoft Office Word で開いて結果を確認できます。以下の図は、計算結果の例を示しています。

(任意)ステップ 5:ジョブの終了
ジョブを変更し、その変更を反映させたい場合は、ジョブを再デプロイしたうえで、停止および再起動する必要があります。変更の例としては、コードの変更、WITH パラメーターの追加または削除、ジョブバージョンの変更などが挙げられます。また、ジョブが状態を再利用できない場合、新しいジョブを開始したい場合、または動的に適用されないパラメーターを更新した場合にも、ジョブを停止および再起動する必要があります。ジョブの停止方法については、「ジョブの停止」をご参照ください。
参照
-
ジョブの起動前または実行中に、ジョブリソースの構成を変更できます。サポートされるリソース割り当てモードには、基本モード(粗粒度)およびエキスパートモード(細粒度)の 2 種類があります。詳細については、「ジョブリソースの構成」をご参照ください。
-
Realtime Compute for Apache Flink では、Flink ジョブパラメーターの動的更新がサポートされています。この機能により、パラメーター構成をより迅速に有効化でき、ジョブの停止および再起動を伴わないため、業務への影響を軽減できます。詳細については、「動的スケーリングおよびパラメーターの動的更新」をご参照ください。
-
ジョブのログレベルを構成し、異なるログレベルごとに個別の出力を指定できます。詳細については、「ジョブログ出力の構成」をご参照ください。
-
SQL ジョブの開発プロセスの完全な例については、「Flink SQL ジョブのクイックスタート」をご参照ください。