このトピックでは、Flink Python API ジョブの背景情報、制限事項、開発メソッド、デバッグテクニック、およびコネクタの使用方法について説明します。
背景情報
Flink Python ジョブはローカルで開発します。開発後、Flink 開発コンソールでジョブをデプロイして開始し、ビジネスの結果を確認します。完全なワークフローについては、「Flink Python ジョブのクイックスタート」をご参照ください。
開発環境の要件
Ververica Runtime (VVR) 8.0.11 より前のバージョンには Python 3.7.9 が含まれています。VVR 8.0.11 以降のバージョンには Python 3.9.21 が含まれています。
説明ローカルの Python バージョンは、ターゲット VVR エンジンにプリインストールされている Python バージョンと一致させてください。
ターゲット VVR エンジンが使用する Flink バージョンと一致するバージョンの PyFlink をインストールしてください。たとえば、デプロイページで
vvr-8.0.9-flink-1.17を選択した場合、apache-flink==1.17.*をインストールします。pip install apache-flink==1.17.2IDE をインストールします。PyCharm または VS Code を推奨します。
Python ジョブの開発はオフラインで完了させ、その後リアルタイムコンピューティング管理コンソールでジョブをデプロイして実行します。
制限事項
デプロイメントとネットワーク環境の制約により、Python ジョブを開発する際には、以下の制限事項に従ってください:
Apache Flink 1.13 以降のみがサポートされています。
Flink ワークスペースには、pandas、NumPy、PyArrow などの一般的なライブラリを含む Python 環境がプリインストールされています。詳細については、このトピックの最後にある「プリインストールされているソフトウェアパッケージリスト」をご参照ください。
Flink ランタイム環境は JDK 8 と JDK 11 のみをサポートしています。Python ジョブがサードパーティの JAR に依存している場合は、それらに互換性があることを確認してください。
VVR 4.x は Scala 2.11 のみをサポートしています。VVR 6.x 以降は Scala 2.12 のみをサポートしています。Python ジョブがサードパーティの JAR に依存している場合は、JAR の依存関係が適切な Scala バージョンと一致していることを確認してください。
ジョブの開発
Table API/SQL と DataStream API の選択
PyFlink は Table API/SQL と DataStream API の両方をサポートしています。最初に Table API/SQL を使用することを推奨します。理由は以下の通りです:
パフォーマンスの向上:Table API/SQL の実行計画は最適化されており、すべて JVM 内で実行されます。DataStream API は、JVM と Python プロセス間で各レコードのシリアル化とデシリアル化が必要であり、大きなオーバーヘッドが発生します。
より完全な機能:Table API/SQL は、コネクタ、データ形式、ウィンドウ関数をより完全にサポートしており、SQL ジョブと同じコネクタエコシステムを共有しています。
コミュニティの方向性:Apache Flink コミュニティは、PyFlink の開発を主に Table API/SQL に集中させています。
DataStream API は、SQL で表現できない複雑なカスタムロジックの場合にのみ使用してください。
開発リファレンス
Flink のビジネスコードをローカルで開発するには、以下のドキュメントを使用してください。開発後、コードを Flink 開発コンソールにアップロードし、ジョブをデプロイします。
Apache Flink 1.20 のビジネスコード開発については、「Flink Python API 開発ガイド」をご参照ください。
Apache Flink のコーディング中によくある問題とその解決策については、「よくある質問」をご参照ください。
プロジェクト構造
Python ジョブには、以下のプロジェクト構造を推奨します:
my-flink-python-project/
├── my_job.py # メインジョブファイル
├── udfs.py # ユーザー定義関数 (オプション)
├── requirements.txt # サードパーティの Python 依存関係 (オプション)
└── config.properties # 設定ファイル (オプション)依存関係の管理
カスタム Python 仮想環境、サードパーティの Python パッケージ、JAR、およびデータファイルを Python ジョブで使用する方法については、「Python 依存関係の使用」をご参照ください。
ユーザー定義関数 (UDF)
次の例は、文字列をマスキングする Python UDF の開発方法を示しています:
from pyflink.table import DataTypes
from pyflink.table.udf import udf
@udf(result_type=DataTypes.STRING())
def mask_phone(phone: str):
"""電話番号をマスキングします: 最初の 3 桁と最後の 4 桁を保持し、中間を **** に置き換えます"""
if phone is None or len(phone) != 11:
return phone
return phone[:3] + '****' + phone[7:]この UDF を SQL ジョブで使用するには、次のようにします:
CREATE TEMPORARY FUNCTION mask_phone AS 'udfs.mask_phone' LANGUAGE PYTHON;
INSERT INTO sink_table
SELECT name, mask_phone(phone) AS masked_phone
FROM source_table;UDF の登録、更新、削除の手順については、「ユーザー定義関数 (UDF) の管理」をご参照ください。
コネクタの使用
サポートされているコネクタのリストについては、「サポートされているコネクタ」をご参照ください。コネクタを使用するには、次の手順に従ってください:
リアルタイムコンピューティングコンソールにログインします。
対象のワークスペースの [操作] 列にある [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[ファイル管理] をクリックします。
[アーティファクトのアップロード] をクリックし、ターゲットコネクタの Python パッケージを選択します。
カスタムコネクタまたは組み込みの Flink コネクタをアップロードできます。公式 Flink コネクタの Python パッケージのダウンロードリンクについては、「コネクタリスト」をご参照ください。
ページで、 をクリックします。[追加の依存関係] フィールドでコネクタパッケージを選択し、他のパラメーターを設定してジョブをデプロイします。
デプロイしたジョブ名をクリックします。[デプロイメント詳細] タブの [ランタイムパラメーター] セクションで、[編集] をクリックします。[その他の設定] フィールドに、Python コネクタパッケージへのパスを追加します。
ジョブが複数のコネクタパッケージ (例:connector-1.jar と connector-2.jar) に依存している場合は、次のように設定します:
pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'組み込みのコネクタ、データ形式、およびカタログ (VVR 11.2 以降のみ) を使用するには、[ランタイムパラメーター] の [その他の設定] フィールドに次の構成を追加します:
pipeline.used-builtin-connectors: kafka;sls pipeline.used-builtin-formats: avro;parquet
ジョブのデバッグ
Python ユーザー定義関数の実装では、logging モジュールを使用してログ情報を出力し、トラブルシューティングに役立てます。例:
import logging
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + jロギング後、TaskManager のログファイルで出力を確認します。
ローカルデバッグ
デフォルトでは、Realtime Compute for Apache Flink はパブリックネットワークにアクセスできないため、コードがオンラインのデータソースに直接接続してテストできない場合があります。ローカルでのデバッグには、次のいずれかのアプローチを使用します:
単体テスト:ユーザー定義関数 (UDF) を個別にテストしてロジックを検証します。
ローカル実行:ファイルやインメモリデータなどのローカルデータソースを使用して入力をシミュレートし、ジョブをローカルで実行して処理ロジックを検証します。例:
from pyflink.datastream import StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() # ローカルデータソースを使用してテスト ds = env.from_collection([('Alice', 1), ('Bob', 2), ('Alice', 3)]) ds.key_by(lambda x: x[0]).sum(1).print() env.execute("local_test")リモートデバッグ:オンラインデータソースを使用してデバッグするには、「コネクタベースのジョブをローカルで実行およびデバッグする」をご参照ください。
ジョブのデプロイ
Python ジョブを開発した後、リアルタイムコンピューティングコンソールにアップロードしてデプロイします。次の手順に従ってください:
リアルタイムコンピューティングコンソールにログインし、対象のワークスペースに移動します。
左側のナビゲーションウィンドウで、[ファイル管理] をクリックします。Python ジョブファイル (.py または .zip) を、サードパーティの依存関係や設定ファイルとともにアップロードします。
ページで、 をクリックし、デプロイ情報を入力します。
パラメーター
説明
Python ファイルパス
アップロードした Python ジョブファイルを選択します。
エントリモジュール
ジョブファイルが .py ファイルの場合は空白のままにします。.zip ファイルの場合は、
my_jobのようにエントリモジュール名を入力します。追加の依存関係
コネクタの JAR や設定ファイルをここで選択します。
Python ライブラリ
サードパーティの Python パッケージ (.whl または .zip) をここで選択します。
Python アーカイブ
カスタム Python 仮想環境 (.zip) をここで選択します。
[デプロイ] をクリックします。
デプロイパラメーターの詳細については、「ジョブのデプロイ」をご参照ください。
完全なサンプルコード
この例は、Kafka からデータを読み取り、処理し、MySQL に書き込む Python ストリーミングジョブを示しています。参照用としてのみご使用ください。
この例には、チェックポイントや再起動ポリシーの構成は含まれていません。これらの設定は、デプロイ後に [デプロイメント詳細] ページでカスタマイズできます。詳細については、「ジョブのデプロイ情報の構成」をご参照ください。
import logging
import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def kafka_to_mysql():
# 実行環境の作成
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Kafka ソーステーブルの作成
t_env.execute_sql("""
CREATE TABLE kafka_source (
`id` INT,
`name` STRING,
`score` INT,
`event_time` TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'student_topic',
'properties.bootstrap.servers' = 'your-kafka-broker:9092',
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
""")
# MySQL シンクテーブルの作成
t_env.execute_sql("""
CREATE TABLE mysql_sink (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://your-mysql-host:3306/my_database',
'table-name' = 'student',
'username' = 'your_username',
'password' = 'your_password'
)
""")
# スコアが 60 以上のレコードをフィルタリングして MySQL に書き込む
t_env.execute_sql("""
INSERT INTO mysql_sink
SELECT id, name, score
FROM kafka_source
WHERE score >= 60
""")
if __name__ == '__main__':
kafka_to_mysql()プリインストールされているソフトウェアパッケージリスト
VVR-11
Flink ワークスペースには、以下のソフトウェアパッケージがインストールされています。
ソフトウェアパッケージ | バージョン |
apache-beam | 2.48.0 |
avro-python3 | 1.10.2 |
brotlipy | 0.7.0 |
certifi | 2022.12.7 |
cffi | 1.15.1 |
charset-normalizer | 2.0.4 |
cloudpickle | 2.2.1 |
conda | 22.11.1 |
conda-content-trust | 0.1.3 |
conda-package-handling | 1.9.0 |
crcmod | 1.7 |
cryptography | 38.0.1 |
Cython | 3.0.12 |
dill | 0.3.1.1 |
dnspython | 2.7.0 |
docopt | 0.6.2 |
exceptiongroup | 1.3.0 |
fastavro | 1.12.1 |
fasteners | 0.20 |
find_libpython | 0.5.0 |
grpcio | 1.56.2 |
grpcio-tools | 1.56.2 |
hdfs | 2.7.3 |
httplib2 | 0.22.0 |
idna | 3.4 |
importlib_metadata | 8.7.0 |
iniconfig | 2.1.0 |
isort | 6.1.0 |
numpy | 1.24.4 |
objsize | 0.6.1 |
orjson | 3.9.15 |
packaging | 25.0 |
pandas | 2.3.3 |
pemja | 0.5.5 |
pip | 22.3.1 |
pluggy | 1.0.0 |
proto-plus | 1.26.1 |
protobuf | 4.25.8 |
py-spy | 0.4.0 |
py4j | 0.10.9.7 |
pyarrow | 11.0.0 |
pyarrow-hotfix | 0.6 |
pycodestyle | 2.14.0 |
pycosat | 0.6.4 |
pycparser | 2.21 |
pydot | 1.4.2 |
pymongo | 4.15.4 |
pyOpenSSL | 22.0.0 |
pyparsing | 3.2.5 |
PySocks | 1.7.1 |
pytest | 7.4.4 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2025.11.3 |
requests | 2.32.5 |
ruamel.yaml | 0.18.16 |
ruamel.yaml.clib | 0.2.14 |
setuptools | 70.0.0 |
six | 1.16.0 |
tomli | 2.3.0 |
toolz | 0.12.0 |
tqdm | 4.64.1 |
typing_extensions | 4.15.0 |
tzdata | 2025.2 |
urllib3 | 1.26.13 |
wheel | 0.38.4 |
zipp | 3.23.0 |
zstandard | 0.25.0 |
VVR-8
Flink ワークスペースには、以下のソフトウェアパッケージがインストールされています。
ソフトウェアパッケージ | バージョン |
apache-beam | 2.43.0 |
avro-python3 | 1.9.2.1 |
certifi | 2025.7.9 |
charset-normalizer | 3.4.2 |
cloudpickle | 2.2.0 |
crcmod | 1.7 |
Cython | 0.29.24 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 1.4.7 |
fasteners | 0.19 |
find_libpython | 0.4.1 |
grpcio | 1.46.3 |
grpcio-tools | 1.46.3 |
hdfs | 2.7.3 |
httplib2 | 0.20.4 |
idna | 3.10 |
isort | 6.0.1 |
numpy | 1.21.6 |
objsize | 0.5.2 |
orjson | 3.10.18 |
pandas | 1.3.5 |
pemja | 0.3.2 |
pip | 22.3.1 |
proto-plus | 1.26.1 |
protobuf | 3.20.3 |
py4j | 0.10.9.7 |
pyarrow | 8.0.0 |
pycodestyle | 2.14.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.2.3 |
python-dateutil | 2.9.0 |
pytz | 2025.2 |
regex | 2024.11.6 |
requests | 2.32.4 |
setuptools | 58.1.0 |
six | 1.17.0 |
typing_extensions | 4.14.1 |
urllib3 | 2.5.0 |
wheel | 0.33.4 |
zstandard | 0.23.0 |
VVR-6
Flink ワークスペースには、以下のソフトウェアパッケージがインストールされています。
ソフトウェアパッケージ | バージョン |
apache-beam | 2.27.0 |
avro-python3 | 1.9.2.1 |
certifi | 2024.8.30 |
charset-normalizer | 3.3.2 |
cloudpickle | 1.2.2 |
crcmod | 1.7 |
Cython | 0.29.16 |
dill | 0.3.1.1 |
docopt | 0.6.2 |
fastavro | 0.23.6 |
future | 0.18.3 |
grpcio | 1.29.0 |
hdfs | 2.7.3 |
httplib2 | 0.17.4 |
idna | 3.8 |
importlib-metadata | 6.7.0 |
isort | 5.11.5 |
jsonpickle | 2.0.0 |
mock | 2.0.0 |
numpy | 1.19.5 |
oauth2client | 4.1.3 |
pandas | 1.1.5 |
pbr | 6.1.0 |
pemja | 0.1.4 |
pip | 20.1.1 |
protobuf | 3.17.3 |
py4j | 0.10.9.3 |
pyarrow | 2.0.0 |
pyasn1 | 0.5.1 |
pyasn1-modules | 0.3.0 |
pycodestyle | 2.10.0 |
pydot | 1.4.2 |
pymongo | 3.13.0 |
pyparsing | 3.1.4 |
python-dateutil | 2.8.0 |
pytz | 2024.1 |
requests | 2.31.0 |
rsa | 4.9 |
setuptools | 47.1.0 |
six | 1.16.0 |
typing-extensions | 3.7.4.3 |
urllib3 | 2.0.7 |
wheel | 0.42.0 |
zipp | 3.15.0 |
参考文献
Flink Python ジョブ開発プロセスの完全なウォークスルーについては、「Flink Python ジョブのクイックスタート」をご参照ください。
Flink Python ジョブでカスタム Python 仮想環境、サードパーティの Python パッケージ、JAR、およびデータファイルを使用する方法の詳細については、「Python 依存関係の使用」をご参照ください。
Realtime Compute for Apache Flink は、SQL および DataStream ジョブもサポートしています。開発ガイドについては、「ジョブ開発マップ」および「JAR ジョブの開発」をご参照ください。