すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:PyFlink ジョブのクイックスタート

最終更新日:Mar 07, 2026

このトピックでは、Realtime Compute for Apache Flink を使用した Flink Python ストリームジョブおよびバッチジョブのデプロイと起動手順について説明します。

前提条件

  • Resource Access Management (RAM) ユーザーまたは RAM ロールを使用する場合、Flink コンソールへのアクセスに必要な権限が付与されていることを確認してください。詳細については、「権限管理」をご参照ください。

  • Flink ワークスペースを作成しました。詳細については、「Apache Flink 向け Realtime Compute の有効化」をご参照ください。

ステップ 1:Python コードファイルの準備

Realtime Compute for Apache Flink コンソール には Python 開発環境は提供されていません。そのため、ジョブはローカルで開発する必要があります。ジョブのデバッグ方法やコネクタの使用方法については、「Python ジョブの開発」をご参照ください。

重要

ローカル開発で使用する Flink のバージョンは、「ステップ 3:Python ジョブのデプロイメント」で選択するエンジンバージョンと一致していることを確認してください。カスタム Python 仮想環境、サードパーティ製 Python パッケージ、JAR パッケージ、データファイルなど、Python ジョブで他の依存関係を使用する方法については、「Python 依存関係の使用」をご参照ください。

このトピックでは、開始に役立つサンプル Python ファイルおよびデータファイルを提供しています。サンプルジョブは単語の出現頻度をカウントするものです。以下の手順で使用するために、これらのファイルをダウンロードしてください。

  • Shakespeare をクリックして Shakespeare データファイルをダウンロードします。

ステップ 2:Python ファイルおよびデータファイルのアップロード

  1. Realtime Compute for Apache Flink コンソール にログインします。

  2. 対象のワークスペースの 操作 列で、コンソール をクリックします。

  3. 左側のナビゲーションウィンドウで、ファイル管理 をクリックします。

  4. リソースのアップロード をクリックし、Python ファイルおよびデータファイルをアップロードします。

    ステップ 1 でダウンロードしたサンプル Python ファイルおよびデータファイルをアップロードします。ファイルの保存パスについては、「ファイル管理」をご参照ください。

ステップ 3:Python ジョブのデプロイメント

ストリームジョブ

  1. オペレーションセンタージョブ O&M ページで、ジョブのデプロイメント > Python ジョブ

  2. デプロイメント情報を入力します。

    py_流_zh.jpg

    パラメーター

    説明

    デプロイモード

    「ストリーム」を選択します。

    ストリーム

    デプロイ名

    Python ジョブの名前を入力します。

    flink-streaming-test-python

    エンジンバージョン

    ジョブ用の Flink エンジンバージョンです。

    信頼性およびパフォーマンスを向上させるため、推奨 または 安定 ラベルが付いたバージョンを使用してください。詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。

    vvr-8.0.9-flink-1.17

    Python ファイルパス

    word_count_streaming.py をクリックしてサンプル Python ファイルをダウンロードし、右側の 上传 アイコンをクリックしてファイルを選択・アップロードします。

    ファイルが既に ファイル管理 に存在する場合は、再度アップロードする必要はありません。ファイルを直接選択してください。

    -

    エントリモジュール

    プログラムのエントリクラスです。

    • Python ジョブファイルが .py ファイルの場合、このフィールドは必須ではありません。

    • ジョブファイルが .zip ファイルの場合、`word_count` のようにエントリモジュールを入力します。

    必須ではありません

    エントリポイントのメイン引数

    入力パラメーターを入力します。これらのパラメーターは main メソッド内で呼び出されます。

    この例では、入力データファイル `Shakespeare` のストレージパスを入力します。

    --input oss://<your-oss-bucket-name>/artifacts/namespaces/<project-name>/Shakespeare

    Shakespeare ファイルの完全なパスは、ファイル管理 からコピーできます。

    デプロイ先

    ドロップダウンリストから、対象の リソースキュー または セッションクラスター(本番環境向けではありません)を選択します。詳細については、「リソースキューの管理」および「セッションクラスターの作成」をご参照ください。

    重要

    セッションクラスターにデプロイされたジョブは、モニタリングおよびアラート(またはデータ曲線)、モニタリングおよびアラート設定、自動チューニングをサポートしません。セッションクラスターは本番環境での使用を避けてください。開発およびテスト用途でのみ使用可能です。詳細については、「ジョブのデバッグ」をご参照ください。

    default-queue

    構成パラメーターの詳細については、「ジョブのデプロイメント」をご参照ください。

  3. デプロイ をクリックします。

