本トピックでは、Flink Python API(PyFlink)ジョブの背景情報、注意事項、開発方法、デバッグ方法、およびコネクタの使用方法について説明します。
背景情報
PyFlink ジョブはローカル環境で開発します。開発完了後、Realtime Compute for Apache Flink コンソールからジョブをデプロイおよび起動し、ビジネス結果を確認します。この一連の手順の詳細なチュートリアルについては、「Flink Python ジョブのクイックスタート」をご参照ください。
開発環境の要件
Ververica Runtime(VVR)8.0.11 より前のバージョンには Python 3.7.9 が含まれています。VVR 8.0.11 以降のバージョンには Python 3.9.21 が含まれています。
説明ローカル開発環境では、対象の VVR エンジンにプリインストール済みの Python バージョンと同一のバージョンを使用することを推奨します。
PyFlink をインストールします。PyFlink のバージョンは、対象の VVR エンジンで使用される Apache Flink のバージョンと一致させる必要があります。たとえば、デプロイページで
vvr-8.0.9-flink-1.17を選択する場合、apache-flink==1.17.*をインストールします。pip install apache-flink==1.17.2IDE をインストールします。PyCharm または VS Code を推奨します。
PyFlink ジョブはオフラインで開発し、その後 Realtime Compute for Apache Flink コンソールでデプロイおよび実行します。
制限事項
PyFlink ジョブを開発する際は、デプロイおよびネットワーク環境によって課せられる以下の要件に注意してください:
Apache Flink 1.13 以降のみがサポートされています。
Flink ワークスペースには、pandas、NumPy、PyArrow などの一般的なライブラリを含むプリインストール済みの Python 環境が含まれています。完全なリストについては、本トピック末尾の「プリインストール済みソフトウェアパッケージ」をご参照ください。
Flink 実行環境では JDK 8 および JDK 11 のみがサポートされています。PyFlink ジョブでサードパーティ製 JAR を使用する場合は、これらの JDK バージョンとの互換性を確認してください。
VVR 4.x では Scala 2.11 のみがサポートされています。VVR 6.x 以降では Scala 2.12 のみがサポートされています。PyFlink ジョブでサードパーティ製 JAR を使用する場合は、JAR の依存関係が適切な Scala バージョンと一致していることを確認してください。
ジョブの開発
Table API/SQL と DataStream API の選択
PyFlink では Table API/SQL および DataStream API の両方がサポートされています。Table API/SQL の使用を推奨します。その理由は以下のとおりです:
優れたパフォーマンス:Table API/SQL の実行計画は最適化後に JVM 内で完全に実行されます。一方、DataStream API では JVM と Python プロセス間でレコード単位のシリアル化およびデシリアル化が発生するため、大きなオーバーヘッドが発生します。
より豊富な機能:Table API/SQL では、コネクタ、データ形式、ウィンドウ関数などに対する完全なサポートが提供されます。また、SQL ジョブと同一のコネクタエコシステムを共有します。
コミュニティによる推奨:Apache Flink コミュニティでは、PyFlink 開発において Table API/SQL が優先されています。
SQL で複雑なカスタムロジックを表現できない場合にのみ、DataStream API を使用してください。
開発の参考情報
以下に示すリソースを活用して、Flink ビジネスロジックをローカルで開発してください。開発完了後、コードを Realtime Compute for Apache Flink コンソールにアップロードし、ジョブをデプロイします。
Apache Flink 1.20 におけるビジネスロジックの開発方法については、「Flink Python API 開発ガイド」をご参照ください。
Apache Flink のコーディング中に発生する一般的な問題とその解決方法については、「よくある質問」をご参照ください。
プロジェクト構造
推奨される Python ジョブプロジェクトの構造は以下のとおりです:
my-flink-python-project/
├── my_job.py # メインジョブファイル
├── udfs.py # ユーザー定義関数(任意)
├── requirements.txt # サードパーティ製 Python 依存関係(任意)
└── config.properties # 構成ファイル(任意)依存関係の管理
PyFlink ジョブでカスタム Python 仮想環境、サードパーティ製 Python パッケージ、JAR、およびデータファイルを使用する方法については、「Python 依存関係の使用」をご参照ください。
ユーザー定義関数(UDF)
以下の例では、文字列を匿名化する Python UDF(ユーザー定義関数)の開発方法を示します:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(result_type=DataTypes.STRING())
def mask_phone(phone: str):
"""電話番号をマスク:先頭 3 桁と末尾 4 桁を保持し、中間の桁を **** で置き換えます。"""
if phone is None or len(phone) != 11:
return phone
return phone[:3] + '****' + phone[7:]この UDF を SQL ジョブで使用するには、以下のとおりです:
CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;
INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;UDF の登録、更新、削除方法については、「ユーザー定義関数(UDF)の管理」をご参照ください。
コネクタの使用
Flink がサポートするコネクタの一覧については、「サポートされているコネクタ」をご参照ください。コネクタを使用するには、以下の手順を実行します:
Realtime Compute for Apache Flink コンソール にログインします。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで、[ファイル管理] をクリックします。
[Upload Artifact] をクリックし、対象のコネクタの Python パッケージを選択します。
Realtime Compute for Apache Flink から提供されるカスタムコネクタまたは組み込みコネクタをアップロードできます。Flink コネクタ向け公式 Python パッケージのダウンロードリンクについては、「コネクタ一覧」をご参照ください。
[]ページで、[]をクリックし、[追加の依存関係]フィールドで対象のコネクタ用のPythonパッケージを選択し、他のパラメーターを設定して、ジョブをデプロイします。
デプロイ済みのジョブの名前をクリックします。[デプロイメントの詳細] タブの [実行時パラメーター設定] セクションで、[編集] をクリックし、[その他の設定] に Python コネクタパッケージへのパスを追加します。
ジョブが複数のコネクタパッケージ(例:connector-1.jar、connector-2.jar)に依存する場合、以下のように設定します:
pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'組み込みコネクタ、データ形式、またはカタログ(VVR 11.2 以降でのみ利用可能)を使用する場合は、**[パラメーター]** の下にある **[その他の構成]** フィールドに構成を追加します。例:
## 複数の組み込みコネクタを使用 pipeline.used-builtin-connectors: kafka;sls ## 複数のデータ形式を使用 pipeline.used-builtin-formats: avro;parquet ## 複数の既存カタログを使用 pipeline.used-builtin-catalogs: catalogname1;catalogname2
コネクタの使用に関する詳細なサンプルコードについては、「完全なサンプルコード」をご参照ください。
ジョブのデバッグ
Python ユーザー定義関数(UDF)を実装する際は、logging モジュールを使用してトラブルシューティング用のログ情報を出力します。以下のコードスニペットはその例です:
import logging
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + jログ出力後、TaskManager のログファイルで出力を確認できます。
ローカルデバッグ
デフォルトでは、Realtime Compute for Apache Flink はインターネットにアクセスできません。そのため、テスト目的でオンラインデータソースに直接接続できない場合があります。ローカルデバッグには、以下のアプローチを推奨します:
単体テスト:UDF のロジックの正しさを検証するために、独立した単体テストを実行します。
ローカル実行:ファイルやメモリ内データなどのローカルデータソースを用いて入力をシミュレートし、ジョブをローカルで実行して処理ロジックを検証します。例:
from pyflink.datastream import StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() # ローカルデータソースを使用したテスト ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)]) ds.key_by(lambda x: x[0]).sum(1).print() env.execute("local_test")リモートデバッグ:オンラインデータソースを用いたデバッグについては、「コネクタを用いたジョブのローカル実行およびデバッグ」をご参照ください。
ジョブのデプロイメント
PyFlink ジョブの開発が完了したら、Realtime Compute for Apache Flink コンソールにアップロードしてデプロイします。以下の手順に従ってください:
Realtime Compute for Apache Flink コンソール にログインし、対象のワークスペースに移動します。
左側のナビゲーションウィンドウで、ファイル管理 をクリックし、Python ジョブファイル(.py または .zip)をアップロードします。また、サードパーティ製の依存関係や構成ファイルも必要に応じてアップロードします。
ページで、 をクリックし、デプロイ情報に入力します。
パラメーター
説明
Python ファイルパス
アップロード済みの Python ジョブファイルを選択します。
エントリーモジュール
.py ファイルの場合は空欄のままにしてください。.zip ファイルの場合は、エントリーモジュール名(例:
my_job)を入力します。追加の依存関係
必要に応じて、コネクタ JAR や構成ファイルを選択します。
Python ライブラリー
必要に応じて、サードパーティ製 Python パッケージ(.whl または .zip)を選択します。
Python アーカイブ
必要に応じて、カスタム Python 仮想環境(.zip)を選択します。
デプロイ をクリックします。
デプロイパラメーターの詳細については、「ジョブのデプロイメント」をご参照ください。
完全なサンプルコード
この例では、Kafka からデータを読み取り、処理し、MySQL に結果を書き込む Python ストリーミングジョブを示します。参考としてご利用ください。
この例にはチェックポイントや再起動戦略の構成は含まれていません。これらはデプロイ後に **[構成]** タブから追加できます。詳細については、「ジョブデプロイメント設定の構成」をご参照ください。
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def kafka_to_mysql():
# 実行環境の作成
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Kafka ソーステーブルの作成
t_env.execute_sql("""
CREATE TABLE kafka_source (
`id` INT,
`name` STRING,
`score` INT,
`event_time` TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'student_topic',
'properties.bootstrap.servers' = 'your-kafka-broker:9092',
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
# MySQL 結果テーブルの作成
t_env.execute_sql("""
CREATE TABLE mysql_sink (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
'table-name' = 'student',
'username' = 'your_username',
'password' = 'your_password'
)
""")
# スコアが 60 以上であるレコードをフィルターし、MySQL に書き込み
t_env.execute_sql("""
INSERT INTO mysql_sink
SELECT id, name, score
FROM kafka_source
WHERE score >= 60
""")
if __name__ == '__main__':
kafka_to_mysql()プリインストール済みソフトウェアパッケージ
VVR-11
Flink ワークスペースにインストールされているソフトウェアパッケージを以下に示します。
ソフトウェアパッケージ | バージョン |
apache-beam | 2.48.0 |
avro-python3 | 1.10.2 |
brotlipy | 0.7.0 |
certifi | 2022.12.7 |
cffi | 1.15.1 |
charset-normalizer | 2.0.4 |
cloudpickle | 2.2.1 |
conda | 22.11.1 |
conda-content-trust | 0.1.3 |
conda-package-handling | 1.9.0 |
crcmod | 1.7 |
cryptography | 38.0.1 |
Cython | 3.0.12 |
dill | 0.3.1.1 |
dnspython | 2.7.0 |
docopt | 0.6.2 |
exceptiongroup | 1.3.0 |
fastavro | 1.12.1 |
fasteners | 0.20 |
find_libpython | 0.5.0 |
grpcio | 1.56.2 |
grpcio-tools | 1.56.2 |
hdfs | 2.7.3 |
httplib2 | 0.22.0 |
idna | 3.4 |
importlib_metadata | 8.7.0 |
iniconfig | 2.1.0 |
isort | 6.1.0 |
numpy | 1.24.4 |
objsize | 0.6.1 |
orjson | 3.9.15 |
packaging | 25.0 |
pandas | 2.3.3 |
pemja | 0.5.5 |
pip | 22.3.1 |
pluggy | 1.0.0 |
proto-plus | 1.26.1 |
protobuf | 4.25.8 |
py-spy | 0.4.0 |
py4j | 0.10.9.7 |
pyarrow | 11.0.0 |
pyarrow-hotfix | 0.6 |
pycodestyle | 2.14.0 |
pycosat | 0.6.4 |
pycparser | 2.21 |
pydot | 1.4.2 |
pymongo | 4.15.4 |
pyOpenSSL | 22.0.0 |
pyparsing | 3.2.5 |
PySocks | 1.7.1 |
pytest | 7.4.4 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2025.11.3 |
requests | 2.32.5 |
ruamel.yaml | 0.18.16 |
ruamel.yaml.clib | 0.2.14 |
setuptools | 70.0.0 |
six | 1.16.0 |
tomli | 2.3.0 |
toolz | 0.12.0 |
tqdm | 4.64.1 |
typing_extensions | 4.15.0 |
tzdata | 2025.2 |
urllib3 | 1.26.13 |
wheel | 0.38.4 |
zipp | 3.23.0 |
zstandard | 0.25.0 |
VVR-8
Flink ワークスペース環境にプリインストール済みのソフトウェアパッケージを以下に示します。
ソフトウェアパッケージ | バージョン |
apache-beam | 2.43.0 |
avro-python3 | 1.9.2.1 |
certifi | 2025.7.9 |
charset-normalizer | 3.4.2 |
cloudpickle | 2.2.0 |
crcmod | 1.7 |
Cython | 0.29.24 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 1.4.7 |
fasteners | 0.19 |
find_libpython | 0.4.1 |
grpcio | 1.46.3 |
grpcio-tools | 1.46.3 |
hdfs | 2.7.3 |
httplib2 | 0.20.4 |
idna | 3.10 |
isort | 6.0.1 |
numpy | 1.21.6 |
objsize | 0.5.2 |
orjson | 3.10.18 |
pandas | 1.3.5 |
pemja | 0.3.2 |
pip | 22.3.1 |
proto-plus | 1.26.1 |
protobuf | 3.20.3 |
py4j | 0.10.9.7 |
pyarrow | 8.0.0 |
pycodestyle | 2.14.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.2.3 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2024.11.6 |
requests | 2.32.4 |
setuptools | 58.1.0 |
six | 1.17.0 |
typing_extensions | 4.14.1 |
urllib3 | 2.5.0 |
wheel | 0.33.4 |
zstandard | 0.23.0 |
VVR-6
Flink ワークスペース環境にプリインストール済みのソフトウェアパッケージを以下に示します。
ソフトウェアパッケージ | バージョン |
apache-beam | 2.27.0 |
avro-python3 | 1.9.2.1 |
certifi | 2024.8.30 |
charset-normalizer | 3.3.2 |
cloudpickle | 1.2.2 |
crcmod | 1.7 |
Cython | 0.29.16 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 0.23.6 |
future | 0.18.3 |
grpcio | 1.29.0 |
hdfs | 2.7.3 |
httplib2 | 0.17.4 |
idna | 3.8 |
importlib-metadata | 6.7.0 |
isort | 5.11.5 |
jsonpickle | 2.0.0 |
mock | 2.0.0 |
numpy | 1.19.5 |
oauth2client | 4.1.3 |
pandas | 1.1.5 |
pbr | 6.1.0 |
pemja | 0.1.4 |
pip | 20.1.1 |
protobuf | 3.17.3 |
py4j | 0.10.9.3 |
pyarrow | 2.0.0 |
pyasn1 | 0.5.1 |
pyasn1-modules | 0.3.0 |
pycodestyle | 2.10.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.1.4 |
python-dateutil | 2.8.0 |
pytz | 2024.1 |
requests | 2.31.0 |
rsa | 4.9 |
setuptools | 47.1.0 |
six | 1.16.0 |
typing-extensions | 3.7.4.3 |
urllib3 | 2.0.7 |
wheel | 0.42.0 |
zipp | 3.15.0 |
参照
PyFlink ジョブの開発プロセス全体のチュートリアルについては、「Flink Python ジョブのクイックスタート」をご参照ください。
Flink Python ジョブでカスタム Python 仮想環境、サードパーティ製 Python ライブラリー、JAR、およびデータファイルを使用する方法については、「Python 依存関係の使用」をご参照ください。
Realtime Compute for Apache Flink では、SQL ジョブおよび DataStream ジョブもサポートされています。これらのジョブタイプの開発ガイドについては、「ジョブ開発の概要」および「JAR ジョブの開発」をご参照ください。