バッチジョブ

  1. オペレーションセンタージョブ O&M ページで、ジョブのデプロイメント をクリックし、Python ジョブ を選択します。

  2. デプロイメント情報を入力します。

    py_批_zh.jpg

    パラメーター

    説明

    デプロイモード

    「バッチ」を選択します。

    バッチ

    デプロイ名

    Python ジョブの名前を入力します。

    flink-batch-test-python

    エンジンバージョン

    ジョブ用の Flink エンジンバージョンです。

    信頼性およびパフォーマンスを向上させるため、推奨 または 安定 ラベルが付いたバージョンを使用してください。詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。

    vvr-8.0.9-flink-1.17

    Python ファイルパス

    word_count_batch.py をクリックしてサンプル Python ファイルをダウンロードし、右側の 上传 アイコンをクリックしてファイルを選択・アップロードします。

    -

    エントリモジュール

    プログラムのエントリとして機能するクラスです。

    • Python ジョブファイルが .py ファイルの場合、このフィールドは必須ではありません。

    • ジョブファイルが .zip ファイルの場合、`word_count` のようにエントリモジュールを入力します。

    必須ではありません

    エントリポイントのメイン引数

    入力パラメーターを入力します。これらのパラメーターは main メソッド内で呼び出されます。

    このトピックでは、入力データファイル Shakespeare のパスおよび出力データディレクトリ `batch-quickstart-test-output` のパスを入力します。

    説明

    出力ディレクトリの名前のみを指定すれば十分です。事前にストレージサービスで出力ディレクトリを作成する必要はありません。ただし、出力ディレクトリの親ディレクトリは入力ファイルの親ディレクトリと同じである必要があります。

    --input oss://<your attached OSS Bucket name>/artifacts/namespaces/<project name>/Shakespeare

    --output oss://<your attached OSS Bucket name>/artifacts/namespaces/<project name>/python-batch-quickstart-test-output

    Shakespeare ファイルの完全なパスは、ファイル管理 からコピーできます。

    デプロイ先

    ドロップダウンリストから、対象の リソースキュー または セッションクラスター(本番環境向けではありません)を選択します。詳細については、「リソースキューの管理」および「セッションクラスターの作成」をご参照ください。

    重要

    セッションクラスターにデプロイされたジョブは、モニタリングおよびアラート、モニタリングおよびアラート設定、自動チューニングをサポートしません。セッションクラスターは本番環境での使用を避けてください。開発およびテスト用途でのみ使用可能です。詳細については、「ジョブのデバッグ」をご参照ください。

    default-queue

    構成パラメーターの詳細については、「ジョブのデプロイメント」をご参照ください。

  3. デプロイ をクリックします。

ステップ 4:Python ジョブの起動および Flink 計算結果の確認

ストリームジョブ

  1. オペレーションセンター > ジョブ O&M ページで、対象ジョブの 操作 列にある 起動 をクリックします。

    py_部署_zh.jpg

  2. ステートレス起動 を選択し、起動 をクリックします。ジョブの起動方法については、「ジョブの起動」をご参照ください。

    ジョブを起動すると、そのステータスが 実行中 または 完了 に変わります。これはジョブが正常に実行されていることを示します。このトピックのサンプル Python ジョブでは、最終的なジョブステータスは 完了 となります。

  3. ジョブステータスが 実行中 に変わると、サンプルストリームジョブの計算結果を確認できます。

    重要

    このトピックのサンプル Python ジョブでは、ストリームジョブのステータスが 完了 に変わると、ジョブ結果が削除されます。したがって、計算結果を確認できるのは、ジョブステータスが 実行中 のときのみです。

    TaskManager の、拡張子が .out で終わるログファイル内を `shakespeare` で検索し、Flink の計算結果を確認します。

    image.png

バッチジョブ

  1. オペレーションセンター > ジョブ O&M ページで、対象ジョブを見つけ、起動 をクリックします。

    py_部署批_zh.jpg

  2. ジョブの起動 ダイアログボックスで、起動 をクリックします。ジョブの起動方法については、「ジョブの起動」をご参照ください。

  3. ジョブステータスが 完了 に変わると、サンプルバッチジョブの計算結果を確認できます。

    OSS コンソール にログインします。oss://<your OSS Bucket name>/artifacts/namespaces/<project name>/python-batch-quickstart-test-output フォルダーで、ジョブの起動日時を名前に持つフォルダーをクリックします。その後、オブジェクトファイル名をクリックし、表示されるパネル内の ダウンロード をクリックします。

    下载

    バッチジョブの結果は .ext ファイルです。ファイルをダウンロード後、テキストエディターまたは Microsoft Office Word で開いて結果を確認できます。以下の図は、計算結果の例を示しています。result

(任意)ステップ 5:ジョブの終了

ジョブを変更し、その変更を反映させたい場合は、ジョブを再デプロイしたうえで、停止および再起動する必要があります。変更の例としては、コードの変更、WITH パラメーターの追加または削除、ジョブバージョンの変更などが挙げられます。また、ジョブが状態を再利用できない場合、新しいジョブを開始したい場合、または動的に適用されないパラメーターを更新した場合にも、ジョブを停止および再起動する必要があります。ジョブの停止方法については、「ジョブの停止」をご参照ください。

参照